Skip to content

Commit a032813

Browse files
UN-2733 [FIX] Allow file processing to continue on cleanup related errors (Zipstack#1503)
* UN-2733 Error handling fix for file processing related celery task * Minor improvement in logging traceback too
1 parent 9b0fbec commit a032813

2 files changed

Lines changed: 18 additions & 9 deletions

File tree

backend/workflow_manager/workflow_v2/file_execution_tasks.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ def _process_file(
557557
result=result,
558558
workflow_file_execution=None,
559559
error=error_msg,
560-
is_api=destination.is_api,
560+
is_api=destination.is_api if destination else False,
561561
destination=destination,
562562
)
563563
except Exception as error:
@@ -578,7 +578,7 @@ def _process_file(
578578
result=result,
579579
workflow_file_execution=workflow_file_execution,
580580
error=error_msg,
581-
is_api=destination.is_api,
581+
is_api=destination.is_api if destination else False,
582582
destination=destination,
583583
)
584584

@@ -1136,10 +1136,17 @@ def _build_final_result(
11361136
)
11371137

11381138
if destination:
1139-
logger.info(
1140-
f"Deleting file execution directory for file: '{file_hash.file_name}'"
1141-
)
1142-
destination.delete_file_execution_directory()
1139+
try:
1140+
logger.info(
1141+
f"Deleting file execution directory for file: '{file_hash.file_name}'"
1142+
)
1143+
destination.delete_file_execution_directory()
1144+
except Exception as cleanup_error:
1145+
logger.exception(
1146+
f"[Execution ID: {workflow_execution.id}, File Execution ID: {workflow_file_execution.id}] "
1147+
f"Failed to delete file execution directory for '{file_hash.file_name}': {cleanup_error}. "
1148+
f"This is non-critical and needs to be cleaned up later."
1149+
)
11431150

11441151
status = (
11451152
FileExecutionStageStatus.SUCCESS

unstract/core/src/unstract/core/file_execution_tracker.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,12 @@ def get_cache_key(self, execution_id: str, file_execution_id: str) -> str:
191191
def set_data(self, data: FileExecutionData, ttl_in_second: int | None = None) -> None:
192192
data.validate()
193193
key = self.get_cache_key(data.execution_id, data.file_execution_id)
194-
logger.info(f"Setting file execution data for {key}: {data}")
194+
logger.info(f"Setting file execution data for '{key}': {data}")
195195
self.redis_client.hset(key, mapping=data.to_serializable())
196196
ttl = ttl_in_second or self.CACHE_TTL_IN_SECOND
197-
logger.info(f"Setting file execution data for {key} to expire in {ttl} seconds")
197+
logger.info(
198+
f"Setting file execution data for '{key}' to expire in '{ttl}' seconds"
199+
)
198200
self.redis_client.expire(key, ttl)
199201

200202
def exists(self, execution_id: str, file_execution_id: str) -> bool:
@@ -229,7 +231,7 @@ def update_stage_status(
229231
execution_id=execution_id,
230232
file_execution_id=file_execution_id,
231233
)
232-
logger.info(f"Existing data for {key}: {existing_data}")
234+
logger.info(f"Existing data for '{key}': {existing_data}")
233235
if not existing_data:
234236
raise FileExecutionTrackerNotFound()
235237

0 commit comments

Comments
 (0)