Skip to content

Commit b8e1913

Browse files
committed
fix(pubsub): streaming pull shouldn't need subscriptions.get permission
Pulling the mesages with the streaming pull should work with the default pubsub.subscriber role. This commit removes the call to fetch a subscription, and replaces the subscription's ACK deadline with a fixed deadline of 60 seconds. That *will* re-introduce the issue #9252, but at least in a less severe manner.
1 parent 1026350 commit b8e1913

File tree

3 files changed

+32
-14
lines changed

3 files changed

+32
-14
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@
5050
_RESUME_THRESHOLD = 0.8
5151
"""The load threshold below which to resume the incoming message stream."""
5252

53+
_DEFAULT_STREAM_ACK_DEADLINE = 60
54+
"""The default message acknowledge deadline in seconds for incoming message stream.
55+
56+
This default deadline is dynamically modified for the messages that are added
57+
to the lease management.
58+
"""
59+
5360

5461
def _maybe_wrap_exception(exception):
5562
"""Wraps a gRPC exception class, if needed."""
@@ -384,8 +391,17 @@ def open(self, callback, on_callback_error):
384391
)
385392

386393
# Create the RPC
387-
subscription = self._client.api.get_subscription(self._subscription)
388-
stream_ack_deadline_seconds = subscription.ack_deadline_seconds
394+
395+
# We must use a fixed value for the ACK deadline, as we cannot read it
396+
# from the subscription. The latter would require `pubsub.subscriptions.get`
397+
# permission, which is not granted to the default subscriber role
398+
# `roles/pubsub.subscriber`.
399+
# See also https://github.com/googleapis/google-cloud-python/issues/9339
400+
#
401+
# When dynamic lease management is enabled for the "on hold" messages,
402+
# the default stream ACK deadline should again be set based on the
403+
# historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`.
404+
stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE
389405

390406
get_initial_request = functools.partial(
391407
self._get_initial_request, stream_ack_deadline_seconds

pubsub/tests/system.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,10 @@ class CallbackError(Exception):
381381
with pytest.raises(CallbackError):
382382
future.result(timeout=30)
383383

384+
@pytest.mark.xfail(
385+
reason="The default stream ACK deadline is static and received messages "
386+
"exceeding FlowControl.max_messages are currently not lease managed."
387+
)
384388
def test_streaming_pull_ack_deadline(
385389
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
386390
):
@@ -395,29 +399,29 @@ def test_streaming_pull_ack_deadline(
395399
# Subscribe to the topic. This must happen before the messages
396400
# are published.
397401
subscriber.create_subscription(
398-
subscription_path, topic_path, ack_deadline_seconds=60
402+
subscription_path, topic_path, ack_deadline_seconds=240
399403
)
400404

401405
# publish some messages and wait for completion
402406
self._publish_messages(publisher, topic_path, batch_sizes=[2])
403407

404408
# subscribe to the topic
405409
callback = StreamingPullCallback(
406-
processing_time=15, # more than the default ACK deadline of 10 seconds
410+
processing_time=70, # more than the default stream ACK deadline (60s)
407411
resolve_at_msg_count=3, # one more than the published messages count
408412
)
409413
flow_control = types.FlowControl(max_messages=1)
410414
subscription_future = subscriber.subscribe(
411415
subscription_path, callback, flow_control=flow_control
412416
)
413417

414-
# We expect to process the first two messages in 2 * 15 seconds, and
418+
# We expect to process the first two messages in 2 * 70 seconds, and
415419
# any duplicate message that is re-sent by the backend in additional
416-
# 15 seconds, totalling 45 seconds (+ overhead) --> if there have been
417-
# no duplicates in 60 seconds, we can reasonably assume that there
420+
# 70 seconds, totalling 210 seconds (+ overhead) --> if there have been
421+
# no duplicates in 240 seconds, we can reasonably assume that there
418422
# won't be any.
419423
try:
420-
callback.done_future.result(timeout=60)
424+
callback.done_future.result(timeout=240)
421425
except exceptions.TimeoutError:
422426
# future timed out, because we received no excessive messages
423427
assert sorted(callback.seen_message_ids) == [1, 2]

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -404,12 +404,9 @@ def test_heartbeat_inactive():
404404
"google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True
405405
)
406406
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
407+
stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE
408+
407409
manager = make_manager()
408-
manager._client.api.get_subscription.return_value = types.Subscription(
409-
name="projects/foo/subscriptions/bar",
410-
topic="projects/foo/topics/baz",
411-
ack_deadline_seconds=123,
412-
)
413410

414411
manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)
415412

@@ -437,7 +434,8 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
437434
)
438435
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
439436
assert initial_request_arg.func == manager._get_initial_request
440-
assert initial_request_arg.args[0] == 123
437+
assert initial_request_arg.args[0] == stream_ack_deadline
438+
assert not manager._client.api.get_subscription.called
441439

442440
resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
443441
manager._on_rpc_done

0 commit comments

Comments
 (0)