From b1acc8a2c0fcf5a6dc0abbae39d402337a1d06a8 Mon Sep 17 00:00:00 2001 From: Yoshi Automation Bot Date: Mon, 22 Jun 2026 23:30:07 +0000 Subject: [PATCH] feat(pubsub): implement streaming keep-alive logic --- .../cloud/pubsub/message_listener/stream.rb | 71 ++++++++- .../message_listener/acknowledge_test.rb | 6 +- .../pubsub/message_listener/inventory_test.rb | 6 +- .../pubsub/message_listener/keepalive_test.rb | 146 ++++++++++++++++++ .../modify_ack_deadline_test.rb | 6 +- .../pubsub/message_listener/nack_test.rb | 6 +- 6 files changed, 226 insertions(+), 15 deletions(-) create mode 100644 google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb index fa563dd17f57..f06e2e3490db 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb @@ -48,6 +48,7 @@ class Stream ## # @private exactly_once_delivery_enabled. attr_reader :exactly_once_delivery_enabled + attr_accessor :keepalive_interval, :pong_deadline ## # @private Create an empty Subscriber::Stream object. @@ -68,17 +69,43 @@ def initialize subscriber @callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads + @keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30) + @pong_deadline = Float(ENV["PUBSUB_TEST_PONG_DEADLINE"] || 15) + @last_ping_at = nil + @last_pong_at = nil + @stream_opened = false + @reconnect_delay = nil + @stream_keepalive_task = Concurrent::TimerTask.new( - execution_interval: 30 + execution_interval: @keepalive_interval + ) do + synchronize do + if @stream_opened && !@stopped && @request_queue + subscriber.service.logger.log :info, "subscriber-streams" do + "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" + end + @last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at + push Google::Cloud::PubSub::V1::StreamingPullRequest.new + end + end + end + + @pong_monitor_task = Concurrent::TimerTask.new( + execution_interval: [@keepalive_interval / 5.0, 0.01].max ) do - # push empty request every 30 seconds to keep stream alive - unless inventory.empty? - subscriber.service.logger.log :info, "subscriber-streams" do - "sending keepAlive to stream for subscription #{@subscriber.subscription_name}" + synchronize do + if @stream_opened && @last_ping_at && @last_pong_at && !@stopped + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at + subscriber.service.logger.log :error, "subscriber-streams" do + "Keep-alive pong not received within #{@pong_deadline}s; restarting stream." + end + @stream_opened = false + @background_thread&.raise RestartStream + end end - push Google::Cloud::PubSub::V1::StreamingPullRequest.new end - end.execute + end end def start @@ -86,6 +113,8 @@ def start break if @background_thread @inventory.start + @stream_keepalive_task.execute + @pong_monitor_task.execute start_streaming! end @@ -108,6 +137,9 @@ def stop @stopped = true @pause_cond.broadcast + @stream_keepalive_task.shutdown + @pong_monitor_task.shutdown + # Now that the reception thread is stopped, immediately stop the # callback thread pool. All queued callbacks will see the stream # is stopped and perform a noop. @@ -219,6 +251,13 @@ class RestartStream < StandardError; end # rubocop:disable all + def backoff_and_wait! + @reconnect_delay = @reconnect_delay ? [@reconnect_delay * 1.5, 60.0].min : 1.0 + synchronize do + @pause_cond.wait(@reconnect_delay + rand(0.0..0.5)) unless @stopped + end + end + def background_run synchronize do # Don't allow a stream to restart if already stopped @@ -245,11 +284,22 @@ def background_run # Call the StreamingPull API to get the response enumerator options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } } + synchronize do + @stream_opened = false + end enum = @subscriber.service.streaming_pull @request_queue.each, options subscriber.service.logger.log :info, "subscriber-streams" do "rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened" end + synchronize do + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @last_ping_at = now + @last_pong_at = now + @stream_opened = true + @reconnect_delay = nil + end + loop do synchronize do if @paused && !@stopped @@ -264,6 +314,9 @@ def background_run begin # Cannot synchronize the enumerator, causes deadlock response = enum.next + synchronize do + @last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + end new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled received_messages = response.received_messages @@ -310,11 +363,13 @@ def background_run "#{status_code}; will be retried." end # Restart the stream with an incremental back for a retriable error. + backoff_and_wait! retry rescue RestartStream subscriber.service.logger.log :info, "subscriber-streams" do "Subscriber stream for subscription #{@subscriber.subscription_name} has ended; will be retried." end + backoff_and_wait! retry rescue StandardError => e subscriber.service.logger.log :error, "subscriber-streams" do @@ -322,6 +377,7 @@ def background_run end @subscriber.error! e + backoff_and_wait! retry end @@ -443,6 +499,7 @@ def initial_input_request req.client_id = @subscriber.service.client_id req.max_outstanding_messages = @inventory.limit req.max_outstanding_bytes = @inventory.bytesize + req.protocol_version = 1 end end diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb index 05b8eeb4ccb8..4f282be9dac1 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/acknowledge_test.rb @@ -71,7 +71,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] @@ -132,7 +133,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb index 2d16c67b0364..bdb3a202c879 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/inventory_test.rb @@ -76,7 +76,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] @@ -141,7 +142,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb new file mode 100644 index 000000000000..ccdef5193081 --- /dev/null +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/keepalive_test.rb @@ -0,0 +1,146 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require "helper" + +describe Google::Cloud::PubSub::MessageListener, :keepalive, :mock_pubsub do + let(:topic_name) { "topic-name-goes-here" } + let(:sub_name) { "subscription-name-goes-here" } + let(:sub_hash) { subscription_hash topic_name, sub_name } + let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) } + let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service } + let(:rec_msg1_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \ + rec_message_hash("rec_message1-msg-goes-here", 1111) } + + before do + ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] = "0.05" + ENV["PUBSUB_TEST_PONG_DEADLINE"] = "0.05" + end + + after do + ENV.delete "PUBSUB_TEST_KEEPALIVE_INTERVAL" + ENV.delete "PUBSUB_TEST_PONG_DEADLINE" + end + + it "sends protocol_version = 1 in initial streaming pull request" do + pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] + stub = StreamingPullStub.new [[pull_res1]] + subscriber.service.mocked_subscription_admin = stub + + called = false + listener = subscriber.listen streams: 1 do |msg| + called = true + end + listener.start + + listener_retries = 0 + until called + fail "callback was not called" if listener_retries > 100 + listener_retries += 1 + sleep 0.01 + end + + listener.stop + listener.wait! + + initial_req = stub.requests.first.to_a.first + _(initial_req.protocol_version).must_equal 1 + end + + it "sends keep-alive pings periodically even when inventory is empty" do + q = StreamingPullStub::RaisableEnumeratorQueue.new + stub = StreamingPullStub.new [[]] + def stub.streaming_pull_internal req, opt = nil + @requests << req + @my_q.each + end + stub.instance_variable_set(:@my_q, q) + subscriber.service.mocked_subscription_admin = stub + + listener = subscriber.listen streams: 1 do |msg| + end + listener.start + + pong_thread = Thread.new do + 10.times do + sleep 0.02 + q.push Google::Cloud::PubSub::V1::StreamingPullResponse.new(received_messages: []) + end + end + + sleep 0.18 + pong_thread.join + + listener.stop + listener.wait! + + reqs = stub.requests.first.to_a + _(reqs.count).must_be :>=, 2 + end + + it "restarts stream when keep-alive pong deadline is exceeded" do + pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] + stub = StreamingPullStub.new [[], [pull_res2]] + subscriber.service.mocked_subscription_admin = stub + + called = false + listener = subscriber.listen streams: 1 do |msg| + called = true + end + listener.start + + listener_retries = 0 + until called + fail "stream did not restart and deliver message" if listener_retries > 200 + listener_retries += 1 + sleep 0.01 + end + + listener.stop + listener.wait! + + _(stub.requests.count).must_equal 2 + end + + it "does not restart stream when actively receiving keep-alive pongs" do + q = StreamingPullStub::RaisableEnumeratorQueue.new + stub = StreamingPullStub.new [[]] + def stub.streaming_pull_internal req, opt = nil + @requests << req + @my_q.each + end + stub.instance_variable_set(:@my_q, q) + subscriber.service.mocked_subscription_admin = stub + + listener = subscriber.listen streams: 1 do |msg| + end + listener.start + + pong_sender = Thread.new do + 8.times do + sleep 0.02 + empty_pong = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [] + q.push empty_pong + end + end + + sleep 0.15 + pong_sender.join + + listener.stop + listener.wait! + + _(stub.requests.count).must_equal 1 + end +end diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/modify_ack_deadline_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/modify_ack_deadline_test.rb index 190bbe8d6a78..acd1476f81cc 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/modify_ack_deadline_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/modify_ack_deadline_test.rb @@ -70,7 +70,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] @@ -126,7 +127,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/nack_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/nack_test.rb index 5b0744ef97de..9f6200a0126a 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/nack_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/nack_test.rb @@ -70,7 +70,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ] @@ -126,7 +127,8 @@ subscription: sub_path, stream_ack_deadline_seconds: 60, max_outstanding_messages: 1000, - max_outstanding_bytes: 100 * 1000 * 1000 + max_outstanding_bytes: 100 * 1000 * 1000, + protocol_version: 1 )] ]