Skip to content

Commit 7983c98

Browse files
harini-venkataramanDeepak-Kesavanpre-commit-ci[bot]clauderitwik-g
authored
[FIX] Executor Queue for Agentic extraction (#1893)
* Execution backend - revamp * async flow * Streaming progress to FE * Removing multi hop in Prompt studio ide and structure tool * UN-3234 [FIX] Add beta tag to agentic prompt studio navigation item * Added executors for agentic prompt studio * Added executors for agentic prompt studio * Removed redundant envs * Removed redundant envs * Removed redundant envs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Removed redundant envs * Removed redundant envs * Removed redundant envs * Removed redundant envs * Removed redundant envs * Removed redundant envs * Removed redundant envs * Removed redundant envs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Removed redundant envs * adding worker for callbacks * adding worker for callbacks * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * adding worker for callbacks * adding worker for callbacks * adding worker for callbacks * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Pluggable apps and plugins to fit the new async prompt execution architecture * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Pluggable apps and plugins to fit the new async prompt execution architecture * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Pluggable apps and plugins to fit the new async prompt execution architecture * adding worker for callbacks * adding worker for callbacks * adding worker for callbacks * adding worker for callbacks * adding worker for callbacks * adding worker for callbacks * adding worker for callbacks * adding worker for callbacks * fix: write output files in agentic extraction pipeline Agentic extraction returned early without writing INFILE (JSON) or METADATA.json, causing destination connectors to read the original PDF and fail with "Expected tool output type: TXT, got: application/pdf". Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * UN-3266 fix: replace hardcoded /tmp paths with secure temp dirs in tests (#1850) * UN-3266 fix: replace hardcoded /tmp paths with secure temp dirs in tests Replace hardcoded /tmp/ paths (SonarCloud S5443 security hotspots) with pytest's tmp_path fixture or module-level tempfile.mkdtemp() constants in all affected test files to avoid world-writable directory vulnerabilities. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update docs * UN-3266 fix: remove dead code with undefined names in fetch_response Remove unreachable code block after the async callback return in fetch_response that still referenced output_count_before and response from the old synchronous implementation, causing ruff F821 errors. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Un 3266 fix security hotspot tmp paths (#1851) * UN-3266 fix: replace hardcoded /tmp paths with secure temp dirs in tests Replace hardcoded /tmp/ paths (SonarCloud S5443 security hotspots) with pytest's tmp_path fixture or module-level tempfile.mkdtemp() constants in all affected test files to avoid world-writable directory vulnerabilities. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * UN-3266 fix: resolve ruff linting failures across multiple files - B026: pass url positionally in worker_celery.py to avoid star-arg after keyword - N803: rename MockAsyncResult to mock_async_result in test_tasks.py - E501/I001: fix long line and import sort in llm_whisperer helper - ANN401: replace Any with object|None in dispatcher.py; add noqa in test helpers - F841: remove unused workflow_id and result assignments Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> * UN-3266 fix: resolve SonarCloud bugs S2259 and S1244 in PR #1849 - S2259: guard against None after _discover_plugins() in loader.py to satisfy static analysis on the dict[str,type]|None field type - S1244: replace float equality checks with pytest.approx() in test_answer_prompt.py and test_phase2h.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * UN-3266 fix: resolve SonarCloud code smells in PR #1849 - S5799: Merge all implicit string concatenations in log messages (legacy_executor.py, tasks.py, dispatcher.py, orchestrator.py, registry.py, variable_replacement.py, structure_tool_task.py) - S1192: Extract duplicate literal to _NO_CELERY_APP_MSG constant in dispatcher.py - S1871: Merge identical elif/else branches in tasks.py and test_sanity_phase6j.py - S1186: Add comment to empty stub method in test_sanity_phase6a.py - S1481: Remove unused local variables in test_sanity_phase6d/e/f/g/h/j and test_phase5d.py - S117: Rename PascalCase local variables to snake_case in test_sanity_phase3/5/6i.py - S5655: Broaden tool type annotation to StreamMixin in IndexingUtils.generate_index_key and PlatformHelper.get_adapter_config - docker:S7031: Merge consecutive RUN instructions in worker-unified.Dockerfile - javascript:S1128: Remove unused pollForCompletion import in usePromptRun.js Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * UN-3266 fix: wrap long log message in dispatcher.py to fix E501 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * UN-3266 fix: resolve remaining SonarCloud S117 naming violations Rename PascalCase local variables to snake_case to comply with S117: - legacy_executor.py: rename tuple-unpacked _get_prompt_deps() results (AnswerPromptService→answer_prompt_svc, RetrievalService→retrieval_svc, VariableReplacementService→variable_replacement_svc, LLM→llm_cls, EmbeddingCompat→embedding_compat_cls, VectorDB→vector_db_cls) and update all downstream usages including _apply_type_conversion and _handle_summarize - test_phase1_log_streaming.py: rename Mock* local variables to mock_* snake_case equivalents - test_sanity_phase3.py: rename MockDispatcher→mock_dispatcher_cls and MockShim→mock_shim_cls across all 10 test methods - test_sanity_phase5.py: rename MockShim→mock_shim, MockX2Text→mock_x2text in 6 test methods; MockDispatcher→mock_dispatcher_cls in dispatch test; fix LLM_cls→llm_cls, EmbeddingCompat→embedding_compat_cls, VectorDB→vector_db_cls in _mock_prompt_deps helper Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * UN-3266 fix: resolve remaining SonarCloud code smells in PR #1849 - test_sanity_phase2/4.py, test_answer_prompt.py: rename PascalCase local variables in _mock_prompt_deps/_mock_deps to snake_case (RetrievalService→retrieval_svc, VariableReplacementService→ variable_replacement_svc, Index→index_cls, LLM_cls→llm_cls, EmbeddingCompat→embedding_compat_cls, VectorDB→vector_db_cls, AnswerPromptService→answer_prompt_svc_cls) — fixes S117 - test_sanity_phase3.py: remove unused local variable "result" — fixes S1481 - structure_tool_task.py: remove redundant json.JSONDecodeError from except clause (subclass of ValueError) — fixes S5713 - shared/workflow/execution/service.py: replace generic Exception with RuntimeError for structure tool failure — fixes S112 - run-worker-docker.sh: define EXECUTOR_WORKER_TYPE constant and replace 10 literal "executor" occurrences — fixes S1192 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * UN-3266 fix: resolve SonarCloud cognitive complexity and code smell violations - Reduce cognitive complexity in answer_prompt.py: - Extract _build_grammar_notes, _run_webhook_postprocess helpers - _is_safe_public_url: extracted _resolve_host_addresses helper - handle_json: early-return pattern eliminates nesting - construct_prompt: delegates grammar loop to _build_grammar_notes - Reduce cognitive complexity in legacy_executor.py: - Extract _execute_single_prompt, _run_table_extraction helpers - Extract _run_challenge_if_enabled, _run_evaluation_if_enabled - Extract _inject_table_settings, _finalize_pipeline_result - Extract _convert_number_answer, _convert_scalar_answer - Extract _sanitize_dict_values helper - _handle_answer_prompt CC reduced from 50 to ~7 - Reduce CC in structure_tool_task.py: guard-clause refactor - Reduce CC in backend: dto.py, deployment_helper.py, api_deployment_views.py, prompt_studio_helper.py - Fix S117: rename PascalCase local vars in test_answer_prompt.py - Fix S1192: extract EXECUTOR_WORKER_TYPE constant in run-worker.sh - Fix S1172: remove unused params from structure_tool_task.py - Fix S5713: remove redundant JSONDecodeError in json_repair_helper.py - Fix S112/S5727 in test_execution.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * UN-3266 fix: remove unused RetrievalStrategy import from _handle_answer_prompt Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * UN-3266 fix: rename UsageHelper params to lowercase (N803) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * UN-3266 fix: resolve remaining SonarCloud issues from check run 66691002192 - Add @staticmethod to _sanitize_null_values (fixes S2325 missing self) - Reduce _execute_single_prompt params from 25 to 11 (S107) by grouping services as deps tuple and extracting exec params from context.executor_params - Add NOSONAR suppression for raise exc in test helper (S112) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * UN-3266 fix: remove unused locals in _handle_answer_prompt (F841) execution_id, file_hash, log_events_id, custom_data are now extracted inside _execute_single_prompt from context.executor_params. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: resolve Biome linting errors in frontend source files Auto-fixed 48 lint errors across 56 files: import ordering, block statements, unused variable prefixing, and formatting issues. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: replace dynamic import of SharePermission with static import in Workflows Resolves vite build warning about SharePermission.jsx being both dynamically and statically imported across the codebase. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: resolve SonarCloud warnings in frontend components - Remove unnecessary try-catch around PostHog event calls - Flip negated condition in PromptOutput.handleTable for clarity Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Address PR #1849 review comments: fix null guards, dead code, and test drift - Remove redundant inline `import uuid as _uuid` in views.py (use module-level uuid) - URL-encode DB_USER in worker_celery.py result backend connection string - Remove misleading task_queues=[Queue("executor")] from dispatch-only Celery app - Remove dead `if not tool:` guards after objects.get() (already raises DoesNotExist) - Move profile_manager/default_profile null checks before first dereference - Reorder ProfileManager.objects.get before mark_document_indexed in tasks.py - Handle ProfileManager.DoesNotExist as warning, not hard failure - Wrap PostHog analytics in try/catch so failures don't block prompt execution - Handle pending-indexing 200 response in usePromptRun.js (clear RUNNING status) - Reset formData when metadata is missing in ConfigureDs.jsx - Fix test_should_skip_extraction tests: function now takes 1 arg (outputs only) - Fix agentic routing tests: mock X2Text.process, remove stale platform_helper kwarg Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix missing llm_usage_reason for summarize LLM usage tracking Add PSKeys.LLM_USAGE_REASON to usage_kwargs in _handle_summarize() so summarization costs appear under summarize_llm in API response metadata. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * UN-3266 [FIX] Fix single-pass extraction routing in LegacyExecutor - Route _handle_structure_pipeline to _handle_single_pass_extraction when is_single_pass=True (was always calling _handle_answer_prompt) - Delegate _handle_single_pass_extraction to cloud plugin via ExecutorRegistry, falling back to _handle_answer_prompt if plugin not installed Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fixing API depployment response mismatches * Fix single-pass extraction showing only one prompt result in real-time - Fix accumulation bug in usePromptOutput updateCoverage() where each loop iteration spread the original promptOutputs instead of the accumulating updatedPromptOutputs, causing only the last prompt to render - Improve index success toast to show document name - Strip adapter names from index key configs for consistent hashing - Update sdk1 uv.lock Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Move summarize from sync Django plugin to executor worker for IDE index When "Summarize" is enabled and a user indexes a new document in Prompt Studio, the backend returned a 500 because the sync Django plugin tried to read the extracted .txt file before extraction happened in the worker. Fix: defer summarization to the executor worker's _handle_ide_index (extract → summarize → index), build summarize_params in the payload, and track the summarize index in the ide_index_complete callback. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Address PR #1849 review comments: null guards, thread safety - Fix null guard ordering in build_index_payload: add DefaultProfileError check before calling validators on default_profile - Fix null guard ordering in _fetch_single_pass_response: move check before .llm.id access and validators (was dead code after dereference) - Add threading.Lock to worker_celery singleton to prevent race under concurrent gunicorn threads Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Add documentation to ExecutionResponse DTO describing result structure Documents the ExecutionResponse dataclass fields, especially the result attribute's per-file dict structure (output, metadata, metrics, error keys) as requested in PR review. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix PR review issues: IDOR, null guards, rollback, spinner, summarize, prompt lookup - Strip result payload from task_status to prevent IDOR data leak - Move null guard before validators in build_fetch_response_payload - Roll back indexing flag on broker failure in index_document - Use explicit IDs for spinner clearing instead of ORM serialization format - Catch broad exceptions in summarize tracking to prevent false failures - Guard prompt_id lookup in fetch_response with 400/404 responses Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix CI, tests, and add async prompt studio improvements - Fix biome import ordering in frontend test files - Fix 22 stale backend test assertions in test_tasks.py (patch targets, return values, view dispatch pattern, remove TestCeleryConfig) - Add ide_prompt_complete callback tests - Add frontend vitest config and regression tests for null guards, stale closures, and single-pass loading guard - Add LLMCompat llama-index compatibility wrapper in SDK1 - Add litellm cohere embed timeout monkey-patch (v1.82.3) - Improve S3 filesystem helper (region_name, empty credential handling) - Add RetrieverLLM and lazy LLM creation for retrievers - Improve worker cleanup: api_client.close() in finally blocks, early return on setup failure in API deployment tasks - Add workers conftest.py for test environment setup Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix pre-existing biome CI errors: import ordering and formatting Auto-fix 18 pre-existing organizeImports and formatting errors across 17 frontend files that were blocking biome ci on the entire codebase. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix ruff F821: add missing transaction import in prompt_studio_helper Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Add input validation guards to bulk_fetch_response endpoint Validate prompt_ids (non-empty), document_id (required), and handle DoesNotExist for both prompts and document to return proper 400/404 instead of dispatching no-op tasks or raising unhandled 500 errors. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * IDE Call backs * Sonar issues fix * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix ruff errors: restore summary_profile variable, suppress TC001 in dispatcher Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update bun.lock to match package.json dependency ranges Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix all biome lint warnings: empty blocks, missing braces, forEach returns, unused vars Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Move ExecutionContext import into TYPE_CHECKING block Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix SonarQube issues: duplication, naming, nesting, unused var - Extract _parse_json_body() helper and _ERR_INVALID_JSON constant to deduplicate 5 identical JSON parsing blocks in internal_views.py - Rename `User` local variable to `user_model` (naming convention) - Merge _emit_result/_emit_error into unified _emit_event() in ide_callback tasks to reduce code duplication - Extract _get_task_error() helper to deduplicate AsyncResult error retrieval in ide_index_error and ide_prompt_error - Remove unused `mock_ar_cls` variable in test_ide_callback.py - Add security note documenting why @csrf_exempt is safe on internal endpoints Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Replace worker-ide-callback Dockerfile with worker-unified The IDE callback worker should use the unified worker image (worker-unified.Dockerfile) consistent with all other v2 workers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Add celery_executor_agentic queue to executor worker The executor worker only consumed from celery_executor_legacy, but the agentic prompt studio dispatches tasks to celery_executor_agentic. This caused agentic operations to sit in RabbitMQ with no consumer, resulting in timeouts and stuck-in-progress states. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * FIxing email enforce type * Removing line-item from select choices * Update workers/shared/enums/worker_enums_base.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Signed-off-by: harini-venkataraman <115449948+harini-venkataraman@users.noreply.github.com> * Update backend/workflow_manager/workflow_v2/workflow_helper.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Signed-off-by: harini-venkataraman <115449948+harini-venkataraman@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix false success logs and silent failures in ETL destination pipelines - Widen except clause in structure_tool_task to catch FileOperationError and log write paths for diagnostics - Add diagnostic logging at all silent return-None points in destination connector so missing INFILE/METADATA paths are visible in logs - Raise RuntimeError instead of silently skipping when no tool execution result is available for DB/FS destinations, preventing false success - Remove dead retry config from execute_bin task (max_retries=0) - Fix duplicate EXECUTOR/IDE_CALLBACK enum entries in WorkerType Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Revert ETL destination pipeline changes — deferring to next cut Reverts diagnostic logging and error-raising changes in structure_tool_task.py and destination_connector.py. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Signed-off-by: harini-venkataraman <115449948+harini-venkataraman@users.noreply.github.com> Co-authored-by: Ghost Jake <89829542+Deepak-Kesavan@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: Kirtiman Mishra <110175055+kirtimanmishrazipstack@users.noreply.github.com> Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
1 parent 20b0ad8 commit 7983c98

8 files changed

Lines changed: 29 additions & 28 deletions

File tree

backend/prompt_studio/prompt_studio_core_v2/static/select_choices.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
"date":"date",
1515
"boolean":"boolean",
1616
"json":"json",
17-
"line_item":"line-item",
1817
"table":"table"
1918
},
2019
"output_processing":{

backend/workflow_manager/workflow_v2/workflow_helper.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -598,11 +598,7 @@ def execute_workflow_async(
598598
@staticmethod
599599
@celery_app.task(
600600
name="async_execute_bin",
601-
autoretry_for=(Exception,),
602601
max_retries=0,
603-
retry_backoff=True,
604-
retry_backoff_max=500,
605-
retry_jitter=True,
606602
)
607603
def execute_bin(
608604
schema_name: str,

docker/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ services:
529529
- EXECUTOR_METRICS_PORT=8088
530530
- HEALTH_PORT=8088
531531
# Configurable Celery options
532-
- CELERY_QUEUES_EXECUTOR=${CELERY_QUEUES_EXECUTOR:-celery_executor_legacy}
532+
- CELERY_QUEUES_EXECUTOR=${CELERY_QUEUES_EXECUTOR:-celery_executor_legacy,celery_executor_agentic}
533533
- CELERY_POOL=${WORKER_EXECUTOR_POOL:-prefork}
534534
- CELERY_PREFETCH_MULTIPLIER=${WORKER_EXECUTOR_PREFETCH_MULTIPLIER:-1}
535535
- CELERY_CONCURRENCY=${WORKER_EXECUTOR_CONCURRENCY:-2}

workers/executor/executors/legacy_executor.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,9 +1059,7 @@ def _sanitize_null_values(
10591059
) -> dict[str, Any]:
10601060
"""Replace 'NA' strings with None in structured output."""
10611061
for k, v in structured_output.items():
1062-
if isinstance(v, str) and v.lower() == "na":
1063-
structured_output[k] = None
1064-
elif isinstance(v, list):
1062+
if isinstance(v, list):
10651063
for i, item in enumerate(v):
10661064
if isinstance(item, str) and item.lower() == "na":
10671065
v[i] = None
@@ -1698,16 +1696,19 @@ def _apply_type_conversion(
16981696
)
16991697

17001698
elif output_type == PSKeys.EMAIL:
1701-
email_prompt = (
1702-
f"Extract the email from the following text:\n{answer}"
1703-
f"\n\nOutput just the email. "
1704-
f"The email should be directly assignable to a string "
1705-
f"variable. No explanation is required. If you cannot "
1706-
f'extract the email, output "NA".'
1707-
)
1708-
structured_output[prompt_name] = LegacyExecutor._convert_scalar_answer(
1709-
answer, llm, answer_prompt_svc, email_prompt
1710-
)
1699+
if answer.lower() == "na":
1700+
structured_output[prompt_name] = answer
1701+
else:
1702+
email_prompt = (
1703+
f"Extract the email from the following text:\n{answer}"
1704+
f"\n\nOutput just the email. "
1705+
f"The email should be directly assignable to a string "
1706+
f"variable. No explanation is required. If you cannot "
1707+
f'extract the email, output "NA".'
1708+
)
1709+
structured_output[prompt_name] = answer_prompt_svc.run_completion(
1710+
llm=llm, prompt=email_prompt
1711+
)
17111712

17121713
elif output_type == PSKeys.DATE:
17131714
date_prompt = (

workers/executor/worker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"""
66

77
import logging
8+
import os
89

910
from shared.enums.worker_enums import WorkerType
1011
from shared.infrastructure.config.builder import WorkerBuilder
@@ -43,7 +44,9 @@ def check_executor_health():
4344
"worker_type": "executor",
4445
"registered_executors": executors,
4546
"executor_count": len(executors),
46-
"queues": ["celery_executor_legacy"],
47+
"queues": os.environ.get(
48+
"CELERY_QUEUES_EXECUTOR", "celery_executor_legacy"
49+
).split(","),
4750
},
4851
)
4952

workers/shared/enums/worker_enums_base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ class QueueName(str, Enum):
155155
# The dispatcher derives queue names as ``celery_executor_{executor_name}``.
156156
# The "legacy" executor is the default OSS executor.
157157
EXECUTOR = "celery_executor_legacy"
158+
EXECUTOR_AGENTIC = "celery_executor_agentic"
158159

159160
# IDE callback queue (prompt studio post-execution callbacks)
160161
IDE_CALLBACK = "ide_callback"

workers/shared/infrastructure/config/registry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class WorkerRegistry:
6666
),
6767
WorkerType.EXECUTOR: WorkerQueueConfig(
6868
primary_queue=QueueName.EXECUTOR,
69+
additional_queues=[QueueName.EXECUTOR_AGENTIC],
6970
),
7071
WorkerType.IDE_CALLBACK: WorkerQueueConfig(
7172
primary_queue=QueueName.IDE_CALLBACK,

workers/tests/test_answer_prompt.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -562,8 +562,8 @@ def test_invalid_strategy_skips_retrieval(
562562
)
563563
result = executor._handle_answer_prompt(ctx)
564564

565-
# Answer stays "NA" which gets sanitized to None
566-
assert result.data[PSKeys.OUTPUT]["field_a"] is None
565+
# Answer stays "NA" (top-level NA is preserved, not sanitized)
566+
assert result.data[PSKeys.OUTPUT]["field_a"] == "NA"
567567

568568

569569
class TestHandleAnswerPromptMultiPrompt:
@@ -687,21 +687,21 @@ def test_vectordb_closed(self, mock_shim_cls, mock_deps):
687687
class TestNullSanitization:
688688
"""Tests for _sanitize_null_values."""
689689

690-
def test_na_string_becomes_none(self):
691-
"""Top-level 'NA' string None."""
690+
def test_na_string_preserved(self):
691+
"""Top-level 'NA' string is preserved (not sanitized to None)."""
692692
from executor.executors.legacy_executor import LegacyExecutor
693693

694694
output = {"field": "NA"}
695695
result = LegacyExecutor._sanitize_null_values(output)
696-
assert result["field"] is None
696+
assert result["field"] == "NA"
697697

698-
def test_na_case_insensitive(self):
699-
"""'na' (lowercase) None."""
698+
def test_na_case_insensitive_preserved(self):
699+
"""Top-level 'na' (lowercase) is preserved (not sanitized to None)."""
700700
from executor.executors.legacy_executor import LegacyExecutor
701701

702702
output = {"field": "na"}
703703
result = LegacyExecutor._sanitize_null_values(output)
704-
assert result["field"] is None
704+
assert result["field"] == "na"
705705

706706
def test_nested_list_na(self):
707707
"""NA in nested list items → None."""

0 commit comments

Comments
 (0)