UN-3576 [FEAT] PG Queue 9g — docker-compose PG consumer + reaper services (gated profile)#2074
Conversation
…ices (gated profile) Make the already-built PG-queue pipeline runnable under docker compose, gated OFF by default, so it runs in containers (not just the host run-worker script). Designed one-service-per-component for a mechanical K8s mapping later. - run-worker-docker.sh (image ENTRYPOINT): recognise `pg-queue-consumer` and `pg-queue-reaper` launch types -> exec the dedicated Python module instead of building a Celery worker command. Dispatched before the Celery two-path; no Dockerfile change. - docker-compose.yaml: 5 services behind the off-by-default `pg-queue` profile (4 consumers: orchestrator-api / orchestrator-general / fileproc / callback, plus a single-instance reaper). Orchestrators + reaper are broker-free; fileproc + callback keep rabbitmq (they still hand off to the Celery executor / notifications until those are migrated). - sample.env: document the new env vars + profile; the backend transport gate stays off (ramping traffic is a later step). Non-regression: default `docker compose up` starts zero PG services. Even with them running, executions route to Celery until the gate + Flipt flag are set. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
PR Review Toolkit — automated review (#2074)
Ran six specialized agents (Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, Code Simplifier) over the 3 changed files. No Critical/High issues — the profile gating is correct and a plain docker compose up is a true no-op. The inline comments below are the actionable items, dominated by defaults/dispatch that mask operator misconfiguration in run-worker-docker.sh and a few comment-accuracy nits in docker-compose.yaml.
Non-inline notes (whole-file / repo-wide, can't anchor to a diff line):
run-worker-docker.shlacksset -u/set -o pipefail(onlyset -eat line 10). The new functions lean heavily on${VAR:-default}expansion;set -uwould harden against future unguarded reads. Verify the rest of the script tolerates-ufirst.depends_onuses short-form (nocondition: service_healthy) on all 5 services — consumers can start before db/redis/rabbitmq are ready and crash-loop underrestart: unless-stopped. This matches the entire existing compose file (zero healthchecks anywhere), so it's a repo-wide convention, not a PR blocker.- No shellcheck /
docker compose configCI covers these files (hadolint is scoped toDockerfile$). Ashellcheck workers/*.shpre-commit hook + adocker compose config -qsmoke step would give cheap regression protection as both launchers grow. - Test coverage: nothing actionable — the underlying
pg_queue_consumer/pg_queue_reapermodules are already unit-tested (workers/tests/test_pg_queue_consumer.py,test_pg_reaper.py); infra config has no test precedent in this repo.
…r config, reject typo'd PG commands, comment accuracy - run-worker-docker.sh: ensure_pg_interpreter() fails loudly if the venv python is missing (avoids a silent restart:unless-stopped crash-loop); WARN when WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE are unset (non-compose launches); add a `pg-*|*-reaper` catch arm so a typo'd PG command fails loudly instead of silently becoming a default Celery worker (log-consumer etc. still pass through — verified by routing test). - docker-compose.yaml: soften "broker-free" wording (fail-closed-to-Celery on a missing transport); spell out the executor/notification migration steps instead of the ③/④ markers; use the exact `pg_orchestrator_lock` name. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review round 1 addressed —
|
Re: the non-inline (whole-file / repo-wide) notesThanks — all four are fair; my take on each:
None of these change the diff; the inline items are all fixed in |
…ch case The PG-queue dispatch case relied on an implicit no-match fall-through to the Celery logic below. SonarCloud flags a case without a default; add an explicit `*)` no-op arm documenting the intentional fall-through. Behaviour unchanged (verified by routing test: legit worker types still reach Celery, typo'd PG commands still rejected). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
| Filename | Overview |
|---|---|
| docker/docker-compose.yaml | Adds 5 PG-queue services behind the pg-queue compose profile; host ports 8093–8097 are free, dependencies are correct per worker role, and the profile gate keeps default behaviour identical to today. |
| workers/run-worker-docker.sh | Adds run_pg_consumer/run_pg_reaper functions and a case-dispatch block; the dispatch fires before Celery path logic, PYTHONPATH is in scope, and the pg-* safety-net catch-all prevents typos from silently falling through to a Celery worker. |
| docker/sample.env | Documents the new PG-queue env vars; WORKER_PG_REAPER_INTERVAL_SECONDS=5 is the only active assignment in the section, consistent with the rest of the file's style. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
subgraph "docker compose up (default)"
W1[worker-api-deployment-v2]
W2[worker-general-v2]
W3[worker-file-processing-v2]
W4[worker-callback-v2]
RMQ[(RabbitMQ)]
W1 & W2 & W3 & W4 -->|consume| RMQ
end
subgraph "docker compose --profile pg-queue up"
PGO1[worker-pg-orchestrator-api]
PGO2[worker-pg-orchestrator-general]
PGF[worker-pg-fileproc]
PGC[worker-pg-callback]
PGR[worker-pg-reaper]
DB[(PostgreSQL)]
RMQ2[(RabbitMQ)]
PGO1 & PGO2 -->|pg_queue_consumer poll| DB
PGF & PGC -->|pg_queue_consumer poll| DB
PGF & PGC -->|tool RPC / notifications| RMQ2
PGR -->|leader-elected sweep| DB
end
GATE{PG_QUEUE_TRANSPORT_ENABLED + Flipt flag}
Backend -->|task dispatch| GATE
GATE -->|OFF default| RMQ
GATE -->|ON future rollout| DB
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
subgraph "docker compose up (default)"
W1[worker-api-deployment-v2]
W2[worker-general-v2]
W3[worker-file-processing-v2]
W4[worker-callback-v2]
RMQ[(RabbitMQ)]
W1 & W2 & W3 & W4 -->|consume| RMQ
end
subgraph "docker compose --profile pg-queue up"
PGO1[worker-pg-orchestrator-api]
PGO2[worker-pg-orchestrator-general]
PGF[worker-pg-fileproc]
PGC[worker-pg-callback]
PGR[worker-pg-reaper]
DB[(PostgreSQL)]
RMQ2[(RabbitMQ)]
PGO1 & PGO2 -->|pg_queue_consumer poll| DB
PGF & PGC -->|pg_queue_consumer poll| DB
PGF & PGC -->|tool RPC / notifications| RMQ2
PGR -->|leader-elected sweep| DB
end
GATE{PG_QUEUE_TRANSPORT_ENABLED + Flipt flag}
Backend -->|task dispatch| GATE
GATE -->|OFF default| RMQ
GATE -->|ON future rollout| DB
Reviews (2): Last reviewed commit: "UN-3576 address greptile: scope PG near-..." | Re-trigger Greptile
Narrow the safety-net catch arm from `pg-*|*-reaper` to `pg-*` so it can never intercept a pluggable worker whose name ends in `-reaper` (e.g. a future bulk-reaper / log-history-reaper) — those now fall through to the Celery path. The `pg-` prefix is reserved for PG-queue components; the exact reaper aliases are already matched above, and pg-prefixed typos (pg-reapr, pg-queue-reapr) are still rejected loudly. Verified by routing test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Greptile round addressed —
|
|
b2ff1b5
into
feat/UN-3445-pg-queue-integration



What & why
Makes the already-built PG-queue pipeline (orchestrator, fan-out, barrier, callback on Postgres) runnable under docker compose, gated OFF by default — so it runs in containers, not just the host
run-worker.sh. Each component is its own service (one-service-per-component), so the eventual Kubernetes mapping is a mechanical "one service → one Deployment".This is the docker analog of an ops rollout: the services exist and are runnable, but no traffic moves until the backend gate (
PG_QUEUE_TRANSPORT_ENABLED) + Flipt flag are flipped (a later step).Changes
workers/run-worker-docker.sh(the image ENTRYPOINT) — recognises two new launch types and execs the dedicated Python module instead of building a Celery worker command:pg-queue-consumer/pg-consumer→python -m pg_queue_consumerpg-queue-reaper/pg-reaper/reaper→python -m pg_queue_reaperdocker/docker-compose.yaml— 5 services behind the off-by-defaultpg-queueprofile:worker-pg-orchestrator-api(api_deployment;celery_api_deployments)worker-pg-orchestrator-general(general;celery)worker-pg-fileproc(file_processing;file_processing,api_file_processing)worker-pg-callback(callback; both callback queues)worker-pg-reaper(single-instance, leader-elected)fileproc+callbackkeep arabbitmqdependency because they still hand off to the Celery executor (tool-execution RPC) / notifications until those are migrated.docker/sample.env— documents the new env vars + the profile, and that the backend gate stays off.Non-regression
docker compose upstarts zero PG services (profile off) — behaviour identical to today.Dev-test (passed end-to-end)
Built
worker-unified:dev,docker compose --profile pg-queue up -d:async_execute_bin→process_file_batch→ PgBarrier decrementremaining=0→ callback → execution COMPLETED; barrier/dedup tables cleaned up.Targets
feat/UN-3445-pg-queue-integration(notmain). Sub-task UN-3576.🤖 Generated with Claude Code