Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 2fcd31e

Browse files
feat: Add OpenTelemetry publish sample
1 parent c31431a commit 2fcd31e

3 files changed

Lines changed: 108 additions & 0 deletions

File tree

samples/snippets/publisher.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,87 @@ def delete_topic(project_id: str, topic_id: str) -> None:
170170
# [END pubsub_delete_topic]
171171

172172

173+
def pubsub_publish_otel_tracing(
174+
topic_project_id: str, trace_project_id: str, topic_id: str
175+
) -> None:
176+
"""
177+
Publish to `topic_id` in `topic_project_id` with OpenTelemetry enabled.
178+
Export the OpenTelemetry traces to Google Cloud Trace in project
179+
`trace_project_id`
180+
181+
Args:
182+
topic_project_id: project ID of the topic to publish to.
183+
trace_project_id: project ID to export Cloud Trace to.
184+
topic_id: topic ID to publish to.
185+
186+
Returns:
187+
None
188+
"""
189+
# [START pubsub_publish_otel_tracing]
190+
191+
from opentelemetry import trace
192+
from opentelemetry.sdk.trace import TracerProvider
193+
from opentelemetry.sdk.trace.export import (
194+
BatchSpanProcessor,
195+
)
196+
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
197+
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased
198+
199+
from google.cloud.pubsub_v1 import PublisherClient
200+
from google.cloud.pubsub_v1.types import PublisherOptions
201+
202+
# TODO(developer)
203+
# topic_project_id = "your-topic-project-id"
204+
# trace_project_id = "your-trace-project-id"
205+
# topic_id = "your-topic-id"
206+
207+
# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
208+
# traces: https://cloud.google.com/trace/docs/setup/python-ot
209+
# Choose and configure the exporter for your set up accordingly.
210+
211+
# Sample 1 in every 1000 traces.
212+
sampler = ParentBased(root=TraceIdRatioBased(1 / 1000))
213+
trace.set_tracer_provider(TracerProvider(sampler=sampler))
214+
215+
# Export to Google Trace.
216+
cloud_trace_exporter = CloudTraceSpanExporter(
217+
project_id=trace_project_id,
218+
)
219+
trace.get_tracer_provider().add_span_processor(
220+
BatchSpanProcessor(cloud_trace_exporter)
221+
)
222+
223+
# Set the `enable_open_telemetry_tracing` option to True when creating
224+
# the publisher client. This in itself is necessary and sufficient for
225+
# the library to export OpenTelemetry traces. However, where the traces
226+
# must be exported to needs to be configured based on your OpenTelemetry
227+
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
228+
publisher = PublisherClient(
229+
publisher_options=PublisherOptions(
230+
enable_open_telemetry_tracing=True,
231+
),
232+
)
233+
234+
# The `topic_path` method creates a fully qualified identifier
235+
# in the form `projects/{project_id}/topics/{topic_id}`
236+
topic_path = publisher.topic_path(topic_project_id, topic_id)
237+
238+
tracer = trace.get_tracer("google.cloud.pubsub_v1.publisher")
239+
with tracer.start_as_current_span("parent cloud trace span"):
240+
# Publish messages.
241+
for n in range(1, 10000):
242+
data_str = f"Message number {n}"
243+
# Data must be a bytestring
244+
data = data_str.encode("utf-8")
245+
# When you publish a message, the client returns a future.
246+
future = publisher.publish(topic_path, data)
247+
print(future.result())
248+
249+
print(f"Published messages to {topic_path}.")
250+
251+
# [END pubsub_publish_otel_tracing]
252+
253+
173254
def publish_messages(project_id: str, topic_id: str) -> None:
174255
"""Publishes multiple messages to a Pub/Sub topic."""
175256
# [START pubsub_quickstart_publisher]
@@ -522,6 +603,13 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
522603
create_parser = subparsers.add_parser("create", help=create_topic.__doc__)
523604
create_parser.add_argument("topic_id")
524605

606+
pubsub_publish_otel_tracing_parser = subparsers.add_parser(
607+
"pubsub-publish-otel-tracing", help=pubsub_publish_otel_tracing.__doc__
608+
)
609+
pubsub_publish_otel_tracing_parser.add_argument("topic_project_id")
610+
pubsub_publish_otel_tracing_parser.add_argument("trace_project_id")
611+
pubsub_publish_otel_tracing_parser.add_argument("topic_id")
612+
525613
create_topic_with_kinesis_ingestion_parser = subparsers.add_parser(
526614
"create_kinesis_ingestion", help=create_topic_with_kinesis_ingestion.__doc__
527615
)
@@ -638,3 +726,7 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
638726
resume_publish_with_ordering_keys(args.project_id, args.topic_id)
639727
elif args.command == "detach-subscription":
640728
detach_subscription(args.project_id, args.subscription_id)
729+
elif args.command == "pubsub-publish-otel-tracing":
730+
pubsub_publish_otel_tracing(
731+
args.topic_project_id, args.trace_project_id, args.topic_id
732+
)

samples/snippets/publisher_test.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import typing
1818
from typing import Any, Callable, cast, Iterator, TypeVar, Union
1919
import uuid
20+
import sys
2021

2122
from _pytest.capture import CaptureFixture
2223
import backoff
@@ -209,6 +210,18 @@ def test_list(topic_path: str, capsys: CaptureFixture[str]) -> None:
209210
assert topic_path in out
210211

211212

213+
@pytest.mark.skipif(
214+
sys.version_info < (3, 8),
215+
reason="Open Telemetry not supported below Python version 3.8",
216+
)
217+
def test_pubsub_publish_otel_tracing(
218+
capsys: CaptureFixture[str],
219+
) -> None:
220+
publisher.pubsub_publish_otel_tracing(PROJECT_ID, PROJECT_ID, TOPIC_ID)
221+
out, _ = capsys.readouterr()
222+
assert f"Published messages to {topic_path}." in out
223+
224+
212225
def test_publish(topic_path: str, capsys: CaptureFixture[str]) -> None:
213226
publisher.publish_messages(PROJECT_ID, TOPIC_ID)
214227

samples/snippets/requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,6 @@ avro==1.12.0
33
protobuf===4.24.4; python_version == '3.7'
44
protobuf==5.28.0; python_version >= '3.8'
55
avro==1.12.0
6+
opentelemetry-api==1.22.0
7+
opentelemetry-sdk==1.22.0
8+
opentelemetry-exporter-gcp-trace==1.7.0

0 commit comments

Comments
 (0)