From e3da9d86af78d8ddd21d1c99023676a3edaaa3b3 Mon Sep 17 00:00:00 2001 From: Chandrasekharan M Date: Mon, 25 Aug 2025 13:06:01 +0530 Subject: [PATCH 1/2] UN-2733 Error handling fix for file processing related celery task --- .../workflow_v2/file_execution_tasks.py | 19 +++++++++++++------ .../unstract/core/file_execution_tracker.py | 8 +++++--- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/backend/workflow_manager/workflow_v2/file_execution_tasks.py b/backend/workflow_manager/workflow_v2/file_execution_tasks.py index 463212a7ce..710fa194dc 100644 --- a/backend/workflow_manager/workflow_v2/file_execution_tasks.py +++ b/backend/workflow_manager/workflow_v2/file_execution_tasks.py @@ -557,7 +557,7 @@ def _process_file( result=result, workflow_file_execution=None, error=error_msg, - is_api=destination.is_api, + is_api=destination.is_api if destination else False, destination=destination, ) except Exception as error: @@ -578,7 +578,7 @@ def _process_file( result=result, workflow_file_execution=workflow_file_execution, error=error_msg, - is_api=destination.is_api, + is_api=destination.is_api if destination else False, destination=destination, ) @@ -1136,10 +1136,17 @@ def _build_final_result( ) if destination: - logger.info( - f"Deleting file execution directory for file: '{file_hash.file_name}'" - ) - destination.delete_file_execution_directory() + try: + logger.info( + f"Deleting file execution directory for file: '{file_hash.file_name}'" + ) + destination.delete_file_execution_directory() + except Exception as cleanup_error: + logger.warning( + f"[Execution ID: {workflow_execution.id}, File Execution ID: {workflow_file_execution.id}] " + f"Failed to delete file execution directory for '{file_hash.file_name}': {cleanup_error}. " + f"This is non-critical and needs to be cleaned up later." + ) status = ( FileExecutionStageStatus.SUCCESS diff --git a/unstract/core/src/unstract/core/file_execution_tracker.py b/unstract/core/src/unstract/core/file_execution_tracker.py index e6eb01e430..3f914f11ca 100644 --- a/unstract/core/src/unstract/core/file_execution_tracker.py +++ b/unstract/core/src/unstract/core/file_execution_tracker.py @@ -191,10 +191,12 @@ def get_cache_key(self, execution_id: str, file_execution_id: str) -> str: def set_data(self, data: FileExecutionData, ttl_in_second: int | None = None) -> None: data.validate() key = self.get_cache_key(data.execution_id, data.file_execution_id) - logger.info(f"Setting file execution data for {key}: {data}") + logger.info(f"Setting file execution data for '{key}': {data}") self.redis_client.hset(key, mapping=data.to_serializable()) ttl = ttl_in_second or self.CACHE_TTL_IN_SECOND - logger.info(f"Setting file execution data for {key} to expire in {ttl} seconds") + logger.info( + f"Setting file execution data for '{key}' to expire in '{ttl}' seconds" + ) self.redis_client.expire(key, ttl) def exists(self, execution_id: str, file_execution_id: str) -> bool: @@ -229,7 +231,7 @@ def update_stage_status( execution_id=execution_id, file_execution_id=file_execution_id, ) - logger.info(f"Existing data for {key}: {existing_data}") + logger.info(f"Existing data for '{key}': {existing_data}") if not existing_data: raise FileExecutionTrackerNotFound() From d919d0400d9ecd581d0d8d17e01936ce0fc0b162 Mon Sep 17 00:00:00 2001 From: Chandrasekharan M Date: Mon, 25 Aug 2025 14:29:49 +0530 Subject: [PATCH 2/2] Minor improvement in logging traceback too --- backend/workflow_manager/workflow_v2/file_execution_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/workflow_manager/workflow_v2/file_execution_tasks.py b/backend/workflow_manager/workflow_v2/file_execution_tasks.py index 710fa194dc..36ea1cc005 100644 --- a/backend/workflow_manager/workflow_v2/file_execution_tasks.py +++ b/backend/workflow_manager/workflow_v2/file_execution_tasks.py @@ -1142,7 +1142,7 @@ def _build_final_result( ) destination.delete_file_execution_directory() except Exception as cleanup_error: - logger.warning( + logger.exception( f"[Execution ID: {workflow_execution.id}, File Execution ID: {workflow_file_execution.id}] " f"Failed to delete file execution directory for '{file_hash.file_name}': {cleanup_error}. " f"This is non-critical and needs to be cleaned up later."