From 01f7ff4319508bc570dd8a335ffe2968102157b7 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Fri, 28 Oct 2022 18:14:28 -0400 Subject: [PATCH 01/19] fix: subtract time spent leasing from max snooze value --- google/cloud/pubsub_v1/subscriber/_protocol/leaser.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 508f4d7ce..2d9a360f5 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -147,6 +147,7 @@ def maintain_leases(self) -> None: repeats. """ while not self._stop_event.is_set(): + start_time = time.time() # Determine the appropriate duration for the lease. This is # based off of how long previous messages have taken to ack, with # a sensible default and within the ranges allowed by Pub/Sub. @@ -204,11 +205,14 @@ def maintain_leases(self) -> None: # We determine the appropriate period of time based on a random # period between: # minimum: MAX_BATCH_LATENCY (to prevent duplicate modacks being created in one batch) - # maximum: 90% of the deadline + # maximum: 90% of the deadline, + # minus the time spent since the start of this while loop. # This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server. # This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases # where there are many clients. - snooze = random.uniform(_MAX_BATCH_LATENCY, deadline * 0.9) + snooze = random.uniform( + _MAX_BATCH_LATENCY, (deadline * 0.9) - (start_time - time.time()) + ) _LOGGER.debug("Snoozing lease management for %f seconds.", snooze) self._stop_event.wait(timeout=snooze) From c764604de2da9700485dc7e167460575bd36c36e Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Tue, 1 Nov 2022 13:34:31 -0400 Subject: [PATCH 02/19] Revert "fix: subtract time spent leasing from max snooze value" This reverts commit 01f7ff4319508bc570dd8a335ffe2968102157b7. --- google/cloud/pubsub_v1/subscriber/_protocol/leaser.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 2d9a360f5..508f4d7ce 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -147,7 +147,6 @@ def maintain_leases(self) -> None: repeats. """ while not self._stop_event.is_set(): - start_time = time.time() # Determine the appropriate duration for the lease. This is # based off of how long previous messages have taken to ack, with # a sensible default and within the ranges allowed by Pub/Sub. @@ -205,14 +204,11 @@ def maintain_leases(self) -> None: # We determine the appropriate period of time based on a random # period between: # minimum: MAX_BATCH_LATENCY (to prevent duplicate modacks being created in one batch) - # maximum: 90% of the deadline, - # minus the time spent since the start of this while loop. + # maximum: 90% of the deadline # This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server. # This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases # where there are many clients. - snooze = random.uniform( - _MAX_BATCH_LATENCY, (deadline * 0.9) - (start_time - time.time()) - ) + snooze = random.uniform(_MAX_BATCH_LATENCY, deadline * 0.9) _LOGGER.debug("Snoozing lease management for %f seconds.", snooze) self._stop_event.wait(timeout=snooze) From 5a133311d20142f406c2cb0957d2df7386a44dcf Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 3 Nov 2022 14:10:16 -0400 Subject: [PATCH 03/19] fix: remove suboptimal list operations in leasing --- .../subscriber/_protocol/dispatcher.py | 24 ++++--- .../_protocol/streaming_pull_manager.py | 64 +++++++++++-------- .../pubsub_v1/subscriber/test_dispatcher.py | 63 ++++++++++++++---- .../subscriber/test_streaming_pull_manager.py | 41 ++++++++++-- 4 files changed, 141 insertions(+), 51 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index ed2f5d217..253cfb0ff 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -319,7 +319,11 @@ def lease(self, items: Sequence[requests.LeaseRequest]) -> None: self._manager.leaser.add(items) self._manager.maybe_pause_consumer() - def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None: + def modify_ack_deadline( + self, + items: Sequence[requests.ModAckRequest], + default_deadline: Optional[float] = None, + ) -> None: """Modify the ack deadline for the given messages. Args: @@ -329,7 +333,9 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None: # to avoid the server-side max request size limit. items_gen = iter(items) ack_ids_gen = (item.ack_id for item in items) - deadline_seconds_gen = (item.seconds for item in items) + deadline_seconds_gen = None + if default_deadline is None: + deadline_seconds_gen = (item.seconds for item in items) total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) for _ in range(total_chunks): @@ -338,13 +344,15 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None: for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) } # no further work needs to be done for `requests_to_retry` - requests_completed, requests_to_retry = self._manager.send_unary_modack( - modify_deadline_ack_ids=list( - itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE) - ), - modify_deadline_seconds=list( - itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE) + _, requests_to_retry = self._manager.send_unary_modack( + modify_deadline_ack_ids=itertools.islice( + ack_ids_gen, _ACK_IDS_BATCH_SIZE ), + modify_deadline_seconds=itertools.islice( + deadline_seconds_gen, _ACK_IDS_BATCH_SIZE + ) + if default_deadline is None + else [default_deadline], ack_reqs_dict=ack_reqs_dict, ) assert ( diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 89dc93e74..ee3097eb7 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -20,7 +20,7 @@ import logging import threading import typing -from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple +from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple import uuid import grpc # type: ignore @@ -686,7 +686,10 @@ def send_unary_ack( return requests_completed, requests_to_retry def send_unary_modack( - self, modify_deadline_ack_ids, modify_deadline_seconds, ack_reqs_dict + self, + modify_deadline_ack_ids, + modify_deadline_seconds: List[float] | itertools.islice, + ack_reqs_dict, ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]: """Send a request using a separate unary request instead of over the stream. @@ -694,22 +697,32 @@ def send_unary_modack( error is re-raised. """ assert modify_deadline_ack_ids + assert len(modify_deadline_seconds) > 0 error_status = None modack_errors_dict = None try: - # Send ack_ids with the same deadline seconds together. - deadline_to_ack_ids = collections.defaultdict(list) - - for n, ack_id in enumerate(modify_deadline_ack_ids): - deadline = modify_deadline_seconds[n] - deadline_to_ack_ids[deadline].append(ack_id) - - for deadline, ack_ids in deadline_to_ack_ids.items(): + if len(modify_deadline_seconds) > 1: + # Send ack_ids with the same deadline seconds together. + deadline_to_ack_ids = collections.defaultdict(list) + + for n, ack_id in enumerate(modify_deadline_ack_ids): + deadline = modify_deadline_seconds[n] + deadline_to_ack_ids[deadline].append(ack_id) + + for deadline, ack_ids in deadline_to_ack_ids.items(): + self._client.modify_ack_deadline( + subscription=self._subscription, + ack_ids=ack_ids, + ack_deadline_seconds=deadline, + ) + # If modify_deadline_seconds is only one element, + # all requests have the same deadline. + else: self._client.modify_ack_deadline( subscription=self._subscription, - ack_ids=ack_ids, - ack_deadline_seconds=deadline, + ack_ids=modify_deadline_ack_ids, + ack_deadline_seconds=modify_deadline_seconds[0], ) except exceptions.GoogleAPICallError as exc: _LOGGER.debug( @@ -990,21 +1003,20 @@ def _get_initial_request( def _send_lease_modacks( self, ack_ids: Iterable[str], ack_deadline: float, warn_on_invalid=True - ) -> List[str]: + ) -> Set[str]: exactly_once_enabled = False with self._exactly_once_enabled_lock: exactly_once_enabled = self._exactly_once_enabled if exactly_once_enabled: - items = [] - for ack_id in ack_ids: - future = futures.Future() - request = requests.ModAckRequest(ack_id, ack_deadline, future) - items.append(request) + items = [ + requests.ModAckRequest(ack_id, ack_deadline, futures.Future()) + for ack_id in ack_ids + ] assert self._dispatcher is not None - self._dispatcher.modify_ack_deadline(items) + self._dispatcher.modify_ack_deadline(items, ack_deadline) - expired_ack_ids = [] + expired_ack_ids = set() for req in items: try: assert req.future is not None @@ -1019,7 +1031,7 @@ def _send_lease_modacks( exc_info=True, ) if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID: - expired_ack_ids.append(req.ack_id) + expired_ack_ids.add(req.ack_id) return expired_ack_ids else: items = [ @@ -1027,8 +1039,8 @@ def _send_lease_modacks( for ack_id in ack_ids ] assert self._dispatcher is not None - self._dispatcher.modify_ack_deadline(items) - return [] + self._dispatcher.modify_ack_deadline(items, ack_deadline) + return set() def _exactly_once_delivery_enabled(self) -> bool: """Whether exactly-once delivery is enabled for the subscription.""" @@ -1082,10 +1094,8 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: # modack the messages we received, as this tells the server that we've # received them. ack_id_gen = (message.ack_id for message in received_messages) - expired_ack_ids = set( - self._send_lease_modacks( - ack_id_gen, self.ack_deadline, warn_on_invalid=False - ) + expired_ack_ids = self._send_lease_modacks( + ack_id_gen, self.ack_deadline, warn_on_invalid=False ) with self._pause_resume_lock: diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index a5107fe7b..806b99cff 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -645,16 +645,20 @@ def test_nack(): ] manager.send_unary_modack.return_value = (items, []) dispatcher_.nack(items) + calls = manager.send_unary_modack.call_args_list + assert len(calls) == 1 - manager.send_unary_modack.assert_called_once_with( - modify_deadline_ack_ids=["ack_id_string"], - modify_deadline_seconds=[0], - ack_reqs_dict={ + for call in calls: + modify_deadline_ack_ids = call[1]["modify_deadline_ack_ids"] + assert list(modify_deadline_ack_ids) == ["ack_id_string"] + modify_deadline_seconds = call[1]["modify_deadline_seconds"] + assert list(modify_deadline_seconds) == [0] + ack_reqs_dict = call[1]["ack_reqs_dict"] + assert ack_reqs_dict == { "ack_id_string": requests.ModAckRequest( ack_id="ack_id_string", seconds=0, future=None ) - }, - ) + } def test_modify_ack_deadline(): @@ -666,12 +670,16 @@ def test_modify_ack_deadline(): items = [requests.ModAckRequest(ack_id="ack_id_string", seconds=60, future=None)] manager.send_unary_modack.return_value = (items, []) dispatcher_.modify_ack_deadline(items) + calls = manager.send_unary_modack.call_args_list + assert len(calls) == 1 - manager.send_unary_modack.assert_called_once_with( - modify_deadline_ack_ids=["ack_id_string"], - modify_deadline_seconds=[60], - ack_reqs_dict={"ack_id_string": items[0]}, - ) + for call in calls: + modify_deadline_ack_ids = call[1]["modify_deadline_ack_ids"] + assert list(modify_deadline_ack_ids) == ["ack_id_string"] + modify_deadline_seconds = call[1]["modify_deadline_seconds"] + assert list(modify_deadline_seconds) == [60] + ack_reqs_dict = call[1]["ack_reqs_dict"] + assert ack_reqs_dict == {"ack_id_string": items[0]} def test_modify_ack_deadline_splitting_large_payload(): @@ -695,7 +703,7 @@ def test_modify_ack_deadline_splitting_large_payload(): sent_ack_ids = collections.Counter() for call in calls: - modack_ackids = call[1]["modify_deadline_ack_ids"] + modack_ackids = list(call[1]["modify_deadline_ack_ids"]) assert len(modack_ackids) <= dispatcher._ACK_IDS_BATCH_SIZE sent_ack_ids.update(modack_ackids) @@ -703,6 +711,37 @@ def test_modify_ack_deadline_splitting_large_payload(): assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once +def test_modify_ack_deadline_splitting_large_payload_with_default_deadline(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [ + # use realistic lengths for ACK IDs (max 176 bytes) + requests.ModAckRequest(ack_id=str(i).zfill(176), seconds=60, future=None) + for i in range(5001) + ] + manager.send_unary_modack.return_value = (items, []) + dispatcher_.modify_ack_deadline(items, 60) + + calls = manager.send_unary_modack.call_args_list + assert len(calls) == 6 + + all_ack_ids = {item.ack_id for item in items} + sent_ack_ids = collections.Counter() + + for call in calls: + modack_ackids = list(call[1]["modify_deadline_ack_ids"]) + modack_deadline_seconds = call[1]["modify_deadline_seconds"] + assert len(list(modack_ackids)) <= dispatcher._ACK_IDS_BATCH_SIZE + assert len(list(modack_deadline_seconds)) == 1 + sent_ack_ids.update(modack_ackids) + + assert set(sent_ack_ids) == all_ack_ids # all messages should have been MODACK-ed + assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once + + @mock.patch("threading.Thread", autospec=True) def test_start(thread): manager = mock.create_autospec( diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 1f28b3f40..0284a2bb4 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -674,6 +674,32 @@ def test_send_unary_modack(): ) +def test_send_unary_modack_default_deadline(): + manager = make_manager() + + ack_reqs_dict = { + "ack_id3": requests.ModAckRequest(ack_id="ack_id3", seconds=60, future=None), + "ack_id4": requests.ModAckRequest(ack_id="ack_id4", seconds=60, future=None), + "ack_id5": requests.ModAckRequest(ack_id="ack_id5", seconds=60, future=None), + } + manager.send_unary_modack( + modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], + modify_deadline_seconds=[10], + ack_reqs_dict=ack_reqs_dict, + ) + + manager._client.modify_ack_deadline.assert_has_calls( + [ + mock.call( + subscription=manager._subscription, + ack_ids=["ack_id3", "ack_id4", "ack_id5"], + ack_deadline_seconds=10, + ), + ], + any_order=True, + ) + + def test_send_unary_modack_exactly_once_enabled_with_futures(): manager = make_manager() manager._exactly_once_enabled = True @@ -1460,7 +1486,8 @@ def test__on_response_modifies_ack_deadline(): [ requests.ModAckRequest("ack_1", 18, None), requests.ModAckRequest("ack_2", 18, None), - ] + ], + 18, ) @@ -1521,6 +1548,7 @@ def test__on_response_modifies_ack_deadline_with_exactly_once_min_lease(): requests.ModAckRequest("ack_1", 10, None), requests.ModAckRequest("ack_2", 10, None), ] + assert call.args[1] == 10 # exactly_once should be enabled after this request b/c subscription_properties says so manager._on_response(response2) @@ -1534,6 +1562,8 @@ def test__on_response_modifies_ack_deadline_with_exactly_once_min_lease(): assert modack_reqs[0].seconds == 60 assert modack_reqs[1].ack_id == "ack_4" assert modack_reqs[1].seconds == 60 + modack_deadline = call.args[1] + assert modack_deadline == 60 def test__on_response_send_ack_deadline_after_enabling_exactly_once(): @@ -1610,7 +1640,8 @@ def test__on_response_no_leaser_overload(): [ requests.ModAckRequest("fack", 10, None), requests.ModAckRequest("back", 10, None), - ] + ], + 10, ) schedule_calls = scheduler.schedule.mock_calls @@ -1660,7 +1691,8 @@ def test__on_response_with_leaser_overload(): requests.ModAckRequest("fack", 10, None), requests.ModAckRequest("back", 10, None), requests.ModAckRequest("zack", 10, None), - ] + ], + 10, ) # one message should be scheduled, the flow control limits allow for it @@ -1740,7 +1772,8 @@ def test__on_response_with_ordering_keys(): requests.ModAckRequest("fack", 10, None), requests.ModAckRequest("back", 10, None), requests.ModAckRequest("zack", 10, None), - ] + ], + 10, ) # The first two messages should be scheduled, The third should be put on From 250cca1e90015e570abedf528279af443262f862 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 3 Nov 2022 14:14:33 -0400 Subject: [PATCH 04/19] remove typing --- .../pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index ee3097eb7..344406e90 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -688,7 +688,7 @@ def send_unary_ack( def send_unary_modack( self, modify_deadline_ack_ids, - modify_deadline_seconds: List[float] | itertools.islice, + modify_deadline_seconds, ack_reqs_dict, ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]: """Send a request using a separate unary request instead of over the stream. From be5b9aec005e119886b4ea28a3fa0bc6fb84ed6e Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 3 Nov 2022 14:40:42 -0400 Subject: [PATCH 05/19] add default_deadline as separate argument to send_unary_modack --- google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | 4 +++- .../subscriber/_protocol/streaming_pull_manager.py | 6 +++--- tests/unit/pubsub_v1/subscriber/test_dispatcher.py | 4 +++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 253cfb0ff..812af6f3d 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -14,6 +14,7 @@ from __future__ import absolute_import from __future__ import division +from email.policy import default import functools import itertools @@ -352,8 +353,9 @@ def modify_ack_deadline( deadline_seconds_gen, _ACK_IDS_BATCH_SIZE ) if default_deadline is None - else [default_deadline], + else None, ack_reqs_dict=ack_reqs_dict, + default_deadline=default_deadline, ) assert ( len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 344406e90..a7de71c3c 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -690,6 +690,7 @@ def send_unary_modack( modify_deadline_ack_ids, modify_deadline_seconds, ack_reqs_dict, + default_deadline=None, ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]: """Send a request using a separate unary request instead of over the stream. @@ -697,12 +698,11 @@ def send_unary_modack( error is re-raised. """ assert modify_deadline_ack_ids - assert len(modify_deadline_seconds) > 0 error_status = None modack_errors_dict = None try: - if len(modify_deadline_seconds) > 1: + if default_deadline is None: # Send ack_ids with the same deadline seconds together. deadline_to_ack_ids = collections.defaultdict(list) @@ -722,7 +722,7 @@ def send_unary_modack( self._client.modify_ack_deadline( subscription=self._subscription, ack_ids=modify_deadline_ack_ids, - ack_deadline_seconds=modify_deadline_seconds[0], + ack_deadline_seconds=default_deadline, ) except exceptions.GoogleAPICallError as exc: _LOGGER.debug( diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 806b99cff..d4813911c 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -734,8 +734,10 @@ def test_modify_ack_deadline_splitting_large_payload_with_default_deadline(): for call in calls: modack_ackids = list(call[1]["modify_deadline_ack_ids"]) modack_deadline_seconds = call[1]["modify_deadline_seconds"] + default_deadline = call[1]["default_deadline"] assert len(list(modack_ackids)) <= dispatcher._ACK_IDS_BATCH_SIZE - assert len(list(modack_deadline_seconds)) == 1 + assert modack_deadline_seconds is None + assert default_deadline == 60 sent_ack_ids.update(modack_ackids) assert set(sent_ack_ids) == all_ack_ids # all messages should have been MODACK-ed From 7a3d5103df16e4a727ca39ffd2ac7219c1ec4320 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 3 Nov 2022 14:42:24 -0400 Subject: [PATCH 06/19] remove unused import --- google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 812af6f3d..389ffa1ac 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -14,7 +14,6 @@ from __future__ import absolute_import from __future__ import division -from email.policy import default import functools import itertools From 5377bd28fc1d03718a67037c4ebde7de678fffe9 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 3 Nov 2022 14:50:27 -0400 Subject: [PATCH 07/19] fix test_streaming_pull_manager --- tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 0284a2bb4..9dff7461c 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -684,8 +684,9 @@ def test_send_unary_modack_default_deadline(): } manager.send_unary_modack( modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], - modify_deadline_seconds=[10], + modify_deadline_seconds=None, ack_reqs_dict=ack_reqs_dict, + default_deadline=10 ) manager._client.modify_ack_deadline.assert_has_calls( From cd0b9967816fe3c4448178bb147ade071d1dc774 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 3 Nov 2022 14:54:28 -0400 Subject: [PATCH 08/19] fix test_streaming_pull_manager lint --- tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 9dff7461c..e01299ef9 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -686,7 +686,7 @@ def test_send_unary_modack_default_deadline(): modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], modify_deadline_seconds=None, ack_reqs_dict=ack_reqs_dict, - default_deadline=10 + default_deadline=10, ) manager._client.modify_ack_deadline.assert_has_calls( From 00122b5d6a714f5136e75e0ff1fe919996869e01 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Fri, 4 Nov 2022 17:57:01 -0400 Subject: [PATCH 09/19] drop expired_ack_ids from lease management --- .../pubsub_v1/subscriber/_protocol/leaser.py | 24 +++++++++- .../unit/pubsub_v1/subscriber/test_leaser.py | 45 +++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 508f4d7ce..1186fc61c 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -17,6 +17,7 @@ import copy import logging import random +from re import L import threading import time import typing @@ -187,6 +188,7 @@ def maintain_leases(self) -> None: # We do not actually call `modify_ack_deadline` over and over # because it is more efficient to make a single request. ack_ids = leased_messages.keys() + expired_ack_ids = set() if ack_ids: _LOGGER.debug("Renewing lease for %d ack IDs.", len(ack_ids)) @@ -197,8 +199,23 @@ def maintain_leases(self) -> None: # is inactive. assert self._manager.dispatcher is not None ack_id_gen = (ack_id for ack_id in ack_ids) - self._manager._send_lease_modacks(ack_id_gen, deadline) + expired_ack_ids = self._manager._send_lease_modacks( + ack_id_gen, deadline + ) + start_time = time.time() + # If exactly once delivery is enabled, we should drop all expired ack_ids from lease management. + if self._manager._exactly_once_delivery_enabled(): + for ack_id in expired_ack_ids: + self._manager.dispatcher.drop( + [ + requests.DropRequest( + ack_id, + leased_messages.get(ack_id).size, + leased_messages.get(ack_id).ordering_key, + ) + ] + ) # Now wait an appropriate period of time and do this again. # # We determine the appropriate period of time based on a random @@ -208,7 +225,10 @@ def maintain_leases(self) -> None: # This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server. # This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases # where there are many clients. - snooze = random.uniform(_MAX_BATCH_LATENCY, deadline * 0.9) + # If we spent any time iterating over expired acks, we should subtract this from the deadline. + snooze = random.uniform( + _MAX_BATCH_LATENCY, (deadline * 0.9 - (time.time() - start_time)) + ) _LOGGER.debug("Snoozing lease management for %f seconds.", snooze) self._stop_event.wait(timeout=snooze) diff --git a/tests/unit/pubsub_v1/subscriber/test_leaser.py b/tests/unit/pubsub_v1/subscriber/test_leaser.py index 7e11e3ccb..f38d4dace 100644 --- a/tests/unit/pubsub_v1/subscriber/test_leaser.py +++ b/tests/unit/pubsub_v1/subscriber/test_leaser.py @@ -151,6 +151,51 @@ def test_maintain_leases_ack_ids(): assert call.args[1] == 10 +def test_maintain_leases_expired_ack_ids_ignored(): + manager = create_manager() + leaser_ = leaser.Leaser(manager) + make_sleep_mark_event_as_done(leaser_) + leaser_.add( + [requests.LeaseRequest(ack_id="my ack id", byte_size=50, ordering_key="")] + ) + manager._exactly_once_delivery_enabled.return_value = False + manager._send_lease_modacks.return_value = set(["my ack id"]) + leaser_.maintain_leases() + + assert len(manager._send_lease_modacks.mock_calls) == 1 + + call = manager._send_lease_modacks.mock_calls[0] + ack_ids = list(call.args[0]) + assert ack_ids == ["my ack id"] + assert call.args[1] == 10 + + +def test_maintain_leases_expired_ack_ids_exactly_once(): + manager = create_manager() + leaser_ = leaser.Leaser(manager) + make_sleep_mark_event_as_done(leaser_) + leaser_.add( + [requests.LeaseRequest(ack_id="my ack id", byte_size=50, ordering_key="")] + ) + manager._exactly_once_delivery_enabled.return_value = True + manager._send_lease_modacks.return_value = set(["my ack id"]) + leaser_.maintain_leases() + + assert len(manager._send_lease_modacks.mock_calls) == 1 + + call = manager._send_lease_modacks.mock_calls[0] + ack_ids = list(call.args[0]) + assert ack_ids == ["my ack id"] + assert call.args[1] == 10 + + assert len(manager.dispatcher.drop.mock_calls) == 1 + call = manager.dispatcher.drop.mock_calls[0] + drop_requests = list(call.args[0]) + assert drop_requests[0].ack_id == "my ack id" + assert drop_requests[0].byte_size == 50 + assert drop_requests[0].ordering_key == "" + + def test_maintain_leases_no_ack_ids(): manager = create_manager() leaser_ = leaser.Leaser(manager) From 5d3320fdf0c1fc2cd3ded7b0cf6b415b77cc350c Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Mon, 7 Nov 2022 11:09:42 -0500 Subject: [PATCH 10/19] add return value to _send_lease_modacks in unit tests --- google/cloud/pubsub_v1/subscriber/_protocol/leaser.py | 1 + tests/unit/pubsub_v1/subscriber/test_leaser.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 1186fc61c..690714956 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -199,6 +199,7 @@ def maintain_leases(self) -> None: # is inactive. assert self._manager.dispatcher is not None ack_id_gen = (ack_id for ack_id in ack_ids) + _LOGGER.warning("calling send_lease_modacks with: ") expired_ack_ids = self._manager._send_lease_modacks( ack_id_gen, deadline ) diff --git a/tests/unit/pubsub_v1/subscriber/test_leaser.py b/tests/unit/pubsub_v1/subscriber/test_leaser.py index f38d4dace..f38717c6f 100644 --- a/tests/unit/pubsub_v1/subscriber/test_leaser.py +++ b/tests/unit/pubsub_v1/subscriber/test_leaser.py @@ -105,6 +105,7 @@ def test_maintain_leases_inactive_manager(caplog): [requests.LeaseRequest(ack_id="my_ack_ID", byte_size=42, ordering_key="")] ) + manager._send_lease_modacks.return_value = set() leaser_.maintain_leases() # Leases should still be maintained even if the manager is inactive. @@ -119,6 +120,7 @@ def test_maintain_leases_stopped(caplog): leaser_ = leaser.Leaser(manager) leaser_.stop() + manager._send_lease_modacks.return_value = set() leaser_.maintain_leases() assert "exiting" in caplog.text @@ -142,6 +144,7 @@ def test_maintain_leases_ack_ids(): [requests.LeaseRequest(ack_id="my ack id", byte_size=50, ordering_key="")] ) + manager._send_lease_modacks.return_value = set() leaser_.maintain_leases() assert len(manager._send_lease_modacks.mock_calls) == 1 @@ -232,6 +235,7 @@ def test_maintain_leases_outdated_items(time): # Now make sure time reports that we are past the end of our timeline. time.return_value = manager.flow_control.max_lease_duration + 1 + manager._send_lease_modacks.return_value = set() leaser_.maintain_leases() # ack2, ack3, and ack4 should be renewed. ack1 should've been dropped From eaec4e6b1fd315139696364eb82b71d732106c6a Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Mon, 7 Nov 2022 11:12:03 -0500 Subject: [PATCH 11/19] remove unused import --- google/cloud/pubsub_v1/subscriber/_protocol/leaser.py | 1 - 1 file changed, 1 deletion(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 690714956..f7523afcb 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -17,7 +17,6 @@ import copy import logging import random -from re import L import threading import time import typing From 5fe67cca6d8772be6bff65ca64fd91849c88261a Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Tue, 8 Nov 2022 18:47:16 -0500 Subject: [PATCH 12/19] addressing comments --- .../subscriber/_protocol/dispatcher.py | 36 +++++++++++-------- .../pubsub_v1/subscriber/_protocol/leaser.py | 23 ++++++------ .../_protocol/streaming_pull_manager.py | 2 ++ 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 389ffa1ac..ab1d1e832 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -333,9 +333,6 @@ def modify_ack_deadline( # to avoid the server-side max request size limit. items_gen = iter(items) ack_ids_gen = (item.ack_id for item in items) - deadline_seconds_gen = None - if default_deadline is None: - deadline_seconds_gen = (item.seconds for item in items) total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) for _ in range(total_chunks): @@ -343,19 +340,28 @@ def modify_ack_deadline( req.ack_id: req for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) } - # no further work needs to be done for `requests_to_retry` - _, requests_to_retry = self._manager.send_unary_modack( - modify_deadline_ack_ids=itertools.islice( - ack_ids_gen, _ACK_IDS_BATCH_SIZE - ), - modify_deadline_seconds=itertools.islice( - deadline_seconds_gen, _ACK_IDS_BATCH_SIZE + requests_to_retry: List[requests.ModAckRequest] + if default_deadline is None: + # no further work needs to be done for `requests_to_retry` + _, requests_to_retry = self._manager.send_unary_modack( + modify_deadline_ack_ids=itertools.islice( + ack_ids_gen, _ACK_IDS_BATCH_SIZE + ), + modify_deadline_seconds=itertools.islice( + (item.seconds for item in items), _ACK_IDS_BATCH_SIZE + ), + ack_reqs_dict=ack_reqs_dict, + default_deadline=None, + ) + else: + _, requests_to_retry = self._manager.send_unary_modack( + modify_deadline_ack_ids=itertools.islice( + ack_ids_gen, _ACK_IDS_BATCH_SIZE + ), + modify_deadline_seconds=None, + ack_reqs_dict=ack_reqs_dict, + default_deadline=default_deadline, ) - if default_deadline is None - else None, - ack_reqs_dict=ack_reqs_dict, - default_deadline=default_deadline, - ) assert ( len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE ), "Too many requests to be retried." diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index f7523afcb..fccbcc25e 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -198,24 +198,23 @@ def maintain_leases(self) -> None: # is inactive. assert self._manager.dispatcher is not None ack_id_gen = (ack_id for ack_id in ack_ids) - _LOGGER.warning("calling send_lease_modacks with: ") expired_ack_ids = self._manager._send_lease_modacks( ack_id_gen, deadline ) start_time = time.time() # If exactly once delivery is enabled, we should drop all expired ack_ids from lease management. - if self._manager._exactly_once_delivery_enabled(): - for ack_id in expired_ack_ids: - self._manager.dispatcher.drop( - [ - requests.DropRequest( - ack_id, - leased_messages.get(ack_id).size, - leased_messages.get(ack_id).ordering_key, - ) - ] - ) + if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids): + self._manager.dispatcher.drop( + [ + requests.DropRequest( + ack_id, + leased_messages.get(ack_id).size, + leased_messages.get(ack_id).ordering_key, + ) + for ack_id in expired_ack_ids + ] + ) # Now wait an appropriate period of time and do this again. # # We determine the appropriate period of time based on a random diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index a7de71c3c..75e9aee02 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -698,6 +698,8 @@ def send_unary_modack( error is re-raised. """ assert modify_deadline_ack_ids + # Either we have a generator or a single deadline. + assert modify_deadline_seconds is None or default_deadline is None error_status = None modack_errors_dict = None From eb3afe390ac61dd8a4b6e9c2fdd63ece6a7f2d01 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Tue, 8 Nov 2022 18:50:05 -0500 Subject: [PATCH 13/19] fix comment --- .../pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 75e9aee02..13974ebe4 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -718,9 +718,8 @@ def send_unary_modack( ack_ids=ack_ids, ack_deadline_seconds=deadline, ) - # If modify_deadline_seconds is only one element, - # all requests have the same deadline. else: + # We can send all requests with the default deadline. self._client.modify_ack_deadline( subscription=self._subscription, ack_ids=modify_deadline_ack_ids, From f5c5f5e9b69c7b2a77da2b4b36f831a9f39a9c5b Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 9 Nov 2022 19:04:16 -0500 Subject: [PATCH 14/19] fix modify_deadline_seconds generator --- google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index ab1d1e832..9d3650dfd 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -333,6 +333,7 @@ def modify_ack_deadline( # to avoid the server-side max request size limit. items_gen = iter(items) ack_ids_gen = (item.ack_id for item in items) + modify_deadline_seconds_gen = (item.seconds for item in items) total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) for _ in range(total_chunks): @@ -348,7 +349,7 @@ def modify_ack_deadline( ack_ids_gen, _ACK_IDS_BATCH_SIZE ), modify_deadline_seconds=itertools.islice( - (item.seconds for item in items), _ACK_IDS_BATCH_SIZE + modify_deadline_seconds_gen, _ACK_IDS_BATCH_SIZE ), ack_reqs_dict=ack_reqs_dict, default_deadline=None, From 13d81853d3d7f35a824a697b5cc49bf0ab5c565f Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 9 Nov 2022 19:11:57 -0500 Subject: [PATCH 15/19] fix modify_deadline_seconds generator --- google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 9d3650dfd..fd1e2fe6b 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -333,7 +333,7 @@ def modify_ack_deadline( # to avoid the server-side max request size limit. items_gen = iter(items) ack_ids_gen = (item.ack_id for item in items) - modify_deadline_seconds_gen = (item.seconds for item in items) + deadline_seconds_gen = (item.seconds for item in items) total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) for _ in range(total_chunks): @@ -349,7 +349,7 @@ def modify_ack_deadline( ack_ids_gen, _ACK_IDS_BATCH_SIZE ), modify_deadline_seconds=itertools.islice( - modify_deadline_seconds_gen, _ACK_IDS_BATCH_SIZE + deadline_seconds_gen, _ACK_IDS_BATCH_SIZE ), ack_reqs_dict=ack_reqs_dict, default_deadline=None, From 14586a53e9d5c1ea27d60c55efc27cb6b1aa0b51 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 9 Nov 2022 19:40:11 -0500 Subject: [PATCH 16/19] fix subscripting in streaming_pull_manager --- google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index fd1e2fe6b..15ad4abb3 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -345,11 +345,11 @@ def modify_ack_deadline( if default_deadline is None: # no further work needs to be done for `requests_to_retry` _, requests_to_retry = self._manager.send_unary_modack( - modify_deadline_ack_ids=itertools.islice( - ack_ids_gen, _ACK_IDS_BATCH_SIZE + modify_deadline_ack_ids=list( + itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE) ), - modify_deadline_seconds=itertools.islice( - deadline_seconds_gen, _ACK_IDS_BATCH_SIZE + modify_deadline_seconds=list( + itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE) ), ack_reqs_dict=ack_reqs_dict, default_deadline=None, From 87dd40a093a88ad1fb92106747d1ac9f36f0cfd3 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 10 Nov 2022 12:17:21 -0500 Subject: [PATCH 17/19] fix mypy checks --- google/cloud/pubsub_v1/subscriber/_protocol/leaser.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index fccbcc25e..ce7065dc0 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -205,6 +205,7 @@ def maintain_leases(self) -> None: start_time = time.time() # If exactly once delivery is enabled, we should drop all expired ack_ids from lease management. if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids): + assert self._manager.dispatcher is not None self._manager.dispatcher.drop( [ requests.DropRequest( @@ -213,6 +214,7 @@ def maintain_leases(self) -> None: leased_messages.get(ack_id).ordering_key, ) for ack_id in expired_ack_ids + if ack_id in leased_messages ] ) # Now wait an appropriate period of time and do this again. From dc8662f92ea39b73c6ff562e6d3457958f232c5a Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 10 Nov 2022 12:45:52 -0500 Subject: [PATCH 18/19] fix mypy checks --- google/cloud/pubsub_v1/subscriber/_protocol/leaser.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index ce7065dc0..ed831839b 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -210,8 +210,8 @@ def maintain_leases(self) -> None: [ requests.DropRequest( ack_id, - leased_messages.get(ack_id).size, - leased_messages.get(ack_id).ordering_key, + leased_messages.get(ack_id).size, # type: ignore + leased_messages.get(ack_id).ordering_key, # type: ignore ) for ack_id in expired_ack_ids if ack_id in leased_messages From f1a7f632529d2e417c8e636ff158b9ec82c933cc Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 10 Nov 2022 12:46:19 -0500 Subject: [PATCH 19/19] fix lint --- google/cloud/pubsub_v1/subscriber/_protocol/leaser.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index ed831839b..16018e384 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -210,8 +210,8 @@ def maintain_leases(self) -> None: [ requests.DropRequest( ack_id, - leased_messages.get(ack_id).size, # type: ignore - leased_messages.get(ack_id).ordering_key, # type: ignore + leased_messages.get(ack_id).size, # type: ignore + leased_messages.get(ack_id).ordering_key, # type: ignore ) for ack_id in expired_ack_ids if ack_id in leased_messages