UN-3513 [FEAT] Type chord-callback boundary with BatchExecutionResult / FileExecutionResult#2020
UN-3513 [FEAT] Type chord-callback boundary with BatchExecutionResult / FileExecutionResult#2020muhammad-ali-e wants to merge 4 commits into
Conversation
… / FileExecutionResult Producers in workers/file_processing/tasks.py now build typed dataclasses (from unstract.core.worker_models) and emit their ``.to_dict()`` instead of hand-rolled dicts. Locks the wire shape to the dataclass schema so downstream refactors fail loud. Scope Producer-side typing only. Consumer (workers/callback/tasks.py + aggregate_file_batch_results) already reads via ``.get(..., default)`` — tolerant by construction — so no consumer-side change needed. Dataclass extensions (unstract.core.worker_models, additive only) * BatchExecutionResult gains 3 optional fields: skipped_already_completed, skipped_active_duplicate, organization_id. * FileExecutionResult gains 3 optional fields for the API path's legacy dict vocabulary: file_name (alias for file), result_data (alias for result), skipped (marker like "already_completed"). * Both from_dict updated to populate the new fields. Producer migrations (workers/file_processing/tasks.py) * L901 (general path, process_file_batch return): BatchExecutionResult(...).to_dict(). Wire dict gains file_results: [] and errors: [] defaults — strictly additive. * L1706, L1798, L1823 (API path returns from _process_file_batch_api_core helpers): FileExecutionResult(...).to_dict(). L1798 preserves the legacy storage_result field via dict-spread merge. Domain-vocabulary correction on the API path API-path producers previously returned status="completed" / "failed" — lowercase strings matching neither ExecutionStatus (workflow-level, uppercase) nor ApiDeploymentResultStatus (per-file, Success/Failed, the canonical per-file vocab). Producers now emit "Success" / "Failed" via FileExecutionResult. Audit: no Python equality consumer was found reading the lowercase variants (grep clean). Observability tooling pattern-matching the old strings would need updating; this is a domain-correctness fix. Tests New tests/test_chord_callback_boundary.py — 14 tests, 3 classes: * Wire-shape characterisation for BatchExecutionResult. * Wire-shape characterisation for FileExecutionResult with alias fields and canonical Success/Failed vocab. * Consumer tolerance: aggregate_file_batch_results-style .get() reads return expected values from the new wire shape. sdk1's 80 worker_models tests still pass — the dataclass extensions are strictly additive. Regression risk: zero on consumer side, zero on backend (doesn't import these classes; has its own FileExecutionResult in dto.py — untouched). Status-vocab shift on API path is a deliberate domain correction. Test count: workers boundary suite +14 (new); sdk1 dispatcher 80/80. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (2)
Summary by CodeRabbit
WalkthroughThis PR establishes wire-level skip reason tracking for file-processing results. It introduces a ChangesSkip reason tracking and result model standardization
🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
PR Review Toolkit — multi-agent review of UN-3513 (devil's-advocate stance)
Ran 6 agents (code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier) against the single commit d5a6e87f. Findings cross-validated against the actual source — links verified, not just agent assertions.
Top-line
- The dataclass extensions are mostly additive as claimed, and consumer tolerance via
.get()largely holds. The PR is mergeable in spirit. - But one BLOCKER reappeared in 5/6 agents independently:
__post_init__silently overwrites the constructor-passedstatusargument. Every migrated call site looks like it's setting status; none of them are. Today it's accidentally correct because each site happens to match theerror-presence rule, but the contract is a lie.from_dicthas the same pathology one layer deeper — it discards the wirestatusand re-derives. - One HIGH that contradicts the PR's stated scope:
_process_file_batch_apiattasks.py:1659still returns a hand-rolled{successful_files, failed_files}2-key dict — the actual API batch boundary feedingprocess_batch_callback_api. The PR description says "L1706/L1798/L1823 (API path returns from_process_file_batch_api_corehelpers)" but those are per-file helpers, not the batch return. The boundary is half-typed. - One HIGH that's subtle:
serialize_dataclass_to_dictdropsNone-valued keys (data_models.py:333). That means the new alias fields (file_name,result_data,skipped) DISAPPEAR from the wire whenever a producer doesn't set them. The test guarantees about aliases hold only because the tests explicitly populate both sides; any future call site that omits the alias silently changes the wire-shape contract for downstream consumers doingif "file_name" in wiremembership checks. - Other findings cluster on: missing producer-side tests, hollow consumer-tolerance test, pre-existing dead
status == "error"branch intime_utils.py:181(PR neither breaks nor fixes), no deprecation marker on aliases, no invariant ontotal_files == sum(...).
Prioritized summary
| # | Severity | File:line | Issue |
|---|---|---|---|
| 1 | BLOCKER | worker_models.py:268 |
__post_init__ ignores status= arg — silently coerces |
| 2 | BLOCKER | worker_models.py:296-301 (via :313) |
from_dict discards wire status, recomputes from error |
| 3 | HIGH | tasks.py:1659 (anchored at :913) |
API batch return still hand-rolled 2-key dict — boundary half-typed |
| 4 | HIGH | worker_models.py:264 |
serialize_dataclass_to_dict strips Nones — aliases drop off wire silently |
| 5 | HIGH | tasks.py:1806 |
Dict-spread merge can wrap storage soft-failures as SUCCESS |
| 6 | HIGH | tasks.py:1709 |
Already-completed skip indistinguishable from SUCCESS at aggregation |
| 7 | MEDIUM | tasks.py:1839 (time_utils.py:181) |
Pre-existing dead status == "error" consumer branch |
| 8 | MEDIUM | test_chord_callback_boundary.py:25 |
No producer-side tests; legacy-dict regression at producer is invisible |
| 9 | MEDIUM | test_chord_callback_boundary.py:184 |
Hollow consumer-tolerance test re-implements .get(); doesn't drive real consumer |
| 10 | LOW | worker_models.py:246 |
No deprecation marker / migration plan on aliases — becomes permanent |
| 11 | LOW | worker_models.py:347 |
No invariant total_files == sum(...); organization_id is context, not outcome |
| 12 | NIT | test_chord_callback_boundary.py:207 |
Flag pre-existing skipped_files read/no-write asymmetry |
Inline comments below carry the per-site detail and suggested fix.
…-binding tests) A+B from the triage on PR #2020: * tasks.py:1659 (API-path BATCH return) — migrated to BatchExecutionResult.to_dict(). Fixes the half-typed boundary the reviewer flagged. file_results, total_files, skipped_already_completed and organization_id are now on the wire. Successful/skipped counter semantic preserved (separating them is deferred to a follow-up). * New SkipReason StrEnum (worker_models.py) with ALREADY_COMPLETED + ACTIVE_DUPLICATE — mirrors the batch-level skip counters on BatchExecutionResult. FileExecutionResult.skipped is now SkipReason | None. from_dict coerces. Producer uses the enum; the ACTIVE_DUPLICATE value has no current per-file producer but is exercised end-to-end via a round-trip test. * TODO(UN-3516) marker on the three alias fields (file_name, result_data, skipped) — sunset ticket filed. * Tests strengthened: - TestProducerBinding drives real _compile_batch_result with a minimal SimpleNamespace context, and drives _process_single_file_api via mocked api_client for the already-completed branch. - TestRealConsumerTolerance imports the real aggregate_file_batch_results — producer-consumer contract driven end-to-end. - test_none_valued_optional_fields_stripped_from_wire documents serialize_dataclass_to_dict's None-strip behaviour. - test_active_duplicate_skip_reason_round_trips proves the second enum value isn't dead. - SonarCloud python:S1244 fixed — pytest.approx. - skipped_files==0 NIT assertion removed. Test count: workers boundary suite 14 -> 18; sdk1 worker_models 80/80 still green. Deferred (separate tickets to follow): __post_init__ silent status clobber, from_dict status discard, BatchExecutionResult invariant, storage soft-failure, dead aggregator branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Review feedback addressed —
|
| # | Finding | What changed |
|---|---|---|
| 4 (LOW) | Aliases lack sunset plan | TODO(UN-3516) markers on the three alias fields. UN-3516 filed with full audit/migration/removal acceptance criteria. |
| 6 (HIGH) | L1659 not migrated — half-typed boundary | Migrated to BatchExecutionResult.to_dict(). file_results, total_files, skipped_already_completed, organization_id now on the API-path wire. |
| 10 (MEDIUM) | Tests don't drive real producers | New TestProducerBinding drives _compile_batch_result + _process_single_file_api directly. A revert at the producer site now fails loud. |
| 11 (MEDIUM) | Hollow consumer-tolerance test | Replaced with TestRealConsumerTolerance that imports the real aggregate_file_batch_results. |
| 12 (NIT) | skipped_files == 0 reads as endorsement |
Assertion removed. |
| — (SonarCloud python:S1244) | Float equality in test | pytest.approx. |
| — (your typo-safety question) | "already_completed" as bare string |
New SkipReason StrEnum with ALREADY_COMPLETED and ACTIVE_DUPLICATE (mirroring the batch-counter vocab on BatchExecutionResult). Producer + tests use the enum. |
Partially fixed (2)
| # | Finding | What changed |
|---|---|---|
| 3 (HIGH) | serialize_dataclass_to_dict strips None |
Touching shared infra is out of scope; added test_none_valued_optional_fields_stripped_from_wire to document and assert the strip behaviour. Membership-check consumers now get a loud test failure if the wire shape ever diverges. |
| 7 (HIGH) | Skipped invisible + reprocessing on except | Skipped-invisible fixed via #6. Reprocessing on except Exception at L1719 is a pre-existing latent bug — deferred. |
Pushed back / deferred (5) — separate tickets to follow
| # | Finding | Why deferred |
|---|---|---|
| 1 (BLOCKER) | __post_init__ silently clobbers status |
Pre-existing dataclass design; touches a class shared with backend. Own design discussion. |
| 2 (BLOCKER) | from_dict discards wire status |
Same pathology as #1; bundled into the same ticket. |
| 5 (MEDIUM) | No total_files invariant + organization_id data-model question + add_file_result not updated |
Strict invariant requires separating successful/skipped counter semantic — that's a behavioural regression for the API path's existing counter. Bundled as a follow-up. |
| 8 (HIGH) | Storage soft-failure → SUCCESS | Pre-existing pattern (legacy dict had the same behaviour). This PR preserves it via dict-spread — does not make it worse. Separate ticket. |
| 9 (MEDIUM) | Dead aggregator branch (status == "error") |
Pre-existing dead code surfaced by the vocab shift, not caused by it. Bundled with the other latent-bug ticket. |
Tests
tests/test_chord_callback_boundary.py: 14 → 18 (addedTestProducerBinding, the None-strip doc test,test_active_duplicate_skip_reason_round_trips, andTestRealConsumerTolerance's second multi-batch case)- sdk1
tests/test_execution.py: 80/80 still green - SonarCloud float-equality finding: resolved
Branch state
UN-3513-chord-callback-result-typing at 8e16eb60e16e2dd55f49163678267b20eb7c2467. Two commits on the branch: d5a6e87f (initial) + 8e16eb60 (this review-fix round). CI / SonarCloud / Greptile / CodeRabbit re-running on the new HEAD.
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Second-pass review — three new findings that escaped the first round. All correctness-level: one data-loss / broken-contract at the batch boundary, one rolling-deploy regression on the consumer read path, and a test-coverage gap where the TestProducerBinding docstring overpromises relative to what the class actually exercises.
…ipped + missing producer tests) Three findings from the second review round on PR #2020: * HIGH — storage_result silent data loss at batch boundary. The per-file dict-spread at tasks.py:1816 preserved storage_result on the immediate return, but the value was dropped when wrapped into BatchExecutionResult.file_results (from_dict didn't know the key). Promoted to a typed FileExecutionResult.storage_result: Any | None field; producer now emits via the constructor; from_dict reads it back. The round-trip preserves it end-to-end. * HIGH — strict SkipReason parsing would crash entire batches during rolling deploys if a newer producer ever emitted an unknown value. Added FileExecutionResult._parse_skipped, which catches ValueError + logs a warning + falls back to None. Standard "strict on emit, lenient on receive" posture for wire compat. * MEDIUM — TestProducerBinding only covered 2 of 5 producer branches. Added three more tests: - _process_single_file_api success branch (asserts storage_result survives the typed wire — would catch the dict-spread revert). - _process_single_file_api failure branch (asserts canonical "Failed" vocab — catches reverts to the legacy lowercase "failed"). - process_file_batch_api batch wrapper via task.apply() with an in-memory result_backend (asserts BatchExecutionResult shape + skipped_already_completed counter derived from SkipReason.ALREADY_COMPLETED.value). Strengthened the existing already-completed branch test to assert result_data + metadata propagation. Bug caught by the new batch-wrapper test: process_file_batch_api was missing execution_time on its BatchExecutionResult(...) call — BatchExecutionResult.execution_time is a required positional, so the API-path batch task would have crashed with TypeError on every run. Introduced batch_start_time = time.time() at task entry and pass execution_time = time.time() - batch_start_time. The new test would have caught this immediately at PR time; logging it here as the exact value of producer-binding coverage. Test count: 18 -> 21; all green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
| Filename | Overview |
|---|---|
| unstract/core/src/unstract/core/worker_models.py | Extends both dataclasses with optional fields (all safe defaults) and adds the SkipReason enum; fixes None-stripping asymmetry in BatchExecutionResult.to_dict() with an explicit post-process loop; from_dict updated to hydrate new fields. |
| workers/file_processing/tasks.py | Three producer sites migrated from hand-rolled dicts to typed dataclass constructors; status vocabulary corrected from lowercase ad-hoc strings to ApiDeploymentResultStatus enum values; batch_start_time added for execution_time tracking on the API path. |
| workers/tests/test_chord_callback_boundary.py | New 14-test file covering wire-shape round-trips, producer binding, and real-consumer tolerance; includes the None-stripping symmetry test that documents the BatchExecutionResult.to_dict() fixup. |
| workers/tests/test_callback_sanity.py | Fixes flaky healthcheck test by switching from endswith to an exact 'callback.worker.healthcheck' match, avoiding non-deterministic task-registry ordering across workers. |
| unstract/core/src/unstract/core/init.py | Adds SkipReason to the package's public imports and all; purely additive. |
Sequence Diagram
sequenceDiagram
participant PFB as process_file_batch
participant CBR as _compile_batch_result
participant PFBA as process_file_batch_api
participant PSFA as _process_single_file_api
participant BER as BatchExecutionResult
participant FER as FileExecutionResult
participant CB as process_batch_callback
PFB->>CBR: context (WorkflowContextData)
CBR->>BER: "BatchExecutionResult(total, successful, failed, skipped_*, org_id)"
BER-->>CBR: .to_dict() wire dict
CBR-->>PFB: wire dict
PFB->>CB: chord callback
PFBA->>PSFA: file_data (per file)
PSFA->>FER: FileExecutionResult(file, file_execution_id, status, file_name, result_data, skipped?, storage_result?)
FER-->>PSFA: .to_dict() per-file wire dict
PSFA-->>PFBA: per-file wire dict
PFBA->>BER: "BatchExecutionResult(total, successful, failed, execution_time, file_results, skipped_*, org_id)"
BER-->>PFBA: .to_dict() with None-stripped nested file_results
PFBA->>CB: chord callback
CB->>CB: aggregate_file_batch_results (tolerant via .get()))
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
unstract/core/src/unstract/core/worker_models.py:297-301
The `status` parameter is accepted by the constructor but is always unconditionally overridden by `__post_init__` — the `else` branch sets `SUCCESS` regardless of what was passed. Every new producer call site in `tasks.py` now passes an explicit `status=ApiDeploymentResultStatus.SUCCESS/FAILED`, but those values are silently discarded. All current sites happen to pass a value consistent with what `__post_init__` would derive, so there is no runtime bug today. The risk is a future caller passing `status=FAILED` with no `error` and receiving `SUCCESS` back unexpectedly. Making `__post_init__` the sole source of truth or adding a docstring warning would make the contract unambiguous.
```suggestion
def __post_init__(self) -> None:
# NOTE: ``status`` is derived solely from the presence of ``error``.
# Any value passed to the constructor is overridden here.
if self.error:
self.status = ApiDeploymentResultStatus.FAILED
else:
self.status = ApiDeploymentResultStatus.SUCCESS
```
Reviews (2): Last reviewed commit: "UN-3513 [FIX] Symmetric None-stripping f..." | Re-trigger Greptile
…rministic callback healthcheck picker Greptile P2 #2 — None-stripping was asymmetric for nested FileExecutionResult objects. ``serialize_dataclass_to_dict`` only filters None at the outermost level, so a standalone ``FileExecutionResult.to_dict()`` would omit unset optional fields while ``batch.to_dict()["file_results"][i]`` would carry explicit ``"file_name": None`` etc. for the same input. A consumer doing ``"x" in result`` membership checks would behave differently depending on whether it read the standalone wire or the nested-in- batch wire — a real contract divergence. Fixed locally on ``BatchExecutionResult.to_dict()`` (not by touching the shared ``serialize_dataclass_to_dict`` infra): post-process ``wire["file_results"]`` to drop None-valued keys, mirroring the top-level strip. ``BatchExecutionResult.from_dict`` was already tolerant via ``.get(...)`` so the round-trip stays clean. Greptile P2 #1 (``status`` constructor parameter clobbered by ``__post_init__``) is the same pathology I flagged as BLOCKER #1 in the first review round — deferred to a separate ticket with the shared-infra dataclass redesign. Test coverage: extended the existing ``test_none_valued_optional_fields_stripped_from_wire`` to also assert nested symmetry — same test method, no new method added. This keeps the pytest collection profile stable (a separate test method would perturb celery's shared task-registry insertion order during pytest collection and amplify a pre-existing flake in ``test_callback_sanity.py``). Test infra fix (bundled because it would have flaked CI on this PR's HEAD): ``test_callback_sanity.TestEagerHealthcheckRoundTrip`` selected the healthcheck task via ``endswith(".healthcheck")`` against ``eager_app.tasks``. That registry is a shared celery global with at least 5 worker modules registering ``healthcheck`` (callback, executor, file_processing, log_consumer, scheduler). ``next(...)`` returned whichever was inserted first, which depends on pytest module-collection order across the whole suite. The test would assert ``worker_type == "callback"`` and intermittently get ``"executor"`` or ``"file_processing"`` instead — empirically a ~10% flake rate on this branch's HEAD, climbing to ~90% with any test-collection perturbation. Replaced with an exact-name lookup (``name == "callback.worker.healthcheck"``); 30/30 green across deterministic + randomised probes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Unstract test resultsPer-group results
Critical paths
|
…test-infra fix) Seven of Vishnu's findings against ``524ae9184`` addressed. Three flagged IMPORTANT (silent-failure + missing test coverage), four SUGGESTION (drift hazard, comment/behaviour mismatch, hollow canary, duplicate-test cleanup). The other three (hand-built fixture, SDK1 ``dict[str, Any]`` boundary, ``as_header`` TypedDict refactor) deferred — see PR thread acknowledgments. * **#11a (SUGGESTION, drift)** — ``workers/queue_backend/dispatch.py`` still hand-built the fairness header instead of calling ``fairness.as_header()``. Wire-format encoding now has a single source so the two sites can't drift. ``FAIRNESS_HEADER_NAME`` import dropped (no longer used here). * **#9 (SUGGESTION, comment/behaviour mismatch)** — ``ExecutionDispatcher._build_send_kwargs`` was forwarding ``{}`` as-is despite the docstring calling it "likely indicates a producer-side build bug". Changed ``if headers is not None`` -> ``if headers``: falsy is dropped so the on-wire shape matches the no-headers baseline and a miswired producer surfaces immediately. Docstring rewritten to describe the new contract. * **#3 (SUGGESTION, wording)** — ``canary_helpers`` docstring overstated adoption. Softened to "intended single home" and named the two characterisation walkers still inlining the logic as a known follow-up. * **#5 + #6 (IMPORTANT cleanup + SUGGESTION fixture)** — six structurally-identical ``*_forwards_headers`` / ``*_omits_headers_when_none`` tests collapsed to three parametrized methods over the three dispatch entry points. Fixture now uses ``FairnessKey(...).as_header()`` rather than hand-built dicts, so the wire shape exercised matches what real producers emit (including ``pipeline_priority``). Net: ~60 LOC removed, per-method failure granularity preserved via parametrize IDs. Also added empty-dict drop assertions covering #9. * **#7 (SUGGESTION, missing combined test)** — new ``test_dispatch_with_callback_combines_headers_and_callbacks`` passes ``on_success``, ``on_error``, ``task_id``, and ``headers`` together and asserts all four land on the same ``send_task`` call. A key-merge regression in ``_build_send_kwargs`` would have slipped through the single-kwarg forwarding tests. * **#8 (SUGGESTION, hollow canary)** — the ``execute_extraction`` dispatch canary only ever asserted the empty (passing) case against the live tree. Added a positive- detection unit test feeding ``ast.parse`` of a known-bad snippet and a blind-spot lock test (constant ref, f-string, ``apply_async`` all evade the detector — documenting the scope so a future widening intentionally trips the asserts). * **#4 (IMPORTANT, untested helper)** — ``_fairness_headers`` in ``structure_tool_task`` was untested; a regression flipping ``NON_API`` -> ``API`` or dropping ``headers=`` at any of the three call sites would have stayed green. Added focused unit tests in new ``test_structure_tool_task.py`` (wire shape, org_id propagation, ``NON_API`` not ``API``) and extended ``test_sanity_phase5.TestStructureToolSingleDispatch`` to assert ``dispatch.call_args.kwargs["headers"]`` carries the expected shape. * **#2 (IMPORTANT, vacuous-pass)** — ``iter_production_trees`` warned-and-continued on ``SyntaxError`` but neither canary module promoted the warning to error. A botched merge in a production file would have dropped silently from the audit set and every canary would have passed vacuously over a smaller tree. Added ``pytestmark = pytest.mark.filterwarnings( "error::UserWarning")`` on both ``test_executor_dispatch`` and ``test_fairness_key``, plus a new ``test_canary_helpers.py`` that unit-tests both the warn-on-broken behaviour and the promote-to-error contract the canary modules rely on. **Bundled test-infra fix (unrelated but unblocks CI):** the ``test_callback_sanity.TestEagerHealthcheckRoundTrip`` tests selected the healthcheck task via ``endswith(".healthcheck")`` against ``eager_app.tasks``, which is a shared celery global registry containing ``callback.worker.healthcheck``, ``executor.worker.healthcheck``, ``file_processing.worker.healthcheck`` etc. The bare ``next(...)`` returned whichever was inserted first — non-deterministic across pytest module-collection orders. Without this fix, the new tests added in this commit perturb the collection profile enough to flip the failure rate from ~10% to nearly 100%. Replaced with exact-name lookup ``name == "callback.worker.healthcheck"``. Identical fix already landed on the UN-3513 branch (see #2020). Test count: 31 -> 42 on the UN-3508-touched modules. Full workers suite: 6 failures pre-existing baseline, unchanged by this commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* UN-3508 [FEAT] Plumb fairness through ExecutionDispatcher
Phase 5.2 of the PG Queue rollout (epic UN-3445). Adds fairness-header
support to the third dispatch path (sdk1's ExecutionDispatcher) so
``execute_extraction`` tasks emitted by file_processing carry the same
routing metadata as workflow-execution dispatches that go through
queue_backend.
What
* sdk1/execution/dispatcher.py: ``dispatch``, ``dispatch_async``,
``dispatch_with_callback`` all accept an optional ``headers`` kwarg.
When non-None, forwarded to Celery's send_task; when None, omitted
so the call shape stays identical to pre-Phase-5.2 for callers that
don't opt in (sdk1's existing tests remain green unchanged).
* queue_backend/fairness.py: new ``FairnessKey.as_header()`` method
returns the wire-ready ``{"x-fairness-key": ...}`` dict. Producers
no longer need to reference ``FAIRNESS_HEADER_NAME`` directly —
keeps the additive-only canary in test_fairness_key.py happy.
* file_processing/structure_tool_task.py: small ``_fairness_headers``
helper builds the header (defaulting workload_type to NON_API;
propagating the real type is Phase 6 work). All three
``dispatcher.dispatch(...)`` sites (lines 468, 507, 720) now pass
``headers=_fairness_headers(organization_id)``.
* tests/test_executor_dispatch.py: new file. Covers header forwarding
through all three dispatcher methods (including the "omit when
None" pre-existing shape preservation), the FairnessKey.as_header()
shape, and an AST inventory canary that forbids raw
``*.send_task("execute_extraction", ...)`` outside
ExecutionDispatcher.
Why
UN-3501 plumbed fairness on bare dispatch() call sites. The
``execute_extraction`` task is the most workflow-execution-y dispatch
in the codebase but bypasses queue_backend (uses ExecutionDispatcher
directly), so it had no fairness header. The canary in
test_fairness_key.py audits only bare-name dispatch() and missed it.
No regression risk
* Additive: ``headers`` is optional and defaults to None on all three
dispatcher methods; the existing 78 sdk1 tests pass unchanged.
* Producer-side only — no consumer reads ``x-fairness-key`` yet.
* No queue routing, task name, or args/kwargs change.
Test count: workers seam suite 53 -> 60 (new test_executor_dispatch.py
with 7 tests). sdk1 dispatcher suite 80/80 green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* UN-3508 [REFACTOR] Extract shared canary helpers to drop SonarCloud duplication
SonarCloud flagged 7.8% duplicated lines in the new
test_executor_dispatch.py — the file-walking helper and skip-dir
constants were copy-pasted from test_fairness_key.py.
Move them into tests/canary_helpers.py:
* WORKERS_ROOT, DEFAULT_SKIP_TOP_DIRS constants.
* iter_production_trees(skip_top_dirs=…) generator.
Both canary tests use relative imports (from .canary_helpers import …)
to keep one canonical import path — tests/ is already a package via
__init__.py, no pyproject change needed. (An earlier attempt added
pythonpath = ["tests"], reverted — it would have created a second
top-level import path for every test file and a dual-module-object
hazard.)
The fairness canary widens its skip set with ``queue_backend`` (where
the seam legitimately defines fairness constants); the executor canary
keeps the default. Tests stay at 60/60 — pure dedup, no behavioural
change.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* UN-3508 [FIX] Address 14 PR review findings (HIGH/MED/NIT)
* dispatcher.py: factor _build_send_kwargs helper; document headers kwarg on dispatch_with_callback; reference FAIRNESS_HEADER_NAME symbol instead of bare string; document empty-dict caller-bug semantic
* structure_tool_task.py: narrow _fairness_headers return type; replace 'Phase 6 work' with TODO(UN-3504) anchor
* fairness.py: concrete as_header() docstring with explicit shape
* canary_helpers.py: surface SyntaxError via UserWarning (real silent-failure bug; canaries no longer pass vacuously on unparseable files)
* test_executor_dispatch.py: switch to dict[str, Any] dropping type-ignore; use WorkloadType.NON_API.value instead of invalid 'etl' literal; new test_dispatch_async_omits_headers_when_none; tighten canary docstring + note blind spots; drop plan-stage vocab; reorder relative import; new test_fairness_header_shape_orgless for org_id=None case
Tests: workers 60 -> 62, sdk1 dispatcher 80/80 green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* UN-3508 [DOCS] Fix iter_production_trees docstring: 'Yield' -> 'Return a list'
Greptile P2: function builds and returns a list — it is not a
generator — but the docstring opened with 'Yield ...', which would
mislead a reader into expecting lazy consumption / generator semantics
(early break, send(), etc.).
Pure docstring fix, no behaviour change.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* UN-3508 [FIX] Address vishnuszipstack review (7 real fixes + bundled test-infra fix)
Seven of Vishnu's findings against ``524ae9184`` addressed. Three
flagged IMPORTANT (silent-failure + missing test coverage), four
SUGGESTION (drift hazard, comment/behaviour mismatch, hollow canary,
duplicate-test cleanup). The other three (hand-built fixture,
SDK1 ``dict[str, Any]`` boundary, ``as_header`` TypedDict refactor)
deferred — see PR thread acknowledgments.
* **#11a (SUGGESTION, drift)** — ``workers/queue_backend/dispatch.py``
still hand-built the fairness header instead of calling
``fairness.as_header()``. Wire-format encoding now has a single
source so the two sites can't drift. ``FAIRNESS_HEADER_NAME``
import dropped (no longer used here).
* **#9 (SUGGESTION, comment/behaviour mismatch)** —
``ExecutionDispatcher._build_send_kwargs`` was forwarding ``{}``
as-is despite the docstring calling it "likely indicates a
producer-side build bug". Changed ``if headers is not None`` ->
``if headers``: falsy is dropped so the on-wire shape matches the
no-headers baseline and a miswired producer surfaces immediately.
Docstring rewritten to describe the new contract.
* **#3 (SUGGESTION, wording)** — ``canary_helpers`` docstring
overstated adoption. Softened to "intended single home" and
named the two characterisation walkers still inlining the logic
as a known follow-up.
* **#5 + #6 (IMPORTANT cleanup + SUGGESTION fixture)** — six
structurally-identical ``*_forwards_headers`` /
``*_omits_headers_when_none`` tests collapsed to three
parametrized methods over the three dispatch entry points.
Fixture now uses ``FairnessKey(...).as_header()`` rather than
hand-built dicts, so the wire shape exercised matches what real
producers emit (including ``pipeline_priority``). Net: ~60 LOC
removed, per-method failure granularity preserved via parametrize
IDs. Also added empty-dict drop assertions covering #9.
* **#7 (SUGGESTION, missing combined test)** — new
``test_dispatch_with_callback_combines_headers_and_callbacks``
passes ``on_success``, ``on_error``, ``task_id``, and ``headers``
together and asserts all four land on the same ``send_task``
call. A key-merge regression in ``_build_send_kwargs`` would
have slipped through the single-kwarg forwarding tests.
* **#8 (SUGGESTION, hollow canary)** — the
``execute_extraction`` dispatch canary only ever asserted the
empty (passing) case against the live tree. Added a positive-
detection unit test feeding ``ast.parse`` of a known-bad snippet
and a blind-spot lock test (constant ref, f-string,
``apply_async`` all evade the detector — documenting the scope
so a future widening intentionally trips the asserts).
* **#4 (IMPORTANT, untested helper)** — ``_fairness_headers`` in
``structure_tool_task`` was untested; a regression flipping
``NON_API`` -> ``API`` or dropping ``headers=`` at any of the
three call sites would have stayed green. Added focused unit
tests in new ``test_structure_tool_task.py`` (wire shape,
org_id propagation, ``NON_API`` not ``API``) and extended
``test_sanity_phase5.TestStructureToolSingleDispatch`` to assert
``dispatch.call_args.kwargs["headers"]`` carries the expected
shape.
* **#2 (IMPORTANT, vacuous-pass)** — ``iter_production_trees``
warned-and-continued on ``SyntaxError`` but neither canary
module promoted the warning to error. A botched merge in a
production file would have dropped silently from the audit set
and every canary would have passed vacuously over a smaller
tree. Added ``pytestmark = pytest.mark.filterwarnings(
"error::UserWarning")`` on both ``test_executor_dispatch`` and
``test_fairness_key``, plus a new ``test_canary_helpers.py``
that unit-tests both the warn-on-broken behaviour and the
promote-to-error contract the canary modules rely on.
**Bundled test-infra fix (unrelated but unblocks CI):** the
``test_callback_sanity.TestEagerHealthcheckRoundTrip`` tests
selected the healthcheck task via ``endswith(".healthcheck")``
against ``eager_app.tasks``, which is a shared celery global
registry containing ``callback.worker.healthcheck``,
``executor.worker.healthcheck``,
``file_processing.worker.healthcheck`` etc. The bare ``next(...)``
returned whichever was inserted first — non-deterministic across
pytest module-collection orders. Without this fix, the new tests
added in this commit perturb the collection profile enough to
flip the failure rate from ~10% to nearly 100%. Replaced with
exact-name lookup ``name == "callback.worker.healthcheck"``.
Identical fix already landed on the UN-3513 branch (see #2020).
Test count: 31 -> 42 on the UN-3508-touched modules. Full workers
suite: 6 failures pre-existing baseline, unchanged by this commit.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>



What
unstract/core/src/unstract/core/worker_models.py(strictly additive — every new field is optional with a safe default):BatchExecutionResultgainsskipped_already_completed: int = 0,skipped_active_duplicate: int = 0,organization_id: str | None = None.FileExecutionResultgainsfile_name: str | None = None,result_data: Any | None = None,skipped: str | None = None.from_dictupdated to populate the new fields.workers/file_processing/tasks.pyproducers now build the typed dataclasses and emit.to_dict()instead of hand-rolled dicts:process_file_batch) →BatchExecutionResult(...).to_dict()._process_file_batch_api_corehelpers) →FileExecutionResult(...).to_dict(). L1798 preserves the legacystorage_resultfield via dict-spread merge.workers/tests/test_chord_callback_boundary.py— 14 tests, 3 classes (wire-shape characterisation for both dataclasses + consumer-tolerance check).Why
Phase 5 of the rollout asks for every queue-crossing payload to be machine-readable so a future scheduler has structured data to dispatch on. The chord-callback boundary was the last loose dict-shaped handoff in workers — typed dataclasses already existed in
unstract.core.worker_modelsbut producers were returning ad-hoc dicts. This PR closes that gap on the producer side. Consumer side is already tolerant (.get(..., default)everywhere inaggregate_file_batch_results), so no consumer change is needed.Builds on UN-3501 (#2003 — fairness key) and runs in parallel with UN-3508 (#2009 — fairness through ExecutionDispatcher). Unblocks Phase 6 (Barrier interface) which needs a typed result contract to declare on
Barrier.coordinate(...).How
return {dict}→return Class(...).to_dict(). Wire shape gains a small number of default values (empty list, None, 0) that consumers ignore via.get().statusstring changes from lowercase"completed"/"failed"(ad-hoc, matched no canonical enum) →"Success"/"Failed"(the actualApiDeploymentResultStatusper-file vocabulary). See "Can this PR break any existing features" below.Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
No on consumer code; no on backend; yes on log/observability tooling that pattern-matches the lowercase API-path status strings (deliberate, see below).
aggregate_file_batch_results(shared/processing/files/time_utils.py:130) reads every field via.get(..., default). Adding fields to the wire dict is silently ignored; missing fields fall back to defaults. New tests assert this tolerance holds for the new wire shape.BatchExecutionResult. Has its own unrelatedFileExecutionResultinworkflow_manager/endpoint_v2/dto.py:123that uses**dataconstruction — untouched.status="completed"/"failed"— lowercase strings matching neitherExecutionStatus(uppercase, workflow-level) norApiDeploymentResultStatus(Success/Failed, the canonical per-file vocabulary). The lowercase strings were a pre-existing ad-hoc third variant. After this PR they emit"Success"/"Failed"— the correct per-file vocabulary.workers/andbackend/forstatus == "completed"/status == "failed"against these chord-callback dicts is clean (matches only unrelated cleanup-result paths).api-deployment/tasks.py:538-542already usesApiDeploymentResultStatus.SUCCESS.value/FAILED.valuewhen building the API response. The lowercase strings were internal chord-callback noise, never the external contract.Database Migrations
None.
Env Config
None.
Relevant Docs
None — dataclass docstrings document the new optional fields inline.
Related Issues or PRs
Dependencies Versions
None.
Notes on Testing
Dev-tested locally against a running worker stack — workflow execution completes end-to-end; callback aggregates correctly; API-path responses unchanged.
Screenshots
N/A (no UI surface).
Checklist
I have read and understood the Contribution Guidelines.