Skip to content

Commit 843376d

Browse files
committed
samples(pubsub): pubsub_optimistic_subscribe sample
1 parent 29f5d7c commit 843376d

2 files changed

Lines changed: 73 additions & 0 deletions

File tree

google-cloud-pubsub/samples/acceptance/subscriptions_test.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
require_relative "../pubsub_subscriber_exactly_once_delivery.rb"
1818
require_relative "../pubsub_create_subscription_with_exactly_once_delivery.rb"
1919
require_relative "../pubsub_create_bigquery_subscription.rb"
20+
require_relative "../pubsub_optimistic_subscribe.rb"
2021
require_relative "../pubsub_subscriber_async_pull_custom_attributes.rb"
2122
require_relative "../pubsub_subscriber_sync_pull.rb"
2223
require_relative "../pubsub_subscriber_flow_settings.rb"
@@ -281,4 +282,27 @@
281282
)
282283
end
283284
end
285+
286+
it "supports pubsub_optimistic_subscribe" do
287+
topic_id = @topic.name
288+
subscription_id = random_subscription_id
289+
publisher = pubsub.publisher @topic.name
290+
291+
out, _err = capture_io do
292+
optimistic_subscribe topic_id: topic_id, subscription_id: subscription_id
293+
end
294+
295+
assert_includes out, "Subscription #{subscription_id} does not exist."
296+
assert_includes out, "Subscription #{subscription_id} created."
297+
298+
@created_subscriptions << subscription_id
299+
publisher.publish "This is a test message."
300+
sleep 5
301+
302+
out, _err = capture_io do
303+
optimistic_subscribe topic_id: topic_id, subscription_id: subscription_id
304+
end
305+
306+
assert_includes out, "Received message: This is a test message."
307+
end
284308
end
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
require "google/cloud/pubsub"
16+
17+
def optimistic_subscribe topic_id:, subscription_id:
18+
# [START pubsub_optimistic_subscribe]
19+
# topic_id = "your-topic-id"
20+
# subscription_id = "your-subscription-id"
21+
22+
pubsub = Google::Cloud::Pubsub.new
23+
24+
# Propagate expection from child threads to the main thread as soon as it is
25+
# raised. Exceptions happened in the callback thread are collected in the
26+
# callback thread pool and do not propagate to the main thread
27+
Thread.abort_on_exception = true
28+
29+
begin
30+
subscriber = pubsub.subscriber subscription_id
31+
listener = subscriber.listen do |received_message|
32+
puts "Received message: #{received_message.data}"
33+
received_message.acknowledge!
34+
end
35+
listener.start
36+
# Let the main thread sleep for 60 seconds so the thread for listening
37+
# messages does not quit
38+
sleep 60
39+
listener.stop.wait!
40+
rescue Google::Cloud::NotFoundError => e
41+
puts "Subscription #{subscription_id} does not exist."
42+
subscription_admin = pubsub.subscription_admin
43+
44+
subscription = subscription_admin.create_subscription name: pubsub.subscription_path(subscription_id),
45+
topic: pubsub.topic_path(topic_id)
46+
puts "Subscription #{subscription_id} created."
47+
end
48+
# [END pubsub_optimistic_subscribe]
49+
end

0 commit comments

Comments
 (0)