From cf0554467d0313de4d1f320a65d556e670cd3d08 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 7 May 2019 18:46:22 +0200 Subject: [PATCH 1/5] Allow skipping Pub/Sub Message autolease In certain cases automatically leasing Message instances upon creation might not be desired, thus an optional parameter is added to Message initializer that allows skipping that. The default behavior is not changed, new Message instances *are* automatically leased upon creation. --- .../google/cloud/pubsub_v1/subscriber/message.py | 14 ++++++++++---- .../unit/pubsub_v1/subscriber/test_message.py | 13 +++++++++++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/message.py b/pubsub/google/cloud/pubsub_v1/subscriber/message.py index 56dde9a7f6b8..b62a28ff6cb6 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/message.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/message.py @@ -70,7 +70,7 @@ class Message(object): published. """ - def __init__(self, message, ack_id, request_queue): + def __init__(self, message, ack_id, request_queue, autolease=True): """Construct the Message. .. note:: @@ -85,6 +85,9 @@ def __init__(self, message, ack_id, request_queue): request_queue (queue.Queue): A queue provided by the policy that can accept requests; the policy is responsible for handling those requests. + autolease (bool): An optional flag determining whether a new Message + instance should automatically lease itself upon creation. + Defaults to :data:`True`. """ self._message = message self._ack_id = ack_id @@ -98,7 +101,8 @@ def __init__(self, message, ack_id, request_queue): # The policy should lease this message, telling PubSub that it has # it until it is acked or otherwise dropped. - self.lease() + if autolease: + self.lease() def __repr__(self): # Get an abbreviated version of the data. @@ -208,8 +212,10 @@ def lease(self): """Inform the policy to lease this message continually. .. note:: - This method is called by the constructor, and you should never - need to call it manually. + By default this method is called by the constructor, and you should + never need to call it manually, unless the + :class:`~.pubsub_v1.subscriber.message.Message` instance was + created with ``autolease=False``. """ self._request_queue.put( requests.LeaseRequest(ack_id=self._ack_id, byte_size=self.size) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py index 98a946ae75c6..8c22992f7a2b 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py @@ -33,7 +33,7 @@ PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000 -def create_message(data, ack_id="ACKID", **attrs): +def create_message(data, ack_id="ACKID", autolease=True, **attrs): with mock.patch.object(message.Message, "lease") as lease: with mock.patch.object(time, "time") as time_: time_.return_value = RECEIVED_SECONDS @@ -48,8 +48,12 @@ def create_message(data, ack_id="ACKID", **attrs): ), ack_id, queue.Queue(), + autolease=autolease, ) - lease.assert_called_once_with() + if autolease: + lease.assert_called_once_with() + else: + lease.assert_not_called() return msg @@ -79,6 +83,11 @@ def test_publish_time(): assert msg.publish_time == PUBLISHED +def test_disable_autolease_on_creation(): + # the create_message() helper does the actual assertion + create_message(b"foo", autolease=False) + + def check_call_types(mock, *args, **kwargs): """Checks a mock's call types. From ffbaad45acbe54a402f3282364eab3644962a4fc Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 7 May 2019 20:22:08 +0200 Subject: [PATCH 2/5] Directly lease received Messages w/o request queue Leasing messages through a request queue in dispatcher causes a race condition with the ConsumeBidirectionalStream thread. A request to pause the background consumer can arrive when the Bidi consumer is just about to fetch the the next batch of messages, and thus the latter gets paused only *after* fetching those messages. This commit synchronously leases received messages in the streaming pull manager callback. If that hits the lease management load limit, the background consumer is paused synchronously, and will correctly pause *before* pulling another batch of messages. --- .../_protocol/streaming_pull_manager.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 650e2f661915..734be555a48f 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -443,13 +443,28 @@ def _on_response(self, response): for message in response.received_messages ] self._dispatcher.modify_ack_deadline(items) + + lease_requests = [] + for received_message in response.received_messages: message = google.cloud.pubsub_v1.subscriber.message.Message( - received_message.message, received_message.ack_id, self._scheduler.queue + received_message.message, + received_message.ack_id, + self._scheduler.queue, + autolease=False, + ) + lease_requests.append( + requests.LeaseRequest(ack_id=message.ack_id, byte_size=message.size) ) - # TODO: Immediately lease instead of using the callback queue. self._scheduler.schedule(self._callback, message) + # TODO: Since the number of received messages can cause an overflow of + # the leaser, we need to assure that only a portion of them are actually + # leased (and their callbacks scheduled). The rest need to be kept in an + # internal buffer until the leaser again has enough room to accept them. + self.leaser.add(lease_requests) + self.maybe_pause_consumer() + def _should_recover(self, exception): """Determine if an error on the RPC stream should be recovered. From 2dd5bd23db2cbb67eafbb763eb45b00fbc69bc28 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 8 May 2019 18:10:32 +0200 Subject: [PATCH 3/5] Add streaming pull message holding buffer If the PubSub backend sends too many messages in a single response that would cause the leaser overload should all these messeges were added to it, the StreamingPullManager now puts excessive messages into an internal holding buffer. The messages are released from the buffer when the leaser again has enough capacity (as defined by the FlowControl settings), and the message received callback is invoked then as well. --- .../_protocol/streaming_pull_manager.py | 87 +++++++++-- .../subscriber/test_streaming_pull_manager.py | 144 +++++++++++++++++- 2 files changed, 213 insertions(+), 18 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 734be555a48f..cfa810fa7984 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -21,6 +21,7 @@ import grpc import six +from six.moves import queue from google.api_core import bidi from google.api_core import exceptions @@ -116,6 +117,11 @@ def __init__( else: self._scheduler = scheduler + # A FIFO queue for the messages that have been received from the server, + # but not yet added to the lease management (and not sent to user callback), + # because the FlowControl limits have been hit. + self._messages_on_hold = queue.Queue() + # The threads created in ``.open()``. self._dispatcher = None self._leaser = None @@ -217,7 +223,11 @@ def maybe_pause_consumer(self): self._consumer.pause() def maybe_resume_consumer(self): - """Check the current load and resume the consumer if needed.""" + """Check the load and held messages and resume the consumer if needed. + + If there are messages held internally, release those messages before + resuming the consumer. That will avoid leaser overload. + """ # If we have been paused by flow control, check and see if we are # back within our limits. # @@ -227,10 +237,48 @@ def maybe_resume_consumer(self): if self._consumer is None or not self._consumer.is_paused: return + _LOGGER.debug("Current load: %.2f", self.load) + + # Before maybe resuming the background consumer, release any messages + # currently on hold, if the current load allows for it. + self._maybe_release_messages() + if self.load < self.flow_control.resume_threshold: + _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) self._consumer.resume() else: - _LOGGER.debug("Did not resume, current load is %s", self.load) + _LOGGER.debug("Did not resume, current load is %.2f.", self.load) + + def _maybe_release_messages(self): + """Release (some of) the held messages if the current load allows for it. + + The method tries to release as many messages as the current leaser load + would allow. Each released message is added to the lease management, + and the user callback is scheduled for it. + + If there are currently no messageges on hold, or if the leaser is + already overloaded, this method is effectively a no-op. + + The method assumes the caller has acquired the ``_pause_resume_lock``. + """ + while True: + if self.load >= 1.0: + break # already overloaded + + try: + msg = self._messages_on_hold.get_nowait() + except queue.Empty: + break + + self.leaser.add( + [requests.LeaseRequest(ack_id=msg.ack_id, byte_size=msg.size)] + ) + _LOGGER.debug( + "Released held message to leaser, scheduling callback for it, " + "still on hold %s.", + self._messages_on_hold.qsize(), + ) + self._scheduler.schedule(self._callback, msg) def _send_unary_request(self, request): """Send a request using a separate unary request instead of over the @@ -431,9 +479,10 @@ def _on_response(self, response): After the messages have all had their ack deadline updated, execute the callback for each message using the executor. """ - _LOGGER.debug( - "Scheduling callbacks for %s messages.", len(response.received_messages) + "Processing %s received message(s), currenty on hold %s.", + len(response.received_messages), + self._messages_on_hold.qsize(), ) # Immediately modack the messages we received, as this tells the server @@ -444,7 +493,7 @@ def _on_response(self, response): ] self._dispatcher.modify_ack_deadline(items) - lease_requests = [] + invoke_callbacks_for = [] for received_message in response.received_messages: message = google.cloud.pubsub_v1.subscriber.message.Message( @@ -453,17 +502,23 @@ def _on_response(self, response): self._scheduler.queue, autolease=False, ) - lease_requests.append( - requests.LeaseRequest(ack_id=message.ack_id, byte_size=message.size) - ) - self._scheduler.schedule(self._callback, message) - - # TODO: Since the number of received messages can cause an overflow of - # the leaser, we need to assure that only a portion of them are actually - # leased (and their callbacks scheduled). The rest need to be kept in an - # internal buffer until the leaser again has enough room to accept them. - self.leaser.add(lease_requests) - self.maybe_pause_consumer() + if self.load < 1.0: + req = requests.LeaseRequest( + ack_id=message.ack_id, byte_size=message.size + ) + self.leaser.add([req]) + invoke_callbacks_for.append(message) + self.maybe_pause_consumer() + else: + self._messages_on_hold.put(message) + + _LOGGER.debug( + "Scheduling callbacks for %s new messages, new total on hold %s.", + len(invoke_callbacks_for), + self._messages_on_hold.qsize(), + ) + for msg in invoke_callbacks_for: + self._scheduler.schedule(self._callback, msg) def _should_recover(self, exception): """Determine if an error on the RPC stream should be recovered. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index cbd02e28ac6c..22585675a324 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -13,9 +13,11 @@ # limitations under the License. import logging +import types as stdlib_types import mock import pytest +from six.moves import queue from google.api_core import bidi from google.api_core import exceptions @@ -113,6 +115,23 @@ def make_manager(**kwargs): ) +def fake_leaser_add(leaser, init_msg_count=0, init_bytes=0): + """Add a simplified fake add() method to a leaser instance. + + The fake add() method actually increases the leaser's internal message count + by one for each message, and the total bytes by 10 for each message (hardcoded, + regardless of the actual message size). + """ + + def fake_add(self, items): + self.message_count += len(items) + self.bytes += len(items) * 10 + + leaser.message_count = init_msg_count + leaser.bytes = init_bytes + leaser.add = stdlib_types.MethodType(fake_add, leaser) + + def test_ack_deadline(): manager = make_manager() assert manager.ack_deadline == 10 @@ -208,6 +227,66 @@ def test_maybe_resume_consumer_wo_consumer_set(): manager.maybe_resume_consumer() # no raise +def test__maybe_release_messages_on_overload(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000) + ) + # Ensure load is exactly 1.0 (to verify that >= condition is used) + _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) + _leaser.message_count = 10 + _leaser.bytes = 1000 + + msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=11) + manager._messages_on_hold.put(msg) + + manager._maybe_release_messages() + + assert manager._messages_on_hold.qsize() == 1 + manager._leaser.add.assert_not_called() + manager._scheduler.schedule.assert_not_called() + + +def test__maybe_release_messages_below_overload(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000) + ) + manager._callback = mock.sentinel.callback + + # init leaser message count to 8 to leave room for 2 more messages + _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) + fake_leaser_add(_leaser, init_msg_count=8, init_bytes=200) + _leaser.add = mock.Mock(wraps=_leaser.add) # to spy on calls + + messages = [ + mock.create_autospec(message.Message, instance=True, ack_id="ack_foo", size=11), + mock.create_autospec(message.Message, instance=True, ack_id="ack_bar", size=22), + mock.create_autospec(message.Message, instance=True, ack_id="ack_baz", size=33), + ] + for msg in messages: + manager._messages_on_hold.put(msg) + + # the actual call of MUT + manager._maybe_release_messages() + + assert manager._messages_on_hold.qsize() == 1 + msg = manager._messages_on_hold.get_nowait() + assert msg.ack_id == "ack_baz" + + assert len(_leaser.add.mock_calls) == 2 + expected_calls = [ + mock.call([requests.LeaseRequest(ack_id="ack_foo", byte_size=11)]), + mock.call([requests.LeaseRequest(ack_id="ack_bar", byte_size=22)]), + ] + _leaser.add.assert_has_calls(expected_calls) + + schedule_calls = manager._scheduler.schedule.mock_calls + assert len(schedule_calls) == 2 + for _, call_args, _ in schedule_calls: + assert call_args[0] == mock.sentinel.callback + assert isinstance(call_args[1], message.Message) + assert call_args[1].ack_id in ("ack_foo", "ack_bar") + + def test_send_unary(): manager = make_manager() manager._UNARY_REQUESTS = True @@ -470,8 +549,8 @@ def test__get_initial_request_wo_leaser(): assert initial_request.modify_deadline_seconds == [] -def test_on_response(): - manager, _, dispatcher, _, _, scheduler = make_running_manager() +def test__on_response_no_leaser_overload(): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() manager._callback = mock.sentinel.callback # Set up the messages. @@ -486,6 +565,9 @@ def test_on_response(): ] ) + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, init_bytes=0) + # Actually run the method and prove that modack and schedule # are called in the expected way. manager._on_response(response) @@ -500,6 +582,64 @@ def test_on_response(): assert call[1][0] == mock.sentinel.callback assert isinstance(call[1][1], message.Message) + # the leaser load limit not hit, no messages had to be put on hold + assert manager._messages_on_hold.qsize() == 0 + + +def test__on_response_with_leaser_overload(): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + + # Set up the messages. + response = types.StreamingPullResponse( + received_messages=[ + types.ReceivedMessage( + ack_id="fack", message=types.PubsubMessage(data=b"foo", message_id="1") + ), + types.ReceivedMessage( + ack_id="back", message=types.PubsubMessage(data=b"bar", message_id="2") + ), + types.ReceivedMessage( + ack_id="zack", message=types.PubsubMessage(data=b"baz", message_id="3") + ), + ] + ) + + # Adjust message bookkeeping in leaser. Pick 99 messages, which is just below + # the default FlowControl.max_messages limit. + fake_leaser_add(leaser, init_msg_count=99, init_bytes=990) + + # Actually run the method and prove that modack and schedule + # are called in the expected way. + manager._on_response(response) + + dispatcher.modify_ack_deadline.assert_called_once_with( + [ + requests.ModAckRequest("fack", 10), + requests.ModAckRequest("back", 10), + requests.ModAckRequest("zack", 10), + ] + ) + + # one message should be scheduled, the leaser capacity allows for it + schedule_calls = scheduler.schedule.mock_calls + assert len(schedule_calls) == 1 + call_args = schedule_calls[0][1] + assert call_args[0] == mock.sentinel.callback + assert isinstance(call_args[1], message.Message) + assert call_args[1].message_id == "1" + + # the rest of the messages should have been put on hold + assert manager._messages_on_hold.qsize() == 2 + while True: + try: + msg = manager._messages_on_hold.get_nowait() + except queue.Empty: + break + else: + assert isinstance(msg, message.Message) + assert msg.message_id in ("2", "3") + def test_retryable_stream_errors(): # Make sure the config matches our hard-coded tuple of exceptions. From b8e14c6945fb430bd8774d99981398e6f6178088 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 9 May 2019 10:52:53 +0200 Subject: [PATCH 4/5] Make a few streaming pull methods thread-safe With the StreamingPullManager._on_response() callback adding received messages to the leaser synchronously (in the background consumer thread), a race condition can happen with the dispatcher thread that can asynchronously add (remove) messages to (from) lease management, e.g. on ack() and nack() requests. The same is the case with related operations of maybe pausing/resuming the background consumer. This commit thus adds locks in key places, assuring that these operations are atomic, ant not subject to race conditions. --- .../pubsub_v1/subscriber/_protocol/leaser.py | 52 ++++++++++------- .../_protocol/streaming_pull_manager.py | 57 +++++++++++-------- 2 files changed, 63 insertions(+), 46 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index bcb73352b537..8a683e4e772d 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -36,9 +36,15 @@ class Leaser(object): def __init__(self, manager): self._thread = None - self._operational_lock = threading.Lock() self._manager = manager + # a lock used for start/stop operations, protecting the _thread attribute + self._operational_lock = threading.Lock() + + # A lock ensuring that add/remove operations are atomic and cannot be + # intertwined. Protects the _leased_messages and _bytes attributes. + self._add_remove_lock = threading.Lock() + self._leased_messages = {} """dict[str, float]: A mapping of ack IDs to the local time when the ack ID was initially leased in seconds since the epoch.""" @@ -64,30 +70,32 @@ def bytes(self): def add(self, items): """Add messages to be managed by the leaser.""" - for item in items: - # Add the ack ID to the set of managed ack IDs, and increment - # the size counter. - if item.ack_id not in self._leased_messages: - self._leased_messages[item.ack_id] = _LeasedMessage( - added_time=time.time(), size=item.byte_size - ) - self._bytes += item.byte_size - else: - _LOGGER.debug("Message %s is already lease managed", item.ack_id) + with self._add_remove_lock: + for item in items: + # Add the ack ID to the set of managed ack IDs, and increment + # the size counter. + if item.ack_id not in self._leased_messages: + self._leased_messages[item.ack_id] = _LeasedMessage( + added_time=time.time(), size=item.byte_size + ) + self._bytes += item.byte_size + else: + _LOGGER.debug("Message %s is already lease managed", item.ack_id) def remove(self, items): """Remove messages from lease management.""" - # Remove the ack ID from lease management, and decrement the - # byte counter. - for item in items: - if self._leased_messages.pop(item.ack_id, None) is not None: - self._bytes -= item.byte_size - else: - _LOGGER.debug("Item %s was not managed.", item.ack_id) - - if self._bytes < 0: - _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) - self._bytes = 0 + with self._add_remove_lock: + # Remove the ack ID from lease management, and decrement the + # byte counter. + for item in items: + if self._leased_messages.pop(item.ack_id, None) is not None: + self._bytes -= item.byte_size + else: + _LOGGER.debug("Item %s was not managed.", item.ack_id) + + if self._bytes < 0: + _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) + self._bytes = 0 def maintain_leases(self): """Maintain all of the leases being managed. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index cfa810fa7984..74008bc94fcb 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -122,6 +122,11 @@ def __init__( # because the FlowControl limits have been hit. self._messages_on_hold = queue.Queue() + # A lock ensuring that pausing / resuming the consumer are both atomic + # operations that cannot be executed concurrently. Needed for properly + # syncing these operations with the current leaser load. + self._pause_resume_lock = threading.Lock() + # The threads created in ``.open()``. self._dispatcher = None self._leaser = None @@ -217,10 +222,13 @@ def add_close_callback(self, callback): def maybe_pause_consumer(self): """Check the current load and pause the consumer if needed.""" - if self.load >= 1.0: - if self._consumer is not None and not self._consumer.is_paused: - _LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load) - self._consumer.pause() + with self._pause_resume_lock: + if self.load >= 1.0: + if self._consumer is not None and not self._consumer.is_paused: + _LOGGER.debug( + "Message backlog over load at %.2f, pausing.", self.load + ) + self._consumer.pause() def maybe_resume_consumer(self): """Check the load and held messages and resume the consumer if needed. @@ -228,26 +236,27 @@ def maybe_resume_consumer(self): If there are messages held internally, release those messages before resuming the consumer. That will avoid leaser overload. """ - # If we have been paused by flow control, check and see if we are - # back within our limits. - # - # In order to not thrash too much, require us to have passed below - # the resume threshold (80% by default) of each flow control setting - # before restarting. - if self._consumer is None or not self._consumer.is_paused: - return - - _LOGGER.debug("Current load: %.2f", self.load) - - # Before maybe resuming the background consumer, release any messages - # currently on hold, if the current load allows for it. - self._maybe_release_messages() - - if self.load < self.flow_control.resume_threshold: - _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) - self._consumer.resume() - else: - _LOGGER.debug("Did not resume, current load is %.2f.", self.load) + with self._pause_resume_lock: + # If we have been paused by flow control, check and see if we are + # back within our limits. + # + # In order to not thrash too much, require us to have passed below + # the resume threshold (80% by default) of each flow control setting + # before restarting. + if self._consumer is None or not self._consumer.is_paused: + return + + _LOGGER.debug("Current load: %.2f", self.load) + + # Before maybe resuming the background consumer, release any messages + # currently on hold, if the current load allows for it. + self._maybe_release_messages() + + if self.load < self.flow_control.resume_threshold: + _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) + self._consumer.resume() + else: + _LOGGER.debug("Did not resume, current load is %.2f.", self.load) def _maybe_release_messages(self): """Release (some of) the held messages if the current load allows for it. From ec6a21d938ce1e09b2dc583ac78dd6ab36dab8a6 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 10 May 2019 18:08:07 +0200 Subject: [PATCH 5/5] Add system test for PubSub max_messages setting --- pubsub/tests/system.py | 113 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index e8921e039164..13e81d281f42 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -15,6 +15,7 @@ from __future__ import absolute_import import datetime +import itertools import threading import time @@ -24,6 +25,9 @@ import google.auth from google.cloud import pubsub_v1 +from google.cloud.pubsub_v1 import exceptions +from google.cloud.pubsub_v1 import futures +from google.cloud.pubsub_v1 import types from test_utils.system import unique_resource_id @@ -206,6 +210,85 @@ class CallbackError(Exception): with pytest.raises(CallbackError): future.result(timeout=30) + def test_streaming_pull_max_messages( + self, publisher, topic_path, subscriber, subscription_path, cleanup + ): + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # create a topic and subscribe to it + publisher.create_topic(topic_path) + subscriber.create_subscription(subscription_path, topic_path) + + batch_sizes = (7, 4, 8, 2, 10, 1, 3, 8, 6, 1) # total: 50 + self._publish_messages(publisher, topic_path, batch_sizes=batch_sizes) + + # now subscribe and do the main part, check for max pending messages + total_messages = sum(batch_sizes) + flow_control = types.FlowControl(max_messages=5) + callback = StreamingPullCallback( + processing_time=1, resolve_at_msg_count=total_messages + ) + + subscription_future = subscriber.subscribe( + subscription_path, callback, flow_control=flow_control + ) + + # Expected time to process all messages in ideal case: + # (total_messages / FlowControl.max_messages) * processing_time + # + # With total=50, max messages=5, and processing_time=1 this amounts to + # 10 seconds (+ overhead), thus a full minute should be more than enough + # for the processing to complete. If not, fail the test with a timeout. + try: + callback.done_future.result(timeout=60) + except exceptions.TimeoutError: + pytest.fail( + "Timeout: receiving/processing streamed messages took too long." + ) + + # The callback future gets resolved once total_messages have been processed, + # but we want to wait for just a little bit longer to possibly catch cases + # when the callback gets invoked *more* than total_messages times. + time.sleep(3) + + try: + # All messages should have been processed exactly once, and no more + # than max_messages simultaneously at any time. + assert callback.completed_calls == total_messages + assert sorted(callback.seen_message_ids) == list( + range(1, total_messages + 1) + ) + assert callback.max_pending_ack <= flow_control.max_messages + finally: + subscription_future.cancel() # trigger clean shutdown + + def _publish_messages(self, publisher, topic_path, batch_sizes): + """Publish ``count`` messages in batches and wait until completion.""" + publish_futures = [] + msg_counter = itertools.count(start=1) + + for batch_size in batch_sizes: + msg_batch = self._make_messages(count=batch_size) + for msg in msg_batch: + future = publisher.publish( + topic_path, msg, seq_num=str(next(msg_counter)) + ) + publish_futures.append(future) + time.sleep(0.1) + + # wait untill all messages have been successfully published + for future in publish_futures: + future.result(timeout=30) + + def _make_messages(self, count): + messages = [ + u"message {}/{}".format(i, count).encode("utf-8") + for i in range(1, count + 1) + ] + return messages + class AckCallback(object): def __init__(self): @@ -236,3 +319,33 @@ def __call__(self, message): # ``calls`` is incremented to do it. self.call_times.append(now) self.calls += 1 + + +class StreamingPullCallback(object): + def __init__(self, processing_time, resolve_at_msg_count): + self._lock = threading.Lock() + self._processing_time = processing_time + self._pending_ack = 0 + self.max_pending_ack = 0 + self.completed_calls = 0 + self.seen_message_ids = [] + + self._resolve_at_msg_count = resolve_at_msg_count + self.done_future = futures.Future() + + def __call__(self, message): + with self._lock: + self._pending_ack += 1 + self.max_pending_ack = max(self.max_pending_ack, self._pending_ack) + self.seen_message_ids.append(int(message.attributes["seq_num"])) + + time.sleep(self._processing_time) + + with self._lock: + self._pending_ack -= 1 + message.ack() + self.completed_calls += 1 + + if self.completed_calls >= self._resolve_at_msg_count: + if not self.done_future.done(): + self.done_future.set_result(None)