UN-2733 [FIX] Allow file processing to continue on cleanup related errors#1503
Conversation
Summary by CodeRabbit
WalkthroughAdds defensive checks to avoid accessing Changes
Sequence Diagram(s)sequenceDiagram
participant W as WorkflowManager
participant P as _process_file
participant R as _build_final_result
participant FS as FileSystem
W->>P: Start file execution
alt processing succeeds
P->>R: build final result (uses destination.is_api)
R->>FS: delete execution directory
alt deletion error (new handling)
FS--xR: error
R->>R: log non-fatal warning (Execution ID, File Execution ID)
else deletion succeeds
FS-->>R: ok
end
R-->>P: return final result
else exception during processing
P->>R: build final result (use destination.is_api if destination else false)
note right of P: access guarded when destination is None
R-->>P: return final result
end
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro Cache: Disabled due to Reviews > Disable Cache setting Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
backend/workflow_manager/workflow_v2/file_execution_tasks.py (3)
554-563: Fix UnboundLocalError risk: 'destination' may be undefined in except pathsIn both except blocks you reference and pass
destinationeven when it might never have been assigned (e.g., failures beforeDestinationConnector.from_config). Usingdestination.is_api if destination else Falsedoes not prevent an UnboundLocalError ifdestinationis not defined at all.Minimal, safe fix: initialize
destinationtoNonebefore thetry:.@@ def _process_file(...): - """ - try: + """ + destination: DestinationConnector | None = None + try: logger.info(Optionally, mirror this for
sourcefor symmetry, though it’s not used in the except blocks.I can open a quick follow-up commit if you’d like.
Also applies to: 575-583
1121-1129: _build_final_result crashes when workflow_file_execution is None
workflow_file_executionis annotated as optional but used unconditionally:
file_execution_id=str(workflow_file_execution.id)will raise ifworkflow_file_executionis None (this happens in the ExecutionContextInitializationError path where you explicitly pass None).- Similar direct
.idusages occur later when updating trackers and logs.This breaks the PR’s goal to “allow processing to continue on cleanup related errors.”
Harden this function to tolerate
workflow_file_execution=None:@@ def _build_final_result(...): - final_result = FileExecutionResult( - file=file_hash.file_name, - file_execution_id=str(workflow_file_execution.id), - error=error, - result=result.output, - metadata=result.metadata, - ) + file_exec_id = str(workflow_file_execution.id) if workflow_file_execution else "" + final_result = FileExecutionResult( + file=file_hash.file_name, + file_execution_id=file_exec_id, + error=error, + result=result.output, + metadata=result.metadata, + )And guard tracker/log updates below (see separate comment). I can push a patch if preferred.
1156-1169: Skip tracker updates when there is no WorkflowFileExecutionIf
workflow_file_executionis None, the following log and tracker operations will raise. Guard them and log a concise note.- logger.info( - f"Marking file execution tracker completed status for execution_id: {workflow_execution.id}, file_execution_id: {workflow_file_execution.id}, stage: {FileExecutionStage.COMPLETED.value}, status: {status.value}, error: {error}" - ) - cls._update_file_execution_tracker( - execution_id=str(workflow_execution.id), - file_execution_id=str(workflow_file_execution.id), - stage=FileExecutionStage.COMPLETED, - status=status, - error=error, - ) - cls.delete_tool_execution_tracker( - execution_id=str(workflow_execution.id), - file_execution_id=str(workflow_file_execution.id), - ) + if workflow_file_execution: + file_exec_id = str(workflow_file_execution.id) + logger.info( + "Marking file execution tracker completed status for execution_id: %s, file_execution_id: %s, stage: %s, status: %s, error: %s", + workflow_execution.id, + file_exec_id, + FileExecutionStage.COMPLETED.value, + status.value, + bool(error), + ) + cls._update_file_execution_tracker( + execution_id=str(workflow_execution.id), + file_execution_id=file_exec_id, + stage=FileExecutionStage.COMPLETED, + status=status, + error=error, + ) + cls.delete_tool_execution_tracker( + execution_id=str(workflow_execution.id), + file_execution_id=file_exec_id, + ) + else: + logger.info( + "Skipping tracker cleanup: workflow_file_execution is unavailable (likely initialization failure)." + )
🧹 Nitpick comments (4)
unstract/core/src/unstract/core/file_execution_tracker.py (2)
191-201: Use Redis pipeline for atomic HSET+EXPIRE and reduce INFO-level verbosity
- Calling HSET and EXPIRE separately can leave keys without TTL if the process dies between calls. A pipeline ensures both operations are applied together.
- Logging the full FileExecutionData at INFO may be noisy and can include identifiers that we may not want in default logs. Consider DEBUG for the full object and INFO for key summaries.
Apply:
- logger.info(f"Setting file execution data for '{key}': {data}") - self.redis_client.hset(key, mapping=data.to_serializable()) - ttl = ttl_in_second or self.CACHE_TTL_IN_SECOND - logger.info( - f"Setting file execution data for '{key}' to expire in '{ttl}' seconds" - ) - self.redis_client.expire(key, ttl) + logger.debug(f"Setting file execution data for '{key}': {data}") + ttl = ttl_in_second or self.CACHE_TTL_IN_SECOND + logger.info(f"Setting file execution data for '{key}' to expire in '{ttl}' seconds") + with self.redis_client.pipeline() as pipe: + pipe.hset(key, mapping=data.to_serializable()) + pipe.expire(key, ttl) + pipe.execute()
230-235: Downgrade "Existing data" log to DEBUG or summarize to avoid log bloat
existing_datacan be large (status_history, serialized file_hash). Emitting the entire object at INFO can flood logs. Prefer DEBUG or log a concise summary (stage, status, ids).Example:
- logger.info(f"Existing data for '{key}': {existing_data}") + logger.debug( + "Existing data for '%s': stage=%s status=%s error=%s", + key, + existing_data.stage_status.stage.value, + existing_data.stage_status.status.value, + bool(existing_data.stage_status.error), + )backend/workflow_manager/workflow_v2/file_execution_tasks.py (2)
1139-1149: Keep stack trace for cleanup failures and avoid None deref in warningGreat call to make cleanup non-fatal. Two small tweaks improve debuggability and safety:
- Include
exc_info=Trueso the suppressed failure has a stack trace in logs.- Avoid
workflow_file_execution.idwhen it’s None.- if destination: - try: - logger.info( - f"Deleting file execution directory for file: '{file_hash.file_name}'" - ) - destination.delete_file_execution_directory() - except Exception as cleanup_error: - logger.warning( - f"[Execution ID: {workflow_execution.id}, File Execution ID: {workflow_file_execution.id}] " - f"Failed to delete file execution directory for '{file_hash.file_name}': {cleanup_error}. " - f"This is non-critical and needs to be cleaned up later." - ) + if destination: + try: + logger.info("Deleting file execution directory for file: '%s'", file_hash.file_name) + destination.delete_file_execution_directory() + except Exception as cleanup_error: + safe_file_exec_id = str(getattr(workflow_file_execution, "id", "")) + logger.warning( + "[Execution ID: %s, File Execution ID: %s] Failed to delete file execution directory for '%s': %s. " + "This is non-critical and needs to be cleaned up later.", + workflow_execution.id, + safe_file_exec_id, + file_hash.file_name, + cleanup_error, + exc_info=True, + )
356-359: Align batch-level directory cleanup with new non-fatal cleanup policyYou suppress cleanup errors at per-file finalization but not here. A failure in
delete_execution_and_api_storage_dirwill currently bubble up from the callback.- DestinationConnector.delete_execution_and_api_storage_dir( - workflow_id=workflow.id, execution_id=execution_id - ) + try: + DestinationConnector.delete_execution_and_api_storage_dir( + workflow_id=workflow.id, execution_id=execution_id + ) + except Exception as e: + logger.warning( + "Non-fatal: failed to delete execution/api storage dirs for execution_id=%s: %s", + execution_id, + e, + exc_info=True, + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/workflow_manager/workflow_v2/file_execution_tasks.py(3 hunks)unstract/core/src/unstract/core/file_execution_tracker.py(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (1)
backend/workflow_manager/workflow_v2/file_execution_tasks.py (1)
990-1013: All good:file_path/file_historyparameters are optional and safely handledI verified that:
FileHistoryHelper.get_file_historyis invoked elsewhere without afile_pathargument (e.g. inendpoint_v2/source.py, lines 670–672), implying it has an optionalfile_pathparameter and toleratesNone.Destination.get_metadata(self, file_history: FileHistory | None = None)explicitly defaultsfile_historytoNoneand returnsdict | None, so passingNoneis safe .No further guards are needed; both downstream paths already handle
Noneas intended.
ritwik-g
left a comment
There was a problem hiding this comment.
@chandrasekharan-zipstack I have 2 minor comments. Other than that it looks good
gaya3-zipstack
left a comment
There was a problem hiding this comment.
@chandrasekharan-zipstack I understand that this PR is handling exceptions while unable to access Minio for cleanup. What is our error handling whan for example minio becomes inaccessible during other stages? Like trying to access a file for reading/writing? In such cases also, will we run into a similar state where the file with the error is in PENDING state forever and other files need another round to be picked up?
Yes, this should handle all such cases too as long as it happens within a file's execution. Basically the exception handler will catch it and mark the status appropriately. This particular issue was because the exception handling code, in turn raised an exception which will no longer be the case |
|
|



What
Why
How
Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
Related Issues or PRs
Notes on Testing
Checklist
I have read and understood the Contribution Guidelines.