Skip to content

Commit 42613a0

Browse files
authored
OpenTelemetry sample (temporalio#18)
Fixes temporalio#12
1 parent d709acf commit 42613a0

File tree

10 files changed

+608
-202
lines changed

10 files changed

+608
-202
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,12 @@ Some examples require extra dependencies. See each sample's directory for specif
5252
* [hello_search_attributes](hello/hello_search_attributes.py) - Start workflow with search attributes then change
5353
while running.
5454
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
55+
<!-- Keep this list in alphabetical order -->
5556
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
5657
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
5758
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
5859
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
59-
60+
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
6061

6162
## Test
6263

@@ -66,4 +67,4 @@ Running the tests requires `poe` to be installed.
6667

6768
Once you have `poe` installed you can run:
6869

69-
poe test
70+
poe test

activity_worker/__init__.py

Whitespace-only changes.

open_telemetry/README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# OpenTelemetry Sample
2+
3+
This sample shows how to configure OpenTelemetry to capture workflow traces and SDK metrics.
4+
5+
For this sample, the optional `open_telemetry` dependency group must be included. To include, run:
6+
7+
poetry install --with open_telemetry
8+
9+
To run, first see [README.md](../README.md) for prerequisites. Then run the following to start a Jaeger container to
10+
view the trace results:
11+
12+
docker run -d --name jaeger \
13+
-p 16686:16686 \
14+
-p 6831:6831/udp \
15+
jaegertracing/all-in-one:latest
16+
17+
Since that is running in the background (`-d`), you can also run the metrics collector in the foreground:
18+
19+
docker run -p 4317:4317 \
20+
-v /path/to/samples-python/open_telemetry/otel-metrics-collector-config.yaml:/etc/otel-collector-config.yaml \
21+
otel/opentelemetry-collector:latest \
22+
--config=/etc/otel-collector-config.yaml
23+
24+
Replace `/path/to/samples-python` with the absolute path to the cloned samples repo.
25+
26+
Now, from this directory, start the worker in its own terminal:
27+
28+
poetry run python worker.py
29+
30+
This will start the worker. Then, in another terminal, run the following to execute the workflow:
31+
32+
poetry run python starter.py
33+
34+
The workflow should complete with the hello result. The workflow trace can now be viewed in Jaeger at
35+
http://localhost:16686/. Under service, select `my-service` and "Find Traces". The workflow should appear and when
36+
clicked, may look something like:
37+
38+
![Jaeger Screenshot](jaeger-screenshot.png)
39+
40+
Note, in-workflow spans do not have a time associated with them. This is by intention since due to replay. In
41+
OpenTelemetry, only the process that started the span may end it. But in Temporal a span may cross workers/processes.
42+
Therefore we intentionally start-then-end in-workflow spans immediately. So while the start time and hierarchy is
43+
accurate, the duration is not.
44+
45+
The metrics should have been dumped out in the terminal where the OpenTelemetry collector container is running.
46+
47+
## OTLP gRPC
48+
49+
Currently for tracing this example uses the `opentelemetry-exporter-jaeger-thrift` exporter because the common OTLP gRPC
50+
exporter `opentelemetry-exporter-otlp-proto-grpc` uses an older, incompatible `protobuf` library. See
51+
[this issue](https://github.com/open-telemetry/opentelemetry-python/issues/2880) for more information.
52+
53+
Once OTel supports latest protobuf, the exporter can be changed and Jaeger could be run with:
54+
55+
docker run -d --name jaeger \
56+
-e COLLECTOR_OTLP_ENABLED=true \
57+
-p 16686:16686 \
58+
-p 4317:4317 \
59+
-p 4318:4318 \
60+
jaegertracing/all-in-one:latest

open_telemetry/__init__.py

Whitespace-only changes.
20.4 KB
Loading
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
receivers:
2+
otlp:
3+
protocols:
4+
grpc:
5+
exporters:
6+
logging:
7+
loglevel: debug
8+
processors:
9+
batch:
10+
service:
11+
pipelines:
12+
metrics:
13+
receivers: [otlp]
14+
exporters: [logging]
15+
processors: [batch]

open_telemetry/starter.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import asyncio
2+
3+
from temporalio.client import Client
4+
from temporalio.contrib.opentelemetry import TracingInterceptor
5+
6+
from open_telemetry.worker import GreetingWorkflow, init_opentelemetry
7+
8+
9+
async def main():
10+
init_opentelemetry()
11+
12+
# Connect client
13+
client = await Client.connect(
14+
"localhost:7233",
15+
# Use OpenTelemetry interceptor
16+
interceptors=[TracingInterceptor()],
17+
)
18+
19+
# Run workflow
20+
result = await client.execute_workflow(
21+
GreetingWorkflow.run,
22+
"Temporal",
23+
id=f"open_telemetry-workflow-id",
24+
task_queue="open_telemetry-task-queue",
25+
)
26+
print(f"Workflow result: {result}")
27+
28+
29+
if __name__ == "__main__":
30+
asyncio.run(main())

open_telemetry/worker.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from opentelemetry import trace
5+
6+
# See note in README about why Thrift
7+
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
8+
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
9+
from opentelemetry.sdk.trace import TracerProvider
10+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
11+
from temporalio import activity, workflow
12+
from temporalio.bridge import telemetry
13+
from temporalio.client import Client
14+
from temporalio.contrib.opentelemetry import TracingInterceptor
15+
from temporalio.worker import Worker
16+
17+
18+
@workflow.defn
19+
class GreetingWorkflow:
20+
@workflow.run
21+
async def run(self, name: str) -> str:
22+
return await workflow.execute_activity(
23+
compose_greeting,
24+
name,
25+
start_to_close_timeout=timedelta(seconds=10),
26+
)
27+
28+
29+
@activity.defn
30+
async def compose_greeting(name: str) -> str:
31+
return f"Hello, {name}!"
32+
33+
34+
interrupt_event = asyncio.Event()
35+
36+
37+
def init_opentelemetry() -> None:
38+
# Setup global tracer for workflow traces
39+
provider = TracerProvider(resource=Resource.create({SERVICE_NAME: "my-service"}))
40+
provider.add_span_processor(BatchSpanProcessor(JaegerExporter()))
41+
trace.set_tracer_provider(provider)
42+
43+
# Setup SDK metrics to OTel endpoint
44+
telemetry.init_telemetry(
45+
telemetry.TelemetryConfig(
46+
otel_metrics=telemetry.OtelCollectorConfig(
47+
url="http://localhost:4317", headers={}
48+
)
49+
)
50+
)
51+
52+
53+
async def main():
54+
init_opentelemetry()
55+
56+
# Connect client
57+
client = await Client.connect(
58+
"localhost:7233",
59+
# Use OpenTelemetry interceptor
60+
interceptors=[TracingInterceptor()],
61+
)
62+
63+
# Run a worker for the workflow
64+
async with Worker(
65+
client,
66+
task_queue="open_telemetry-task-queue",
67+
workflows=[GreetingWorkflow],
68+
activities=[compose_greeting],
69+
):
70+
# Wait until interrupted
71+
print("Worker started, ctrl+c to exit")
72+
await interrupt_event.wait()
73+
print("Shutting down")
74+
75+
76+
if __name__ == "__main__":
77+
loop = asyncio.new_event_loop()
78+
try:
79+
loop.run_until_complete(main())
80+
except KeyboardInterrupt:
81+
interrupt_event.set()
82+
loop.run_until_complete(loop.shutdown_asyncgens())

0 commit comments

Comments
 (0)