Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,169 @@ services:
- ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config
- prompt_studio_data:/app/prompt-studio-data

# ===========================================================================
# PG-queue services (Postgres-backed transport) — gated behind the `pg-queue`
# compose profile, so a plain `docker compose up` starts NONE of them and
# behaviour is identical to today. Bring them up with:
# docker compose --profile pg-queue up -d
# They consume from Postgres, not the broker. The orchestrator consumers and
# the reaper are broker-free for PG-routed executions (fan-out honors the
# carried transport; a missing transport fails closed to Celery, which these
# consumers aren't expected to receive). The fan-out and callback consumers
# still hand off to the Celery executor (tool-execution RPC) / Celery
# notifications, so they keep a rabbitmq dependency until the executor and
# notifications themselves move to PG. Each maps 1:1 to a future K8s Deployment.
# Executions still route to Celery until the backend gate
# (PG_QUEUE_TRANSPORT_ENABLED) + Flipt flag are flipped — that is the ramp,
# not this change.
# ===========================================================================

# Orchestrator consumer for API-deployment executions (async_execute_bin_api).
worker-pg-orchestrator-api:
image: unstract/worker-unified:${VERSION}
container_name: unstract-worker-pg-orchestrator-api
restart: unless-stopped
command: ["pg-queue-consumer"]
ports:
- "8093:8090"
env_file:
- ../workers/.env
- ./essentials.env
depends_on:
- db
- redis
environment:
- ENVIRONMENT=development
- APPLICATION_NAME=unstract-worker-pg-orchestrator-api
- WORKER_BARRIER_BACKEND=pg
- WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=api_deployment
Comment thread
muhammad-ali-e marked this conversation as resolved.
- WORKER_PG_QUEUE_CONSUMER_QUEUE=celery_api_deployments
- WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090
labels:
- traefik.enable=false
volumes:
- ./workflow_data:/data
- ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config
profiles:
- pg-queue

# Orchestrator consumer for general / scheduled-ETL executions (async_execute_bin).
worker-pg-orchestrator-general:
image: unstract/worker-unified:${VERSION}
container_name: unstract-worker-pg-orchestrator-general
restart: unless-stopped
command: ["pg-queue-consumer"]
ports:
- "8094:8090"
env_file:
- ../workers/.env
- ./essentials.env
depends_on:
- db
- redis
environment:
- ENVIRONMENT=development
- APPLICATION_NAME=unstract-worker-pg-orchestrator-general
- WORKER_BARRIER_BACKEND=pg
- WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=general
- WORKER_PG_QUEUE_CONSUMER_QUEUE=celery
- WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090
labels:
- traefik.enable=false
volumes:
- ./workflow_data:/data
- ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config
profiles:
- pg-queue

# Fan-out consumer (process_file_batch) — both the ETL and API file queues.
worker-pg-fileproc:
image: unstract/worker-unified:${VERSION}
container_name: unstract-worker-pg-fileproc
restart: unless-stopped
command: ["pg-queue-consumer"]
ports:
- "8095:8090"
env_file:
- ../workers/.env
- ./essentials.env
depends_on:
- db
- redis
- rabbitmq # still dispatches the tool-execution RPC to the Celery executor (until the executor/tool-RPC moves to PG)
environment:
- ENVIRONMENT=development
- APPLICATION_NAME=unstract-worker-pg-fileproc
- WORKER_BARRIER_BACKEND=pg
- WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=file_processing
- WORKER_PG_QUEUE_CONSUMER_QUEUE=file_processing,api_file_processing
- WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090
labels:
- traefik.enable=false
volumes:
- ./workflow_data:/data
- ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config
profiles:
- pg-queue

# Fan-in callback consumer (process_batch_callback / _api) — both callback queues.
worker-pg-callback:
image: unstract/worker-unified:${VERSION}
container_name: unstract-worker-pg-callback
restart: unless-stopped
command: ["pg-queue-consumer"]
ports:
- "8096:8090"
env_file:
- ../workers/.env
- ./essentials.env
depends_on:
- db
- redis
- rabbitmq # may dispatch Celery notifications when not PG-routed (until notifications move to PG)
environment:
- ENVIRONMENT=development
- APPLICATION_NAME=unstract-worker-pg-callback
- WORKER_BARRIER_BACKEND=pg
- WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE=callback
- WORKER_PG_QUEUE_CONSUMER_QUEUE=file_processing_callback,api_file_processing_callback
- WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT=8090
labels:
- traefik.enable=false
volumes:
- ./workflow_data:/data
- ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config
profiles:
- pg-queue

# Reaper — leader-elected recovery loop. Run exactly ONE instance (it elects a
# single leader via pg_orchestrator_lock; extra replicas idle as standby).
worker-pg-reaper:
image: unstract/worker-unified:${VERSION}
container_name: unstract-worker-pg-reaper
restart: unless-stopped
command: ["pg-queue-reaper"]
ports:
- "8097:8086"
env_file:
- ../workers/.env
- ./essentials.env
depends_on:
- db
- redis
environment:
- ENVIRONMENT=development
- APPLICATION_NAME=unstract-worker-pg-reaper
- WORKER_PG_REAPER_INTERVAL_SECONDS=${WORKER_PG_REAPER_INTERVAL_SECONDS:-5}
- WORKER_PG_REAPER_HEALTH_PORT=8086
labels:
- traefik.enable=false
volumes:
- ./workflow_data:/data
- ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config
profiles:
- pg-queue

volumes:
prompt_studio_data:
unstract_data:
Expand Down
23 changes: 23 additions & 0 deletions docker/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,26 @@ CIRCUIT_BREAKER_RECOVERY_TIMEOUT=60
HEALTH_CHECK_INTERVAL=30
HEALTH_CHECK_TIMEOUT=10
ENABLE_METRICS=true

# =============================================================================
# PG-Queue Services (Postgres-backed transport) — gated, opt-in
# =============================================================================
# The PG consumer/reaper services live behind the `pg-queue` compose profile and
# are OFF by default. Bring them up with:
# docker compose --profile pg-queue up -d
# They consume from Postgres (not the broker); their worker-type/queue identity
# is set per service in docker-compose.yaml, so nothing here is required to run
# them. The values below are documented for reference / overrides only.
#
# WORKER_BARRIER_BACKEND - fan-in barrier substrate; the PG services set
# this to `pg` (default elsewhere: chord/redis).
# WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE - which worker's tasks the consumer loads.
# WORKER_PG_QUEUE_CONSUMER_QUEUE - comma-separated queues the consumer polls.
# WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT - opt-in consumer liveness port (unset = off).
# WORKER_PG_REAPER_HEALTH_PORT - opt-in reaper liveness port (unset = off).
WORKER_PG_REAPER_INTERVAL_SECONDS=5 # Reaper sweep interval (seconds)
#
# NOTE: routing executions to PG is a SEPARATE, later step. Running these
# services does NOT move any traffic — the backend gate PG_QUEUE_TRANSPORT_ENABLED
# (in backend/.env, default off) plus the Flipt flag still decide per-execution
# transport, and both stay off until the rollout ramp.
87 changes: 87 additions & 0 deletions workers/run-worker-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ ENV_FILE="/app/.env"
# Worker type constant for the executor worker
readonly EXECUTOR_WORKER_TYPE="executor"

# Python interpreter for PG-queue components (consumer / reaper). Unlike Celery
# workers, these launch a dedicated module rather than a `celery ... worker`
# command (see run_pg_consumer / run_pg_reaper below).
readonly PG_QUEUE_PYTHON_BIN="/app/.venv/bin/python"
Comment thread
muhammad-ali-e marked this conversation as resolved.

# Available core workers (OSS)
declare -A WORKERS=(
["api"]="api_deployment"
Expand Down Expand Up @@ -486,6 +491,62 @@ run_worker() {
exec $celery_cmd $celery_args
}

# =============================================================================
# PG-queue components (Postgres-backed transport)
# =============================================================================
# These do NOT run a Celery worker — they exec a dedicated Python module that
# polls the Postgres queue. The consumer picks which worker's tasks to register
# and which queues to poll from the environment (set per compose service /
# K8s Deployment); the reaper needs neither. Mirrors the host launcher
# run-worker.sh (`pg-queue-consumer` / `reaper`).

# Fail loudly if the interpreter is missing (e.g. venv path moved in an image
# refactor) rather than letting `exec` die with a terse "not found" that
# restart:unless-stopped turns into a silent crash-loop.
ensure_pg_interpreter() {
if [[ ! -x "$PG_QUEUE_PYTHON_BIN" ]]; then
print_status $RED "PG-queue interpreter not found/executable: $PG_QUEUE_PYTHON_BIN — check the image build / venv path."
exit 1
fi
}

run_pg_consumer() {
ensure_pg_interpreter
local source_type="${WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE:-notification}"
Comment thread
muhammad-ali-e marked this conversation as resolved.
Comment thread
muhammad-ali-e marked this conversation as resolved.
local queues="${WORKER_PG_QUEUE_CONSUMER_QUEUE:-}"
Comment thread
muhammad-ali-e marked this conversation as resolved.
export WORKER_NAME="${WORKER_NAME:-pg-consumer-${source_type}}"

# Warn (don't fail) on missing config: all compose services set these, but a
# hand-rolled docker run / K8s Deployment that forgets them would otherwise
# start a mislabelled or idle-looking consumer silently. The module logs its
# resolved worker type + queue set at startup, so these stay diagnosable.
if [[ -z "${WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE:-}" ]]; then
print_status $YELLOW "WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE unset — defaulting source worker type to '$source_type'; WORKER_NAME label may not match the module's actual role."
fi
if [[ -z "$queues" ]]; then
print_status $YELLOW "WORKER_PG_QUEUE_CONSUMER_QUEUE unset — consumer will use the module default; check the startup log for the resolved queue set."
fi

print_status $GREEN "Starting PG-queue consumer..."
print_status $BLUE "Source worker type: $source_type"
print_status $BLUE "Queues: ${queues:-<unset — module default>}"
print_status $BLUE "Health port: ${WORKER_PG_QUEUE_CONSUMER_HEALTH_PORT:-<disabled>}"

# WORKER_PG_QUEUE_CONSUMER_WORKER_TYPE / _QUEUE are read by the module itself.
exec "$PG_QUEUE_PYTHON_BIN" -m pg_queue_consumer
}

run_pg_reaper() {
ensure_pg_interpreter
export WORKER_NAME="${WORKER_NAME:-pg-reaper}"

print_status $GREEN "Starting PG-queue reaper (leader-elected)..."
print_status $BLUE "Interval: ${WORKER_PG_REAPER_INTERVAL_SECONDS:-5}s"
print_status $BLUE "Health port: ${WORKER_PG_REAPER_HEALTH_PORT:-<disabled>}"

exec "$PG_QUEUE_PYTHON_BIN" -m pg_queue_reaper
}

# Main execution
# Load environment first for any needed variables
load_env "$ENV_FILE"
Expand All @@ -496,6 +557,32 @@ discover_pluggable_workers
# Add PYTHONPATH for imports - include both /app and /unstract for packages
export PYTHONPATH="/app:/unstract/core/src:/unstract/connectors/src:/unstract/filesystem/src:/unstract/flags/src:/unstract/tool-registry/src:/unstract/tool-sandbox/src:/unstract/workflow-execution/src:${PYTHONPATH:-}"

# PG-queue components run a dedicated module, not a Celery worker — dispatch
# them before the Celery command-building logic below. PYTHONPATH (exported
# above) is required for the module imports.
case "${1:-}" in
Comment thread
muhammad-ali-e marked this conversation as resolved.
pg-queue-consumer|pg-consumer)
run_pg_consumer
;;
pg-queue-reaper|pg-reaper|reaper)
run_pg_reaper
;;
pg-*)
# Obviously-PG-intended but unrecognized (e.g. a typo'd command) — fail
# loudly instead of silently coercing it into a default Celery worker.
# Scoped to the `pg-` prefix (reserved for PG-queue components) so it
# never intercepts a pluggable worker name; the exact reaper aliases are
# already matched above.
print_status $RED "Unrecognized PG-queue command: '$1' (did you mean pg-queue-consumer / pg-queue-reaper?)."
exit 1
;;
*)
# Not a PG-queue command — intentionally fall through to the Celery
# command logic below (handles every existing worker type + full
# Celery commands).
;;
esac

# Two-path logic: Full Celery command vs Traditional worker type
if [[ "$1" == *"celery"* ]] || [[ "$1" == *".venv"* ]]; then
# =============================================================================
Expand Down