Skip to content

UN-3508 [FEAT] Plumb fairness through ExecutionDispatcher#2009

Merged
muhammad-ali-e merged 5 commits into
mainfrom
UN-3508-executor-fairness
Jun 8, 2026
Merged

UN-3508 [FEAT] Plumb fairness through ExecutionDispatcher#2009
muhammad-ali-e merged 5 commits into
mainfrom
UN-3508-executor-fairness

Conversation

@muhammad-ali-e

Copy link
Copy Markdown
Contributor

What

  • Add optional headers: dict | None kwarg to all three dispatch methods on unstract.sdk1.execution.dispatcher.ExecutionDispatcher (dispatch, dispatch_async, dispatch_with_callback). When non-None, forwarded as Celery message headers; when None, the kwarg is omitted from send_task so existing callers see the identical pre-change call shape.
  • New FairnessKey.as_header() helper in workers/queue_backend/fairness.py returns the wire-ready {"x-fairness-key": ...} dict, so producers don't need to import the slot-name constant.
  • Wire fairness from the three dispatcher.dispatch(...) sites in workers/file_processing/structure_tool_task.py (lines 468, 507, 720). Each passes headers=_fairness_headers(organization_id). A small in-file helper defaults workload_type to NON_API; propagating the real type from upstream is Phase 6 work.
  • New workers/tests/test_executor_dispatch.py (7 tests):
    • Header forwarding through all three dispatcher methods.
    • "Omit when None" — call shape preservation for callers that don't opt in (sdk1 round-trip tests still green unchanged).
    • FairnessKey.as_header() shape.
    • AST inventory canary: no production code calls *.send_task("execute_extraction", ...) outside ExecutionDispatcher.

Why

UN-3501 (#2003) plumbed fairness on bare dispatch(...) call sites — but execute_extraction, the most workflow-execution-y dispatch in the codebase, bypasses queue_backend.dispatch() entirely (it uses sdk1's ExecutionDispatcher). The Phase 5.1 canary audits bare-name dispatch() and missed it. This PR brings the third dispatch path under the same fairness umbrella.

Under Celery today the header sits inertly — no consumer reads it. Under the future PG Queue substrate, execute_extraction tasks land in the staging queue and the fairness scheduler sorts on the same three fields. Producer-side plumbing now means the consumer can land without backfilling in-flight payloads.

How

  • ExecutionDispatcher only attaches headers= to send_task when the caller provides them. That preserves the pre-change mock.assert_called_with(...) shape used by sdk1's existing 78 dispatcher tests — they all stay green without modification.
  • FairnessKey.as_header() shifts the wire-shape knowledge into the seam module, so producers compose a FairnessKey(...) and call .as_header(). The constant FAIRNESS_HEADER_NAME stays internal to queue_backend.fairness — keeps the "no consumer reads it yet" canary green at the producer level.
  • The _fairness_headers helper in structure_tool_task.py is intentionally one function: same workload_type for all three sites today; Phase 6 (chord lift) is the right place to thread the real workload type from the workflow's class.

Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)

No.

  • headers is optional and defaults to None on all three dispatcher methods. Callers that don't opt in see the identical send_task call shape.
  • All 78 existing sdk1 dispatcher tests pass unchanged.
  • Worker seam suite extended (53 → 60 tests, all green).
  • Producer-side only: no consumer in workers/ reads x-fairness-key yet (canary in test_fairness_key.py still asserts this).
  • No queue routing, task name, or args/kwargs change.

Database Migrations

  • None.

Env Config

  • None.

Relevant Docs

  • N/A. Module-level docstrings in queue_backend/fairness.py and the dispatcher cover the header semantics.

Related Issues or PRs

Dependencies Versions

  • None.

Notes on Testing

```
cd workers
.venv/bin/python -m pytest \
tests/test_executor_dispatch.py \
tests/test_fairness_key.py \
tests/test_queue_backend_seam.py \
tests/test_dispatch_sites_characterisation.py

60 passed in ~6s

cd ../unstract/sdk1
uv run pytest tests/test_execution.py

80 passed

```

Verification:

```

Every execute_extraction dispatch goes through ExecutionDispatcher:

grep -rn "send_task.execute_extraction" workers/ --include=".py" \
| grep -v "ExecutionDispatcher\|tests/\|sdk1/"

(empty — only sdk1's ExecutionDispatcher.dispatch* methods)

```

Screenshots

N/A (no UI surface).

Checklist

I have read and understood the Contribution Guidelines.

@coderabbitai

coderabbitai Bot commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a10df623-988a-4a67-af8a-a87c47a064c9

📥 Commits

Reviewing files that changed from the base of the PR and between 524ae91 and b1c61c4.

📒 Files selected for processing (9)
  • unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py
  • workers/queue_backend/dispatch.py
  • workers/tests/canary_helpers.py
  • workers/tests/test_callback_sanity.py
  • workers/tests/test_canary_helpers.py
  • workers/tests/test_executor_dispatch.py
  • workers/tests/test_fairness_key.py
  • workers/tests/test_sanity_phase5.py
  • workers/tests/test_structure_tool_task.py
🚧 Files skipped from review as they are similar to previous changes (4)
  • workers/tests/canary_helpers.py
  • unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py
  • workers/tests/test_fairness_key.py
  • workers/tests/test_executor_dispatch.py

Summary by CodeRabbit

  • Refactor

    • Dispatching now consistently accepts and forwards optional message headers and centralizes send-argument construction.
    • Worker dispatch sites updated to include fairness headers for workload routing.
  • Tests

    • Added coverage for header propagation, fairness-header shape, and dispatch-call inventory (canary) across production code.
    • New shared test helpers and updates to several test suites for deterministic task lookups and header assertions.

Walkthrough

ExecutionDispatcher accepts optional Celery headers via a shared _build_send_kwargs helper. FairnessKey gains as_header(); callers in structure_tool_task now pass fairness headers. Shared AST canary helpers were added and tests validate header forwarding, fairness serialization, and detector behavior.

Changes

Executor Header Forwarding and Fairness Integration

Layer / File(s) Summary
ExecutionDispatcher header forwarding contract and helper
unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py
ExecutionDispatcher.dispatch, dispatch_async, and dispatch_with_callback signatures all accept optional `headers: dict[str, Any]
FairnessKey header serialization & dispatch wiring
workers/queue_backend/fairness.py, workers/queue_backend/dispatch.py
FairnessKey.as_header() formats fairness metadata into the Celery send_task headers payload shape; dispatch.py now uses fairness.as_header() when fairness is provided.
Caller integration with fairness headers
workers/file_processing/structure_tool_task.py
Imports FairnessKey/WorkloadType, implements _fairness_headers(organization_id) that returns executor dispatch headers with WorkloadType.NON_API, and passes headers=_fairness_headers(organization_id) to three dispatcher.dispatch call sites.
Shared AST canary helpers
workers/tests/canary_helpers.py
Adds WORKERS_ROOT, DEFAULT_SKIP_TOP_DIRS, and iter_production_trees() for production-tree AST audits, emitting UserWarning for unparseable files.
Executor dispatch test coverage and AST detector
workers/tests/test_executor_dispatch.py, workers/tests/test_fairness_key.py
New tests assert ExecutionDispatcher forwards headers when provided and omits them when falsy; verify FairnessKey.as_header() shape including org_id=None; and include an AST canary that detects raw app.send_task("execute_extraction", ...) calls outside the dispatcher seam. test_fairness_key.py switched to shared canary helpers.
Canary helper tests and sanity/test updates
workers/tests/test_canary_helpers.py, workers/tests/test_structure_tool_task.py, workers/tests/test_sanity_phase5.py, workers/tests/test_callback_sanity.py
Add unit tests for iter_production_trees warning behavior; add tests asserting _fairness_headers produces x-fairness-key wire shape and that integration dispatch uses headers; narrow callback healthcheck selection to a deterministic task name.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 56.76% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: adding fairness header plumbing through ExecutionDispatcher across three dispatch methods.
Description check ✅ Passed The PR description is comprehensive and well-structured, covering all required template sections with detailed explanations of what changed, why, how, testing, and potential impacts.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3508-executor-fairness

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@muhammad-ali-e muhammad-ali-e left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR review — six specialised review agents (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier) ran against this branch's 6-file scope. Inline comments below are deduplicated and prioritised. HIGH/MEDIUM items are worth resolving before merge; LOW/NIT items are optional polish.

Comment thread unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py
Comment thread unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py Outdated
Comment thread unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py Outdated
Comment thread workers/file_processing/structure_tool_task.py Outdated
Comment thread workers/file_processing/structure_tool_task.py Outdated
Comment thread workers/tests/test_executor_dispatch.py Outdated
Comment thread workers/tests/test_executor_dispatch.py Outdated
Comment thread workers/tests/test_executor_dispatch.py Outdated
Comment thread workers/tests/test_executor_dispatch.py
Comment thread workers/tests/test_executor_dispatch.py
@muhammad-ali-e

Copy link
Copy Markdown
Contributor Author

Review feedback addressed — `d6300bc0`

14/14 actioned. One-to-one accounting:

HIGH (4/4)

# File:Line Status
1 dispatcher.py:224dispatch_with_callback Args missing headers: ✅ Added the line, mirroring dispatch_async's entry
2 structure_tool_task.py:43 — return type widening ✅ Narrowed to dict[str, dict[str, str | int | None]] (matches as_header())
3 structure_tool_task.py:49 — "Phase 6 work" with no TODO anchor ✅ Replaced with TODO(UN-3504): propagate caller's WorkloadType... — grep-able + ticket-anchored
4 canary_helpers.py:35 — silent SyntaxError swallow ✅ Real silent-failure bug; now emits UserWarning so the regression surfaces. Tests can promote via warnings.simplefilter("error", UserWarning). Both the fairness canary and the execute-extraction canary inherit the fix automatically since they share this helper.

MEDIUM (5/5)

# File:Line Status
5 dispatcher.py:147 — empty-dict footgun + 3 copies of send_kwargs block ✅ Extracted _build_send_kwargs(context, queue, *, headers=None, link=None, link_error=None, task_id=None) static helper. All three dispatch methods now share one source of truth (kills ~25 LOC of duplication). Empty-dict semantic documented in dispatch()'s headers: Args block as "caller responsible for the header schema; {} is forwarded as-is and likely indicates a producer-side build bug".
6 test_executor_dispatch.py:32# type: ignore[arg-type] ✅ Switched to dict[str, Any] + **overrides: Any; dropped the silencer
7 test_executor_dispatch.py:48,67 — invalid "etl" literal ✅ Replaced with WorkloadType.NON_API.value (and WorkloadType.API.value where appropriate). Renames now propagate
8 test_executor_dispatch.py:64 — missing async "omits when None" test ✅ Added test_dispatch_async_omits_headers_when_none
9 test_executor_dispatch.py:128 — canary blind spots vs docstring ✅ Tightened docstring to "raw string-literal dispatch" + listed the known blind spots (constant refs, f-strings, apply_async) in the assertion message. Closed via docstring honesty rather than AST resolution widening — the latter is over-engineering for what's a 1-line dispatcher seam

LOW / NIT (5/5)

# File:Line Status
10 dispatcher.py:117 — bare string instead of symbol ✅ Docstring now references queue_backend.fairness.FAIRNESS_HEADER_NAME symbol with "x-fairness-key" in parens for grep continuity. sdk1 can't import the constant (no workers dep), but naming the symbol survives renames.
11 fairness.py:58 — vague "wire-ready" ✅ Docstring now states the concrete shape {FAIRNESS_HEADER_NAME: self.to_dict()}
12 test_executor_dispatch.py:1,55 — plan-stage vocab ✅ Module docstring rewritten to describe what tests cover; L55 comment no longer says "pre-Phase-5.2"
13 test_executor_dispatch.py:13 — import order ✅ Relative from .canary_helpers moved below unstract.sdk1 block
14 test_executor_dispatch.py:93 — missing org_id=None case ✅ Added test_fairness_header_shape_orgless pinning the JSON-null serialisation contract

Tests

  • workers/tests/test_executor_dispatch.py: 7 → 9 (added the two missing coverage tests)
  • workers/tests/test_fairness_key.py: 22 (unchanged)
  • workers/tests/test_queue_backend_seam.py: 17 (unchanged)
  • workers/tests/test_dispatch_sites_characterisation.py: 14 (unchanged)
  • unstract/sdk1/tests/test_execution.py: 80 (unchanged — _build_send_kwargs refactor preserves call-shape semantics)
  • Workers seam suite total: 62 (was 60). sdk1: 80. All green.

Notable structural changes

  • _build_send_kwargs in dispatcher.py is the second time this rollout introduces a private factoring (the first was _fairness_headers in structure_tool_task.py). Both serve the same principle: one place to audit the optional-kwarg contract.
  • The canary-helpers SyntaxError fix is the kind of silent-failure I'd want banked as a pattern for future canary code. The reviewer's framing — "the docstring acknowledges the bug as justification" — was exactly right.

PR head now: d6300bc0.

muhammad-ali-e and others added 3 commits June 8, 2026 11:18
Phase 5.2 of the PG Queue rollout (epic UN-3445). Adds fairness-header
support to the third dispatch path (sdk1's ExecutionDispatcher) so
``execute_extraction`` tasks emitted by file_processing carry the same
routing metadata as workflow-execution dispatches that go through
queue_backend.

What

* sdk1/execution/dispatcher.py: ``dispatch``, ``dispatch_async``,
  ``dispatch_with_callback`` all accept an optional ``headers`` kwarg.
  When non-None, forwarded to Celery's send_task; when None, omitted
  so the call shape stays identical to pre-Phase-5.2 for callers that
  don't opt in (sdk1's existing tests remain green unchanged).
* queue_backend/fairness.py: new ``FairnessKey.as_header()`` method
  returns the wire-ready ``{"x-fairness-key": ...}`` dict. Producers
  no longer need to reference ``FAIRNESS_HEADER_NAME`` directly —
  keeps the additive-only canary in test_fairness_key.py happy.
* file_processing/structure_tool_task.py: small ``_fairness_headers``
  helper builds the header (defaulting workload_type to NON_API;
  propagating the real type is Phase 6 work). All three
  ``dispatcher.dispatch(...)`` sites (lines 468, 507, 720) now pass
  ``headers=_fairness_headers(organization_id)``.
* tests/test_executor_dispatch.py: new file. Covers header forwarding
  through all three dispatcher methods (including the "omit when
  None" pre-existing shape preservation), the FairnessKey.as_header()
  shape, and an AST inventory canary that forbids raw
  ``*.send_task("execute_extraction", ...)`` outside
  ExecutionDispatcher.

Why

UN-3501 plumbed fairness on bare dispatch() call sites. The
``execute_extraction`` task is the most workflow-execution-y dispatch
in the codebase but bypasses queue_backend (uses ExecutionDispatcher
directly), so it had no fairness header. The canary in
test_fairness_key.py audits only bare-name dispatch() and missed it.

No regression risk

* Additive: ``headers`` is optional and defaults to None on all three
  dispatcher methods; the existing 78 sdk1 tests pass unchanged.
* Producer-side only — no consumer reads ``x-fairness-key`` yet.
* No queue routing, task name, or args/kwargs change.

Test count: workers seam suite 53 -> 60 (new test_executor_dispatch.py
with 7 tests). sdk1 dispatcher suite 80/80 green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…uplication

SonarCloud flagged 7.8% duplicated lines in the new
test_executor_dispatch.py — the file-walking helper and skip-dir
constants were copy-pasted from test_fairness_key.py.

Move them into tests/canary_helpers.py:

* WORKERS_ROOT, DEFAULT_SKIP_TOP_DIRS constants.
* iter_production_trees(skip_top_dirs=…) generator.

Both canary tests use relative imports (from .canary_helpers import …)
to keep one canonical import path — tests/ is already a package via
__init__.py, no pyproject change needed. (An earlier attempt added
pythonpath = ["tests"], reverted — it would have created a second
top-level import path for every test file and a dual-module-object
hazard.)

The fairness canary widens its skip set with ``queue_backend`` (where
the seam legitimately defines fairness constants); the executor canary
keeps the default. Tests stay at 60/60 — pure dedup, no behavioural
change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* dispatcher.py: factor _build_send_kwargs helper; document headers kwarg on dispatch_with_callback; reference FAIRNESS_HEADER_NAME symbol instead of bare string; document empty-dict caller-bug semantic
* structure_tool_task.py: narrow _fairness_headers return type; replace 'Phase 6 work' with TODO(UN-3504) anchor
* fairness.py: concrete as_header() docstring with explicit shape
* canary_helpers.py: surface SyntaxError via UserWarning (real silent-failure bug; canaries no longer pass vacuously on unparseable files)
* test_executor_dispatch.py: switch to dict[str, Any] dropping type-ignore; use WorkloadType.NON_API.value instead of invalid 'etl' literal; new test_dispatch_async_omits_headers_when_none; tighten canary docstring + note blind spots; drop plan-stage vocab; reorder relative import; new test_fairness_header_shape_orgless for org_id=None case

Tests: workers 60 -> 62, sdk1 dispatcher 80/80 green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@muhammad-ali-e muhammad-ali-e marked this pull request as ready for review June 8, 2026 05:51
@greptile-apps

greptile-apps Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR wires fairness headers through ExecutionDispatcher, the third (and previously unplumbed) dispatch path for execute_extraction tasks. It adds an optional headers kwarg to all three dispatcher methods, introduces FairnessKey.as_header() as the canonical header-shape builder, and attaches fairness metadata to the three dispatcher.dispatch(...) call sites in structure_tool_task.py, all with a NON_API default pending Phase 6 workload-type propagation.

  • ExecutionDispatcher._build_send_kwargs centralises the send_task kwargs construction and uses a falsy-drop (if headers:) to preserve the pre-change call shape for callers that pass no headers, keeping 78 existing sdk1 tests green without modification.
  • FairnessKey.as_header() hides the FAIRNESS_HEADER_NAME wire-name constant from producers, keeping the "no consumer reads it yet" invariant at the producer level; dispatch.py is updated to call it instead of inlining the dict.
  • Seven new tests in test_executor_dispatch.py cover header forwarding, the omit-when-falsy contract across all three dispatcher methods, and an AST canary ensuring no raw send_task("execute_extraction", …) escapes the ExecutionDispatcher seam.

Confidence Score: 5/5

Safe to merge. The change is producer-side only — no consumer reads the header yet — so there is no on-wire behaviour change for existing tasks.

All three dispatcher methods receive an optional headers kwarg that defaults to None and is dropped when falsy, leaving the send_task call shape identical for every existing caller. The _build_send_kwargs refactor is a pure consolidation with no logic change. The three new dispatch(...) call sites in structure_tool_task.py only add metadata and do not alter task routing, args, or kwargs.

No files require special attention.

Important Files Changed

Filename Overview
unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py Adds _build_send_kwargs to centralise send_task kwargs construction; adds optional headers kwarg to all three dispatch methods. Falsy-drop on headers preserves backward compatibility.
workers/file_processing/structure_tool_task.py Wires fairness headers at all three dispatcher.dispatch(...) call sites via a small _fairness_headers helper. Hard-coded NON_API default is intentional and well-documented with a TODO for Phase 6.
workers/queue_backend/fairness.py Adds as_header() convenience method to FairnessKey; keeps FAIRNESS_HEADER_NAME encapsulated within the module as intended.
workers/queue_backend/dispatch.py Simplified to call fairness.as_header() instead of inlining the dict; removes the now-unnecessary FAIRNESS_HEADER_NAME import. Functionally equivalent to prior behaviour.
workers/tests/test_executor_dispatch.py New test file covering header forwarding across all three dispatcher methods, falsy-drop contract, combined-kwargs correctness, and an AST canary guarding against raw send_task calls outside ExecutionDispatcher.
workers/tests/canary_helpers.py New shared AST-walk helper for inventory canaries. Emits UserWarning on unparseable files rather than silently skipping, which callers promote to test failures via pytestmark.
workers/tests/test_callback_sanity.py Fixes a flaky healthcheck lookup by matching the fully-qualified task name callback.worker.healthcheck instead of the ambiguous endswith pattern that was sensitive to Celery registry insertion order.
workers/tests/test_structure_tool_task.py New tests locking the _fairness_headers wire shape, including a pinned assertion that workload_type is NON_API to catch a silent priority-inversion regression.
workers/tests/test_sanity_phase5.py Adds a headers= assertion to the existing TestStructureToolSingleDispatch test, pairing integration-level coverage with the unit tests in test_structure_tool_task.py.
workers/tests/test_fairness_key.py Refactored to use shared canary_helpers; skip set composition is identical to the original; adds pytestmark filterwarnings for stricter canary enforcement.
workers/tests/test_canary_helpers.py Tests that iter_production_trees emits a UserWarning on unparseable files and that promoting that warning to an error raises correctly — locking the fail-loud contract the canary modules depend on.

Reviews (3): Last reviewed commit: "UN-3508 [FIX] Address vishnuszipstack re..." | Re-trigger Greptile

Comment thread workers/tests/canary_helpers.py
…n a list'

Greptile P2: function builds and returns a list — it is not a
generator — but the docstring opened with 'Yield ...', which would
mislead a reader into expecting lazy consumption / generator semantics
(early break, send(), etc.).

Pure docstring fix, no behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
workers/tests/canary_helpers.py (1)

21-24: ⚡ Quick win

Naming and docstring mismatch: "iter" and "Yield" suggest generator, but implementation returns a list.

The function name iter_production_trees and docstring verb "Yield" strongly imply generator/iterator semantics (lazy evaluation, one-time consumption), but the implementation builds and returns a complete list[tuple[...]] (lines 32, 47). This misleads readers familiar with Python's iterator protocol and naming conventions (e.g., itertools, dict.items()).

The list approach is reasonable here (allows multiple iterations, manageable file count), but the contract should be explicit.

📝 Proposed fix

Option 1 (preferred): Rename to reflect the actual behavior and fix the docstring:

-def iter_production_trees(
+def get_production_trees(
     skip_top_dirs: frozenset[str] = DEFAULT_SKIP_TOP_DIRS,
 ) -> list[tuple[pathlib.Path, ast.AST]]:
-    """Yield ``(rel_path, parsed_tree)`` for every .py file outside the skip set.
+    """Return ``(rel_path, parsed_tree)`` for every .py file outside the skip set.

Then update call sites in test_executor_dispatch.py line 147 and test_fairness_key.py line 33.

Option 2: Make it an actual generator (only if lazy parsing is valuable):

 def iter_production_trees(
     skip_top_dirs: frozenset[str] = DEFAULT_SKIP_TOP_DIRS,
-) -> list[tuple[pathlib.Path, ast.AST]]:
+) -> Iterator[tuple[pathlib.Path, ast.AST]]:
     """Yield ``(rel_path, parsed_tree)`` for every .py file outside the skip set.
     ...
     """
-    out: list[tuple[pathlib.Path, ast.AST]] = []
     for py in WORKERS_ROOT.rglob("*.py"):
         rel = py.relative_to(WORKERS_ROOT)
         if rel.parts and rel.parts[0] in skip_top_dirs:
             continue
         try:
             tree = ast.parse(py.read_text(), filename=str(py))
         except SyntaxError as exc:
             warnings.warn(
                 f"canary scan skipping unparseable file {rel}: {exc}",
                 UserWarning,
                 stacklevel=2,
             )
             continue
-        out.append((rel, tree))
+        yield (rel, tree)
-    return out

(Add from collections.abc import Iterator to imports.)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@workers/tests/canary_helpers.py` around lines 21 - 24, The function
iter_production_trees has iterator-style naming and docstring but returns a
list; rename the function to list_production_trees (and update its docstring to
say "Return" not "Yield") to reflect its actual behavior, update any references
(e.g., calls in test_executor_dispatch.py and test_fairness_key.py) to the new
name, and run tests to ensure imports/usages are consistent; if you prefer lazy
behavior instead, convert iter_production_trees to a generator by yielding each
(path, parsed_tree) and update its type hint to Iterator[tuple[pathlib.Path,
ast.AST]].
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@workers/tests/canary_helpers.py`:
- Around line 21-24: The function iter_production_trees has iterator-style
naming and docstring but returns a list; rename the function to
list_production_trees (and update its docstring to say "Return" not "Yield") to
reflect its actual behavior, update any references (e.g., calls in
test_executor_dispatch.py and test_fairness_key.py) to the new name, and run
tests to ensure imports/usages are consistent; if you prefer lazy behavior
instead, convert iter_production_trees to a generator by yielding each (path,
parsed_tree) and update its type hint to Iterator[tuple[pathlib.Path, ast.AST]].

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 47d676de-b680-4f86-8e09-bdc94cb470b2

📥 Commits

Reviewing files that changed from the base of the PR and between 35a5b95 and 981c758.

📒 Files selected for processing (6)
  • unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py
  • workers/file_processing/structure_tool_task.py
  • workers/queue_backend/fairness.py
  • workers/tests/canary_helpers.py
  • workers/tests/test_executor_dispatch.py
  • workers/tests/test_fairness_key.py

@vishnuszipstack vishnuszipstack left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated PR Review (PR Review Toolkit)

Ran six specialised review agents — Comment Analyzer, PR Test Analyzer, Silent Failure Hunter, Type Design Analyzer, Code Reviewer, Code Simplifier — against origin/main...HEAD. No CRITICAL correctness defects. The change is well-constructed and tests pass (31/31). Inline comments below, prioritised.

Highest-value items: (1) the canary UserWarning is not enforced as an error by either caller, so an unparseable file silently shrinks the audited tree and lets canaries pass vacuously; (2) _fairness_headers (the producer) has no test asserting the NON_API default or that the 3 dispatch sites attach the header.

One finding can't be inlined because it's outside the diff: workers/queue_backend/dispatch.py:41 hand-builds {FAIRNESS_HEADER_NAME: fairness.to_dict()}, an exact duplicate of the new FairnessKey.as_header() — refactor it to fairness.as_header() so the wire format has a single encoding (see comment on fairness.py:57).

Comment thread workers/tests/canary_helpers.py
Comment thread workers/tests/canary_helpers.py Outdated
Comment thread workers/file_processing/structure_tool_task.py
Comment thread workers/tests/test_executor_dispatch.py Outdated
Comment thread workers/tests/test_executor_dispatch.py Outdated
Comment thread workers/tests/test_executor_dispatch.py Outdated
Comment thread workers/tests/test_executor_dispatch.py
Comment thread unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py Outdated
Comment thread unstract/sdk1/src/unstract/sdk1/execution/dispatcher.py
Comment thread workers/queue_backend/fairness.py
…test-infra fix)

Seven of Vishnu's findings against ``524ae9184`` addressed. Three
flagged IMPORTANT (silent-failure + missing test coverage), four
SUGGESTION (drift hazard, comment/behaviour mismatch, hollow canary,
duplicate-test cleanup). The other three (hand-built fixture,
SDK1 ``dict[str, Any]`` boundary, ``as_header`` TypedDict refactor)
deferred — see PR thread acknowledgments.

* **#11a (SUGGESTION, drift)** — ``workers/queue_backend/dispatch.py``
  still hand-built the fairness header instead of calling
  ``fairness.as_header()``. Wire-format encoding now has a single
  source so the two sites can't drift. ``FAIRNESS_HEADER_NAME``
  import dropped (no longer used here).

* **#9 (SUGGESTION, comment/behaviour mismatch)** —
  ``ExecutionDispatcher._build_send_kwargs`` was forwarding ``{}``
  as-is despite the docstring calling it "likely indicates a
  producer-side build bug". Changed ``if headers is not None`` ->
  ``if headers``: falsy is dropped so the on-wire shape matches the
  no-headers baseline and a miswired producer surfaces immediately.
  Docstring rewritten to describe the new contract.

* **#3 (SUGGESTION, wording)** — ``canary_helpers`` docstring
  overstated adoption. Softened to "intended single home" and
  named the two characterisation walkers still inlining the logic
  as a known follow-up.

* **#5 + #6 (IMPORTANT cleanup + SUGGESTION fixture)** — six
  structurally-identical ``*_forwards_headers`` /
  ``*_omits_headers_when_none`` tests collapsed to three
  parametrized methods over the three dispatch entry points.
  Fixture now uses ``FairnessKey(...).as_header()`` rather than
  hand-built dicts, so the wire shape exercised matches what real
  producers emit (including ``pipeline_priority``). Net: ~60 LOC
  removed, per-method failure granularity preserved via parametrize
  IDs. Also added empty-dict drop assertions covering #9.

* **#7 (SUGGESTION, missing combined test)** — new
  ``test_dispatch_with_callback_combines_headers_and_callbacks``
  passes ``on_success``, ``on_error``, ``task_id``, and ``headers``
  together and asserts all four land on the same ``send_task``
  call. A key-merge regression in ``_build_send_kwargs`` would
  have slipped through the single-kwarg forwarding tests.

* **#8 (SUGGESTION, hollow canary)** — the
  ``execute_extraction`` dispatch canary only ever asserted the
  empty (passing) case against the live tree. Added a positive-
  detection unit test feeding ``ast.parse`` of a known-bad snippet
  and a blind-spot lock test (constant ref, f-string,
  ``apply_async`` all evade the detector — documenting the scope
  so a future widening intentionally trips the asserts).

* **#4 (IMPORTANT, untested helper)** — ``_fairness_headers`` in
  ``structure_tool_task`` was untested; a regression flipping
  ``NON_API`` -> ``API`` or dropping ``headers=`` at any of the
  three call sites would have stayed green. Added focused unit
  tests in new ``test_structure_tool_task.py`` (wire shape,
  org_id propagation, ``NON_API`` not ``API``) and extended
  ``test_sanity_phase5.TestStructureToolSingleDispatch`` to assert
  ``dispatch.call_args.kwargs["headers"]`` carries the expected
  shape.

* **#2 (IMPORTANT, vacuous-pass)** — ``iter_production_trees``
  warned-and-continued on ``SyntaxError`` but neither canary
  module promoted the warning to error. A botched merge in a
  production file would have dropped silently from the audit set
  and every canary would have passed vacuously over a smaller
  tree. Added ``pytestmark = pytest.mark.filterwarnings(
  "error::UserWarning")`` on both ``test_executor_dispatch`` and
  ``test_fairness_key``, plus a new ``test_canary_helpers.py``
  that unit-tests both the warn-on-broken behaviour and the
  promote-to-error contract the canary modules rely on.

**Bundled test-infra fix (unrelated but unblocks CI):** the
``test_callback_sanity.TestEagerHealthcheckRoundTrip`` tests
selected the healthcheck task via ``endswith(".healthcheck")``
against ``eager_app.tasks``, which is a shared celery global
registry containing ``callback.worker.healthcheck``,
``executor.worker.healthcheck``,
``file_processing.worker.healthcheck`` etc. The bare ``next(...)``
returned whichever was inserted first — non-deterministic across
pytest module-collection orders. Without this fix, the new tests
added in this commit perturb the collection profile enough to
flip the failure rate from ~10% to nearly 100%. Replaced with
exact-name lookup ``name == "callback.worker.healthcheck"``.
Identical fix already landed on the UN-3513 branch (see #2020).

Test count: 31 -> 42 on the UN-3508-touched modules. Full workers
suite: 6 failures pre-existing baseline, unchanged by this commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sonarqubecloud

sonarqubecloud Bot commented Jun 8, 2026

Copy link
Copy Markdown

@github-actions

github-actions Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Unstract test results

Per-group results

Status Group Tier Passed Failed Errors Skipped Duration (s)
unit-connectors unit 64 12 0 3 17.1
unit-core unit 0 0 2 0 1.3
unit-platform-service unit 9 0 1 0 1.4
unit-prompt-service unit 15 0 0 0 20.4
unit-rig unit 53 0 0 0 3.4
unit-runner unit 11 0 0 0 3.3
unit-sdk1 unit 381 0 0 0 24.6
unit-tool-registry unit 0 0 1 0 1.4
unit-workers unit 0 0 0 0 17.9
TOTAL 533 12 4 3 90.6

Critical paths

⚠️ Critical paths not yet covered

  • auth-login — User can log in and obtain a session cookie. (entry: POST /api/v1/auth/login; declared coverage: no groups declared)
  • adapter-register-llm — Register and validate an LLM adapter. (entry: POST /api/v1/adapter/; declared coverage: no groups declared)
  • workflow-create-execute — Create a workflow, configure source+destination, execute, poll, fetch result. (entry: POST /api/v1/workflow/{id}/execute/; declared coverage: e2e-workflow)
  • api-deployment-run — Deploy a workflow as an API, POST a document, receive structured JSON. (entry: POST /deployment/api/{org}/{name}/; declared coverage: e2e-api-deployment)
  • prompt-studio-fetch-response — Prompt Studio: create project, add prompt, run single-pass, get response. (entry: POST /api/v1/prompt-studio/prompt-studio-tool/{id}/fetch_response/; declared coverage: e2e-prompt-studio)
  • pipeline-etl-execute — Run an ETL pipeline from source connector to destination. (entry: POST /api/v1/pipeline/{id}/execute/; declared coverage: no groups declared)
  • usage-token-tracking — Per-execution token usage is recorded and retrievable. (entry: GET /api/v1/usage/get_token_usage/; declared coverage: no groups declared)
  • workflow-execution-fan-out — Multi-file workflow execution fans out to file-processing workers and rejoins. (entry: internal: backend → rabbitmq → workers/file_processing; declared coverage: no groups declared)
  • callback-result-delivery — Async results are posted back via the callback worker. (entry: internal: workers/callback → backend /internal endpoints; declared coverage: no groups declared)
✅ Covered critical paths
  • tool-sandbox-exec — covered by unit-runner

@muhammad-ali-e muhammad-ali-e merged commit 7c0e251 into main Jun 8, 2026
9 checks passed
@muhammad-ali-e muhammad-ali-e deleted the UN-3508-executor-fairness branch June 8, 2026 11:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants