Skip to content

UN-3576 [FEAT] PG Queue 9g — docker-compose PG consumer + reaper services (gated profile)#2074

Merged
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationfrom
feat/UN-3576-FEAT_docker_compose_pg_services
Jun 18, 2026
Merged

UN-3576 [FEAT] PG Queue 9g — docker-compose PG consumer + reaper services (gated profile)#2074
muhammad-ali-e merged 4 commits into
feat/UN-3445-pg-queue-integrationfrom
feat/UN-3576-FEAT_docker_compose_pg_services

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What & why

Makes the already-built PG-queue pipeline (orchestrator, fan-out, barrier, callback on Postgres) runnable under docker compose, gated OFF by default — so it runs in containers, not just the host run-worker.sh. Each component is its own service (one-service-per-component), so the eventual Kubernetes mapping is a mechanical "one service → one Deployment".

This is the docker analog of an ops rollout: the services exist and are runnable, but no traffic moves until the backend gate (PG_QUEUE_TRANSPORT_ENABLED) + Flipt flag are flipped (a later step).

Changes

  • workers/run-worker-docker.sh (the image ENTRYPOINT) — recognises two new launch types and execs the dedicated Python module instead of building a Celery worker command:
    • pg-queue-consumer / pg-consumerpython -m pg_queue_consumer
    • pg-queue-reaper / pg-reaper / reaperpython -m pg_queue_reaper
    • Dispatched before the Celery two-path logic; existing worker types are unaffected. No Dockerfile change (the script is already the entrypoint).
  • docker/docker-compose.yaml — 5 services behind the off-by-default pg-queue profile:
    • worker-pg-orchestrator-api (api_deployment; celery_api_deployments)
    • worker-pg-orchestrator-general (general; celery)
    • worker-pg-fileproc (file_processing; file_processing,api_file_processing)
    • worker-pg-callback (callback; both callback queues)
    • worker-pg-reaper (single-instance, leader-elected)
    • The orchestrators + reaper are broker-free. fileproc + callback keep a rabbitmq dependency because they still hand off to the Celery executor (tool-execution RPC) / notifications until those are migrated.
  • docker/sample.env — documents the new env vars + the profile, and that the backend gate stays off.

Non-regression

  • Default docker compose up starts zero PG services (profile off) — behaviour identical to today.
  • Even with the PG services running, executions still route to Celery until the gate + flag are flipped.

Dev-test (passed end-to-end)

Built worker-unified:dev, docker compose --profile pg-queue up -d:

  • All 5 services boot and stay up; each consumer registers the correct worker-type tasks and polls the correct queues; consumer/reaper liveness endpoints healthy.
  • Reaper leader-election verified: deferred as standby while another reaper held the lease, then acquired leadership once that stopped.
  • Triggered an API deployment → the full pipeline drained through the containerized consumers: orchestrator ran async_execute_binprocess_file_batch → PgBarrier decrement remaining=0 → callback → execution COMPLETED; barrier/dedup tables cleaned up.

Targets feat/UN-3445-pg-queue-integration (not main). Sub-task UN-3576.

🤖 Generated with Claude Code

…ices (gated profile)

Make the already-built PG-queue pipeline runnable under docker compose, gated
OFF by default, so it runs in containers (not just the host run-worker script).
Designed one-service-per-component for a mechanical K8s mapping later.

- run-worker-docker.sh (image ENTRYPOINT): recognise `pg-queue-consumer` and
  `pg-queue-reaper` launch types -> exec the dedicated Python module instead of
  building a Celery worker command. Dispatched before the Celery two-path; no
  Dockerfile change.
- docker-compose.yaml: 5 services behind the off-by-default `pg-queue` profile
  (4 consumers: orchestrator-api / orchestrator-general / fileproc / callback,
  plus a single-instance reaper). Orchestrators + reaper are broker-free;
  fileproc + callback keep rabbitmq (they still hand off to the Celery executor
  / notifications until those are migrated).
- sample.env: document the new env vars + profile; the backend transport gate
  stays off (ramping traffic is a later step).

Non-regression: default `docker compose up` starts zero PG services. Even with
them running, executions route to Celery until the gate + Flipt flag are set.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3200c91b-5d3a-45ac-bf3b-48dae508064b

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/UN-3576-FEAT_docker_compose_pg_services

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review Toolkit — automated review (#2074)

Ran six specialized agents (Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, Code Simplifier) over the 3 changed files. No Critical/High issues — the profile gating is correct and a plain docker compose up is a true no-op. The inline comments below are the actionable items, dominated by defaults/dispatch that mask operator misconfiguration in run-worker-docker.sh and a few comment-accuracy nits in docker-compose.yaml.

Non-inline notes (whole-file / repo-wide, can't anchor to a diff line):

  • run-worker-docker.sh lacks set -u/set -o pipefail (only set -e at line 10). The new functions lean heavily on ${VAR:-default} expansion; set -u would harden against future unguarded reads. Verify the rest of the script tolerates -u first.
  • depends_on uses short-form (no condition: service_healthy) on all 5 services — consumers can start before db/redis/rabbitmq are ready and crash-loop under restart: unless-stopped. This matches the entire existing compose file (zero healthchecks anywhere), so it's a repo-wide convention, not a PR blocker.
  • No shellcheck / docker compose config CI covers these files (hadolint is scoped to Dockerfile$). A shellcheck workers/*.sh pre-commit hook + a docker compose config -q smoke step would give cheap regression protection as both launchers grow.
  • Test coverage: nothing actionable — the underlying pg_queue_consumer/pg_queue_reaper modules are already unit-tested (workers/tests/test_pg_queue_consumer.py, test_pg_reaper.py); infra config has no test precedent in this repo.

Comment thread workers/run-worker-docker.sh
Comment thread workers/run-worker-docker.sh
Comment thread workers/run-worker-docker.sh
Comment thread workers/run-worker-docker.sh
Comment thread docker/docker-compose.yaml Outdated
Comment thread docker/docker-compose.yaml Outdated
Comment thread docker/docker-compose.yaml
Comment thread docker/docker-compose.yaml Outdated
…r config, reject typo'd PG commands, comment accuracy

- run-worker-docker.sh: ensure_pg_interpreter() fails loudly if the venv python
  is missing (avoids a silent restart:unless-stopped crash-loop); WARN when
  WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE are unset (non-compose launches);
  add a `pg-*|*-reaper` catch arm so a typo'd PG command fails loudly instead of
  silently becoming a default Celery worker (log-consumer etc. still pass
  through — verified by routing test).
- docker-compose.yaml: soften "broker-free" wording (fail-closed-to-Celery on a
  missing transport); spell out the executor/notification migration steps
  instead of the ③/④ markers; use the exact `pg_orchestrator_lock` name.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review round 1 addressed — c7bfe1b07

Thanks for the review. 7 fixed, 1 acknowledged-as-follow-up.

Fixed (run-worker-docker.sh):

  • [Med] interpreter guardensure_pg_interpreter() fails loudly if the venv python is missing (no silent crash-loop under restart: unless-stopped).
  • [Med] unset worker-type — WARN when WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE is unset.
  • [Low] unset queue — WARN when WORKER_PG_QUEUE_CONSUMER_QUEUE is unset; the module also logs its resolved queue set at startup.
  • [Med] near-miss commandspg-*|*-reaper) arm rejects typo'd PG commands loudly; deliberate fall-through documented. Verified no legit worker type (incl. log-consumer) is caught.

Fixed (docker-compose.yaml):

  • [Low] "broker-free" wording — softened to "broker-free for PG-routed executions" (fail-closed-to-Celery on a missing transport).
  • [Low] ③/④ comment rot — spelled out as "until the executor/tool-RPC moves to PG" / "until notifications move to PG".
  • [Low] lock name — exact pg_orchestrator_lock.

Acknowledged — follow-up (out of this PR's file scope):

  • [Low] enum validationWORKER_PG_QUEUE_CONSUMER_WORKER_TYPE/_QUEUE are consumed as unvalidated strings upstream (pg_queue_consumer/__main__.py, consumer.py), not in compose. Worth tightening those boundaries to raise on unknown values like WORKER_BARRIER_BACKEND does — better as its own change; mitigated here by the new unset-WARN.

Script changes validated by bash -n + a routing-precedence test; compose by docker compose config. The happy-path e2e (containerized pipeline → COMPLETED) was already passing and these are defensive guards/comments only.

@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Re: the non-inline (whole-file / repo-wide) notes

Thanks — all four are fair; my take on each:

  • set -u / set -o pipefail — the new PG functions are written set -u-safe (all reads use ${VAR:-default}), but run-worker-docker.sh is the shared entrypoint for every worker, and flipping shell options there risks tripping unguarded reads elsewhere in the 590-line script. Per your "verify the rest tolerates -u first" caveat, I'd rather not ride a global shell-option change on this docker-wiring PR — better as a focused hardening pass over the whole launcher (with a re-test of all worker types). Noting it for that.
  • depends_on short-form (no condition: service_healthy) — agreed, and as you note it matches the entire existing compose file (zero healthchecks anywhere) — a repo-wide convention, not introduced here. The new consumers/reaper self-recover (the reaper's connection is self-healing; consumers retry the poll), so a brief pre-readiness crash-loop self-resolves. Worth a repo-wide healthcheck pass separately.
  • No shellcheck / docker compose config CI — good cheap regression net as both launchers grow; out of scope for this change (CI config) but I'll flag a shellcheck workers/*.sh + docker compose config -q pre-commit hook as a follow-up.
  • Test coverage — agreed, nothing actionable: the pg_queue_consumer/pg_queue_reaper modules are unit-tested already, and infra config has no test precedent here.

None of these change the diff; the inline items are all fixed in c7bfe1b07.

@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 18, 2026 11:55
…ch case

The PG-queue dispatch case relied on an implicit no-match fall-through to the
Celery logic below. SonarCloud flags a case without a default; add an explicit
`*)` no-op arm documenting the intentional fall-through. Behaviour unchanged
(verified by routing test: legit worker types still reach Celery, typo'd PG
commands still rejected).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@greptile-apps

greptile-apps Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

Adds five Postgres-backed worker services (worker-pg-orchestrator-api, worker-pg-orchestrator-general, worker-pg-fileproc, worker-pg-callback, worker-pg-reaper) to docker-compose behind the off-by-default pg-queue compose profile, along with corresponding dispatch logic in the container entrypoint script. No existing service or routing is changed — executions stay on Celery until the backend gate and Flipt flag are flipped in a later step.

  • run-worker-docker.sh: Two new functions (run_pg_consumer, run_pg_reaper) exec a dedicated Python module rather than building a Celery command. A case block dispatching on pg-queue-consumer, pg-queue-reaper, and a pg-* safety catch-all runs before all Celery path logic.
  • docker/docker-compose.yaml: Five services behind the pg-queue profile; orchestrators/reaper carry no RabbitMQ dependency (broker-free for PG-routed paths); fileproc and callback retain rabbitmq until the executor/notification migration completes. Host ports 8093–8097 are new and don't collide with existing 8085–8092 range.
  • docker/sample.env: Documents the new env-var set and records WORKER_PG_REAPER_INTERVAL_SECONDS=5 as an active default, consistent with other sample values in the file.

Confidence Score: 5/5

Safe to merge — all five new services are completely inactive until the pg-queue compose profile is explicitly passed; a plain docker compose up starts none of them and the Celery path is untouched.

The change is purely additive: new services hidden behind an opt-in profile, new case branches that exit before the existing Celery logic, and documentation comments. Host ports 8093–8097 don't collide with the existing 8085–8092 range; the pg-* safety-net prevents mistyped commands from silently falling through to a Celery worker; and the fanout/callback services correctly retain a RabbitMQ dependency until their executor migration is complete.

No files require special attention.

Important Files Changed

Filename Overview
docker/docker-compose.yaml Adds 5 PG-queue services behind the pg-queue compose profile; host ports 8093–8097 are free, dependencies are correct per worker role, and the profile gate keeps default behaviour identical to today.
workers/run-worker-docker.sh Adds run_pg_consumer/run_pg_reaper functions and a case-dispatch block; the dispatch fires before Celery path logic, PYTHONPATH is in scope, and the pg-* safety-net catch-all prevents typos from silently falling through to a Celery worker.
docker/sample.env Documents the new PG-queue env vars; WORKER_PG_REAPER_INTERVAL_SECONDS=5 is the only active assignment in the section, consistent with the rest of the file's style.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    subgraph "docker compose up (default)"
        W1[worker-api-deployment-v2]
        W2[worker-general-v2]
        W3[worker-file-processing-v2]
        W4[worker-callback-v2]
        RMQ[(RabbitMQ)]
        W1 & W2 & W3 & W4 -->|consume| RMQ
    end

    subgraph "docker compose --profile pg-queue up"
        PGO1[worker-pg-orchestrator-api]
        PGO2[worker-pg-orchestrator-general]
        PGF[worker-pg-fileproc]
        PGC[worker-pg-callback]
        PGR[worker-pg-reaper]
        DB[(PostgreSQL)]
        RMQ2[(RabbitMQ)]
        PGO1 & PGO2 -->|pg_queue_consumer poll| DB
        PGF & PGC -->|pg_queue_consumer poll| DB
        PGF & PGC -->|tool RPC / notifications| RMQ2
        PGR -->|leader-elected sweep| DB
    end

    GATE{PG_QUEUE_TRANSPORT_ENABLED + Flipt flag}
    Backend -->|task dispatch| GATE
    GATE -->|OFF default| RMQ
    GATE -->|ON future rollout| DB
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
    subgraph "docker compose up (default)"
        W1[worker-api-deployment-v2]
        W2[worker-general-v2]
        W3[worker-file-processing-v2]
        W4[worker-callback-v2]
        RMQ[(RabbitMQ)]
        W1 & W2 & W3 & W4 -->|consume| RMQ
    end

    subgraph "docker compose --profile pg-queue up"
        PGO1[worker-pg-orchestrator-api]
        PGO2[worker-pg-orchestrator-general]
        PGF[worker-pg-fileproc]
        PGC[worker-pg-callback]
        PGR[worker-pg-reaper]
        DB[(PostgreSQL)]
        RMQ2[(RabbitMQ)]
        PGO1 & PGO2 -->|pg_queue_consumer poll| DB
        PGF & PGC -->|pg_queue_consumer poll| DB
        PGF & PGC -->|tool RPC / notifications| RMQ2
        PGR -->|leader-elected sweep| DB
    end

    GATE{PG_QUEUE_TRANSPORT_ENABLED + Flipt flag}
    Backend -->|task dispatch| GATE
    GATE -->|OFF default| RMQ
    GATE -->|ON future rollout| DB
Loading

Reviews (2): Last reviewed commit: "UN-3576 address greptile: scope PG near-..." | Re-trigger Greptile

Comment thread workers/run-worker-docker.sh Outdated
Comment thread workers/run-worker-docker.sh
Narrow the safety-net catch arm from `pg-*|*-reaper` to `pg-*` so it can never
intercept a pluggable worker whose name ends in `-reaper` (e.g. a future
bulk-reaper / log-history-reaper) — those now fall through to the Celery path.
The `pg-` prefix is reserved for PG-queue components; the exact reaper aliases
are already matched above, and pg-prefixed typos (pg-reapr, pg-queue-reapr) are
still rejected loudly. Verified by routing test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Greptile round addressed — d8ac1e601

Greptile passed (no Critical/High). Both P2s:

  • *-reaper glob could catch a pluggable workerfixed: narrowed the near-miss catch to pg-* only (reserved prefix), so no pluggable name is ever intercepted; pg-prefixed typos still rejected. Verified by routing test.
  • fallback notificationunknownkept notification with rationale: it mirrors the module's own default (pg_queue_consumer/__main__.py defaults WORKER_TYPE to notification when the env is unset), so the label accurately reflects what the consumer actually registers as; unknown would create a label/reality divergence. The unset-WARN already surfaces the misconfig.

@sonarqubecloud

Copy link
Copy Markdown

@muhammad-ali-e muhammad-ali-e merged commit b2ff1b5 into feat/UN-3445-pg-queue-integration Jun 18, 2026
6 checks passed
@muhammad-ali-e muhammad-ali-e deleted the feat/UN-3576-FEAT_docker_compose_pg_services branch June 18, 2026 12:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant