Skip to content

UN-3608 [FEAT] PG Queue 9i — executor async/callback path on PG (self-chained continuations)#2097

Draft
muhammad-ali-e wants to merge 1 commit into
feat/UN-3445-pg-queue-integrationfrom
UN-3608-pg-queue-executor-callback
Draft

UN-3608 [FEAT] PG Queue 9i — executor async/callback path on PG (self-chained continuations)#2097
muhammad-ali-e wants to merge 1 commit into
feat/UN-3445-pg-queue-integrationfrom
UN-3608-pg-queue-executor-callback

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

Migrates the executor RPC's async/callback path off Celery onto the PG queue, completing the executor-transport migration (the blocking path landed in 9h-c). Targets the feat/UN-3445-pg-queue-integration branch (not main).

Today dispatch_with_callback (Prompt Studio run/index/extract, lookups) always rides Celery link/link_error. This routes it through Postgres instead, via §5 fire-and-forget self-chaining: the executor consumer, after running execute_extraction, enqueues the continuation itself.

How

  • Routingdispatch_async / dispatch_with_callback now pick PG-vs-Celery per call via the single pg_queue_enabled flag (same gate as the rest of the feature), in both backend and workers executor_rpc.py.
  • Self-chaining — the on_success/on_error Celery Signatures are translated to a serialisable ContinuationSpec (task_name/kwargs/queue) carried in the task payload. After the executor runs, the consumer self-chains the matching continuation onto the callback queue: the result dict prepended on success, the dispatch task_id on error (Celery link_error parity). It acks regardless of chain outcome — a vt-redelivery would re-run the executor (LLM double-spend); a chain failure is logged loud and best-effort.
  • Consumer — new gated worker-pg-ide-callback service drains the ide_callback queue (compose profile pg-queue).
  • Wire contractContinuationSpec + on_success/on_error/task_id added to the shared TaskPayload in unstract.core (all optional, set only when present).

Zero-regression

Every new path is gated and fails closed to Celery. Gate OFF is byte-identical to the prior behaviour: dispatch_with_callback delegates to the unchanged SDK ExecutionDispatcher (send_task(..., link=, link_error=)), no continuation payload, no self-chain. Call sites are unchanged — they keep passing Celery Signatures; the dispatcher translates them only on the PG branch.

Tests

  • payload set/unset (optional keys), signature → spec translation (incl. missing-queue fail-fast)
  • consumer self-chain success/error branches + chain-enqueue-failure-still-acks guard
  • routing gate ON/OFF for all three dispatch modes (backend + workers)

Dev-tested end-to-end (flag ON)

backend dispatch_with_callback → PG → worker-pg-executor execute_extraction → self-chain → worker-pg-ide-callback ide_prompt_complete → emit-websocket 200, output persisted — zero RabbitMQ. Non-regression confirmed with a flag ON/OFF A/B (identical behaviour).

After this slice the executor transport is fully on PG; RabbitMQ can be decommissioned next.

🤖 Generated with Claude Code

…-chained continuations)

Migrate the executor RPC's async/callback path off Celery onto the PG queue,
completing the executor-transport migration (the blocking path landed in 9h-c).

- dispatch_async / dispatch_with_callback now route PG-vs-Celery per call via the
  single pg_queue_enabled flag (was always Celery). Backend + workers.
- §5 fire-and-forget self-chaining: on_success/on_error Celery Signatures are
  translated to serialisable ContinuationSpecs carried in the payload; after the
  executor runs execute_extraction the consumer self-chains the matching
  continuation onto the callback queue (result prepended on success, dispatch
  task_id on error), acking regardless to avoid an LLM double-spend.
- New gated worker-pg-ide-callback consumer drains the ide_callback queue
  (Prompt Studio run/index/extract, lookups). Compose profile pg-queue.
- ContinuationSpec + on_success/on_error/task_id added to the shared TaskPayload
  wire contract (unstract.core).

Zero-regression: every new path is gated and fails closed to Celery; gate OFF is
byte-identical to the prior Celery behaviour. Call sites unchanged — they keep
passing Celery Signatures; the dispatcher translates only on the PG branch.

Tests: payload set/unset, signature->spec translation, consumer self-chain
success/error + enqueue-failure guard, routing gate (backend + workers).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f7a04ff4-b1fc-4e51-a051-a2ec07f85926

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3608-pg-queue-executor-callback

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sonarqubecloud

Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
30.5% Duplication on New Code (required ≤ 3%)

See analysis details on SonarQube Cloud

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review — PG Queue 9i (executor async/callback path)

Ran the PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier) over ab80a494..7afc6d19. Overall the implementation is well-structured and unusually well-commented, and the happy-path test coverage is solid. The inline comments below are the deduplicated findings, prioritised.

Critical (1): the early-drop branches (malformed / poison / unknown-task) never chain on_error, so a dispatch_with_callback caller hangs forever with no user-facing error.

Important (5): the reply_key XOR callback invariant is documented but unenforced; the PG on_error callback can't recover the real executor error via AsyncResult; success-result self-chaining and the backend producer don't JSON-coerce callback payloads (silent drop / insert error on UUID/datetime); _signature_to_spec accepts an empty task_name and silently drops positional callback args.

Suggestions: _DispatchHandle + _signature_to_spec are duplicated byte-for-byte across backend/workers; returned ExecutionResult(success=False) routes to on_success; over-broad swallow in _continuation_org; consumer helpers re-widen typed payloads to bare dict; minor comment/jargon polish.

Test gaps (no inline anchor — the branches aren't in this diff): add coverage for on_error firing on the drop branches, the both-keys-present precedence, and dispatch_async/dispatch_with_callback enqueue-failure propagation.

reply_key,
)
return
if on_error:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical — on_error callback dropped on the early-drop branches. This new block correctly self-chains on_error when the run raises, but the three early-drop branches above — malformed payload, poison/max_attempts exceeded, and unknown/unregistered task — only call _fail_reply(reply_key, ...) and then delete(). For a dispatch_with_callback message (no reply_key, has on_error) those branches delete the row and never chain on_error. A poison executor task is exactly the realistic failure mode, and it's the case where the user most needs the error surfaced; instead the HTTP-202 caller waits on a WebSocket error event that never arrives (run appears to hang, no terminal state). This is asymmetric with the reply_key path, which does fail those branches.

Fix: in each early-drop branch, in addition to _fail_reply, chain the error continuation when present, e.g. a helper _fail_dispatch(payload, error=...) that does _fail_reply(reply_key, ...) and if on_error: self._chain_continuation(on_error, prepend=payload.get('task_id') or '', payload=payload) before the delete(). Add tests for the unknown-task / poison / malformed cases with on_error set.

# regardless — same anti-double-spend reasoning as reply_key (a
# vt-redelivery would re-run the executor / re-spend LLM tokens).
# _chain_continuation is best-effort, so the ack never wedges.
self._chain_continuation(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — PG on_error loses the real executor error text. prepend=payload.get('task_id') hands the callback (e.g. ide_index_error) the dispatch task_id as failed_task_id, but that callback resolves the actual message via AsyncResult(failed_task_id, app=app).result. On the PG path the executor ran eagerly via task.apply and never wrote to a Celery result backend under that id, so AsyncResult(...).result is empty and the callback degrades to its generic default error string. Parity is structural (right positional shape) but not behavioural — the real error is dropped.

Fix: the consumer has the exception in hand at the failure branch — carry the real error text into the on_error continuation (e.g. prepend f"{type(exc).__name__}: {exc}", or pass it via the continuation kwargs), or store the error under task_id in a backend the callback can read.

# Async/callback: self-chain the success continuation onto the callback
# queue before the ack — the §5 hand-off (PG analogue of Celery's link).
# Best-effort (never raises): a chain failure logs + still acks, so the
# executor is not re-run (LLM double-spend); the callback is lost.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — success callback silently dropped on non-JSON-safe results. prepend=eager.result is enqueued as args=[eager.result]; client.send serialises with plain json.dumps(message) (no default=str). An executor result dict containing a UUID/datetime — exactly the kind of payload these results carry — makes json.dumps raise TypeError, which _chain_continuation's broad except swallows ("FAILED to self-chain"), so the success callback and its user-facing WebSocket event are lost. The Celery path serialises via its configured serializer and has no such gap.

Fix: JSON-coerce the prepended result before enqueuing (mirror backend/pg_queue/producer.py:_json_safe), or give client.send's json.dumps a default=str.

message.msg_id,
reply_key,
)
elif on_success:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — a returned ExecutionResult(success=False) chains on_success. The success/error split keys purely on whether task.apply(...) raised, but execute_extraction can return a failed ExecutionResult (success=False, via ExecutionResult.failure(...)) without raising. That takes this elif on_success branch and chains the success continuation with a failure payload; on_error never fires. This matches Celery semantics (link_error also only fires on raised exceptions), so it's parity rather than a regression — but given the user-facing impact, either inspect eager.result.success is False and route to on_error, or add a comment documenting that returned-failure deliberately follows the success path, so a future reader doesn't mistake it for the drop-branch bug above.

self._result_backend = PgResultBackend()
self._result_backend.store_result(reply_key, result=result, error=error)

def _chain_continuation(self, spec: dict, *, prepend: object, payload: dict) -> None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — restore the typed contract at the consumer boundary. _chain_continuation(self, spec: dict, *, prepend, payload: dict) re-widens to bare dict, discarding the new ContinuationSpec / TaskPayload types the PR added precisely for these shapes. A key rename across the producer↔consumer boundary (e.g. task_namename) wouldn't be caught by the type checker. Annotate spec: ContinuationSpec, payload: TaskPayload (imports already available in this tree).

if reply_key is not None:
message["reply_key"] = reply_key
if on_success is not None:
message["on_success"] = on_success

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — on_success/on_error bypass _json_safe. args/kwargs go through _json_safe(...) above (lines 96-97), but the continuation specs (each with a nested kwargs dict) are written into the JSONField verbatim. If a callback's kwargs carry a UUID/datetime, PgQueueMessage.objects.create raises at insert time — and unlike the worker path this is caller-visible at dispatch. Fix: message["on_success"] = _json_safe(on_success) (and likewise on_error).

"PG self-chaining routes by the row's queue and cannot default it"
)
return ContinuationSpec(
task_name=sig.task,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — _signature_to_spec validates queue but not task, and silently drops positional args. Two gaps with the same fail-fast philosophy already applied to queue:

  1. task_name=sig.task is read with no guard. If sig.task is empty/None (a malformed signature), the continuation enqueues with a falsy task_name and is dropped far downstream as "malformed payload: missing task_name" — a lost callback with a misleading log. Add if not getattr(sig, 'task', None): raise ValueError(...).
  2. ContinuationSpec has no args, and only sig.kwargs is read. A Celery Signature may carry positional args; on the PG branch they're silently dropped, so the callback runs with a different arg list than on Celery — violating the "call sites unchanged" contract. Today's call sites are kwargs-only (latent), but make it loud: if getattr(sig, 'args', None): raise ValueError("callback signature has positional args; PG self-chaining supports kwargs only").

_DEFAULT_TIMEOUT = 3600


class _DispatchHandle:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — _DispatchHandle and _signature_to_spec are duplicated byte-for-byte with backend/pg_queue/executor_rpc.py. Both files now carry identical copies of this class and the _signature_to_spec helper (and near-identical dispatch_async/dispatch_with_callback bodies). The new translation logic — especially the correctness-bearing "fail fast on missing queue" rule — can now drift silently between the two copies. ContinuationSpec/PgTaskStatus were hoisted into unstract.core for exactly this reason; _DispatchHandle and _signature_to_spec (no Django/worker deps) belong there too. Import both copies from one shared definition.

the prompt-studio call sites keep passing Celery ``Signature``s unchanged and
the dispatcher translates them only on the PG branch.

The consumer prepends the chained argument the callback expects (the executor

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — "before kwargs" is slightly misleading. _chain_continuation passes the chained value as a separate positional args=[prepend] list and kwargs as a distinct mapping — they aren't concatenated into one positional sequence, so "before kwargs" reads as if kwargs were positional. Reword to e.g.: "… passes the chained argument as the callback's first positional arg, alongside the spec's kwargs — mirroring how Celery's link prepends the parent task's return value."

# IDE callback consumer (③c) — drains the ``ide_callback`` queue the executor
# self-chains onto for async/callback dispatch (Prompt Studio run/index/extract,
# lookups). Registers the ide_callback worker's tasks (ide_prompt_complete,
# ide_index_complete, extraction_complete, …). Dark until dispatch_with_callback

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — "Dark" is jargon for a compose file. Clear to the authors (idle / no traffic) but opaque to an operator scanning services. Suggest: "Idle (receives no work) until dispatch_with_callback routes to PG, i.e. the gating flag is on". Also worth confirming: the sibling worker-pg-callback/worker-pg-scheduler declare depends_on: rabbitmq; this service doesn't. Likely fine since ide_callback tasks only do API writes + ws emits, but verify none dispatch onward to a Celery broker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant