@@ -68,30 +68,120 @@ def list_subscriptions_in_project(project_id: str) -> None:
6868 # [END pubsub_list_subscriptions]
6969
7070
71- def create_subscription (project_id : str , topic_id : str , subscription_id : str ) -> None :
72- """Create a new pull subscription on the given topic."""
73- # [START pubsub_create_pull_subscription]
71+ def pubsub_subscribe_otel_tracing (
72+ subscription_project_id : str ,
73+ cloud_trace_project_id : str ,
74+ subscription_id : str ,
75+ timeout : Optional [float ] = None ,
76+ ) -> None :
77+ """
78+ Subscribe to `subscription_id` in `subscription_project_id` with OpenTelemetry enabled.
79+ Export the OpenTelemetry traces to Google Cloud Trace in project
80+ `trace_project_id`
81+ Args:
82+ subscription_project_id: project ID of the subscription.
83+ cloud_trace_project_id: project ID to export Cloud Trace to.
84+ subscription_id: subscription ID to subscribe from.
85+ timeout: time until which to subscribe to.
86+ Returns:
87+ None
88+ """
89+ # [START pubsub_subscribe_otel_tracing]
90+ from opentelemetry import trace
91+ from opentelemetry .sdk .trace import TracerProvider
92+ from opentelemetry .sdk .trace .export import (
93+ BatchSpanProcessor ,
94+ )
95+ from opentelemetry .exporter .cloud_trace import CloudTraceSpanExporter
96+ from opentelemetry .sdk .trace .sampling import TraceIdRatioBased , ParentBased
97+
7498 from google .cloud import pubsub_v1
99+ from google .cloud .pubsub_v1 import SubscriberClient
100+ from google .cloud .pubsub_v1 .types import SubscriberOptions
75101
76102 # TODO(developer)
77- # project_id = "your-project-id"
78- # topic_id = "your-topic-id"
103+ # subscription_project_id = "your-subscription-project-id"
79104 # subscription_id = "your-subscription-id"
105+ # cloud_trace_project_id = "your-cloud-trace-project-id"
106+ # timeout = 300.0
80107
81- publisher = pubsub_v1 .PublisherClient ()
82- subscriber = pubsub_v1 .SubscriberClient ()
83- topic_path = publisher .topic_path (project_id , topic_id )
84- subscription_path = subscriber .subscription_path (project_id , subscription_id )
108+ # In this sample, we use a Google Cloud Trace to export the OpenTelemetry
109+ # traces: https://cloud.google.com/trace/docs/setup/python-ot
110+ # Choose and configure the exporter for your set up accordingly.
85111
86- # Wrap the subscriber in a 'with' block to automatically call close() to
87- # close the underlying gRPC channel when done.
112+ # Sample 1 in every 100 traces
113+ sampler = ParentBased (root = TraceIdRatioBased (1 / 100 ))
114+ trace .set_tracer_provider (TracerProvider (sampler = sampler ))
115+
116+ # Export to Google Trace
117+ cloud_trace_exporter = CloudTraceSpanExporter (
118+ project_id = cloud_trace_project_id ,
119+ )
120+ trace .get_tracer_provider ().add_span_processor (
121+ BatchSpanProcessor (cloud_trace_exporter )
122+ )
123+ # Set the `enable_open_telemetry_tracing` option to True when creating
124+ # the subscriber client. This in itself is necessary and sufficient for
125+ # the library to export OpenTelemetry traces. However, where the traces
126+ # must be exported to needs to be configured based on your OpenTelemetry
127+ # set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
128+ subscriber = SubscriberClient (
129+ subscriber_options = SubscriberOptions (enable_open_telemetry_tracing = True )
130+ )
131+
132+ # The `subscription_path` method creates a fully qualified identifier
133+ # in the form `projects/{project_id}/subscriptions/{subscription_id}`
134+ subscription_path = subscriber .subscription_path (
135+ subscription_project_id , subscription_id
136+ )
137+
138+ # Define callback to be called when a message is received.
139+ def callback (message : pubsub_v1 .subscriber .message .Message ) -> None :
140+ # Ack message after processing it.
141+ print (message .data )
142+ message .ack ()
143+
144+ # Wrap subscriber in a 'with' block to automatically call close() when done.
88145 with subscriber :
89- subscription = subscriber .create_subscription (
90- request = {"name" : subscription_path , "topic" : topic_path }
91- )
146+ try :
147+ # Optimistically subscribe to messages on the subscription.
148+ streaming_pull_future = subscriber .subscribe (
149+ subscription_path , callback = callback
150+ )
151+ streaming_pull_future .result (timeout = timeout )
152+ except TimeoutError :
153+ print ("Successfully subscribed until the timeout passed." )
154+ streaming_pull_future .cancel () # Trigger the shutdown.
155+ streaming_pull_future .result () # Block until the shutdown is complete.
156+
157+ # [END pubsub_subscribe_otel_tracing]
158+
159+ def create_subscription (
160+ project_id : str , topic_id : str , subscription_id : str
161+ ) -> None :
162+ """Create a new pull subscription on the given topic."""
163+ # [START pubsub_create_pull_subscription]
164+ from google .cloud import pubsub_v1
165+
166+ # TODO(developer)
167+ # project_id = "your-project-id"
168+ # topic_id = "your-topic-id"
169+ # subscription_id = "your-subscription-id"
170+
171+ publisher = pubsub_v1 .PublisherClient ()
172+ subscriber = pubsub_v1 .SubscriberClient ()
173+ topic_path = publisher .topic_path (project_id , topic_id )
174+ subscription_path = subscriber .subscription_path (project_id , subscription_id )
175+
176+ # Wrap the subscriber in a 'with' block to automatically call close() to
177+ # close the underlying gRPC channel when done.
178+ with subscriber :
179+ subscription = subscriber .create_subscription (
180+ request = {"name" : subscription_path , "topic" : topic_path }
181+ )
92182
93- print (f"Subscription created: { subscription } " )
94- # [END pubsub_create_pull_subscription]
183+ print (f"Subscription created: { subscription } " )
184+ # [END pubsub_create_pull_subscription]
95185
96186
97187def optimistic_subscribe (
0 commit comments