|
| 1 | +from dataclasses import asdict, is_dataclass |
| 2 | +from typing import Any, Optional, Type, Union |
| 3 | + |
| 4 | +import sentry_sdk |
| 5 | +from temporalio import activity, workflow |
| 6 | +from temporalio.worker import ( |
| 7 | + ActivityInboundInterceptor, |
| 8 | + ExecuteActivityInput, |
| 9 | + ExecuteWorkflowInput, |
| 10 | + Interceptor, |
| 11 | + WorkflowInboundInterceptor, |
| 12 | + WorkflowInterceptorClassInput, |
| 13 | +) |
| 14 | + |
| 15 | + |
| 16 | +def _set_common_workflow_tags( |
| 17 | + info: Union[workflow.Info, activity.Info], scope: sentry_sdk.Scope |
| 18 | +): |
| 19 | + scope.set_tag("temporal.workflow.type", info.workflow_type) |
| 20 | + scope.set_tag("temporal.workflow.id", info.workflow_id) |
| 21 | + |
| 22 | + |
| 23 | +class _SentryActivityInboundInterceptor(ActivityInboundInterceptor): |
| 24 | + async def execute_activity(self, input: ExecuteActivityInput) -> Any: |
| 25 | + transaction_name = input.fn.__module__ + "." + input.fn.__qualname__ |
| 26 | + scope_ctx_manager = sentry_sdk.configure_scope() |
| 27 | + with scope_ctx_manager as scope, sentry_sdk.start_transaction( |
| 28 | + name=transaction_name |
| 29 | + ): |
| 30 | + scope.set_tag("temporal.execution_type", "activity") |
| 31 | + activity_info = activity.info() |
| 32 | + _set_common_workflow_tags(activity_info, scope) |
| 33 | + scope.set_tag("temporal.activity.id", activity_info.activity_id) |
| 34 | + scope.set_tag("temporal.activity.type", activity_info.activity_type) |
| 35 | + scope.set_tag("temporal.activity.task_queue", activity_info.task_queue) |
| 36 | + scope.set_tag( |
| 37 | + "temporal.workflow.namespace", activity_info.workflow_namespace |
| 38 | + ) |
| 39 | + scope.set_tag("temporal.workflow.run_id", activity_info.workflow_run_id) |
| 40 | + try: |
| 41 | + return await super().execute_activity(input) |
| 42 | + except Exception as e: |
| 43 | + if len(input.args) == 1 and is_dataclass(input.args[0]): |
| 44 | + scope.set_context("temporal.activity.input", asdict(input.args[0])) |
| 45 | + scope.set_context("temporal.activity.info", activity.info().__dict__) |
| 46 | + sentry_sdk.capture_exception(e) |
| 47 | + raise e |
| 48 | + finally: |
| 49 | + scope.clear() |
| 50 | + |
| 51 | + |
| 52 | +class _SentryWorkflowInterceptor(WorkflowInboundInterceptor): |
| 53 | + async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: |
| 54 | + transaction_name = input.run_fn.__module__ + "." + input.run_fn.__qualname__ |
| 55 | + scope_ctx_manager = sentry_sdk.configure_scope() |
| 56 | + with scope_ctx_manager as scope, sentry_sdk.start_transaction( |
| 57 | + name=transaction_name |
| 58 | + ): |
| 59 | + scope.set_tag("temporal.execution_type", "workflow") |
| 60 | + workflow_info = workflow.info() |
| 61 | + _set_common_workflow_tags(workflow_info, scope) |
| 62 | + scope.set_tag("temporal.workflow.task_queue", workflow_info.task_queue) |
| 63 | + scope.set_tag("temporal.workflow.namespace", workflow_info.namespace) |
| 64 | + scope.set_tag("temporal.workflow.run_id", workflow_info.run_id) |
| 65 | + try: |
| 66 | + return await super().execute_workflow(input) |
| 67 | + except Exception as e: |
| 68 | + if len(input.args) == 1 and is_dataclass(input.args[0]): |
| 69 | + scope.set_context("temporal.workflow.input", asdict(input.args[0])) |
| 70 | + scope.set_context("temporal.workflow.info", workflow.info().__dict__) |
| 71 | + sentry_sdk.capture_exception(e) |
| 72 | + raise e |
| 73 | + finally: |
| 74 | + scope.clear() |
| 75 | + |
| 76 | + |
| 77 | +class SentryInterceptor(Interceptor): |
| 78 | + """Temporal Interceptor class which will report workflow & activity exceptions to Sentry""" |
| 79 | + |
| 80 | + def intercept_activity( |
| 81 | + self, next: ActivityInboundInterceptor |
| 82 | + ) -> ActivityInboundInterceptor: |
| 83 | + """Implementation of |
| 84 | + :py:meth:`temporalio.worker.Interceptor.intercept_activity`. |
| 85 | + """ |
| 86 | + return _SentryActivityInboundInterceptor(super().intercept_activity(next)) |
| 87 | + |
| 88 | + def workflow_interceptor_class( |
| 89 | + self, input: WorkflowInterceptorClassInput |
| 90 | + ) -> Optional[Type[WorkflowInboundInterceptor]]: |
| 91 | + return _SentryWorkflowInterceptor |
0 commit comments