forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_interceptor.py
More file actions
164 lines (138 loc) · 5.49 KB
/
test_interceptor.py
File metadata and controls
164 lines (138 loc) · 5.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import sys
import unittest.mock
from collections import abc
import pytest
if sys.version_info >= (3, 14):
pytest.skip(
"Sentry does not support Python 3.14 yet.",
allow_module_level=True,
)
import sentry_sdk
import temporalio.activity
import temporalio.workflow
from sentry_sdk.integrations.asyncio import AsyncioIntegration
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import (
SandboxedWorkflowRunner,
SandboxRestrictions,
)
from sentry.activity import broken_activity, working_activity
from sentry.interceptor import SentryInterceptor
from sentry.workflow import SentryExampleWorkflow, SentryExampleWorkflowInput
from tests.sentry.fake_sentry_transport import FakeSentryTransport
@pytest.fixture
def transport() -> FakeSentryTransport:
"""Fixture to provide a fake transport for Sentry SDK."""
return FakeSentryTransport()
@pytest.fixture(autouse=True)
def sentry_init(transport: FakeSentryTransport) -> None:
"""Initialize Sentry for testing."""
sentry_sdk.init(
transport=transport,
integrations=[
AsyncioIntegration(),
],
)
@pytest.fixture
async def worker(client: Client) -> abc.AsyncIterator[Worker]:
"""Fixture to provide a worker for testing."""
async with Worker(
client,
task_queue="sentry-task-queue",
workflows=[SentryExampleWorkflow],
activities=[broken_activity, working_activity],
interceptors=[SentryInterceptor()],
workflow_runner=SandboxedWorkflowRunner(
restrictions=SandboxRestrictions.default.with_passthrough_modules(
"sentry_sdk"
)
),
) as worker:
yield worker
async def test_sentry_interceptor_reports_no_errors_when_workflow_succeeds(
client: Client, worker: Worker, transport: FakeSentryTransport
) -> None:
"""Test that Sentry interceptor reports no errors when workflow succeeds."""
# WHEN
try:
await client.execute_workflow(
SentryExampleWorkflow.run,
SentryExampleWorkflowInput(option="working"),
id="sentry-workflow-id",
task_queue=worker.task_queue,
)
except Exception:
pytest.fail("Workflow should not raise an exception")
# THEN
assert len(transport.events) == 0, "No events should be captured"
async def test_sentry_interceptor_captures_errors(
client: Client, worker: Worker, transport: FakeSentryTransport
) -> None:
"""Test that errors are captured with correct Sentry metadata."""
# WHEN
try:
await client.execute_workflow(
SentryExampleWorkflow.run,
SentryExampleWorkflowInput(option="broken"),
id="sentry-workflow-id",
task_queue=worker.task_queue,
)
pytest.fail("Workflow should raise an exception")
except Exception:
pass
# THEN
# there should be two events: one for the failed activity and one for the failed workflow
assert len(transport.events) == 2, "Two events should be captured"
# Check the first event - should be the activity exception
# --------------------------------------------------------
event = transport.events[0]
# Check exception was captured
assert event["exception"]["values"][0]["type"] == "Exception"
assert event["exception"]["values"][0]["value"] == "Activity failed!"
# Check useful metadata were captured as tags
assert event["tags"] == {
"temporal.execution_type": "activity",
"module": "sentry.activity.broken_activity",
"temporal.workflow.type": "SentryExampleWorkflow",
"temporal.workflow.id": "sentry-workflow-id",
"temporal.activity.id": "1",
"temporal.activity.type": "broken_activity",
"temporal.activity.task_queue": "sentry-task-queue",
"temporal.workflow.namespace": "default",
"temporal.workflow.run_id": unittest.mock.ANY,
}
# Check activity input was captured as context
assert event["contexts"]["temporal.activity.input"] == {
"message": "Hello, Temporal!",
}
# Check activity info was captured as context
activity_info = temporalio.activity.Info(
**event["contexts"]["temporal.activity.info"] # type: ignore
)
assert activity_info.activity_type == "broken_activity"
# Check the second event - should be the workflow exception
# ---------------------------------------------------------
event = transport.events[1]
# Check exception was captured
assert event["exception"]["values"][0]["type"] == "ApplicationError"
assert event["exception"]["values"][0]["value"] == "Activity failed!"
# Check useful metadata were captured as tags
assert event["tags"] == {
"temporal.execution_type": "workflow",
"module": "sentry.workflow.SentryExampleWorkflow.run",
"temporal.workflow.type": "SentryExampleWorkflow",
"temporal.workflow.id": "sentry-workflow-id",
"temporal.workflow.task_queue": "sentry-task-queue",
"temporal.workflow.namespace": "default",
"temporal.workflow.run_id": unittest.mock.ANY,
}
# Check workflow input was captured as context
assert event["contexts"]["temporal.workflow.input"] == {
"option": "broken",
}
# Check workflow info was captured as context
workflow_info = temporalio.workflow.Info(
**event["contexts"]["temporal.workflow.info"] # type: ignore
)
assert workflow_info.workflow_type == "SentryExampleWorkflow"