diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index b7b49a7618..a85d3e6ca6 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -541,6 +541,169 @@ services: - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config - prompt_studio_data:/app/prompt-studio-data + # =========================================================================== + # PG-queue services (Postgres-backed transport) — gated behind the `pg-queue` + # compose profile, so a plain `docker compose up` starts NONE of them and + # behaviour is identical to today. Bring them up with: + # docker compose --profile pg-queue up -d + # They consume from Postgres, not the broker. The orchestrator consumers and + # the reaper are broker-free for PG-routed executions (fan-out honors the + # carried transport; a missing transport fails closed to Celery, which these + # consumers aren't expected to receive). The fan-out and callback consumers + # still hand off to the Celery executor (tool-execution RPC) / Celery + # notifications, so they keep a rabbitmq dependency until the executor and + # notifications themselves move to PG. Each maps 1:1 to a future K8s Deployment. + # Executions still route to Celery until the backend gate + # (PG_QUEUE_TRANSPORT_ENABLED) + Flipt flag are flipped — that is the ramp, + # not this change. + # =========================================================================== + + # Orchestrator consumer for API-deployment executions (async_execute_bin_api). + worker-pg-orchestrator-api: + image: unstract/worker-unified:${VERSION} + container_name: unstract-worker-pg-orchestrator-api + restart: unless-stopped + command: ["pg-queue-consumer"] + ports: + - "8093:8090" + env_file: + - ../workers/.env + - ./essentials.env + depends_on: + - db + - redis + environment: + - ENVIRONMENT=development + - APPLICATION_NAME=unstract-worker-pg-orchestrator-api + - WORKER_BARRIER_BACKEND=pg + - WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=api_deployment + - WORKER_PG_QUEUE_CONSUMER_QUEUE=celery_api_deployments + - WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090 + labels: + - traefik.enable=false + volumes: + - ./workflow_data:/data + - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config + profiles: + - pg-queue + + # Orchestrator consumer for general / scheduled-ETL executions (async_execute_bin). + worker-pg-orchestrator-general: + image: unstract/worker-unified:${VERSION} + container_name: unstract-worker-pg-orchestrator-general + restart: unless-stopped + command: ["pg-queue-consumer"] + ports: + - "8094:8090" + env_file: + - ../workers/.env + - ./essentials.env + depends_on: + - db + - redis + environment: + - ENVIRONMENT=development + - APPLICATION_NAME=unstract-worker-pg-orchestrator-general + - WORKER_BARRIER_BACKEND=pg + - WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=general + - WORKER_PG_QUEUE_CONSUMER_QUEUE=celery + - WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090 + labels: + - traefik.enable=false + volumes: + - ./workflow_data:/data + - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config + profiles: + - pg-queue + + # Fan-out consumer (process_file_batch) — both the ETL and API file queues. + worker-pg-fileproc: + image: unstract/worker-unified:${VERSION} + container_name: unstract-worker-pg-fileproc + restart: unless-stopped + command: ["pg-queue-consumer"] + ports: + - "8095:8090" + env_file: + - ../workers/.env + - ./essentials.env + depends_on: + - db + - redis + - rabbitmq # still dispatches the tool-execution RPC to the Celery executor (until the executor/tool-RPC moves to PG) + environment: + - ENVIRONMENT=development + - APPLICATION_NAME=unstract-worker-pg-fileproc + - WORKER_BARRIER_BACKEND=pg + - WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=file_processing + - WORKER_PG_QUEUE_CONSUMER_QUEUE=file_processing,api_file_processing + - WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090 + labels: + - traefik.enable=false + volumes: + - ./workflow_data:/data + - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config + profiles: + - pg-queue + + # Fan-in callback consumer (process_batch_callback / _api) — both callback queues. + worker-pg-callback: + image: unstract/worker-unified:${VERSION} + container_name: unstract-worker-pg-callback + restart: unless-stopped + command: ["pg-queue-consumer"] + ports: + - "8096:8090" + env_file: + - ../workers/.env + - ./essentials.env + depends_on: + - db + - redis + - rabbitmq # may dispatch Celery notifications when not PG-routed (until notifications move to PG) + environment: + - ENVIRONMENT=development + - APPLICATION_NAME=unstract-worker-pg-callback + - WORKER_BARRIER_BACKEND=pg + - WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=callback + - WORKER_PG_QUEUE_CONSUMER_QUEUE=file_processing_callback,api_file_processing_callback + - WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090 + labels: + - traefik.enable=false + volumes: + - ./workflow_data:/data + - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config + profiles: + - pg-queue + + # Reaper — leader-elected recovery loop. Run exactly ONE instance (it elects a + # single leader via pg_orchestrator_lock; extra replicas idle as standby). + worker-pg-reaper: + image: unstract/worker-unified:${VERSION} + container_name: unstract-worker-pg-reaper + restart: unless-stopped + command: ["pg-queue-reaper"] + ports: + - "8097:8086" + env_file: + - ../workers/.env + - ./essentials.env + depends_on: + - db + - redis + environment: + - ENVIRONMENT=development + - APPLICATION_NAME=unstract-worker-pg-reaper + - WORKER_PG_REAPER_INTERVAL_SECONDS=${WORKER_PG_REAPER_INTERVAL_SECONDS:-5} + - WORKER_PG_REAPER_HEALTH_PORT=8086 + labels: + - traefik.enable=false + volumes: + - ./workflow_data:/data + - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config + profiles: + - pg-queue + volumes: prompt_studio_data: unstract_data: diff --git a/docker/sample.env b/docker/sample.env index 2feb36cf2b..7f114461bd 100644 --- a/docker/sample.env +++ b/docker/sample.env @@ -107,3 +107,26 @@ CIRCUIT_BREAKER_RECOVERY_TIMEOUT=60 HEALTH_CHECK_INTERVAL=30 HEALTH_CHECK_TIMEOUT=10 ENABLE_METRICS=true + +# ============================================================================= +# PG-Queue Services (Postgres-backed transport) — gated, opt-in +# ============================================================================= +# The PG consumer/reaper services live behind the `pg-queue` compose profile and +# are OFF by default. Bring them up with: +# docker compose --profile pg-queue up -d +# They consume from Postgres (not the broker); their worker-type/queue identity +# is set per service in docker-compose.yaml, so nothing here is required to run +# them. The values below are documented for reference / overrides only. +# +# WORKER_BARRIER_BACKEND - fan-in barrier substrate; the PG services set +# this to `pg` (default elsewhere: chord/redis). +# WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE - which worker's tasks the consumer loads. +# WORKER_PG_QUEUE_CONSUMER_QUEUE - comma-separated queues the consumer polls. +# WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT - opt-in consumer liveness port (unset = off). +# WORKER_PG_REAPER_HEALTH_PORT - opt-in reaper liveness port (unset = off). +WORKER_PG_REAPER_INTERVAL_SECONDS=5 # Reaper sweep interval (seconds) +# +# NOTE: routing executions to PG is a SEPARATE, later step. Running these +# services does NOT move any traffic — the backend gate PG_QUEUE_TRANSPORT_ENABLED +# (in backend/.env, default off) plus the Flipt flag still decide per-execution +# transport, and both stay off until the rollout ramp. diff --git a/workers/run-worker-docker.sh b/workers/run-worker-docker.sh index 1b8926a063..be5aa86789 100755 --- a/workers/run-worker-docker.sh +++ b/workers/run-worker-docker.sh @@ -25,6 +25,11 @@ ENV_FILE="/app/.env" # Worker type constant for the executor worker readonly EXECUTOR_WORKER_TYPE="executor" +# Python interpreter for PG-queue components (consumer / reaper). Unlike Celery +# workers, these launch a dedicated module rather than a `celery ... worker` +# command (see run_pg_consumer / run_pg_reaper below). +readonly PG_QUEUE_PYTHON_BIN="/app/.venv/bin/python" + # Available core workers (OSS) declare -A WORKERS=( ["api"]="api_deployment" @@ -486,6 +491,62 @@ run_worker() { exec $celery_cmd $celery_args } +# ============================================================================= +# PG-queue components (Postgres-backed transport) +# ============================================================================= +# These do NOT run a Celery worker — they exec a dedicated Python module that +# polls the Postgres queue. The consumer picks which worker's tasks to register +# and which queues to poll from the environment (set per compose service / +# K8s Deployment); the reaper needs neither. Mirrors the host launcher +# run-worker.sh (`pg-queue-consumer` / `reaper`). + +# Fail loudly if the interpreter is missing (e.g. venv path moved in an image +# refactor) rather than letting `exec` die with a terse "not found" that +# restart:unless-stopped turns into a silent crash-loop. +ensure_pg_interpreter() { + if [[ ! -x "$PG_QUEUE_PYTHON_BIN" ]]; then + print_status $RED "PG-queue interpreter not found/executable: $PG_QUEUE_PYTHON_BIN — check the image build / venv path." + exit 1 + fi +} + +run_pg_consumer() { + ensure_pg_interpreter + local source_type="${WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE:-notification}" + local queues="${WORKER_PG_QUEUE_CONSUMER_QUEUE:-}" + export WORKER_NAME="${WORKER_NAME:-pg-consumer-${source_type}}" + + # Warn (don't fail) on missing config: all compose services set these, but a + # hand-rolled docker run / K8s Deployment that forgets them would otherwise + # start a mislabelled or idle-looking consumer silently. The module logs its + # resolved worker type + queue set at startup, so these stay diagnosable. + if [[ -z "${WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE:-}" ]]; then + print_status $YELLOW "WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE unset — defaulting source worker type to '$source_type'; WORKER_NAME label may not match the module's actual role." + fi + if [[ -z "$queues" ]]; then + print_status $YELLOW "WORKER_PG_QUEUE_CONSUMER_QUEUE unset — consumer will use the module default; check the startup log for the resolved queue set." + fi + + print_status $GREEN "Starting PG-queue consumer..." + print_status $BLUE "Source worker type: $source_type" + print_status $BLUE "Queues: ${queues:-}" + print_status $BLUE "Health port: ${WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT:-}" + + # WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE are read by the module itself. + exec "$PG_QUEUE_PYTHON_BIN" -m pg_queue_consumer +} + +run_pg_reaper() { + ensure_pg_interpreter + export WORKER_NAME="${WORKER_NAME:-pg-reaper}" + + print_status $GREEN "Starting PG-queue reaper (leader-elected)..." + print_status $BLUE "Interval: ${WORKER_PG_REAPER_INTERVAL_SECONDS:-5}s" + print_status $BLUE "Health port: ${WORKER_PG_REAPER_HEALTH_PORT:-}" + + exec "$PG_QUEUE_PYTHON_BIN" -m pg_queue_reaper +} + # Main execution # Load environment first for any needed variables load_env "$ENV_FILE" @@ -496,6 +557,32 @@ discover_pluggable_workers # Add PYTHONPATH for imports - include both /app and /unstract for packages export PYTHONPATH="/app:/unstract/core/src:/unstract/connectors/src:/unstract/filesystem/src:/unstract/flags/src:/unstract/tool-registry/src:/unstract/tool-sandbox/src:/unstract/workflow-execution/src:${PYTHONPATH:-}" +# PG-queue components run a dedicated module, not a Celery worker — dispatch +# them before the Celery command-building logic below. PYTHONPATH (exported +# above) is required for the module imports. +case "${1:-}" in + pg-queue-consumer|pg-consumer) + run_pg_consumer + ;; + pg-queue-reaper|pg-reaper|reaper) + run_pg_reaper + ;; + pg-*) + # Obviously-PG-intended but unrecognized (e.g. a typo'd command) — fail + # loudly instead of silently coercing it into a default Celery worker. + # Scoped to the `pg-` prefix (reserved for PG-queue components) so it + # never intercepts a pluggable worker name; the exact reaper aliases are + # already matched above. + print_status $RED "Unrecognized PG-queue command: '$1' (did you mean pg-queue-consumer / pg-queue-reaper?)." + exit 1 + ;; + *) + # Not a PG-queue command — intentionally fall through to the Celery + # command logic below (handles every existing worker type + full + # Celery commands). + ;; +esac + # Two-path logic: Full Celery command vs Traditional worker type if [[ "$1" == *"celery"* ]] || [[ "$1" == *".venv"* ]]; then # =============================================================================