From c53eac801a0d563dfd5a73e6f4e9f4476a671216 Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 18 Jun 2026 17:02:03 +0530 Subject: [PATCH 1/4] =?UTF-8?q?UN-3576=20[FEAT]=20PG=20Queue=209g=20?= =?UTF-8?q?=E2=80=94=20docker-compose=20PG=20consumer=20+=20reaper=20servi?= =?UTF-8?q?ces=20(gated=20profile)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make the already-built PG-queue pipeline runnable under docker compose, gated OFF by default, so it runs in containers (not just the host run-worker script). Designed one-service-per-component for a mechanical K8s mapping later. - run-worker-docker.sh (image ENTRYPOINT): recognise `pg-queue-consumer` and `pg-queue-reaper` launch types -> exec the dedicated Python module instead of building a Celery worker command. Dispatched before the Celery two-path; no Dockerfile change. - docker-compose.yaml: 5 services behind the off-by-default `pg-queue` profile (4 consumers: orchestrator-api / orchestrator-general / fileproc / callback, plus a single-instance reaper). Orchestrators + reaper are broker-free; fileproc + callback keep rabbitmq (they still hand off to the Celery executor / notifications until those are migrated). - sample.env: document the new env vars + profile; the backend transport gate stays off (ramping traffic is a later step). Non-regression: default `docker compose up` starts zero PG services. Even with them running, executions route to Celery until the gate + Flipt flag are set. Co-Authored-By: Claude Opus 4.8 --- docker/docker-compose.yaml | 161 +++++++++++++++++++++++++++++++++++ docker/sample.env | 23 +++++ workers/run-worker-docker.sh | 50 +++++++++++ 3 files changed, 234 insertions(+) diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index b7b49a7618..bfc10de905 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -541,6 +541,167 @@ services: - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config - prompt_studio_data:/app/prompt-studio-data + # =========================================================================== + # PG-queue services (Postgres-backed transport) — gated behind the `pg-queue` + # compose profile, so a plain `docker compose up` starts NONE of them and + # behaviour is identical to today. Bring them up with: + # docker compose --profile pg-queue up -d + # They consume from Postgres, not the broker. The orchestrator consumers and + # the reaper are fully broker-free (no rabbitmq dependency). The fan-out and + # callback consumers still hand off to the Celery executor (tool execution + # RPC) / Celery notifications, so they keep a rabbitmq dependency until those + # are migrated (steps ③/④). Each maps 1:1 to a future K8s Deployment. + # Executions still route to Celery until the backend gate + # (PG_QUEUE_TRANSPORT_ENABLED) + Flipt flag are flipped — that is the ramp, + # not this change. + # =========================================================================== + + # Orchestrator consumer for API-deployment executions (async_execute_bin_api). + worker-pg-orchestrator-api: + image: unstract/worker-unified:${VERSION} + container_name: unstract-worker-pg-orchestrator-api + restart: unless-stopped + command: ["pg-queue-consumer"] + ports: + - "8093:8090" + env_file: + - ../workers/.env + - ./essentials.env + depends_on: + - db + - redis + environment: + - ENVIRONMENT=development + - APPLICATION_NAME=unstract-worker-pg-orchestrator-api + - WORKER_BARRIER_BACKEND=pg + - WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=api_deployment + - WORKER_PG_QUEUE_CONSUMER_QUEUE=celery_api_deployments + - WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090 + labels: + - traefik.enable=false + volumes: + - ./workflow_data:/data + - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config + profiles: + - pg-queue + + # Orchestrator consumer for general / scheduled-ETL executions (async_execute_bin). + worker-pg-orchestrator-general: + image: unstract/worker-unified:${VERSION} + container_name: unstract-worker-pg-orchestrator-general + restart: unless-stopped + command: ["pg-queue-consumer"] + ports: + - "8094:8090" + env_file: + - ../workers/.env + - ./essentials.env + depends_on: + - db + - redis + environment: + - ENVIRONMENT=development + - APPLICATION_NAME=unstract-worker-pg-orchestrator-general + - WORKER_BARRIER_BACKEND=pg + - WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=general + - WORKER_PG_QUEUE_CONSUMER_QUEUE=celery + - WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090 + labels: + - traefik.enable=false + volumes: + - ./workflow_data:/data + - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config + profiles: + - pg-queue + + # Fan-out consumer (process_file_batch) — both the ETL and API file queues. + worker-pg-fileproc: + image: unstract/worker-unified:${VERSION} + container_name: unstract-worker-pg-fileproc + restart: unless-stopped + command: ["pg-queue-consumer"] + ports: + - "8095:8090" + env_file: + - ../workers/.env + - ./essentials.env + depends_on: + - db + - redis + - rabbitmq # still dispatches the tool-execution RPC to the Celery executor (until ③) + environment: + - ENVIRONMENT=development + - APPLICATION_NAME=unstract-worker-pg-fileproc + - WORKER_BARRIER_BACKEND=pg + - WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=file_processing + - WORKER_PG_QUEUE_CONSUMER_QUEUE=file_processing,api_file_processing + - WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090 + labels: + - traefik.enable=false + volumes: + - ./workflow_data:/data + - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config + profiles: + - pg-queue + + # Fan-in callback consumer (process_batch_callback / _api) — both callback queues. + worker-pg-callback: + image: unstract/worker-unified:${VERSION} + container_name: unstract-worker-pg-callback + restart: unless-stopped + command: ["pg-queue-consumer"] + ports: + - "8096:8090" + env_file: + - ../workers/.env + - ./essentials.env + depends_on: + - db + - redis + - rabbitmq # may dispatch Celery notifications when not PG-routed (until ④) + environment: + - ENVIRONMENT=development + - APPLICATION_NAME=unstract-worker-pg-callback + - WORKER_BARRIER_BACKEND=pg + - WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=callback + - WORKER_PG_QUEUE_CONSUMER_QUEUE=file_processing_callback,api_file_processing_callback + - WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090 + labels: + - traefik.enable=false + volumes: + - ./workflow_data:/data + - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config + profiles: + - pg-queue + + # Reaper — leader-elected recovery loop. Run exactly ONE instance (it elects a + # single leader via orchestrator_lock; extra replicas idle as standby). + worker-pg-reaper: + image: unstract/worker-unified:${VERSION} + container_name: unstract-worker-pg-reaper + restart: unless-stopped + command: ["pg-queue-reaper"] + ports: + - "8097:8086" + env_file: + - ../workers/.env + - ./essentials.env + depends_on: + - db + - redis + environment: + - ENVIRONMENT=development + - APPLICATION_NAME=unstract-worker-pg-reaper + - WORKER_PG_REAPER_INTERVAL_SECONDS=${WORKER_PG_REAPER_INTERVAL_SECONDS:-5} + - WORKER_PG_REAPER_HEALTH_PORT=8086 + labels: + - traefik.enable=false + volumes: + - ./workflow_data:/data + - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config + profiles: + - pg-queue + volumes: prompt_studio_data: unstract_data: diff --git a/docker/sample.env b/docker/sample.env index 2feb36cf2b..7f114461bd 100644 --- a/docker/sample.env +++ b/docker/sample.env @@ -107,3 +107,26 @@ CIRCUIT_BREAKER_RECOVERY_TIMEOUT=60 HEALTH_CHECK_INTERVAL=30 HEALTH_CHECK_TIMEOUT=10 ENABLE_METRICS=true + +# ============================================================================= +# PG-Queue Services (Postgres-backed transport) — gated, opt-in +# ============================================================================= +# The PG consumer/reaper services live behind the `pg-queue` compose profile and +# are OFF by default. Bring them up with: +# docker compose --profile pg-queue up -d +# They consume from Postgres (not the broker); their worker-type/queue identity +# is set per service in docker-compose.yaml, so nothing here is required to run +# them. The values below are documented for reference / overrides only. +# +# WORKER_BARRIER_BACKEND - fan-in barrier substrate; the PG services set +# this to `pg` (default elsewhere: chord/redis). +# WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE - which worker's tasks the consumer loads. +# WORKER_PG_QUEUE_CONSUMER_QUEUE - comma-separated queues the consumer polls. +# WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT - opt-in consumer liveness port (unset = off). +# WORKER_PG_REAPER_HEALTH_PORT - opt-in reaper liveness port (unset = off). +WORKER_PG_REAPER_INTERVAL_SECONDS=5 # Reaper sweep interval (seconds) +# +# NOTE: routing executions to PG is a SEPARATE, later step. Running these +# services does NOT move any traffic — the backend gate PG_QUEUE_TRANSPORT_ENABLED +# (in backend/.env, default off) plus the Flipt flag still decide per-execution +# transport, and both stay off until the rollout ramp. diff --git a/workers/run-worker-docker.sh b/workers/run-worker-docker.sh index 1b8926a063..e0890f93a5 100755 --- a/workers/run-worker-docker.sh +++ b/workers/run-worker-docker.sh @@ -25,6 +25,11 @@ ENV_FILE="/app/.env" # Worker type constant for the executor worker readonly EXECUTOR_WORKER_TYPE="executor" +# Python interpreter for PG-queue components (consumer / reaper). Unlike Celery +# workers, these launch a dedicated module rather than a `celery ... worker` +# command (see run_pg_consumer / run_pg_reaper below). +readonly PG_QUEUE_PYTHON_BIN="/app/.venv/bin/python" + # Available core workers (OSS) declare -A WORKERS=( ["api"]="api_deployment" @@ -486,6 +491,39 @@ run_worker() { exec $celery_cmd $celery_args } +# ============================================================================= +# PG-queue components (Postgres-backed transport) +# ============================================================================= +# These do NOT run a Celery worker — they exec a dedicated Python module that +# polls the Postgres queue. The consumer picks which worker's tasks to register +# and which queues to poll from the environment (set per compose service / +# K8s Deployment); the reaper needs neither. Mirrors the host launcher +# run-worker.sh (`pg-queue-consumer` / `reaper`). + +run_pg_consumer() { + local source_type="${WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE:-notification}" + local queues="${WORKER_PG_QUEUE_CONSUMER_QUEUE:-}" + export WORKER_NAME="${WORKER_NAME:-pg-consumer-${source_type}}" + + print_status $GREEN "Starting PG-queue consumer..." + print_status $BLUE "Source worker type: $source_type" + print_status $BLUE "Queues: ${queues:-}" + print_status $BLUE "Health port: ${WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT:-}" + + # WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE are read by the module itself. + exec "$PG_QUEUE_PYTHON_BIN" -m pg_queue_consumer +} + +run_pg_reaper() { + export WORKER_NAME="${WORKER_NAME:-pg-reaper}" + + print_status $GREEN "Starting PG-queue reaper (leader-elected)..." + print_status $BLUE "Interval: ${WORKER_PG_REAPER_INTERVAL_SECONDS:-5}s" + print_status $BLUE "Health port: ${WORKER_PG_REAPER_HEALTH_PORT:-}" + + exec "$PG_QUEUE_PYTHON_BIN" -m pg_queue_reaper +} + # Main execution # Load environment first for any needed variables load_env "$ENV_FILE" @@ -496,6 +534,18 @@ discover_pluggable_workers # Add PYTHONPATH for imports - include both /app and /unstract for packages export PYTHONPATH="/app:/unstract/core/src:/unstract/connectors/src:/unstract/filesystem/src:/unstract/flags/src:/unstract/tool-registry/src:/unstract/tool-sandbox/src:/unstract/workflow-execution/src:${PYTHONPATH:-}" +# PG-queue components run a dedicated module, not a Celery worker — dispatch +# them before the Celery command-building logic below. PYTHONPATH (exported +# above) is required for the module imports. +case "${1:-}" in + pg-queue-consumer|pg-consumer) + run_pg_consumer + ;; + pg-queue-reaper|pg-reaper|reaper) + run_pg_reaper + ;; +esac + # Two-path logic: Full Celery command vs Traditional worker type if [[ "$1" == *"celery"* ]] || [[ "$1" == *".venv"* ]]; then # ============================================================================= From c7bfe1b073d3f636ec8cc05e85249caecef1b87c Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 18 Jun 2026 17:19:44 +0530 Subject: [PATCH 2/4] UN-3576 address review: guard PG interpreter, warn on missing consumer config, reject typo'd PG commands, comment accuracy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - run-worker-docker.sh: ensure_pg_interpreter() fails loudly if the venv python is missing (avoids a silent restart:unless-stopped crash-loop); WARN when WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE are unset (non-compose launches); add a `pg-*|*-reaper` catch arm so a typo'd PG command fails loudly instead of silently becoming a default Celery worker (log-consumer etc. still pass through — verified by routing test). - docker-compose.yaml: soften "broker-free" wording (fail-closed-to-Celery on a missing transport); spell out the executor/notification migration steps instead of the ③/④ markers; use the exact `pg_orchestrator_lock` name. Co-Authored-By: Claude Opus 4.8 --- docker/docker-compose.yaml | 16 +++++++++------- workers/run-worker-docker.sh | 31 +++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index bfc10de905..a85d3e6ca6 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -547,10 +547,12 @@ services: # behaviour is identical to today. Bring them up with: # docker compose --profile pg-queue up -d # They consume from Postgres, not the broker. The orchestrator consumers and - # the reaper are fully broker-free (no rabbitmq dependency). The fan-out and - # callback consumers still hand off to the Celery executor (tool execution - # RPC) / Celery notifications, so they keep a rabbitmq dependency until those - # are migrated (steps ③/④). Each maps 1:1 to a future K8s Deployment. + # the reaper are broker-free for PG-routed executions (fan-out honors the + # carried transport; a missing transport fails closed to Celery, which these + # consumers aren't expected to receive). The fan-out and callback consumers + # still hand off to the Celery executor (tool-execution RPC) / Celery + # notifications, so they keep a rabbitmq dependency until the executor and + # notifications themselves move to PG. Each maps 1:1 to a future K8s Deployment. # Executions still route to Celery until the backend gate # (PG_QUEUE_TRANSPORT_ENABLED) + Flipt flag are flipped — that is the ramp, # not this change. @@ -628,7 +630,7 @@ services: depends_on: - db - redis - - rabbitmq # still dispatches the tool-execution RPC to the Celery executor (until ③) + - rabbitmq # still dispatches the tool-execution RPC to the Celery executor (until the executor/tool-RPC moves to PG) environment: - ENVIRONMENT=development - APPLICATION_NAME=unstract-worker-pg-fileproc @@ -658,7 +660,7 @@ services: depends_on: - db - redis - - rabbitmq # may dispatch Celery notifications when not PG-routed (until ④) + - rabbitmq # may dispatch Celery notifications when not PG-routed (until notifications move to PG) environment: - ENVIRONMENT=development - APPLICATION_NAME=unstract-worker-pg-callback @@ -675,7 +677,7 @@ services: - pg-queue # Reaper — leader-elected recovery loop. Run exactly ONE instance (it elects a - # single leader via orchestrator_lock; extra replicas idle as standby). + # single leader via pg_orchestrator_lock; extra replicas idle as standby). worker-pg-reaper: image: unstract/worker-unified:${VERSION} container_name: unstract-worker-pg-reaper diff --git a/workers/run-worker-docker.sh b/workers/run-worker-docker.sh index e0890f93a5..d08216516b 100755 --- a/workers/run-worker-docker.sh +++ b/workers/run-worker-docker.sh @@ -500,11 +500,33 @@ run_worker() { # K8s Deployment); the reaper needs neither. Mirrors the host launcher # run-worker.sh (`pg-queue-consumer` / `reaper`). +# Fail loudly if the interpreter is missing (e.g. venv path moved in an image +# refactor) rather than letting `exec` die with a terse "not found" that +# restart:unless-stopped turns into a silent crash-loop. +ensure_pg_interpreter() { + if [[ ! -x "$PG_QUEUE_PYTHON_BIN" ]]; then + print_status $RED "PG-queue interpreter not found/executable: $PG_QUEUE_PYTHON_BIN — check the image build / venv path." + exit 1 + fi +} + run_pg_consumer() { + ensure_pg_interpreter local source_type="${WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE:-notification}" local queues="${WORKER_PG_QUEUE_CONSUMER_QUEUE:-}" export WORKER_NAME="${WORKER_NAME:-pg-consumer-${source_type}}" + # Warn (don't fail) on missing config: all compose services set these, but a + # hand-rolled docker run / K8s Deployment that forgets them would otherwise + # start a mislabelled or idle-looking consumer silently. The module logs its + # resolved worker type + queue set at startup, so these stay diagnosable. + if [[ -z "${WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE:-}" ]]; then + print_status $YELLOW "WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE unset — defaulting source worker type to '$source_type'; WORKER_NAME label may not match the module's actual role." + fi + if [[ -z "$queues" ]]; then + print_status $YELLOW "WORKER_PG_QUEUE_CONSUMER_QUEUE unset — consumer will use the module default; check the startup log for the resolved queue set." + fi + print_status $GREEN "Starting PG-queue consumer..." print_status $BLUE "Source worker type: $source_type" print_status $BLUE "Queues: ${queues:-}" @@ -515,6 +537,7 @@ run_pg_consumer() { } run_pg_reaper() { + ensure_pg_interpreter export WORKER_NAME="${WORKER_NAME:-pg-reaper}" print_status $GREEN "Starting PG-queue reaper (leader-elected)..." @@ -544,6 +567,14 @@ case "${1:-}" in pg-queue-reaper|pg-reaper|reaper) run_pg_reaper ;; + pg-*|*-reaper) + # Obviously-PG-intended but unrecognized (e.g. a typo'd command) — fail + # loudly instead of silently coercing it into a default Celery worker. + # (No legitimate worker type starts with `pg-` or ends with `-reaper`.) + print_status $RED "Unrecognized PG-queue command: '$1' (did you mean pg-queue-consumer / pg-queue-reaper?)." + exit 1 + ;; + # Any other token intentionally falls through to the Celery command logic below. esac # Two-path logic: Full Celery command vs Traditional worker type From 719e19283f116285e15b227e06543410bb8c2ee3 Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 18 Jun 2026 17:27:15 +0530 Subject: [PATCH 3/4] UN-3576 address SonarCloud: add explicit default (*) arm to PG-dispatch case The PG-queue dispatch case relied on an implicit no-match fall-through to the Celery logic below. SonarCloud flags a case without a default; add an explicit `*)` no-op arm documenting the intentional fall-through. Behaviour unchanged (verified by routing test: legit worker types still reach Celery, typo'd PG commands still rejected). Co-Authored-By: Claude Opus 4.8 --- workers/run-worker-docker.sh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/workers/run-worker-docker.sh b/workers/run-worker-docker.sh index d08216516b..cd2bf44a3c 100755 --- a/workers/run-worker-docker.sh +++ b/workers/run-worker-docker.sh @@ -574,7 +574,11 @@ case "${1:-}" in print_status $RED "Unrecognized PG-queue command: '$1' (did you mean pg-queue-consumer / pg-queue-reaper?)." exit 1 ;; - # Any other token intentionally falls through to the Celery command logic below. + *) + # Not a PG-queue command — intentionally fall through to the Celery + # command logic below (handles every existing worker type + full + # Celery commands). + ;; esac # Two-path logic: Full Celery command vs Traditional worker type From d8ac1e601136457a41b7f692b3c22b0bb226ad78 Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 18 Jun 2026 17:34:32 +0530 Subject: [PATCH 4/4] UN-3576 address greptile: scope PG near-miss catch to pg- prefix only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Narrow the safety-net catch arm from `pg-*|*-reaper` to `pg-*` so it can never intercept a pluggable worker whose name ends in `-reaper` (e.g. a future bulk-reaper / log-history-reaper) — those now fall through to the Celery path. The `pg-` prefix is reserved for PG-queue components; the exact reaper aliases are already matched above, and pg-prefixed typos (pg-reapr, pg-queue-reapr) are still rejected loudly. Verified by routing test. Co-Authored-By: Claude Opus 4.8 --- workers/run-worker-docker.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/workers/run-worker-docker.sh b/workers/run-worker-docker.sh index cd2bf44a3c..be5aa86789 100755 --- a/workers/run-worker-docker.sh +++ b/workers/run-worker-docker.sh @@ -567,10 +567,12 @@ case "${1:-}" in pg-queue-reaper|pg-reaper|reaper) run_pg_reaper ;; - pg-*|*-reaper) + pg-*) # Obviously-PG-intended but unrecognized (e.g. a typo'd command) — fail # loudly instead of silently coercing it into a default Celery worker. - # (No legitimate worker type starts with `pg-` or ends with `-reaper`.) + # Scoped to the `pg-` prefix (reserved for PG-queue components) so it + # never intercepts a pluggable worker name; the exact reaper aliases are + # already matched above. print_status $RED "Unrecognized PG-queue command: '$1' (did you mean pg-queue-consumer / pg-queue-reaper?)." exit 1 ;;