UN-3608 [FEAT] PG Queue 9i — executor async/callback path on PG (self-chained continuations)#2097
Conversation
…-chained continuations) Migrate the executor RPC's async/callback path off Celery onto the PG queue, completing the executor-transport migration (the blocking path landed in 9h-c). - dispatch_async / dispatch_with_callback now route PG-vs-Celery per call via the single pg_queue_enabled flag (was always Celery). Backend + workers. - §5 fire-and-forget self-chaining: on_success/on_error Celery Signatures are translated to serialisable ContinuationSpecs carried in the payload; after the executor runs execute_extraction the consumer self-chains the matching continuation onto the callback queue (result prepended on success, dispatch task_id on error), acking regardless to avoid an LLM double-spend. - New gated worker-pg-ide-callback consumer drains the ide_callback queue (Prompt Studio run/index/extract, lookups). Compose profile pg-queue. - ContinuationSpec + on_success/on_error/task_id added to the shared TaskPayload wire contract (unstract.core). Zero-regression: every new path is gated and fails closed to Celery; gate OFF is byte-identical to the prior Celery behaviour. Call sites unchanged — they keep passing Celery Signatures; the dispatcher translates only on the PG branch. Tests: payload set/unset, signature->spec translation, consumer self-chain success/error + enqueue-failure guard, routing gate (backend + workers). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review — PG Queue 9i (executor async/callback path)
Ran the PR Review Toolkit (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier) over ab80a494..7afc6d19. Overall the implementation is well-structured and unusually well-commented, and the happy-path test coverage is solid. The inline comments below are the deduplicated findings, prioritised.
Critical (1): the early-drop branches (malformed / poison / unknown-task) never chain on_error, so a dispatch_with_callback caller hangs forever with no user-facing error.
Important (5): the reply_key XOR callback invariant is documented but unenforced; the PG on_error callback can't recover the real executor error via AsyncResult; success-result self-chaining and the backend producer don't JSON-coerce callback payloads (silent drop / insert error on UUID/datetime); _signature_to_spec accepts an empty task_name and silently drops positional callback args.
Suggestions: _DispatchHandle + _signature_to_spec are duplicated byte-for-byte across backend/workers; returned ExecutionResult(success=False) routes to on_success; over-broad swallow in _continuation_org; consumer helpers re-widen typed payloads to bare dict; minor comment/jargon polish.
Test gaps (no inline anchor — the branches aren't in this diff): add coverage for on_error firing on the drop branches, the both-keys-present precedence, and dispatch_async/dispatch_with_callback enqueue-failure propagation.
| reply_key, | ||
| ) | ||
| return | ||
| if on_error: |
There was a problem hiding this comment.
Critical — on_error callback dropped on the early-drop branches. This new block correctly self-chains on_error when the run raises, but the three early-drop branches above — malformed payload, poison/max_attempts exceeded, and unknown/unregistered task — only call _fail_reply(reply_key, ...) and then delete(). For a dispatch_with_callback message (no reply_key, has on_error) those branches delete the row and never chain on_error. A poison executor task is exactly the realistic failure mode, and it's the case where the user most needs the error surfaced; instead the HTTP-202 caller waits on a WebSocket error event that never arrives (run appears to hang, no terminal state). This is asymmetric with the reply_key path, which does fail those branches.
Fix: in each early-drop branch, in addition to _fail_reply, chain the error continuation when present, e.g. a helper _fail_dispatch(payload, error=...) that does _fail_reply(reply_key, ...) and if on_error: self._chain_continuation(on_error, prepend=payload.get('task_id') or '', payload=payload) before the delete(). Add tests for the unknown-task / poison / malformed cases with on_error set.
| # regardless — same anti-double-spend reasoning as reply_key (a | ||
| # vt-redelivery would re-run the executor / re-spend LLM tokens). | ||
| # _chain_continuation is best-effort, so the ack never wedges. | ||
| self._chain_continuation( |
There was a problem hiding this comment.
Important — PG on_error loses the real executor error text. prepend=payload.get('task_id') hands the callback (e.g. ide_index_error) the dispatch task_id as failed_task_id, but that callback resolves the actual message via AsyncResult(failed_task_id, app=app).result. On the PG path the executor ran eagerly via task.apply and never wrote to a Celery result backend under that id, so AsyncResult(...).result is empty and the callback degrades to its generic default error string. Parity is structural (right positional shape) but not behavioural — the real error is dropped.
Fix: the consumer has the exception in hand at the failure branch — carry the real error text into the on_error continuation (e.g. prepend f"{type(exc).__name__}: {exc}", or pass it via the continuation kwargs), or store the error under task_id in a backend the callback can read.
| # Async/callback: self-chain the success continuation onto the callback | ||
| # queue before the ack — the §5 hand-off (PG analogue of Celery's link). | ||
| # Best-effort (never raises): a chain failure logs + still acks, so the | ||
| # executor is not re-run (LLM double-spend); the callback is lost. |
There was a problem hiding this comment.
Important — success callback silently dropped on non-JSON-safe results. prepend=eager.result is enqueued as args=[eager.result]; client.send serialises with plain json.dumps(message) (no default=str). An executor result dict containing a UUID/datetime — exactly the kind of payload these results carry — makes json.dumps raise TypeError, which _chain_continuation's broad except swallows ("FAILED to self-chain"), so the success callback and its user-facing WebSocket event are lost. The Celery path serialises via its configured serializer and has no such gap.
Fix: JSON-coerce the prepended result before enqueuing (mirror backend/pg_queue/producer.py:_json_safe), or give client.send's json.dumps a default=str.
| message.msg_id, | ||
| reply_key, | ||
| ) | ||
| elif on_success: |
There was a problem hiding this comment.
Suggestion — a returned ExecutionResult(success=False) chains on_success. The success/error split keys purely on whether task.apply(...) raised, but execute_extraction can return a failed ExecutionResult (success=False, via ExecutionResult.failure(...)) without raising. That takes this elif on_success branch and chains the success continuation with a failure payload; on_error never fires. This matches Celery semantics (link_error also only fires on raised exceptions), so it's parity rather than a regression — but given the user-facing impact, either inspect eager.result.success is False and route to on_error, or add a comment documenting that returned-failure deliberately follows the success path, so a future reader doesn't mistake it for the drop-branch bug above.
| self._result_backend = PgResultBackend() | ||
| self._result_backend.store_result(reply_key, result=result, error=error) | ||
|
|
||
| def _chain_continuation(self, spec: dict, *, prepend: object, payload: dict) -> None: |
There was a problem hiding this comment.
Suggestion — restore the typed contract at the consumer boundary. _chain_continuation(self, spec: dict, *, prepend, payload: dict) re-widens to bare dict, discarding the new ContinuationSpec / TaskPayload types the PR added precisely for these shapes. A key rename across the producer↔consumer boundary (e.g. task_name → name) wouldn't be caught by the type checker. Annotate spec: ContinuationSpec, payload: TaskPayload (imports already available in this tree).
| if reply_key is not None: | ||
| message["reply_key"] = reply_key | ||
| if on_success is not None: | ||
| message["on_success"] = on_success |
There was a problem hiding this comment.
Important — on_success/on_error bypass _json_safe. args/kwargs go through _json_safe(...) above (lines 96-97), but the continuation specs (each with a nested kwargs dict) are written into the JSONField verbatim. If a callback's kwargs carry a UUID/datetime, PgQueueMessage.objects.create raises at insert time — and unlike the worker path this is caller-visible at dispatch. Fix: message["on_success"] = _json_safe(on_success) (and likewise on_error).
| "PG self-chaining routes by the row's queue and cannot default it" | ||
| ) | ||
| return ContinuationSpec( | ||
| task_name=sig.task, |
There was a problem hiding this comment.
Important — _signature_to_spec validates queue but not task, and silently drops positional args. Two gaps with the same fail-fast philosophy already applied to queue:
task_name=sig.taskis read with no guard. Ifsig.taskis empty/None(a malformed signature), the continuation enqueues with a falsytask_nameand is dropped far downstream as "malformed payload: missing task_name" — a lost callback with a misleading log. Addif not getattr(sig, 'task', None): raise ValueError(...).ContinuationSpechas noargs, and onlysig.kwargsis read. A CelerySignaturemay carry positionalargs; on the PG branch they're silently dropped, so the callback runs with a different arg list than on Celery — violating the "call sites unchanged" contract. Today's call sites are kwargs-only (latent), but make it loud:if getattr(sig, 'args', None): raise ValueError("callback signature has positional args; PG self-chaining supports kwargs only").
| _DEFAULT_TIMEOUT = 3600 | ||
|
|
||
|
|
||
| class _DispatchHandle: |
There was a problem hiding this comment.
Suggestion — _DispatchHandle and _signature_to_spec are duplicated byte-for-byte with backend/pg_queue/executor_rpc.py. Both files now carry identical copies of this class and the _signature_to_spec helper (and near-identical dispatch_async/dispatch_with_callback bodies). The new translation logic — especially the correctness-bearing "fail fast on missing queue" rule — can now drift silently between the two copies. ContinuationSpec/PgTaskStatus were hoisted into unstract.core for exactly this reason; _DispatchHandle and _signature_to_spec (no Django/worker deps) belong there too. Import both copies from one shared definition.
| the prompt-studio call sites keep passing Celery ``Signature``s unchanged and | ||
| the dispatcher translates them only on the PG branch. | ||
|
|
||
| The consumer prepends the chained argument the callback expects (the executor |
There was a problem hiding this comment.
Suggestion — "before kwargs" is slightly misleading. _chain_continuation passes the chained value as a separate positional args=[prepend] list and kwargs as a distinct mapping — they aren't concatenated into one positional sequence, so "before kwargs" reads as if kwargs were positional. Reword to e.g.: "… passes the chained argument as the callback's first positional arg, alongside the spec's kwargs — mirroring how Celery's link prepends the parent task's return value."
| # IDE callback consumer (③c) — drains the ``ide_callback`` queue the executor | ||
| # self-chains onto for async/callback dispatch (Prompt Studio run/index/extract, | ||
| # lookups). Registers the ide_callback worker's tasks (ide_prompt_complete, | ||
| # ide_index_complete, extraction_complete, …). Dark until dispatch_with_callback |
There was a problem hiding this comment.
Suggestion — "Dark" is jargon for a compose file. Clear to the authors (idle / no traffic) but opaque to an operator scanning services. Suggest: "Idle (receives no work) until dispatch_with_callback routes to PG, i.e. the gating flag is on". Also worth confirming: the sibling worker-pg-callback/worker-pg-scheduler declare depends_on: rabbitmq; this service doesn't. Likely fine since ide_callback tasks only do API writes + ws emits, but verify none dispatch onward to a Celery broker.


What
Migrates the executor RPC's async/callback path off Celery onto the PG queue, completing the executor-transport migration (the blocking path landed in 9h-c). Targets the
feat/UN-3445-pg-queue-integrationbranch (notmain).Today
dispatch_with_callback(Prompt Studio run/index/extract, lookups) always rides Celerylink/link_error. This routes it through Postgres instead, via §5 fire-and-forget self-chaining: the executor consumer, after runningexecute_extraction, enqueues the continuation itself.How
dispatch_async/dispatch_with_callbacknow pick PG-vs-Celery per call via the singlepg_queue_enabledflag (same gate as the rest of the feature), in both backend and workersexecutor_rpc.py.on_success/on_errorCelerySignatures are translated to a serialisableContinuationSpec(task_name/kwargs/queue) carried in the task payload. After the executor runs, the consumer self-chains the matching continuation onto the callback queue: the result dict prepended on success, the dispatchtask_idon error (Celerylink_errorparity). It acks regardless of chain outcome — a vt-redelivery would re-run the executor (LLM double-spend); a chain failure is logged loud and best-effort.worker-pg-ide-callbackservice drains theide_callbackqueue (compose profilepg-queue).ContinuationSpec+on_success/on_error/task_idadded to the sharedTaskPayloadinunstract.core(all optional, set only when present).Zero-regression
Every new path is gated and fails closed to Celery. Gate OFF is byte-identical to the prior behaviour:
dispatch_with_callbackdelegates to the unchanged SDKExecutionDispatcher(send_task(..., link=, link_error=)), no continuation payload, no self-chain. Call sites are unchanged — they keep passing CelerySignatures; the dispatcher translates them only on the PG branch.Tests
signature → spectranslation (incl. missing-queue fail-fast)Dev-tested end-to-end (flag ON)
backend dispatch_with_callback → PG → worker-pg-executor execute_extraction → self-chain → worker-pg-ide-callback ide_prompt_complete → emit-websocket 200, output persisted— zero RabbitMQ. Non-regression confirmed with a flag ON/OFF A/B (identical behaviour).After this slice the executor transport is fully on PG; RabbitMQ can be decommissioned next.
🤖 Generated with Claude Code