Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 0ecedd9

Browse files
Add Subscribe Side OpenTelemetry sample
1 parent 1f7520d commit 0ecedd9

2 files changed

Lines changed: 133 additions & 16 deletions

File tree

samples/snippets/subscriber.py

Lines changed: 106 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,30 +68,120 @@ def list_subscriptions_in_project(project_id: str) -> None:
6868
# [END pubsub_list_subscriptions]
6969

7070

71-
def create_subscription(project_id: str, topic_id: str, subscription_id: str) -> None:
72-
"""Create a new pull subscription on the given topic."""
73-
# [START pubsub_create_pull_subscription]
71+
def pubsub_subscribe_otel_tracing(
72+
subscription_project_id: str,
73+
cloud_trace_project_id: str,
74+
subscription_id: str,
75+
timeout: Optional[float] = None,
76+
) -> None:
77+
"""
78+
Subscribe to `subscription_id` in `subscription_project_id` with OpenTelemetry enabled.
79+
Export the OpenTelemetry traces to Google Cloud Trace in project
80+
`trace_project_id`
81+
Args:
82+
subscription_project_id: project ID of the subscription.
83+
cloud_trace_project_id: project ID to export Cloud Trace to.
84+
subscription_id: subscription ID to subscribe from.
85+
timeout: time until which to subscribe to.
86+
Returns:
87+
None
88+
"""
89+
# [START pubsub_subscribe_otel_tracing]
90+
from opentelemetry import trace
91+
from opentelemetry.sdk.trace import TracerProvider
92+
from opentelemetry.sdk.trace.export import (
93+
BatchSpanProcessor,
94+
)
95+
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
96+
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased
97+
7498
from google.cloud import pubsub_v1
99+
from google.cloud.pubsub_v1 import SubscriberClient
100+
from google.cloud.pubsub_v1.types import SubscriberOptions
75101

76102
# TODO(developer)
77-
# project_id = "your-project-id"
78-
# topic_id = "your-topic-id"
103+
# subscription_project_id = "your-subscription-project-id"
79104
# subscription_id = "your-subscription-id"
105+
# cloud_trace_project_id = "your-cloud-trace-project-id"
106+
# timeout = 300.0
80107

81-
publisher = pubsub_v1.PublisherClient()
82-
subscriber = pubsub_v1.SubscriberClient()
83-
topic_path = publisher.topic_path(project_id, topic_id)
84-
subscription_path = subscriber.subscription_path(project_id, subscription_id)
108+
# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
109+
# traces: https://cloud.google.com/trace/docs/setup/python-ot
110+
# Choose and configure the exporter for your set up accordingly.
85111

86-
# Wrap the subscriber in a 'with' block to automatically call close() to
87-
# close the underlying gRPC channel when done.
112+
# Sample 1 in every 100 traces
113+
sampler = ParentBased(root=TraceIdRatioBased(1 / 100))
114+
trace.set_tracer_provider(TracerProvider(sampler=sampler))
115+
116+
# Export to Google Trace
117+
cloud_trace_exporter = CloudTraceSpanExporter(
118+
project_id=cloud_trace_project_id,
119+
)
120+
trace.get_tracer_provider().add_span_processor(
121+
BatchSpanProcessor(cloud_trace_exporter)
122+
)
123+
# Set the `enable_open_telemetry_tracing` option to True when creating
124+
# the subscriber client. This in itself is necessary and sufficient for
125+
# the library to export OpenTelemetry traces. However, where the traces
126+
# must be exported to needs to be configured based on your OpenTelemetry
127+
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
128+
subscriber = SubscriberClient(
129+
subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True)
130+
)
131+
132+
# The `subscription_path` method creates a fully qualified identifier
133+
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
134+
subscription_path = subscriber.subscription_path(
135+
subscription_project_id, subscription_id
136+
)
137+
138+
# Define callback to be called when a message is received.
139+
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
140+
# Ack message after processing it.
141+
print(message.data)
142+
message.ack()
143+
144+
# Wrap subscriber in a 'with' block to automatically call close() when done.
88145
with subscriber:
89-
subscription = subscriber.create_subscription(
90-
request={"name": subscription_path, "topic": topic_path}
91-
)
146+
try:
147+
# Optimistically subscribe to messages on the subscription.
148+
streaming_pull_future = subscriber.subscribe(
149+
subscription_path, callback=callback
150+
)
151+
streaming_pull_future.result(timeout=timeout)
152+
except TimeoutError:
153+
print("Successfully subscribed until the timeout passed.")
154+
streaming_pull_future.cancel() # Trigger the shutdown.
155+
streaming_pull_future.result() # Block until the shutdown is complete.
156+
157+
# [END pubsub_subscribe_otel_tracing]
158+
159+
def create_subscription(
160+
project_id: str, topic_id: str, subscription_id: str
161+
) -> None:
162+
"""Create a new pull subscription on the given topic."""
163+
# [START pubsub_create_pull_subscription]
164+
from google.cloud import pubsub_v1
165+
166+
# TODO(developer)
167+
# project_id = "your-project-id"
168+
# topic_id = "your-topic-id"
169+
# subscription_id = "your-subscription-id"
170+
171+
publisher = pubsub_v1.PublisherClient()
172+
subscriber = pubsub_v1.SubscriberClient()
173+
topic_path = publisher.topic_path(project_id, topic_id)
174+
subscription_path = subscriber.subscription_path(project_id, subscription_id)
175+
176+
# Wrap the subscriber in a 'with' block to automatically call close() to
177+
# close the underlying gRPC channel when done.
178+
with subscriber:
179+
subscription = subscriber.create_subscription(
180+
request={"name": subscription_path, "topic": topic_path}
181+
)
92182

93-
print(f"Subscription created: {subscription}")
94-
# [END pubsub_create_pull_subscription]
183+
print(f"Subscription created: {subscription}")
184+
# [END pubsub_create_pull_subscription]
95185

96186

97187
def optimistic_subscribe(

samples/snippets/subscriber_test.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,33 @@ def test_create_subscription(
234234
subscriber_client.delete_subscription(request={"subscription": subscription_path})
235235

236236

237+
def test_pubsub_subscribe_otel_tracing(
238+
subscriber_client: pubsub_v1.SubscriberClient, capsys: CaptureFixture[str]
239+
) -> None:
240+
subscription = f"pubsub_subscribe_otel_tracing-{PY_VERSION}-{UUID}"
241+
242+
subscription_path = subscriber_client.subscription_path(PROJECT_ID, subscription)
243+
244+
try:
245+
subscriber_client.get_subscription(request={"subscription": subscription_path})
246+
except NotFound:
247+
subscriber_client.create_subscription(
248+
request={"name": subscription_path, "topic": topic}
249+
)
250+
251+
_ = _publish_messages(publisher_client, topic)
252+
253+
subscriber.pubsub_subscribe_otel_tracing(PROJECT_ID, PROJECT_ID, subscription, 5)
254+
255+
out, _ = capsys.readouterr()
256+
assert "Listening" in out
257+
assert subscription in out
258+
assert "message" in out
259+
260+
# Clean up.
261+
subscriber_client.delete_subscription(request={"subscription": subscription_path})
262+
263+
237264
def test_optimistic_subscribe(
238265
subscriber_client: pubsub_v1.SubscriberClient,
239266
topic: str,

0 commit comments

Comments
 (0)