Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Stream
##
# @private exactly_once_delivery_enabled.
attr_reader :exactly_once_delivery_enabled
attr_accessor :keepalive_interval, :pong_deadline

##
# @private Create an empty Subscriber::Stream object.
Expand All @@ -68,24 +69,51 @@ def initialize subscriber

@callback_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @subscriber.callback_threads

@keepalive_interval = Float(ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] || 30)
@pong_deadline = Float(ENV["PUBSUB_TEST_PONG_DEADLINE"] || 15)
@last_ping_at = nil
@last_pong_at = nil
@stream_opened = false

@stream_keepalive_task = Concurrent::TimerTask.new(
execution_interval: 30
execution_interval: @keepalive_interval
) do
synchronize do
if @stream_opened && !@stopped && @request_queue
subscriber.service.logger.log :info, "subscriber-streams" do
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
end
@last_ping_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @last_pong_at >= @last_ping_at
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
end
end
end

@pong_monitor_task = Concurrent::TimerTask.new(
execution_interval: [@keepalive_interval / 5.0, 0.01].max
) do
# push empty request every 30 seconds to keep stream alive
unless inventory.empty?
subscriber.service.logger.log :info, "subscriber-streams" do
"sending keepAlive to stream for subscription #{@subscriber.subscription_name}"
synchronize do
if @stream_opened && @last_ping_at && @last_pong_at && !@stopped
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
if now - @last_ping_at >= @pong_deadline && @last_pong_at < @last_ping_at
subscriber.service.logger.log :error, "subscriber-streams" do
"Keep-alive pong not received within #{@pong_deadline}s; restarting stream."
end
@stream_opened = false
@background_thread&.raise RestartStream
end
end
push Google::Cloud::PubSub::V1::StreamingPullRequest.new
end
end.execute
end
end

def start
synchronize do
break if @background_thread

@inventory.start
@stream_keepalive_task.execute
@pong_monitor_task.execute

start_streaming!
end
Expand All @@ -108,6 +136,9 @@ def stop
@stopped = true
@pause_cond.broadcast

@stream_keepalive_task.shutdown
@pong_monitor_task.shutdown

# Now that the reception thread is stopped, immediately stop the
# callback thread pool. All queued callbacks will see the stream
# is stopped and perform a noop.
Expand Down Expand Up @@ -245,11 +276,21 @@ def background_run

# Call the StreamingPull API to get the response enumerator
options = { :"metadata" => { :"x-goog-request-params" => @subscriber.subscription_name } }
synchronize do
@stream_opened = false
end
enum = @subscriber.service.streaming_pull @request_queue.each, options
subscriber.service.logger.log :info, "subscriber-streams" do
"rpc: streamingPull, subscription: #{@subscriber.subscription_name}, stream opened"
end

synchronize do
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@last_ping_at = now
@last_pong_at = now
@stream_opened = true
end

loop do
synchronize do
if @paused && !@stopped
Expand All @@ -264,6 +305,9 @@ def background_run
begin
# Cannot synchronize the enumerator, causes deadlock
response = enum.next
synchronize do
@last_pong_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
new_exactly_once_delivery_enabled = response&.subscription_properties&.exactly_once_delivery_enabled
received_messages = response.received_messages

Expand Down Expand Up @@ -443,6 +487,7 @@ def initial_input_request
req.client_id = @subscriber.service.client_id
req.max_outstanding_messages = @inventory.limit
req.max_outstanding_bytes = @inventory.bytesize
req.protocol_version = 1
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down Expand Up @@ -132,7 +133,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down Expand Up @@ -141,7 +142,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require "helper"

describe Google::Cloud::PubSub::MessageListener, :keepalive, :mock_pubsub do
let(:topic_name) { "topic-name-goes-here" }
let(:sub_name) { "subscription-name-goes-here" }
let(:sub_hash) { subscription_hash topic_name, sub_name }
let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) }
let(:subscriber) { Google::Cloud::PubSub::Subscriber.from_grpc sub_grpc, pubsub.service }
let(:rec_msg1_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message1-msg-goes-here", 1111) }

before do
ENV["PUBSUB_TEST_KEEPALIVE_INTERVAL"] = "0.05"
ENV["PUBSUB_TEST_PONG_DEADLINE"] = "0.05"
end

after do
ENV.delete "PUBSUB_TEST_KEEPALIVE_INTERVAL"
ENV.delete "PUBSUB_TEST_PONG_DEADLINE"
end

it "sends protocol_version = 1 in initial streaming pull request" do
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
stub = StreamingPullStub.new [[pull_res1]]
subscriber.service.mocked_subscription_admin = stub

called = false
listener = subscriber.listen streams: 1 do |msg|
called = true
end
listener.start

listener_retries = 0
until called
fail "callback was not called" if listener_retries > 100
listener_retries += 1
sleep 0.01
end

listener.stop
listener.wait!

initial_req = stub.requests.first.to_a.first
_(initial_req.protocol_version).must_equal 1
end

it "sends keep-alive pings periodically even when inventory is empty" do
q = StreamingPullStub::RaisableEnumeratorQueue.new
stub = StreamingPullStub.new [[]]
def stub.streaming_pull_internal req, opt = nil
@requests << req
@my_q.each
end
stub.instance_variable_set(:@my_q, q)
subscriber.service.mocked_subscription_admin = stub

listener = subscriber.listen streams: 1 do |msg|
end
listener.start

pong_thread = Thread.new do
10.times do
sleep 0.02
q.push Google::Cloud::PubSub::V1::StreamingPullResponse.new(received_messages: [])
end
end

sleep 0.18
pong_thread.join

listener.stop
listener.wait!

reqs = stub.requests.first.to_a
_(reqs.count).must_be :>=, 2
end

it "restarts stream when keep-alive pong deadline is exceeded" do
pull_res2 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
stub = StreamingPullStub.new [[], [pull_res2]]
subscriber.service.mocked_subscription_admin = stub

called = false
listener = subscriber.listen streams: 1 do |msg|
called = true
end
listener.start

listener_retries = 0
until called
fail "stream did not restart and deliver message" if listener_retries > 200
listener_retries += 1
sleep 0.01
end

listener.stop
listener.wait!

_(stub.requests.count).must_equal 2
end

it "does not restart stream when actively receiving keep-alive pongs" do
q = StreamingPullStub::RaisableEnumeratorQueue.new
stub = StreamingPullStub.new [[]]
def stub.streaming_pull_internal req, opt = nil
@requests << req
@my_q.each
end
stub.instance_variable_set(:@my_q, q)
subscriber.service.mocked_subscription_admin = stub

listener = subscriber.listen streams: 1 do |msg|
end
listener.start

pong_sender = Thread.new do
8.times do
sleep 0.02
empty_pong = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: []
q.push empty_pong
end
end

sleep 0.15
pong_sender.join

listener.stop
listener.wait!

_(stub.requests.count).must_equal 1
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down Expand Up @@ -126,7 +127,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down Expand Up @@ -126,7 +127,8 @@
subscription: sub_path,
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * 1000 * 1000
max_outstanding_bytes: 100 * 1000 * 1000,
protocol_version: 1
)]
]

Expand Down
Loading