Skip to content

Commit 2459e87

Browse files
Merge branch 'main' into misc/UN-3396-MISC_design_rules_prototype
2 parents 66ab764 + 8dec64e commit 2459e87

File tree

7 files changed

+184
-11
lines changed

7 files changed

+184
-11
lines changed

backend/prompt_studio/prompt_studio_core_v2/internal_urls.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010
path("output/", internal_views.prompt_output, name="prompt-output"),
1111
path("index/", internal_views.index_update, name="index-update"),
1212
path("indexing-status/", internal_views.indexing_status, name="indexing-status"),
13+
path(
14+
"extraction-status/",
15+
internal_views.extraction_status,
16+
name="extraction-status",
17+
),
1318
path(
1419
"profile/<str:profile_id>/",
1520
internal_views.profile_detail,

backend/prompt_studio/prompt_studio_core_v2/internal_views.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,73 @@ def index_update(request):
154154
)
155155

156156

157+
@csrf_exempt
158+
@require_http_methods(["POST"])
159+
def extraction_status(request):
160+
"""Mark IndexManager.extraction_status for a document+profile pair.
161+
162+
Called by the ide_callback worker after a successful ide_index run so
163+
that subsequent Answer Prompt dispatches can short-circuit extraction
164+
via PromptStudioIndexHelper.check_extraction_status.
165+
166+
Expected JSON payload:
167+
{
168+
"document_id": str,
169+
"profile_manager_id": str,
170+
"x2text_config_hash": str,
171+
"enable_highlight": bool,
172+
"extracted": bool (optional, default true),
173+
"error_message": str | null (optional)
174+
}
175+
"""
176+
data, err = _parse_json_body(request)
177+
if err:
178+
return err
179+
180+
document_id = data.get("document_id", "")
181+
profile_manager_id = data.get("profile_manager_id", "")
182+
x2text_config_hash = data.get("x2text_config_hash", "")
183+
enable_highlight = data.get("enable_highlight", False)
184+
extracted = data.get("extracted", True)
185+
error_message = data.get("error_message")
186+
187+
if not document_id or not profile_manager_id or not x2text_config_hash:
188+
return JsonResponse(
189+
{
190+
"success": False,
191+
"error": (
192+
"document_id, profile_manager_id, and x2text_config_hash "
193+
"are required"
194+
),
195+
},
196+
status=status.HTTP_400_BAD_REQUEST,
197+
)
198+
199+
try:
200+
from prompt_studio.prompt_profile_manager_v2.models import ProfileManager
201+
from prompt_studio.prompt_studio_index_manager_v2.prompt_studio_index_helper import (
202+
PromptStudioIndexHelper,
203+
)
204+
205+
profile_manager = ProfileManager.objects.get(pk=profile_manager_id)
206+
success = PromptStudioIndexHelper.mark_extraction_status(
207+
document_id=document_id,
208+
profile_manager=profile_manager,
209+
x2text_config_hash=x2text_config_hash,
210+
enable_highlight=enable_highlight,
211+
extracted=extracted,
212+
error_message=error_message,
213+
)
214+
return JsonResponse({"success": success})
215+
216+
except Exception as e:
217+
logger.exception("extraction_status internal API failed")
218+
return JsonResponse(
219+
{"success": False, "error": str(e)},
220+
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
221+
)
222+
223+
157224
@csrf_exempt
158225
@require_http_methods(["POST"])
159226
def indexing_status(request):

backend/prompt_studio/prompt_studio_index_manager_v2/prompt_studio_index_helper.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,6 @@ def mark_extraction_status(
9797
with transaction.atomic():
9898
document = DocumentManager.objects.get(pk=document_id)
9999

100-
args = {
101-
"document_manager": document,
102-
"profile_manager": profile_manager,
103-
}
104-
105100
# Build extraction status data
106101
status_data = {
107102
"extracted": extracted,
@@ -112,13 +107,23 @@ def mark_extraction_status(
112107
if not extracted and error_message:
113108
status_data["error"] = error_message
114109

115-
defaults = {"extraction_status": {x2text_config_hash: status_data}}
116-
117-
index_manager, created = IndexManager.objects.update_or_create(
118-
**args,
119-
defaults=defaults,
110+
# Lock the row (or create an empty one) so concurrent callers
111+
# merge into the same dict rather than clobbering each other.
112+
index_manager, created = (
113+
IndexManager.objects.select_for_update().get_or_create(
114+
document_manager=document,
115+
profile_manager=profile_manager,
116+
defaults={"extraction_status": {}},
117+
)
120118
)
121119

120+
# Merge in place — update_or_create(defaults=...) would replace
121+
# the whole dict and wipe any prior hash entries.
122+
extraction_status = dict(index_manager.extraction_status or {})
123+
extraction_status[x2text_config_hash] = status_data
124+
index_manager.extraction_status = extraction_status
125+
index_manager.save(update_fields=["extraction_status"])
126+
122127
logger.info(
123128
f"Index manager {index_manager} {index_manager.index_ids_history}"
124129
)

workers/executor/executors/legacy_executor.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1026,9 +1026,20 @@ def _handle_index(self, context: ExecutionContext) -> ExecutionResult:
10261026
doc_id_found,
10271027
reindex,
10281028
)
1029+
if doc_id_found and not reindex:
1030+
shim.stream_log(
1031+
"Document already indexed in vector store; skipping re-index."
1032+
)
1033+
logger.info(
1034+
"Skipping re-index: doc_id=%s already in vector DB and "
1035+
"reindex=False",
1036+
doc_id,
1037+
)
1038+
return ExecutionResult(success=True, data={IKeys.DOC_ID: doc_id})
1039+
10291040
if doc_id_found and reindex:
10301041
shim.stream_log("Document already indexed, re-indexing...")
1031-
elif not doc_id_found:
1042+
else:
10321043
shim.stream_log("Indexing document for the first time...")
10331044
shim.stream_log("Indexing document into vector store...")
10341045
index.perform_indexing(

workers/ide_callback/tasks.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,31 @@ def ide_index_complete(
211211
profile_manager_id,
212212
)
213213

214+
# Mark extraction_status so subsequent Answer Prompt dispatches
215+
# can short-circuit re-extraction. The Phase 4 backend payload
216+
# already stashes x2text_config_hash and enable_highlight in
217+
# cb_kwargs for exactly this purpose. Failure here is non-fatal:
218+
# primary indexing already succeeded above.
219+
x2text_config_hash = cb.get("x2text_config_hash", "")
220+
enable_highlight = cb.get("enable_highlight", False)
221+
if x2text_config_hash and profile_manager_id:
222+
try:
223+
api.mark_extraction_status(
224+
document_id=document_id,
225+
profile_manager_id=profile_manager_id,
226+
x2text_config_hash=x2text_config_hash,
227+
enable_highlight=enable_highlight,
228+
organization_id=org_id,
229+
)
230+
except Exception:
231+
logger.warning(
232+
"Failed to mark extraction_status for document %s "
233+
"profile %s; primary indexing succeeded.",
234+
document_id,
235+
profile_manager_id,
236+
exc_info=True,
237+
)
238+
214239
# Handle summary index tracking via backend endpoint
215240
# (requires PromptIdeBaseTool + IndexingUtils which need Django ORM)
216241
summary_profile_id = cb.get("summary_profile_id", "")

workers/shared/clients/prompt_studio_client.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
_OUTPUT_ENDPOINT = "v1/prompt-studio/output/"
1616
_INDEX_ENDPOINT = "v1/prompt-studio/index/"
1717
_INDEXING_STATUS_ENDPOINT = "v1/prompt-studio/indexing-status/"
18+
_EXTRACTION_STATUS_ENDPOINT = "v1/prompt-studio/extraction-status/"
1819
_PROFILE_ENDPOINT = "v1/prompt-studio/profile/{profile_id}/"
1920
_HUBSPOT_ENDPOINT = "v1/prompt-studio/hubspot-notify/"
2021
_SUMMARY_INDEX_KEY_ENDPOINT = "v1/prompt-studio/summary-index-key/"
@@ -71,6 +72,33 @@ def update_index_manager(
7172
}
7273
return self.post(_INDEX_ENDPOINT, data=payload, organization_id=organization_id)
7374

75+
def mark_extraction_status(
76+
self,
77+
document_id: str,
78+
profile_manager_id: str,
79+
x2text_config_hash: str,
80+
enable_highlight: bool,
81+
organization_id: str | None = None,
82+
extracted: bool = True,
83+
error_message: str | None = None,
84+
) -> dict[str, Any]:
85+
"""Mark IndexManager.extraction_status for a document+profile pair.
86+
87+
Called from the ide_index_complete callback so that subsequent
88+
Answer Prompt dispatches can short-circuit re-extraction.
89+
"""
90+
payload = {
91+
"document_id": document_id,
92+
"profile_manager_id": profile_manager_id,
93+
"x2text_config_hash": x2text_config_hash,
94+
"enable_highlight": enable_highlight,
95+
"extracted": extracted,
96+
"error_message": error_message,
97+
}
98+
return self.post(
99+
_EXTRACTION_STATUS_ENDPOINT, data=payload, organization_id=organization_id
100+
)
101+
74102
def mark_document_indexed(
75103
self,
76104
org_id: str,

workers/tests/test_legacy_executor_index.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,38 @@ def test_reindex_passed_through(self, mock_get_fs, mock_indexing_deps):
220220
assert result.success is True
221221
init_call = mock_index_cls.call_args
222222
assert init_call.kwargs["processing_options"].reindex is True
223+
# reindex=True with already-indexed doc must still call perform_indexing
224+
mock_index_cls.return_value.perform_indexing.assert_called_once()
225+
226+
@patch(_PATCH_FS)
227+
def test_already_indexed_no_reindex_short_circuits(
228+
self, mock_get_fs, mock_indexing_deps
229+
):
230+
"""doc_id already in VDB and reindex=False → skip perform_indexing.
231+
232+
This is the defense-in-depth guard introduced for the IDE
233+
re-indexing fix: even if the Redis cache misses and Answer Prompt
234+
re-dispatches index, the executor must not re-write the same chunks
235+
into the vector store.
236+
"""
237+
mock_index_cls, mock_emb_cls, mock_vdb_cls = mock_indexing_deps
238+
_register_legacy()
239+
executor = ExecutorRegistry.get("legacy")
240+
241+
mock_index = _setup_mock_index(mock_index_cls, "doc-already-indexed")
242+
mock_index.is_document_indexed.return_value = True
243+
mock_emb_cls.return_value = MagicMock()
244+
mock_vdb_cls.return_value = MagicMock()
245+
mock_get_fs.return_value = MagicMock()
246+
247+
# reindex defaults to False
248+
ctx = _make_index_context()
249+
result = executor.execute(ctx)
250+
251+
assert result.success is True
252+
assert result.data[IKeys.DOC_ID] == "doc-already-indexed"
253+
mock_index.is_document_indexed.assert_called_once()
254+
mock_index.perform_indexing.assert_not_called()
223255

224256

225257
# --- 5. VectorDB.close() always called ---

0 commit comments

Comments
 (0)