Skip to content

Commit 8e753f4

Browse files
authored
Sentry interceptor example (#23)
1 parent 8407dca commit 8e753f4

8 files changed

Lines changed: 274 additions & 5 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ Some examples require extra dependencies. See each sample's directory for specif
5858
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
5959
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
6060
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
61+
* [sentry](sentry) - Report errors to Sentry.
6162

6263
## Test
6364

poetry.lock

Lines changed: 71 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ optional = true
3939
temporalio = { version = "*", extras = ["opentelemetry"] }
4040
opentelemetry-exporter-jaeger-thrift = "^1.13.0"
4141

42+
[tool.poetry.group.sentry]
43+
optional = true
44+
dependencies = { sentry-sdk = "^1.11.0" }
45+
4246
[tool.poe.tasks]
4347
format = [{cmd = "black ."}, {cmd = "isort ."}]
4448
lint = [{cmd = "black --check ."}, {cmd = "isort --check-only ."}, {ref = "lint-types" }]

sentry/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Sentry Sample
2+
3+
This sample shows how to configure [Sentry](https://sentry.io) to intercept and capture errors from the Temporal SDK.
4+
5+
For this sample, the optional `sentry` dependency group must be included. To include, run:
6+
7+
poetry install --with sentry
8+
9+
To run, first see [README.md](../README.md) for prerequisites. Set `SENTRY_DSN` environment variable to the Sentry DSN.
10+
Then, run the following from this directory to start the worker:
11+
12+
poetry run python worker.py
13+
14+
This will start the worker. Then, in another terminal, run the following to execute the workflow:
15+
16+
poetry run python starter.py
17+
18+
The workflow should complete with the hello result. If you alter the workflow or the activity to raise an
19+
`ApplicationError` instead, it should appear in Sentry.

sentry/__init__.py

Whitespace-only changes.

sentry/interceptor.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from dataclasses import asdict, is_dataclass
2+
from typing import Any, Optional, Type, Union
3+
4+
import sentry_sdk
5+
from temporalio import activity, workflow
6+
from temporalio.worker import (
7+
ActivityInboundInterceptor,
8+
ExecuteActivityInput,
9+
ExecuteWorkflowInput,
10+
Interceptor,
11+
WorkflowInboundInterceptor,
12+
WorkflowInterceptorClassInput,
13+
)
14+
15+
16+
def _set_common_workflow_tags(
17+
info: Union[workflow.Info, activity.Info], scope: sentry_sdk.Scope
18+
):
19+
scope.set_tag("temporal.workflow.type", info.workflow_type)
20+
scope.set_tag("temporal.workflow.id", info.workflow_id)
21+
22+
23+
class _SentryActivityInboundInterceptor(ActivityInboundInterceptor):
24+
async def execute_activity(self, input: ExecuteActivityInput) -> Any:
25+
transaction_name = input.fn.__module__ + "." + input.fn.__qualname__
26+
scope_ctx_manager = sentry_sdk.configure_scope()
27+
with scope_ctx_manager as scope, sentry_sdk.start_transaction(
28+
name=transaction_name
29+
):
30+
scope.set_tag("temporal.execution_type", "activity")
31+
activity_info = activity.info()
32+
_set_common_workflow_tags(activity_info, scope)
33+
scope.set_tag("temporal.activity.id", activity_info.activity_id)
34+
scope.set_tag("temporal.activity.type", activity_info.activity_type)
35+
scope.set_tag("temporal.activity.task_queue", activity_info.task_queue)
36+
scope.set_tag(
37+
"temporal.workflow.namespace", activity_info.workflow_namespace
38+
)
39+
scope.set_tag("temporal.workflow.run_id", activity_info.workflow_run_id)
40+
try:
41+
return await super().execute_activity(input)
42+
except Exception as e:
43+
if len(input.args) == 1 and is_dataclass(input.args[0]):
44+
scope.set_context("temporal.activity.input", asdict(input.args[0]))
45+
scope.set_context("temporal.activity.info", activity.info().__dict__)
46+
sentry_sdk.capture_exception(e)
47+
raise e
48+
finally:
49+
scope.clear()
50+
51+
52+
class _SentryWorkflowInterceptor(WorkflowInboundInterceptor):
53+
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
54+
transaction_name = input.run_fn.__module__ + "." + input.run_fn.__qualname__
55+
scope_ctx_manager = sentry_sdk.configure_scope()
56+
with scope_ctx_manager as scope, sentry_sdk.start_transaction(
57+
name=transaction_name
58+
):
59+
scope.set_tag("temporal.execution_type", "workflow")
60+
workflow_info = workflow.info()
61+
_set_common_workflow_tags(workflow_info, scope)
62+
scope.set_tag("temporal.workflow.task_queue", workflow_info.task_queue)
63+
scope.set_tag("temporal.workflow.namespace", workflow_info.namespace)
64+
scope.set_tag("temporal.workflow.run_id", workflow_info.run_id)
65+
try:
66+
return await super().execute_workflow(input)
67+
except Exception as e:
68+
if len(input.args) == 1 and is_dataclass(input.args[0]):
69+
scope.set_context("temporal.workflow.input", asdict(input.args[0]))
70+
scope.set_context("temporal.workflow.info", workflow.info().__dict__)
71+
sentry_sdk.capture_exception(e)
72+
raise e
73+
finally:
74+
scope.clear()
75+
76+
77+
class SentryInterceptor(Interceptor):
78+
"""Temporal Interceptor class which will report workflow & activity exceptions to Sentry"""
79+
80+
def intercept_activity(
81+
self, next: ActivityInboundInterceptor
82+
) -> ActivityInboundInterceptor:
83+
"""Implementation of
84+
:py:meth:`temporalio.worker.Interceptor.intercept_activity`.
85+
"""
86+
return _SentryActivityInboundInterceptor(super().intercept_activity(next))
87+
88+
def workflow_interceptor_class(
89+
self, input: WorkflowInterceptorClassInput
90+
) -> Optional[Type[WorkflowInboundInterceptor]]:
91+
return _SentryWorkflowInterceptor

sentry/starter.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import asyncio
2+
import os
3+
4+
from temporalio.client import Client
5+
6+
from sentry.worker import GreetingWorkflow
7+
8+
9+
async def main():
10+
# Connect client
11+
client = await Client.connect("localhost:7233")
12+
13+
# Run workflow
14+
result = await client.execute_workflow(
15+
GreetingWorkflow.run,
16+
"World",
17+
id="sentry-workflow-id",
18+
task_queue="sentry-task-queue",
19+
)
20+
print(f"Workflow result: {result}")
21+
22+
23+
if __name__ == "__main__":
24+
asyncio.run(main())

sentry/worker.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import asyncio
2+
import logging
3+
import os
4+
from dataclasses import dataclass
5+
from datetime import timedelta
6+
7+
import sentry_sdk
8+
from temporalio import activity, workflow
9+
from temporalio.client import Client
10+
from temporalio.worker import Worker
11+
12+
from sentry.interceptor import SentryInterceptor
13+
14+
15+
@dataclass
16+
class ComposeGreetingInput:
17+
greeting: str
18+
name: str
19+
20+
21+
@activity.defn
22+
async def compose_greeting(input: ComposeGreetingInput) -> str:
23+
activity.logger.info("Running activity with parameter %s" % input)
24+
return f"{input.greeting}, {input.name}!"
25+
26+
27+
@workflow.defn
28+
class GreetingWorkflow:
29+
@workflow.run
30+
async def run(self, name: str) -> str:
31+
workflow.logger.info("Running workflow with parameter %s" % name)
32+
return await workflow.execute_activity(
33+
compose_greeting,
34+
ComposeGreetingInput("Hello", name),
35+
start_to_close_timeout=timedelta(seconds=10),
36+
)
37+
38+
39+
async def main():
40+
# Uncomment the line below to see logging
41+
# logging.basicConfig(level=logging.INFO)
42+
43+
# Initialize the Sentry SDK
44+
sentry_sdk.init(
45+
dsn=os.environ.get("SENTRY_DSN"),
46+
)
47+
48+
# Start client
49+
client = await Client.connect("localhost:7233")
50+
51+
# Run a worker for the workflow
52+
worker = Worker(
53+
client,
54+
task_queue="sentry-task-queue",
55+
workflows=[GreetingWorkflow],
56+
activities=[compose_greeting],
57+
interceptors=[SentryInterceptor()], # Use SentryInterceptor for error reporting
58+
)
59+
60+
await worker.run()
61+
62+
63+
if __name__ == "__main__":
64+
asyncio.run(main())

0 commit comments

Comments
 (0)