Skip to content

UN-3554 [FEAT] PG Queue 9d slice 2 — reaper process + barrier-orphan sweep#2058

Merged
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationfrom
UN-3554-reaper-process
Jun 16, 2026
Merged

UN-3554 [FEAT] PG Queue 9d slice 2 — reaper process + barrier-orphan sweep#2058
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationfrom
UN-3554-reaper-process

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Second slice of 9d. Builds on the leader-election lease (UN-3553) to stand up the reaper process — the leader-elected recovery loop — with its first recovery job, the barrier-orphan sweep. Ships dark: launched explicitly, never part of the default worker set, and a no-op until the PG barrier is used.

Why now (and why not folded into 9e)

9d is 9e's safety net. The reaper harness (lease-maintenance loop + graceful shutdown + entrypoint) is transport-agnostic — identical regardless of what recovery runs inside it — so building it now isn't throwaway; 9e just drops recovery functions into a proven loop. Keeping it out of 9e shrinks that phase's blast radius, and the barrier sweep makes WORKER_BARRIER_BACKEND=pg production-safe independently.

What's in it

workers/queue_backend/pg_queue/reaper.py

  • sweep_expired_barriers(conn)DELETE FROM pg_barrier_state WHERE expires_at < now() RETURNING execution_id + a loud per-orphan WARNING. The documented PgBarrier backstop: reclaims a barrier whose header tasks never all completed. By PgBarrier's existing semantics, a late in-flight decrement then finds no row and abandons (no spurious callback). Marking the owning execution terminal needs the backend + the pipeline's PG shape — that's 9e.
  • PgReaper — leader-elected loop. Each cycle: renew (step down to standby if renew() returns False), else try to acquire; sweep only while leader. run() loops with graceful SIGTERM/SIGINT shutdown + lease release on exit. Guard: cycle interval must be shorter than the lease window, or the leader thrashes leadership between renews.
  • reaper_interval_from_env() (WORKER_PG_REAPER_INTERVAL_SECONDS, default 5s), main(), and a python -m queue_backend.pg_queue.reaper entrypoint (no worker-app bootstrap needed — it's pure SQL).

leader_election.py — exposes a lease_seconds property (the reaper validates its cycle against it).

Tests (test_pg_reaper.py) — 15:

  • env/construction guards (interval-shorter-than-lease, non-positive rejected);
  • leadership gating (sweeps only while leader; steps down on renew-fail; releases the lease on stop);
  • real-PG sweep (reclaims only rows past expires_at, leaves fresh barriers intact; no-op when nothing expired).

Out of scope (follow-ups)

  • run-worker.sh wiring + liveness probe — separate followup (mirrors 9c → 9c-followup).
  • Pipeline recovery — counter reconstruction from WorkflowFileExecution, per-stage re-enqueue of stuck files, marking orphaned executions terminal — deferred to 9e, where there's a real PG pipeline to test against. This slice is the storage/orphan backstop only.

Testing

15/15 pass, stable across reruns; 70/70 across reaper + leader-election + pg_barrier (no regression). Ruff clean. Pre-commit green.

Base: feat/UN-3445-pg-queue-integration (not main). Sub-task UN-3554 under UN-3536 (Phase 9), builds on UN-3553.

…sweep

Builds on the leader-election lease (UN-3553): stands up the reaper process —
the leader-elected recovery loop — with its first recovery job, the
barrier-orphan sweep. Ships dark (launched explicitly, never in the default
worker set).

- reaper.py:
  - sweep_expired_barriers(conn): DELETE pg_barrier_state WHERE expires_at <
    now() RETURNING + loud per-orphan WARNING. The documented PgBarrier
    backstop — reclaims barriers whose header tasks never all completed; a
    late in-flight decrement then finds no row and abandons (existing
    semantics). Execution terminal-status recovery is 9e's job.
  - PgReaper: leader-elected loop. Each cycle renews (steps down to standby if
    renew() returns False), else tries to acquire; sweeps ONLY while leader.
    run() loops with graceful SIGTERM/SIGINT shutdown + lease release on exit.
    Guard: cycle interval must be shorter than the lease window, or the leader
    thrashes leadership between renews.
  - reaper_interval_from_env() (WORKER_PG_REAPER_INTERVAL_SECONDS, default 5s),
    main(), python -m queue_backend.pg_queue.reaper entrypoint.
- leader_election.py: expose lease_seconds property (reaper validates its
  cycle against it).
- test_pg_reaper.py: 15 tests — env/construction guards (interval < lease),
  leadership gating (sweeps only when leader, steps down on renew-fail,
  releases on stop), real-PG sweep (reclaims only expired, leaves fresh).

Out of scope: run-worker.sh wiring + liveness (followup, like 9c-followup);
pipeline recovery (counter reconstruction, per-stage re-enqueue) deferred to
9e where there's a real PG pipeline to test against.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 81a9e57e-4f98-4d2f-b6c9-f7c565b5f467

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3554-reaper-process

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review Toolkit — comprehensive review (#2058 · PG Queue 9d slice 2)

Ran six specialist passes (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier). The slice is well-built and closely follows the established leader_election.py conventions — leadership gating, the interval-vs-lease guard, signal-handler fallback, and release-on-exit are all sound and tested.

One issue is worth blocking on and was independently found by three passes: sweep_expired_barriers omits the rollback-and-discard discipline every other PG component in this package uses, so a single transient DB error permanently wedges the reaper while it keeps holding leadership and logging look-alike tracebacks. Verified: create_pg_connection runs in manual-commit mode (no autocommit), and LeaderLease._cursor is the canonical pattern this path should mirror.

Inline findings below, prioritised. Severity is prefixed on each comment.

Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py Outdated
Comment thread workers/queue_backend/pg_queue/reaper.py
Comment thread workers/tests/test_pg_reaper.py
Comment thread workers/tests/test_pg_reaper.py
Comment thread workers/tests/test_pg_reaper.py Outdated
Comment thread workers/tests/test_pg_reaper.py

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review Toolkit — comprehensive review (#2058 · PG Queue 9d slice 2)

Ran six specialist passes (Code Reviewer, Silent-Failure Hunter, Type-Design Analyzer, Test Analyzer, Comment Analyzer, Code Simplifier). The slice is well-built and closely follows the established leader_election.py conventions — leadership gating, the interval-vs-lease guard, signal-handler fallback, and release-on-exit are all sound and tested.

One issue is worth blocking on and was independently found by three passes: sweep_expired_barriers omits the rollback-and-discard discipline every other PG component in this package uses, so a single transient DB error permanently wedges the reaper while it keeps holding leadership and logging look-alike tracebacks. Verified: create_pg_connection runs in manual-commit mode (no autocommit), and LeaderLease._cursor is the canonical pattern this path should mirror.

Inline findings below, prioritised. Severity is prefixed on each comment.

…ntract

Toolkit + SonarCloud review on #2058:

- [CRITICAL] sweep_expired_barriers now rolls back on error before re-raising
  (conn is manual-commit; an un-rolled-back failure left it in an aborted-txn
  state, poisoning every later cycle → silent self-perpetuating stall). This
  also clears SonarCloud's C-reliability gate.
- [HIGH] On a failed sweep PgReaper discards its OWNED connection so the next
  tick reconnects — covers a poisoned/dead handle that `.closed` alone misses.
- [MEDIUM] renew() raising now sets _is_leader=False before propagating
  (honours the lease's "raise == stop acting" contract).
- [MEDIUM] release() failure on shutdown is logged (with the lease-window
  note) instead of silently suppressed.
- [MEDIUM] signal-handler ValueError is re-raised unless we're off the main
  thread (don't mislabel an unrelated ValueError).
- [MEDIUM/type-design] tick() returns a TickOutcome(was_leader, reclaimed)
  NamedTuple instead of an overloaded `-1` int sentinel; added an is_leader
  property; lease param typed against a new LeaderLeaseLike Protocol.
- [LOW] Reworded the sweep race comment, the step-down log (same-cycle
  re-acquire), and the run() self-recovery comment for accuracy.
- Tests: +8 — run() swallows a tick error; owned-conn recreated-when-closed;
  injected-conn never swapped; failed-sweep discards owned conn; sweep SQL
  contract (no DB); sweep rolls back on error; step-down-then-reacquire;
  renew-raising steps down. 23 total; drive paths via is_leader, no
  private-flag poking.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review addressed — commit 59cc79f83

All toolkit findings + the SonarCloud reliability gate are in. Validated each against the code first; all were valid.

CRITICAL / HIGH (the real bug)

  • Missing rollback poisoned the sweep connection. create_pg_connection is manual-commit, so an un-rolled-back DELETE/commit failure left the connection in an aborted-transaction state — and _get_sweep_conn (which only rebuilt on .closed) handed the same poisoned handle back forever → a silent self-perpetuating stall. Fixed: sweep_expired_barriers now rolls back before re-raising, and PgReaper discards its owned connection on a sweep error so the next tick reconnects. This is also what cleared SonarCloud's C-reliability gate.

MEDIUM (correctness + design)

  • renew() raising now sets _is_leader=False before propagating (honours the lease's "raise == stop acting" contract).
  • release() failure on shutdown is logged (with the lease-window note), not silently suppressed.
  • signal-handler except ValueError re-raises unless we're off the main thread.
  • tick() returns a TickOutcome(was_leader, reclaimed) NamedTuple — the truthy -1 sentinel is gone; added an is_leader property; lease param typed against a new LeaderLeaseLike(Protocol).

LOW — reworded the sweep race comment, the step-down log (same-cycle re-acquire), and the run() self-recovery comment; kept the load-bearing interval<=0 guard, trimmed a duplicated rationale comment.

Tests: 15 → 23. Added: run() swallows a tick error; owned-conn recreated-when-closed; injected-conn never swapped; failed-sweep discards owned conn; sweep SQL contract (no-DB, so the predicate has coverage even without live PG); sweep rolls back on error; step-down-then-reacquire; renew-raising steps down. Paths driven via is_leader, no private-flag poking.

23/23 pass; 78/78 across reaper + leader-election + pg_barrier; ruff + pre-commit green.

…d S1244)

The two reaper-interval asserts compared float returns with ==; the values
are exactly representable so it was harmless, but pytest.approx is the correct
idiom and clears the S1244 reliability bugs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

SonarCloud S1244 (float-equality) ×2 fixed in ceb059b73 — the two reaper-interval asserts now use pytest.approx(5.0) / pytest.approx(2.5) instead of == 5.0 / == 2.5. Values were exactly representable (harmless), but approx is the correct idiom and clears the reliability bugs. 23/23 tests still green.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 16, 2026 04:53
@greptile-apps

greptile-apps Bot commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds the reaper process — a leader-elected recovery loop built on top of the LeaderLease from the previous slice (UN-3553). It ships the process harness (lease-maintenance loop + graceful SIGTERM/SIGINT shutdown) together with one concrete recovery job: sweeping expired rows from pg_barrier_state.

  • reaper.py introduces PgReaper, sweep_expired_barriers, TickOutcome, and supporting helpers; PgReaper enforces at construction that the cycle interval is strictly shorter than the lease window to prevent leadership thrashing, and the finally block of run() releases the lease and closes owned connections.
  • leader_election.py adds a lease_seconds property to expose the lease window to the reaper's guard check.
  • test_pg_reaper.py provides 15 tests across 4 layers (env/construction, leadership gating with a fake lease, connection handling with mocks, real-Postgres barrier sweep).

Confidence Score: 5/5

Safe to merge — the reaper ships dark and is a no-op until PG barrier rows exist, so there is no production blast radius today.

The harness is well-guarded: the interval-shorter-than-lease check at construction prevents leadership thrashing, the finally block correctly releases the lease and closes owned connections, sweep errors discard the connection and let the loop continue, and renew failures atomically step leadership down before any recovery work runs. The four-layer test suite covers the failure modes that matter.

No files require special attention.

Important Files Changed

Filename Overview
workers/queue_backend/pg_queue/reaper.py New reaper module — PgReaper loop, sweep_expired_barriers, LeaderLeaseLike Protocol, and reaper_interval_from_env. Interval-shorter-than-lease guard, owned-connection lifecycle, and graceful shutdown are all well-handled.
workers/queue_backend/pg_queue/leader_election.py Minimal additive change: exposes lease_seconds as a read-only property so PgReaper can validate its cycle interval against the lease window at construction time.
workers/tests/test_pg_reaper.py 15 tests across 4 layers with good separation of concerns; _FakeLease duck-types the Protocol cleanly, and Layer 4 seeds real Postgres rows and verifies only expired ones are reclaimed.
workers/queue_backend/pg_queue/init.py Exports the new reaper symbols (PgReaper, LeaderLeaseLike, TickOutcome, reaper_interval_from_env, sweep_expired_barriers) alongside existing package exports.

Reviews (2): Last reviewed commit: "UN-3554 [FEAT] Address Greptile: manual-..." | Re-trigger Greptile

Comment thread workers/tests/test_pg_reaper.py
Comment thread workers/queue_backend/pg_queue/reaper.py
… owned conn

- The real-Postgres fixture (barrier_conn) was autocommit, which made
  sweep_expired_barriers' own commit() a no-op and its rollback unreachable —
  so Layer 4 tested a different mode than the production reaper
  (create_pg_connection is manual-commit). Switched the fixture to manual-commit
  and added explicit commits to the seed/read/cleanup helpers, so the real-DB
  tests now exercise the sweep's commit (and rollback) in production mode.
- run() now closes its OWNED sweep connection on shutdown (an injected one is
  the caller's). Harmless for the main() process but keeps PgReaper clean if
  ever embedded / test-driven.

23 tests pass; ruff clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile review addressed — commit 7ea670313

Both findings validated and fixed:

  • Layer-4 fixture was autocommit — which made the sweep's own commit() a no-op and its rollback unreachable, so the real-Postgres tests ran in a different mode than the production reaper (create_pg_connection is manual-commit). Switched the fixture to manual-commit + explicit commits in the seed/read/cleanup helpers, so Layer 4 now verifies the sweep actually persists its DELETE and exercises the rollback path in production mode.
  • Owned sweep connection not closed on shutdownrun() now closes its owned sweep connection in the finally (an injected one stays the caller's).

23/23 pass; ruff + pre-commit green. Greptile confidence was 4/5 with these two as the only items — both resolved.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit 649e06b into feat/UN-3445-pg-queue-integration Jun 16, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3554-reaper-process branch June 16, 2026 05:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant