Skip to content

UN-2733 [FIX] Allow file processing to continue on cleanup related errors#1503

Merged
chandrasekharan-zipstack merged 3 commits into
mainfrom
UN-2733-fix-file-processing-err-handling
Aug 25, 2025
Merged

UN-2733 [FIX] Allow file processing to continue on cleanup related errors#1503
chandrasekharan-zipstack merged 3 commits into
mainfrom
UN-2733-fix-file-processing-err-handling

Conversation

@chandrasekharan-zipstack

@chandrasekharan-zipstack chandrasekharan-zipstack commented Aug 25, 2025

Copy link
Copy Markdown
Contributor

What

  • Fixed error handling logic which was meant to gracefully handle errors on file processing
  • Suppressed errors that arise while performing a cleanup

Why

  • Error handling code raised another error

How

  • Caught errors during cleanup

Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)

  • No, only suppressed an error and added logs

Related Issues or PRs

Notes on Testing

  • No explicit testing done since it was a simple logical change

Checklist

I have read and understood the Contribution Guidelines.

@coderabbitai

coderabbitai Bot commented Aug 25, 2025

Copy link
Copy Markdown
Contributor

Summary by CodeRabbit

  • Bug Fixes

    • Improved stability during file processing by safely handling cases where a destination may be unavailable.
    • Cleanup failures now log a non-fatal warning and defer deletion, avoiding interruptions to result delivery.
  • Chores

    • Refined logging messages for consistency and clarity, including quoted keys and values.
  • Reliability

    • Maintains existing result construction and status updates while adding safer cleanup paths without changing user-facing behavior.

Walkthrough

Adds defensive checks to avoid accessing destination.is_api when destination is None and wraps deletion of the file execution directory in a try/except that logs a non-fatal warning on failure; also adjusts logging message formatting for Redis key and TTL in the file execution tracker.

Changes

Cohort / File(s) Summary
Workflow result & cleanup handling
backend/workflow_manager/workflow_v2/file_execution_tasks.py
Guard is_api access when destination may be None in _process_file exception paths; wrap directory deletion in _build_final_result with try/except and log a non-fatal warning if cleanup fails; no public API changes.
Logging format adjustments
unstract/core/src/unstract/core/file_execution_tracker.py
Adjust log messages to quote the Redis key and TTL (one message reformatted to multiple lines); no behavioral or API changes.

Sequence Diagram(s)

sequenceDiagram
  participant W as WorkflowManager
  participant P as _process_file
  participant R as _build_final_result
  participant FS as FileSystem

  W->>P: Start file execution
  alt processing succeeds
    P->>R: build final result (uses destination.is_api)
    R->>FS: delete execution directory
    alt deletion error (new handling)
      FS--xR: error
      R->>R: log non-fatal warning (Execution ID, File Execution ID)
    else deletion succeeds
      FS-->>R: ok
    end
    R-->>P: return final result
  else exception during processing
    P->>R: build final result (use destination.is_api if destination else false)
    note right of P: access guarded when destination is None
    R-->>P: return final result
  end
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.


📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between e3da9d8 and d919d04.

📒 Files selected for processing (1)
  • backend/workflow_manager/workflow_v2/file_execution_tasks.py (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/workflow_manager/workflow_v2/file_execution_tasks.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch UN-2733-fix-file-processing-err-handling

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
backend/workflow_manager/workflow_v2/file_execution_tasks.py (3)

554-563: Fix UnboundLocalError risk: 'destination' may be undefined in except paths

In both except blocks you reference and pass destination even when it might never have been assigned (e.g., failures before DestinationConnector.from_config). Using destination.is_api if destination else False does not prevent an UnboundLocalError if destination is not defined at all.

Minimal, safe fix: initialize destination to None before the try:.

@@ def _process_file(...):
-        """
-        try:
+        """
+        destination: DestinationConnector | None = None
+        try:
             logger.info(

Optionally, mirror this for source for symmetry, though it’s not used in the except blocks.

I can open a quick follow-up commit if you’d like.

Also applies to: 575-583


1121-1129: _build_final_result crashes when workflow_file_execution is None

workflow_file_execution is annotated as optional but used unconditionally:

  • file_execution_id=str(workflow_file_execution.id) will raise if workflow_file_execution is None (this happens in the ExecutionContextInitializationError path where you explicitly pass None).
  • Similar direct .id usages occur later when updating trackers and logs.

This breaks the PR’s goal to “allow processing to continue on cleanup related errors.”

Harden this function to tolerate workflow_file_execution=None:

@@ def _build_final_result(...):
-        final_result = FileExecutionResult(
-            file=file_hash.file_name,
-            file_execution_id=str(workflow_file_execution.id),
-            error=error,
-            result=result.output,
-            metadata=result.metadata,
-        )
+        file_exec_id = str(workflow_file_execution.id) if workflow_file_execution else ""
+        final_result = FileExecutionResult(
+            file=file_hash.file_name,
+            file_execution_id=file_exec_id,
+            error=error,
+            result=result.output,
+            metadata=result.metadata,
+        )

And guard tracker/log updates below (see separate comment). I can push a patch if preferred.


1156-1169: Skip tracker updates when there is no WorkflowFileExecution

If workflow_file_execution is None, the following log and tracker operations will raise. Guard them and log a concise note.

-        logger.info(
-            f"Marking file execution tracker completed status for execution_id: {workflow_execution.id}, file_execution_id: {workflow_file_execution.id}, stage: {FileExecutionStage.COMPLETED.value}, status: {status.value}, error: {error}"
-        )
-        cls._update_file_execution_tracker(
-            execution_id=str(workflow_execution.id),
-            file_execution_id=str(workflow_file_execution.id),
-            stage=FileExecutionStage.COMPLETED,
-            status=status,
-            error=error,
-        )
-        cls.delete_tool_execution_tracker(
-            execution_id=str(workflow_execution.id),
-            file_execution_id=str(workflow_file_execution.id),
-        )
+        if workflow_file_execution:
+            file_exec_id = str(workflow_file_execution.id)
+            logger.info(
+                "Marking file execution tracker completed status for execution_id: %s, file_execution_id: %s, stage: %s, status: %s, error: %s",
+                workflow_execution.id,
+                file_exec_id,
+                FileExecutionStage.COMPLETED.value,
+                status.value,
+                bool(error),
+            )
+            cls._update_file_execution_tracker(
+                execution_id=str(workflow_execution.id),
+                file_execution_id=file_exec_id,
+                stage=FileExecutionStage.COMPLETED,
+                status=status,
+                error=error,
+            )
+            cls.delete_tool_execution_tracker(
+                execution_id=str(workflow_execution.id),
+                file_execution_id=file_exec_id,
+            )
+        else:
+            logger.info(
+                "Skipping tracker cleanup: workflow_file_execution is unavailable (likely initialization failure)."
+            )
🧹 Nitpick comments (4)
unstract/core/src/unstract/core/file_execution_tracker.py (2)

191-201: Use Redis pipeline for atomic HSET+EXPIRE and reduce INFO-level verbosity

  • Calling HSET and EXPIRE separately can leave keys without TTL if the process dies between calls. A pipeline ensures both operations are applied together.
  • Logging the full FileExecutionData at INFO may be noisy and can include identifiers that we may not want in default logs. Consider DEBUG for the full object and INFO for key summaries.

Apply:

-        logger.info(f"Setting file execution data for '{key}': {data}")
-        self.redis_client.hset(key, mapping=data.to_serializable())
-        ttl = ttl_in_second or self.CACHE_TTL_IN_SECOND
-        logger.info(
-            f"Setting file execution data for '{key}' to expire in '{ttl}' seconds"
-        )
-        self.redis_client.expire(key, ttl)
+        logger.debug(f"Setting file execution data for '{key}': {data}")
+        ttl = ttl_in_second or self.CACHE_TTL_IN_SECOND
+        logger.info(f"Setting file execution data for '{key}' to expire in '{ttl}' seconds")
+        with self.redis_client.pipeline() as pipe:
+            pipe.hset(key, mapping=data.to_serializable())
+            pipe.expire(key, ttl)
+            pipe.execute()

230-235: Downgrade "Existing data" log to DEBUG or summarize to avoid log bloat

existing_data can be large (status_history, serialized file_hash). Emitting the entire object at INFO can flood logs. Prefer DEBUG or log a concise summary (stage, status, ids).

Example:

-        logger.info(f"Existing data for '{key}': {existing_data}")
+        logger.debug(
+            "Existing data for '%s': stage=%s status=%s error=%s",
+            key,
+            existing_data.stage_status.stage.value,
+            existing_data.stage_status.status.value,
+            bool(existing_data.stage_status.error),
+        )
backend/workflow_manager/workflow_v2/file_execution_tasks.py (2)

1139-1149: Keep stack trace for cleanup failures and avoid None deref in warning

Great call to make cleanup non-fatal. Two small tweaks improve debuggability and safety:

  • Include exc_info=True so the suppressed failure has a stack trace in logs.
  • Avoid workflow_file_execution.id when it’s None.
-        if destination:
-            try:
-                logger.info(
-                    f"Deleting file execution directory for file: '{file_hash.file_name}'"
-                )
-                destination.delete_file_execution_directory()
-            except Exception as cleanup_error:
-                logger.warning(
-                    f"[Execution ID: {workflow_execution.id}, File Execution ID: {workflow_file_execution.id}] "
-                    f"Failed to delete file execution directory for '{file_hash.file_name}': {cleanup_error}. "
-                    f"This is non-critical and needs to be cleaned up later."
-                )
+        if destination:
+            try:
+                logger.info("Deleting file execution directory for file: '%s'", file_hash.file_name)
+                destination.delete_file_execution_directory()
+            except Exception as cleanup_error:
+                safe_file_exec_id = str(getattr(workflow_file_execution, "id", ""))
+                logger.warning(
+                    "[Execution ID: %s, File Execution ID: %s] Failed to delete file execution directory for '%s': %s. "
+                    "This is non-critical and needs to be cleaned up later.",
+                    workflow_execution.id,
+                    safe_file_exec_id,
+                    file_hash.file_name,
+                    cleanup_error,
+                    exc_info=True,
+                )

356-359: Align batch-level directory cleanup with new non-fatal cleanup policy

You suppress cleanup errors at per-file finalization but not here. A failure in delete_execution_and_api_storage_dir will currently bubble up from the callback.

-        DestinationConnector.delete_execution_and_api_storage_dir(
-            workflow_id=workflow.id, execution_id=execution_id
-        )
+        try:
+            DestinationConnector.delete_execution_and_api_storage_dir(
+                workflow_id=workflow.id, execution_id=execution_id
+            )
+        except Exception as e:
+            logger.warning(
+                "Non-fatal: failed to delete execution/api storage dirs for execution_id=%s: %s",
+                execution_id,
+                e,
+                exc_info=True,
+            )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between ad9fcde and e3da9d8.

📒 Files selected for processing (2)
  • backend/workflow_manager/workflow_v2/file_execution_tasks.py (3 hunks)
  • unstract/core/src/unstract/core/file_execution_tracker.py (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (1)
backend/workflow_manager/workflow_v2/file_execution_tasks.py (1)

990-1013: All good: file_path/file_history parameters are optional and safely handled

I verified that:

  • FileHistoryHelper.get_file_history is invoked elsewhere without a file_path argument (e.g. in endpoint_v2/source.py, lines 670–672), implying it has an optional file_path parameter and tolerates None .
  • Destination.get_metadata(self, file_history: FileHistory | None = None) explicitly defaults file_history to None and returns dict | None, so passing None is safe .

No further guards are needed; both downstream paths already handle None as intended.

Comment thread backend/workflow_manager/workflow_v2/file_execution_tasks.py
Comment thread backend/workflow_manager/workflow_v2/file_execution_tasks.py Outdated

@ritwik-g ritwik-g left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chandrasekharan-zipstack I have 2 minor comments. Other than that it looks good

@Deepak-Kesavan Deepak-Kesavan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

@gaya3-zipstack gaya3-zipstack left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chandrasekharan-zipstack I understand that this PR is handling exceptions while unable to access Minio for cleanup. What is our error handling whan for example minio becomes inaccessible during other stages? Like trying to access a file for reading/writing? In such cases also, will we run into a similar state where the file with the error is in PENDING state forever and other files need another round to be picked up?

@chandrasekharan-zipstack

Copy link
Copy Markdown
Contributor Author

@chandrasekharan-zipstack I understand that this PR is handling exceptions while unable to access Minio for cleanup. What is our error handling whan for example minio becomes inaccessible during other stages? Like trying to access a file for reading/writing? In such cases also, will we run into a similar state where the file with the error is in PENDING state forever and other files need another round to be picked up?

Yes, this should handle all such cases too as long as it happens within a file's execution. Basically the exception handler will catch it and mark the status appropriately. This particular issue was because the exception handling code, in turn raised an exception which will no longer be the case

@gaya3-zipstack gaya3-zipstack left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@github-actions

Copy link
Copy Markdown
Contributor
filepath function $$\textcolor{#23d18b}{\tt{passed}}$$ SUBTOTAL
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_logs}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_cleanup}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_cleanup\_skip}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_client\_init}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image\_exists}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config\_without\_mount}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_run\_container}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image\_for\_sidecar}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_sidecar\_container}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{TOTAL}}$$ $$\textcolor{#23d18b}{\tt{11}}$$ $$\textcolor{#23d18b}{\tt{11}}$$

@sonarqubecloud

Copy link
Copy Markdown

@chandrasekharan-zipstack chandrasekharan-zipstack merged commit a032813 into main Aug 25, 2025
7 checks passed
@chandrasekharan-zipstack chandrasekharan-zipstack deleted the UN-2733-fix-file-processing-err-handling branch August 25, 2025 09:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants