@@ -170,6 +170,87 @@ def delete_topic(project_id: str, topic_id: str) -> None:
170170 # [END pubsub_delete_topic]
171171
172172
173+ def pubsub_publish_otel_tracing (
174+ topic_project_id : str , trace_project_id : str , topic_id : str
175+ ) -> None :
176+ """
177+ Publish to `topic_id` in `topic_project_id` with OpenTelemetry enabled.
178+ Export the OpenTelemetry traces to Google Cloud Trace in project
179+ `trace_project_id`
180+
181+ Args:
182+ topic_project_id: project ID of the topic to publish to.
183+ trace_project_id: project ID to export Cloud Trace to.
184+ topic_id: topic ID to publish to.
185+
186+ Returns:
187+ None
188+ """
189+ # [START pubsub_publish_otel_tracing]
190+
191+ from opentelemetry import trace
192+ from opentelemetry .sdk .trace import TracerProvider
193+ from opentelemetry .sdk .trace .export import (
194+ BatchSpanProcessor ,
195+ )
196+ from opentelemetry .exporter .cloud_trace import CloudTraceSpanExporter
197+ from opentelemetry .sdk .trace .sampling import TraceIdRatioBased , ParentBased
198+
199+ from google .cloud .pubsub_v1 import PublisherClient
200+ from google .cloud .pubsub_v1 .types import PublisherOptions
201+
202+ # TODO(developer)
203+ # topic_project_id = "your-topic-project-id"
204+ # trace_project_id = "your-trace-project-id"
205+ # topic_id = "your-topic-id"
206+
207+ # In this sample, we use a Google Cloud Trace to export the OpenTelemetry
208+ # traces: https://cloud.google.com/trace/docs/setup/python-ot
209+ # Choose and configure the exporter for your set up accordingly.
210+
211+ # Sample 1 in every 1000 traces.
212+ sampler = ParentBased (root = TraceIdRatioBased (1 / 1000 ))
213+ trace .set_tracer_provider (TracerProvider (sampler = sampler ))
214+
215+ # Export to Google Trace.
216+ cloud_trace_exporter = CloudTraceSpanExporter (
217+ project_id = trace_project_id ,
218+ )
219+ trace .get_tracer_provider ().add_span_processor (
220+ BatchSpanProcessor (cloud_trace_exporter )
221+ )
222+
223+ # Set the `enable_open_telemetry_tracing` option to True when creating
224+ # the publisher client. This in itself is necessary and sufficient for
225+ # the library to export OpenTelemetry traces. However, where the traces
226+ # must be exported to needs to be configured based on your OpenTelemetry
227+ # set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
228+ publisher = PublisherClient (
229+ publisher_options = PublisherOptions (
230+ enable_open_telemetry_tracing = True ,
231+ ),
232+ )
233+
234+ # The `topic_path` method creates a fully qualified identifier
235+ # in the form `projects/{project_id}/topics/{topic_id}`
236+ topic_path = publisher .topic_path (topic_project_id , topic_id )
237+
238+ tracer = trace .get_tracer ("google.cloud.pubsub_v1.publisher" )
239+ with tracer .start_as_current_span ("parent cloud trace span" ):
240+ # Publish messages.
241+ for n in range (1 , 10000 ):
242+ data_str = f"Message number { n } "
243+ # Data must be a bytestring
244+ data = data_str .encode ("utf-8" )
245+ # When you publish a message, the client returns a future.
246+ future = publisher .publish (topic_path , data )
247+ print (future .result ())
248+
249+ print (f"Published messages to { topic_path } ." )
250+
251+ # [END pubsub_publish_otel_tracing]
252+
253+
173254def publish_messages (project_id : str , topic_id : str ) -> None :
174255 """Publishes multiple messages to a Pub/Sub topic."""
175256 # [START pubsub_quickstart_publisher]
@@ -522,6 +603,13 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
522603 create_parser = subparsers .add_parser ("create" , help = create_topic .__doc__ )
523604 create_parser .add_argument ("topic_id" )
524605
606+ pubsub_publish_otel_tracing_parser = subparsers .add_parser (
607+ "pubsub-publish-otel-tracing" , help = pubsub_publish_otel_tracing .__doc__
608+ )
609+ pubsub_publish_otel_tracing_parser .add_argument ("topic_project_id" )
610+ pubsub_publish_otel_tracing_parser .add_argument ("trace_project_id" )
611+ pubsub_publish_otel_tracing_parser .add_argument ("topic_id" )
612+
525613 create_topic_with_kinesis_ingestion_parser = subparsers .add_parser (
526614 "create_kinesis_ingestion" , help = create_topic_with_kinesis_ingestion .__doc__
527615 )
@@ -638,3 +726,7 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
638726 resume_publish_with_ordering_keys (args .project_id , args .topic_id )
639727 elif args .command == "detach-subscription" :
640728 detach_subscription (args .project_id , args .subscription_id )
729+ elif args .command == "pubsub-publish-otel-tracing" :
730+ pubsub_publish_otel_tracing (
731+ args .topic_project_id , args .trace_project_id , args .topic_id
732+ )
0 commit comments