|
1 | 1 | from dataclasses import asdict, is_dataclass |
2 | 2 | from typing import Any, Optional, Type, Union |
3 | 3 |
|
4 | | -import sentry_sdk |
5 | 4 | from temporalio import activity, workflow |
6 | 5 | from temporalio.worker import ( |
7 | 6 | ActivityInboundInterceptor, |
|
12 | 11 | WorkflowInterceptorClassInput, |
13 | 12 | ) |
14 | 13 |
|
| 14 | +with workflow.unsafe.imports_passed_through(): |
| 15 | + from sentry_sdk import Hub, capture_exception, set_context, set_tag |
15 | 16 |
|
16 | | -def _set_common_workflow_tags( |
17 | | - info: Union[workflow.Info, activity.Info], scope: sentry_sdk.Scope |
18 | | -): |
19 | | - scope.set_tag("temporal.workflow.type", info.workflow_type) |
20 | | - scope.set_tag("temporal.workflow.id", info.workflow_id) |
| 17 | + |
| 18 | +def _set_common_workflow_tags(info: Union[workflow.Info, activity.Info]): |
| 19 | + set_tag("temporal.workflow.type", info.workflow_type) |
| 20 | + set_tag("temporal.workflow.id", info.workflow_id) |
21 | 21 |
|
22 | 22 |
|
23 | 23 | class _SentryActivityInboundInterceptor(ActivityInboundInterceptor): |
24 | 24 | async def execute_activity(self, input: ExecuteActivityInput) -> Any: |
25 | | - transaction_name = input.fn.__module__ + "." + input.fn.__qualname__ |
26 | | - scope_ctx_manager = sentry_sdk.configure_scope() |
27 | | - with scope_ctx_manager as scope, sentry_sdk.start_transaction( |
28 | | - name=transaction_name |
29 | | - ): |
30 | | - scope.set_tag("temporal.execution_type", "activity") |
| 25 | + # https://docs.sentry.io/platforms/python/troubleshooting/#addressing-concurrency-issues |
| 26 | + with Hub(Hub.current): |
| 27 | + set_tag("temporal.execution_type", "activity") |
| 28 | + set_tag("module", input.fn.__module__ + "." + input.fn.__qualname__) |
| 29 | + |
31 | 30 | activity_info = activity.info() |
32 | | - _set_common_workflow_tags(activity_info, scope) |
33 | | - scope.set_tag("temporal.activity.id", activity_info.activity_id) |
34 | | - scope.set_tag("temporal.activity.type", activity_info.activity_type) |
35 | | - scope.set_tag("temporal.activity.task_queue", activity_info.task_queue) |
36 | | - scope.set_tag( |
37 | | - "temporal.workflow.namespace", activity_info.workflow_namespace |
38 | | - ) |
39 | | - scope.set_tag("temporal.workflow.run_id", activity_info.workflow_run_id) |
| 31 | + _set_common_workflow_tags(activity_info) |
| 32 | + set_tag("temporal.activity.id", activity_info.activity_id) |
| 33 | + set_tag("temporal.activity.type", activity_info.activity_type) |
| 34 | + set_tag("temporal.activity.task_queue", activity_info.task_queue) |
| 35 | + set_tag("temporal.workflow.namespace", activity_info.workflow_namespace) |
| 36 | + set_tag("temporal.workflow.run_id", activity_info.workflow_run_id) |
40 | 37 | try: |
41 | 38 | return await super().execute_activity(input) |
42 | 39 | except Exception as e: |
43 | 40 | if len(input.args) == 1 and is_dataclass(input.args[0]): |
44 | | - scope.set_context("temporal.activity.input", asdict(input.args[0])) |
45 | | - scope.set_context("temporal.activity.info", activity.info().__dict__) |
46 | | - sentry_sdk.capture_exception(e) |
| 41 | + set_context("temporal.activity.input", asdict(input.args[0])) |
| 42 | + set_context("temporal.activity.info", activity.info().__dict__) |
| 43 | + capture_exception() |
47 | 44 | raise e |
48 | | - finally: |
49 | | - scope.clear() |
50 | 45 |
|
51 | 46 |
|
52 | 47 | class _SentryWorkflowInterceptor(WorkflowInboundInterceptor): |
53 | 48 | async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: |
54 | | - transaction_name = input.run_fn.__module__ + "." + input.run_fn.__qualname__ |
55 | | - scope_ctx_manager = sentry_sdk.configure_scope() |
56 | | - with scope_ctx_manager as scope, sentry_sdk.start_transaction( |
57 | | - name=transaction_name |
58 | | - ): |
59 | | - scope.set_tag("temporal.execution_type", "workflow") |
| 49 | + # https://docs.sentry.io/platforms/python/troubleshooting/#addressing-concurrency-issues |
| 50 | + with Hub(Hub.current): |
| 51 | + set_tag("temporal.execution_type", "workflow") |
| 52 | + set_tag("module", input.run_fn.__module__ + "." + input.run_fn.__qualname__) |
60 | 53 | workflow_info = workflow.info() |
61 | | - _set_common_workflow_tags(workflow_info, scope) |
62 | | - scope.set_tag("temporal.workflow.task_queue", workflow_info.task_queue) |
63 | | - scope.set_tag("temporal.workflow.namespace", workflow_info.namespace) |
64 | | - scope.set_tag("temporal.workflow.run_id", workflow_info.run_id) |
| 54 | + _set_common_workflow_tags(workflow_info) |
| 55 | + set_tag("temporal.workflow.task_queue", workflow_info.task_queue) |
| 56 | + set_tag("temporal.workflow.namespace", workflow_info.namespace) |
| 57 | + set_tag("temporal.workflow.run_id", workflow_info.run_id) |
65 | 58 | try: |
66 | 59 | return await super().execute_workflow(input) |
67 | 60 | except Exception as e: |
68 | 61 | if len(input.args) == 1 and is_dataclass(input.args[0]): |
69 | | - scope.set_context("temporal.workflow.input", asdict(input.args[0])) |
70 | | - scope.set_context("temporal.workflow.info", workflow.info().__dict__) |
71 | | - sentry_sdk.capture_exception(e) |
| 62 | + set_context("temporal.workflow.input", asdict(input.args[0])) |
| 63 | + set_context("temporal.workflow.info", workflow.info().__dict__) |
| 64 | + |
| 65 | + if not workflow.unsafe.is_replaying(): |
| 66 | + capture_exception() |
72 | 67 | raise e |
73 | | - finally: |
74 | | - scope.clear() |
75 | 68 |
|
76 | 69 |
|
77 | 70 | class SentryInterceptor(Interceptor): |
|
0 commit comments