UN-3508 [FEAT] Plumb fairness through ExecutionDispatcher#2009
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (9)
🚧 Files skipped from review as they are similar to previous changes (4)
Summary by CodeRabbit
WalkthroughExecutionDispatcher accepts optional Celery headers via a shared _build_send_kwargs helper. FairnessKey gains as_header(); callers in structure_tool_task now pass fairness headers. Shared AST canary helpers were added and tests validate header forwarding, fairness serialization, and detector behavior. ChangesExecutor Header Forwarding and Fairness Integration
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review — six specialised review agents (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier) ran against this branch's 6-file scope. Inline comments below are deduplicated and prioritised. HIGH/MEDIUM items are worth resolving before merge; LOW/NIT items are optional polish.
Review feedback addressed — `d6300bc0`14/14 actioned. One-to-one accounting: HIGH (4/4)
MEDIUM (5/5)
LOW / NIT (5/5)
Tests
Notable structural changes
PR head now: |
Phase 5.2 of the PG Queue rollout (epic UN-3445). Adds fairness-header
support to the third dispatch path (sdk1's ExecutionDispatcher) so
``execute_extraction`` tasks emitted by file_processing carry the same
routing metadata as workflow-execution dispatches that go through
queue_backend.
What
* sdk1/execution/dispatcher.py: ``dispatch``, ``dispatch_async``,
``dispatch_with_callback`` all accept an optional ``headers`` kwarg.
When non-None, forwarded to Celery's send_task; when None, omitted
so the call shape stays identical to pre-Phase-5.2 for callers that
don't opt in (sdk1's existing tests remain green unchanged).
* queue_backend/fairness.py: new ``FairnessKey.as_header()`` method
returns the wire-ready ``{"x-fairness-key": ...}`` dict. Producers
no longer need to reference ``FAIRNESS_HEADER_NAME`` directly —
keeps the additive-only canary in test_fairness_key.py happy.
* file_processing/structure_tool_task.py: small ``_fairness_headers``
helper builds the header (defaulting workload_type to NON_API;
propagating the real type is Phase 6 work). All three
``dispatcher.dispatch(...)`` sites (lines 468, 507, 720) now pass
``headers=_fairness_headers(organization_id)``.
* tests/test_executor_dispatch.py: new file. Covers header forwarding
through all three dispatcher methods (including the "omit when
None" pre-existing shape preservation), the FairnessKey.as_header()
shape, and an AST inventory canary that forbids raw
``*.send_task("execute_extraction", ...)`` outside
ExecutionDispatcher.
Why
UN-3501 plumbed fairness on bare dispatch() call sites. The
``execute_extraction`` task is the most workflow-execution-y dispatch
in the codebase but bypasses queue_backend (uses ExecutionDispatcher
directly), so it had no fairness header. The canary in
test_fairness_key.py audits only bare-name dispatch() and missed it.
No regression risk
* Additive: ``headers`` is optional and defaults to None on all three
dispatcher methods; the existing 78 sdk1 tests pass unchanged.
* Producer-side only — no consumer reads ``x-fairness-key`` yet.
* No queue routing, task name, or args/kwargs change.
Test count: workers seam suite 53 -> 60 (new test_executor_dispatch.py
with 7 tests). sdk1 dispatcher suite 80/80 green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…uplication SonarCloud flagged 7.8% duplicated lines in the new test_executor_dispatch.py — the file-walking helper and skip-dir constants were copy-pasted from test_fairness_key.py. Move them into tests/canary_helpers.py: * WORKERS_ROOT, DEFAULT_SKIP_TOP_DIRS constants. * iter_production_trees(skip_top_dirs=…) generator. Both canary tests use relative imports (from .canary_helpers import …) to keep one canonical import path — tests/ is already a package via __init__.py, no pyproject change needed. (An earlier attempt added pythonpath = ["tests"], reverted — it would have created a second top-level import path for every test file and a dual-module-object hazard.) The fairness canary widens its skip set with ``queue_backend`` (where the seam legitimately defines fairness constants); the executor canary keeps the default. Tests stay at 60/60 — pure dedup, no behavioural change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* dispatcher.py: factor _build_send_kwargs helper; document headers kwarg on dispatch_with_callback; reference FAIRNESS_HEADER_NAME symbol instead of bare string; document empty-dict caller-bug semantic * structure_tool_task.py: narrow _fairness_headers return type; replace 'Phase 6 work' with TODO(UN-3504) anchor * fairness.py: concrete as_header() docstring with explicit shape * canary_helpers.py: surface SyntaxError via UserWarning (real silent-failure bug; canaries no longer pass vacuously on unparseable files) * test_executor_dispatch.py: switch to dict[str, Any] dropping type-ignore; use WorkloadType.NON_API.value instead of invalid 'etl' literal; new test_dispatch_async_omits_headers_when_none; tighten canary docstring + note blind spots; drop plan-stage vocab; reorder relative import; new test_fairness_header_shape_orgless for org_id=None case Tests: workers 60 -> 62, sdk1 dispatcher 80/80 green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
| Filename | Overview |
|---|---|
| unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py | Adds _build_send_kwargs to centralise send_task kwargs construction; adds optional headers kwarg to all three dispatch methods. Falsy-drop on headers preserves backward compatibility. |
| workers/file_processing/structure_tool_task.py | Wires fairness headers at all three dispatcher.dispatch(...) call sites via a small _fairness_headers helper. Hard-coded NON_API default is intentional and well-documented with a TODO for Phase 6. |
| workers/queue_backend/fairness.py | Adds as_header() convenience method to FairnessKey; keeps FAIRNESS_HEADER_NAME encapsulated within the module as intended. |
| workers/queue_backend/dispatch.py | Simplified to call fairness.as_header() instead of inlining the dict; removes the now-unnecessary FAIRNESS_HEADER_NAME import. Functionally equivalent to prior behaviour. |
| workers/tests/test_executor_dispatch.py | New test file covering header forwarding across all three dispatcher methods, falsy-drop contract, combined-kwargs correctness, and an AST canary guarding against raw send_task calls outside ExecutionDispatcher. |
| workers/tests/canary_helpers.py | New shared AST-walk helper for inventory canaries. Emits UserWarning on unparseable files rather than silently skipping, which callers promote to test failures via pytestmark. |
| workers/tests/test_callback_sanity.py | Fixes a flaky healthcheck lookup by matching the fully-qualified task name callback.worker.healthcheck instead of the ambiguous endswith pattern that was sensitive to Celery registry insertion order. |
| workers/tests/test_structure_tool_task.py | New tests locking the _fairness_headers wire shape, including a pinned assertion that workload_type is NON_API to catch a silent priority-inversion regression. |
| workers/tests/test_sanity_phase5.py | Adds a headers= assertion to the existing TestStructureToolSingleDispatch test, pairing integration-level coverage with the unit tests in test_structure_tool_task.py. |
| workers/tests/test_fairness_key.py | Refactored to use shared canary_helpers; skip set composition is identical to the original; adds pytestmark filterwarnings for stricter canary enforcement. |
| workers/tests/test_canary_helpers.py | Tests that iter_production_trees emits a UserWarning on unparseable files and that promoting that warning to an error raises correctly — locking the fail-loud contract the canary modules depend on. |
Reviews (3): Last reviewed commit: "UN-3508 [FIX] Address vishnuszipstack re..." | Re-trigger Greptile
…n a list' Greptile P2: function builds and returns a list — it is not a generator — but the docstring opened with 'Yield ...', which would mislead a reader into expecting lazy consumption / generator semantics (early break, send(), etc.). Pure docstring fix, no behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
workers/tests/canary_helpers.py (1)
21-24: ⚡ Quick winNaming and docstring mismatch: "iter" and "Yield" suggest generator, but implementation returns a list.
The function name
iter_production_treesand docstring verb "Yield" strongly imply generator/iterator semantics (lazy evaluation, one-time consumption), but the implementation builds and returns a completelist[tuple[...]](lines 32, 47). This misleads readers familiar with Python's iterator protocol and naming conventions (e.g.,itertools,dict.items()).The list approach is reasonable here (allows multiple iterations, manageable file count), but the contract should be explicit.
📝 Proposed fix
Option 1 (preferred): Rename to reflect the actual behavior and fix the docstring:
-def iter_production_trees( +def get_production_trees( skip_top_dirs: frozenset[str] = DEFAULT_SKIP_TOP_DIRS, ) -> list[tuple[pathlib.Path, ast.AST]]: - """Yield ``(rel_path, parsed_tree)`` for every .py file outside the skip set. + """Return ``(rel_path, parsed_tree)`` for every .py file outside the skip set.Then update call sites in
test_executor_dispatch.pyline 147 andtest_fairness_key.pyline 33.Option 2: Make it an actual generator (only if lazy parsing is valuable):
def iter_production_trees( skip_top_dirs: frozenset[str] = DEFAULT_SKIP_TOP_DIRS, -) -> list[tuple[pathlib.Path, ast.AST]]: +) -> Iterator[tuple[pathlib.Path, ast.AST]]: """Yield ``(rel_path, parsed_tree)`` for every .py file outside the skip set. ... """ - out: list[tuple[pathlib.Path, ast.AST]] = [] for py in WORKERS_ROOT.rglob("*.py"): rel = py.relative_to(WORKERS_ROOT) if rel.parts and rel.parts[0] in skip_top_dirs: continue try: tree = ast.parse(py.read_text(), filename=str(py)) except SyntaxError as exc: warnings.warn( f"canary scan skipping unparseable file {rel}: {exc}", UserWarning, stacklevel=2, ) continue - out.append((rel, tree)) + yield (rel, tree) - return out(Add
from collections.abc import Iteratorto imports.)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@workers/tests/canary_helpers.py` around lines 21 - 24, The function iter_production_trees has iterator-style naming and docstring but returns a list; rename the function to list_production_trees (and update its docstring to say "Return" not "Yield") to reflect its actual behavior, update any references (e.g., calls in test_executor_dispatch.py and test_fairness_key.py) to the new name, and run tests to ensure imports/usages are consistent; if you prefer lazy behavior instead, convert iter_production_trees to a generator by yielding each (path, parsed_tree) and update its type hint to Iterator[tuple[pathlib.Path, ast.AST]].
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@workers/tests/canary_helpers.py`:
- Around line 21-24: The function iter_production_trees has iterator-style
naming and docstring but returns a list; rename the function to
list_production_trees (and update its docstring to say "Return" not "Yield") to
reflect its actual behavior, update any references (e.g., calls in
test_executor_dispatch.py and test_fairness_key.py) to the new name, and run
tests to ensure imports/usages are consistent; if you prefer lazy behavior
instead, convert iter_production_trees to a generator by yielding each (path,
parsed_tree) and update its type hint to Iterator[tuple[pathlib.Path, ast.AST]].
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 47d676de-b680-4f86-8e09-bdc94cb470b2
📒 Files selected for processing (6)
unstract/sdk1/src/unstract/sdk1/execution/dispatcher.pyworkers/file_processing/structure_tool_task.pyworkers/queue_backend/fairness.pyworkers/tests/canary_helpers.pyworkers/tests/test_executor_dispatch.pyworkers/tests/test_fairness_key.py
981c758 to
524ae91
Compare
vishnuszipstack
left a comment
There was a problem hiding this comment.
Automated PR Review (PR Review Toolkit)
Ran six specialised review agents — Comment Analyzer, PR Test Analyzer, Silent Failure Hunter, Type Design Analyzer, Code Reviewer, Code Simplifier — against origin/main...HEAD. No CRITICAL correctness defects. The change is well-constructed and tests pass (31/31). Inline comments below, prioritised.
Highest-value items: (1) the canary UserWarning is not enforced as an error by either caller, so an unparseable file silently shrinks the audited tree and lets canaries pass vacuously; (2) _fairness_headers (the producer) has no test asserting the NON_API default or that the 3 dispatch sites attach the header.
One finding can't be inlined because it's outside the diff: workers/queue_backend/dispatch.py:41 hand-builds {FAIRNESS_HEADER_NAME: fairness.to_dict()}, an exact duplicate of the new FairnessKey.as_header() — refactor it to fairness.as_header() so the wire format has a single encoding (see comment on fairness.py:57).
…test-infra fix) Seven of Vishnu's findings against ``524ae9184`` addressed. Three flagged IMPORTANT (silent-failure + missing test coverage), four SUGGESTION (drift hazard, comment/behaviour mismatch, hollow canary, duplicate-test cleanup). The other three (hand-built fixture, SDK1 ``dict[str, Any]`` boundary, ``as_header`` TypedDict refactor) deferred — see PR thread acknowledgments. * **#11a (SUGGESTION, drift)** — ``workers/queue_backend/dispatch.py`` still hand-built the fairness header instead of calling ``fairness.as_header()``. Wire-format encoding now has a single source so the two sites can't drift. ``FAIRNESS_HEADER_NAME`` import dropped (no longer used here). * **#9 (SUGGESTION, comment/behaviour mismatch)** — ``ExecutionDispatcher._build_send_kwargs`` was forwarding ``{}`` as-is despite the docstring calling it "likely indicates a producer-side build bug". Changed ``if headers is not None`` -> ``if headers``: falsy is dropped so the on-wire shape matches the no-headers baseline and a miswired producer surfaces immediately. Docstring rewritten to describe the new contract. * **#3 (SUGGESTION, wording)** — ``canary_helpers`` docstring overstated adoption. Softened to "intended single home" and named the two characterisation walkers still inlining the logic as a known follow-up. * **#5 + #6 (IMPORTANT cleanup + SUGGESTION fixture)** — six structurally-identical ``*_forwards_headers`` / ``*_omits_headers_when_none`` tests collapsed to three parametrized methods over the three dispatch entry points. Fixture now uses ``FairnessKey(...).as_header()`` rather than hand-built dicts, so the wire shape exercised matches what real producers emit (including ``pipeline_priority``). Net: ~60 LOC removed, per-method failure granularity preserved via parametrize IDs. Also added empty-dict drop assertions covering #9. * **#7 (SUGGESTION, missing combined test)** — new ``test_dispatch_with_callback_combines_headers_and_callbacks`` passes ``on_success``, ``on_error``, ``task_id``, and ``headers`` together and asserts all four land on the same ``send_task`` call. A key-merge regression in ``_build_send_kwargs`` would have slipped through the single-kwarg forwarding tests. * **#8 (SUGGESTION, hollow canary)** — the ``execute_extraction`` dispatch canary only ever asserted the empty (passing) case against the live tree. Added a positive- detection unit test feeding ``ast.parse`` of a known-bad snippet and a blind-spot lock test (constant ref, f-string, ``apply_async`` all evade the detector — documenting the scope so a future widening intentionally trips the asserts). * **#4 (IMPORTANT, untested helper)** — ``_fairness_headers`` in ``structure_tool_task`` was untested; a regression flipping ``NON_API`` -> ``API`` or dropping ``headers=`` at any of the three call sites would have stayed green. Added focused unit tests in new ``test_structure_tool_task.py`` (wire shape, org_id propagation, ``NON_API`` not ``API``) and extended ``test_sanity_phase5.TestStructureToolSingleDispatch`` to assert ``dispatch.call_args.kwargs["headers"]`` carries the expected shape. * **#2 (IMPORTANT, vacuous-pass)** — ``iter_production_trees`` warned-and-continued on ``SyntaxError`` but neither canary module promoted the warning to error. A botched merge in a production file would have dropped silently from the audit set and every canary would have passed vacuously over a smaller tree. Added ``pytestmark = pytest.mark.filterwarnings( "error::UserWarning")`` on both ``test_executor_dispatch`` and ``test_fairness_key``, plus a new ``test_canary_helpers.py`` that unit-tests both the warn-on-broken behaviour and the promote-to-error contract the canary modules rely on. **Bundled test-infra fix (unrelated but unblocks CI):** the ``test_callback_sanity.TestEagerHealthcheckRoundTrip`` tests selected the healthcheck task via ``endswith(".healthcheck")`` against ``eager_app.tasks``, which is a shared celery global registry containing ``callback.worker.healthcheck``, ``executor.worker.healthcheck``, ``file_processing.worker.healthcheck`` etc. The bare ``next(...)`` returned whichever was inserted first — non-deterministic across pytest module-collection orders. Without this fix, the new tests added in this commit perturb the collection profile enough to flip the failure rate from ~10% to nearly 100%. Replaced with exact-name lookup ``name == "callback.worker.healthcheck"``. Identical fix already landed on the UN-3513 branch (see #2020). Test count: 31 -> 42 on the UN-3508-touched modules. Full workers suite: 6 failures pre-existing baseline, unchanged by this commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Unstract test resultsPer-group results
Critical paths
|



What
headers: dict | Nonekwarg to all three dispatch methods onunstract.sdk1.execution.dispatcher.ExecutionDispatcher(dispatch,dispatch_async,dispatch_with_callback). When non-None, forwarded as Celery message headers; when None, the kwarg is omitted fromsend_taskso existing callers see the identical pre-change call shape.FairnessKey.as_header()helper inworkers/queue_backend/fairness.pyreturns the wire-ready{"x-fairness-key": ...}dict, so producers don't need to import the slot-name constant.dispatcher.dispatch(...)sites inworkers/file_processing/structure_tool_task.py(lines 468, 507, 720). Each passesheaders=_fairness_headers(organization_id). A small in-file helper defaultsworkload_typetoNON_API; propagating the real type from upstream is Phase 6 work.workers/tests/test_executor_dispatch.py(7 tests):FairnessKey.as_header()shape.*.send_task("execute_extraction", ...)outsideExecutionDispatcher.Why
UN-3501 (#2003) plumbed fairness on bare
dispatch(...)call sites — butexecute_extraction, the most workflow-execution-y dispatch in the codebase, bypassesqueue_backend.dispatch()entirely (it uses sdk1'sExecutionDispatcher). The Phase 5.1 canary audits bare-namedispatch()and missed it. This PR brings the third dispatch path under the same fairness umbrella.Under Celery today the header sits inertly — no consumer reads it. Under the future PG Queue substrate,
execute_extractiontasks land in the staging queue and the fairness scheduler sorts on the same three fields. Producer-side plumbing now means the consumer can land without backfilling in-flight payloads.How
ExecutionDispatcheronly attachesheaders=tosend_taskwhen the caller provides them. That preserves the pre-changemock.assert_called_with(...)shape used by sdk1's existing 78 dispatcher tests — they all stay green without modification.FairnessKey.as_header()shifts the wire-shape knowledge into the seam module, so producers compose aFairnessKey(...)and call.as_header(). The constantFAIRNESS_HEADER_NAMEstays internal toqueue_backend.fairness— keeps the "no consumer reads it yet" canary green at the producer level._fairness_headershelper instructure_tool_task.pyis intentionally one function: same workload_type for all three sites today; Phase 6 (chord lift) is the right place to thread the real workload type from the workflow's class.Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
No.
headersis optional and defaults to None on all three dispatcher methods. Callers that don't opt in see the identicalsend_taskcall shape.x-fairness-keyyet (canary in test_fairness_key.py still asserts this).Database Migrations
Env Config
Relevant Docs
queue_backend/fairness.pyand the dispatcher cover the header semantics.Related Issues or PRs
Dependencies Versions
Notes on Testing
```
cd workers
.venv/bin/python -m pytest \
tests/test_executor_dispatch.py \
tests/test_fairness_key.py \
tests/test_queue_backend_seam.py \
tests/test_dispatch_sites_characterisation.py
60 passed in ~6s
cd ../unstract/sdk1
uv run pytest tests/test_execution.py
80 passed
```
Verification:
```
Every execute_extraction dispatch goes through ExecutionDispatcher:
grep -rn "send_task.execute_extraction" workers/ --include=".py" \
| grep -v "ExecutionDispatcher\|tests/\|sdk1/"
(empty — only sdk1's ExecutionDispatcher.dispatch* methods)
```
Screenshots
N/A (no UI surface).
Checklist
I have read and understood the Contribution Guidelines.