UN-3554 [FEAT] PG Queue 9d slice 2 — reaper process + barrier-orphan sweep#2058
Conversation
…sweep
Builds on the leader-election lease (UN-3553): stands up the reaper process —
the leader-elected recovery loop — with its first recovery job, the
barrier-orphan sweep. Ships dark (launched explicitly, never in the default
worker set).
- reaper.py:
- sweep_expired_barriers(conn): DELETE pg_barrier_state WHERE expires_at <
now() RETURNING + loud per-orphan WARNING. The documented PgBarrier
backstop — reclaims barriers whose header tasks never all completed; a
late in-flight decrement then finds no row and abandons (existing
semantics). Execution terminal-status recovery is 9e's job.
- PgReaper: leader-elected loop. Each cycle renews (steps down to standby if
renew() returns False), else tries to acquire; sweeps ONLY while leader.
run() loops with graceful SIGTERM/SIGINT shutdown + lease release on exit.
Guard: cycle interval must be shorter than the lease window, or the leader
thrashes leadership between renews.
- reaper_interval_from_env() (WORKER_PG_REAPER_INTERVAL_SECONDS, default 5s),
main(), python -m queue_backend.pg_queue.reaper entrypoint.
- leader_election.py: expose lease_seconds property (reaper validates its
cycle against it).
- test_pg_reaper.py: 15 tests — env/construction guards (interval < lease),
leadership gating (sweeps only when leader, steps down on renew-fail,
releases on stop), real-PG sweep (reclaims only expired, leaves fresh).
Out of scope: run-worker.sh wiring + liveness (followup, like 9c-followup);
pipeline recovery (counter reconstruction, per-stage re-enqueue) deferred to
9e where there's a real PG pipeline to test against.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
PR Review Toolkit — comprehensive review (#2058 · PG Queue 9d slice 2)
Ran six specialist passes (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier). The slice is well-built and closely follows the established leader_election.py conventions — leadership gating, the interval-vs-lease guard, signal-handler fallback, and release-on-exit are all sound and tested.
One issue is worth blocking on and was independently found by three passes: sweep_expired_barriers omits the rollback-and-discard discipline every other PG component in this package uses, so a single transient DB error permanently wedges the reaper while it keeps holding leadership and logging look-alike tracebacks. Verified: create_pg_connection runs in manual-commit mode (no autocommit), and LeaderLease._cursor is the canonical pattern this path should mirror.
Inline findings below, prioritised. Severity is prefixed on each comment.
muhammad-ali-e
left a comment
There was a problem hiding this comment.
PR Review Toolkit — comprehensive review (#2058 · PG Queue 9d slice 2)
Ran six specialist passes (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier). The slice is well-built and closely follows the established leader_election.py conventions — leadership gating, the interval-vs-lease guard, signal-handler fallback, and release-on-exit are all sound and tested.
One issue is worth blocking on and was independently found by three passes: sweep_expired_barriers omits the rollback-and-discard discipline every other PG component in this package uses, so a single transient DB error permanently wedges the reaper while it keeps holding leadership and logging look-alike tracebacks. Verified: create_pg_connection runs in manual-commit mode (no autocommit), and LeaderLease._cursor is the canonical pattern this path should mirror.
Inline findings below, prioritised. Severity is prefixed on each comment.
…ntract Toolkit + SonarCloud review on #2058: - [CRITICAL] sweep_expired_barriers now rolls back on error before re-raising (conn is manual-commit; an un-rolled-back failure left it in an aborted-txn state, poisoning every later cycle → silent self-perpetuating stall). This also clears SonarCloud's C-reliability gate. - [HIGH] On a failed sweep PgReaper discards its OWNED connection so the next tick reconnects — covers a poisoned/dead handle that `.closed` alone misses. - [MEDIUM] renew() raising now sets _is_leader=False before propagating (honours the lease's "raise == stop acting" contract). - [MEDIUM] release() failure on shutdown is logged (with the lease-window note) instead of silently suppressed. - [MEDIUM] signal-handler ValueError is re-raised unless we're off the main thread (don't mislabel an unrelated ValueError). - [MEDIUM/type-design] tick() returns a TickOutcome(was_leader, reclaimed) NamedTuple instead of an overloaded `-1` int sentinel; added an is_leader property; lease param typed against a new LeaderLeaseLike Protocol. - [LOW] Reworded the sweep race comment, the step-down log (same-cycle re-acquire), and the run() self-recovery comment for accuracy. - Tests: +8 — run() swallows a tick error; owned-conn recreated-when-closed; injected-conn never swapped; failed-sweep discards owned conn; sweep SQL contract (no DB); sweep rolls back on error; step-down-then-reacquire; renew-raising steps down. 23 total; drive paths via is_leader, no private-flag poking. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review addressed — commit
|
…d S1244) The two reaper-interval asserts compared float returns with ==; the values are exactly representable so it was harmless, but pytest.approx is the correct idiom and clears the S1244 reliability bugs. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
SonarCloud S1244 (float-equality) ×2 fixed in |
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_queue/reaper.py | New reaper module — PgReaper loop, sweep_expired_barriers, LeaderLeaseLike Protocol, and reaper_interval_from_env. Interval-shorter-than-lease guard, owned-connection lifecycle, and graceful shutdown are all well-handled. |
| workers/queue_backend/pg_queue/leader_election.py | Minimal additive change: exposes lease_seconds as a read-only property so PgReaper can validate its cycle interval against the lease window at construction time. |
| workers/tests/test_pg_reaper.py | 15 tests across 4 layers with good separation of concerns; _FakeLease duck-types the Protocol cleanly, and Layer 4 seeds real Postgres rows and verifies only expired ones are reclaimed. |
| workers/queue_backend/pg_queue/init.py | Exports the new reaper symbols (PgReaper, LeaderLeaseLike, TickOutcome, reaper_interval_from_env, sweep_expired_barriers) alongside existing package exports. |
Reviews (2): Last reviewed commit: "UN-3554 [FEAT] Address Greptile: manual-..." | Re-trigger Greptile
… owned conn - The real-Postgres fixture (barrier_conn) was autocommit, which made sweep_expired_barriers' own commit() a no-op and its rollback unreachable — so Layer 4 tested a different mode than the production reaper (create_pg_connection is manual-commit). Switched the fixture to manual-commit and added explicit commits to the seed/read/cleanup helpers, so the real-DB tests now exercise the sweep's commit (and rollback) in production mode. - run() now closes its OWNED sweep connection on shutdown (an injected one is the caller's). Harmless for the main() process but keeps PgReaper clean if ever embedded / test-driven. 23 tests pass; ruff clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Greptile review addressed — commit
|
|
649e06b
into
feat/UN-3445-pg-queue-integration



What
Second slice of 9d. Builds on the leader-election lease (UN-3553) to stand up the reaper process — the leader-elected recovery loop — with its first recovery job, the barrier-orphan sweep. Ships dark: launched explicitly, never part of the default worker set, and a no-op until the PG barrier is used.
Why now (and why not folded into 9e)
9d is 9e's safety net. The reaper harness (lease-maintenance loop + graceful shutdown + entrypoint) is transport-agnostic — identical regardless of what recovery runs inside it — so building it now isn't throwaway; 9e just drops recovery functions into a proven loop. Keeping it out of 9e shrinks that phase's blast radius, and the barrier sweep makes
WORKER_BARRIER_BACKEND=pgproduction-safe independently.What's in it
workers/queue_backend/pg_queue/reaper.pysweep_expired_barriers(conn)—DELETE FROM pg_barrier_state WHERE expires_at < now() RETURNING execution_id+ a loud per-orphanWARNING. The documentedPgBarrierbackstop: reclaims a barrier whose header tasks never all completed. By PgBarrier's existing semantics, a late in-flight decrement then finds no row and abandons (no spurious callback). Marking the owning execution terminal needs the backend + the pipeline's PG shape — that's 9e.PgReaper— leader-elected loop. Each cycle: renew (step down to standby ifrenew()returnsFalse), else try to acquire; sweep only while leader.run()loops with graceful SIGTERM/SIGINT shutdown + lease release on exit. Guard: cycle interval must be shorter than the lease window, or the leader thrashes leadership between renews.reaper_interval_from_env()(WORKER_PG_REAPER_INTERVAL_SECONDS, default 5s),main(), and apython -m queue_backend.pg_queue.reaperentrypoint (no worker-app bootstrap needed — it's pure SQL).leader_election.py— exposes alease_secondsproperty (the reaper validates its cycle against it).Tests (
test_pg_reaper.py) — 15:expires_at, leaves fresh barriers intact; no-op when nothing expired).Out of scope (follow-ups)
run-worker.shwiring + liveness probe — separate followup (mirrors 9c → 9c-followup).WorkflowFileExecution, per-stage re-enqueue of stuck files, marking orphaned executions terminal — deferred to 9e, where there's a real PG pipeline to test against. This slice is the storage/orphan backstop only.Testing
15/15 pass, stable across reruns; 70/70 across reaper + leader-election + pg_barrier (no regression). Ruff clean. Pre-commit green.
Base:
feat/UN-3445-pg-queue-integration(notmain). Sub-task UN-3554 under UN-3536 (Phase 9), builds on UN-3553.