From 17feea5783f3a878b4dcfb3a8570585f7637378f Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo <63511057+acocuzzo@users.noreply.github.com> Date: Wed, 5 Oct 2022 21:40:12 -0400 Subject: [PATCH 1/7] Fix: Silence invalid_ack_id warnings for receipt modacks (#798) --- .../_protocol/streaming_pull_manager.py | 20 +++-- .../subscriber/test_streaming_pull_manager.py | 85 ++++++++++++++++++- 2 files changed, 97 insertions(+), 8 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 21c1bab7b..89dc93e74 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -989,7 +989,7 @@ def _get_initial_request( return request def _send_lease_modacks( - self, ack_ids: Iterable[str], ack_deadline: float + self, ack_ids: Iterable[str], ack_deadline: float, warn_on_invalid=True ) -> List[str]: exactly_once_enabled = False with self._exactly_once_enabled_lock: @@ -1010,10 +1010,14 @@ def _send_lease_modacks( assert req.future is not None req.future.result() except AcknowledgeError as ack_error: - _LOGGER.warning( - "AcknowledgeError when lease-modacking a message.", - exc_info=True, - ) + if ( + ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID + or warn_on_invalid + ): + _LOGGER.warning( + "AcknowledgeError when lease-modacking a message.", + exc_info=True, + ) if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID: expired_ack_ids.append(req.ack_id) return expired_ack_ids @@ -1078,7 +1082,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: # modack the messages we received, as this tells the server that we've # received them. ack_id_gen = (message.ack_id for message in received_messages) - expired_ack_ids = set(self._send_lease_modacks(ack_id_gen, self.ack_deadline)) + expired_ack_ids = set( + self._send_lease_modacks( + ack_id_gen, self.ack_deadline, warn_on_invalid=False + ) + ) with self._pause_resume_lock: assert self._scheduler is not None diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index b4f76f20b..1f28b3f40 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1846,7 +1846,7 @@ def test__on_response_disable_exactly_once(): assert manager._stream_ack_deadline == 60 -def test__on_response_exactly_once_immediate_modacks_fail(): +def test__on_response_exactly_once_immediate_modacks_fail(caplog): manager, _, dispatcher, leaser, _, scheduler = make_running_manager() manager._callback = mock.sentinel.callback @@ -1890,7 +1890,8 @@ def complete_futures_with_error(*args, **kwargs): fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10) - manager._on_response(response) + with caplog.at_level(logging.WARNING): + manager._on_response(response) # The second messages should be scheduled, and not the first. @@ -1902,6 +1903,14 @@ def complete_futures_with_error(*args, **kwargs): assert call_args[1].message_id == "2" assert manager._messages_on_hold.size == 0 + + expected_warnings = [ + record.message.lower() + for record in caplog.records + if "AcknowledgeError when lease-modacking a message." in record.message + ] + assert len(expected_warnings) == 1 + # No messages available assert manager._messages_on_hold.get() is None @@ -1909,6 +1918,78 @@ def complete_futures_with_error(*args, **kwargs): assert manager.load == 0.001 +def test__on_response_exactly_once_immediate_modacks_fail_non_invalid(caplog): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + + def complete_futures_with_error(*args, **kwargs): + modack_requests = args[0] + for req in modack_requests: + if req.ack_id == "fack": + req.future.set_exception( + subscriber_exceptions.AcknowledgeError( + subscriber_exceptions.AcknowledgeStatus.OTHER, None + ) + ) + else: + req.future.set_exception( + subscriber_exceptions.AcknowledgeError( + subscriber_exceptions.AcknowledgeStatus.SUCCESS, None + ) + ) + + dispatcher.modify_ack_deadline.side_effect = complete_futures_with_error + + # Set up the messages. + response = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="fack", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ), + gapic_types.ReceivedMessage( + ack_id="good", + message=gapic_types.PubsubMessage(data=b"foo", message_id="2"), + ), + ], + subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties( + exactly_once_delivery_enabled=True + ), + ) + + # Actually run the method and prove that modack and schedule are called in + # the expected way. + + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10) + + with caplog.at_level(logging.WARNING): + manager._on_response(response) + + # The second messages should be scheduled, and not the first. + + schedule_calls = scheduler.schedule.mock_calls + assert len(schedule_calls) == 2 + call_args = schedule_calls[0][1] + assert call_args[0] == mock.sentinel.callback + assert isinstance(call_args[1], message.Message) + assert call_args[1].message_id == "1" + + assert manager._messages_on_hold.size == 0 + + expected_warnings = [ + record.message.lower() + for record in caplog.records + if "AcknowledgeError when lease-modacking a message." in record.message + ] + assert len(expected_warnings) == 2 + + # No messages available + assert manager._messages_on_hold.get() is None + + # do not add message + assert manager.load == 0.002 + + def test__should_recover_true(): manager = make_manager() From 9c79a1f93be08e4687f4afc5be9f9f19c7640514 Mon Sep 17 00:00:00 2001 From: WhiteSource Renovate Date: Thu, 6 Oct 2022 15:50:13 +0200 Subject: [PATCH 2/7] chore(deps): update dependency backoff to v2.2.1 (#797) Co-authored-by: Anna Cocuzzo <63511057+acocuzzo@users.noreply.github.com> --- samples/snippets/requirements-test.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt index 7f05516b7..f98cebbb5 100644 --- a/samples/snippets/requirements-test.txt +++ b/samples/snippets/requirements-test.txt @@ -1,4 +1,4 @@ -backoff==2.1.2 +backoff==2.2.1 pytest==7.1.3 mock==4.0.3 flaky==3.7.0 From fa235033481783c2ec378b2a26b223bdff206461 Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Fri, 7 Oct 2022 23:40:05 -0400 Subject: [PATCH 3/7] fix(deps): allow protobuf 3.19.5 (#801) * fix(deps): allow protobuf 3.19.5 * explicitly exclude protobuf 4.21.0 --- setup.py | 2 +- testing/constraints-3.7.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 19f092686..9fb99c111 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ "grpcio >= 1.38.1, < 2.0dev", # https://github.com/googleapis/python-pubsub/issues/414 "google-api-core[grpc] >= 1.32.0, <3.0.0dev,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*", "proto-plus >= 1.22.0, <2.0.0dev", - "protobuf >= 3.20.2, <5.0.0dev", + "protobuf>=3.19.5,<5.0.0dev,!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5", "grpc-google-iam-v1 >=0.12.4, <1.0.0dev", "grpcio-status >= 1.16.0", ] diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 22fedb9e7..08b242a12 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -9,4 +9,4 @@ google-api-core==1.32.0 libcst==0.3.10 proto-plus==1.22.0 grpc-google-iam-v1==0.12.4 -protobuf==3.20.2 +protobuf==3.19.5 From 4361e6735004a5600ee73979b99e6b9dd587c49b Mon Sep 17 00:00:00 2001 From: Jaume Marhuenda Date: Sat, 8 Oct 2022 14:26:38 -0400 Subject: [PATCH 4/7] fix: batch at most 1,000 ack ids per request (#802) Co-authored-by: Anna Cocuzzo <63511057+acocuzzo@users.noreply.github.com> Co-authored-by: Owl Bot --- .../cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | 10 +--------- tests/unit/pubsub_v1/subscriber/test_dispatcher.py | 4 ++-- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index c6dbf067f..ed2f5d217 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -59,16 +59,8 @@ """The maximum amount of time in seconds to wait for additional request items before processing the next batch of requests.""" -_ACK_IDS_BATCH_SIZE = 2500 +_ACK_IDS_BATCH_SIZE = 1000 """The maximum number of ACK IDs to send in a single StreamingPullRequest. - -The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per -acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164 -bytes, thus we cannot send more than o 524288/176 ~= 2979 ACK IDs in a single -StreamingPullRequest message. - -Accounting for some overhead, we should thus only send a maximum of 2500 ACK -IDs at a time. """ _MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS = 1 diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 91ee2a66d..a5107fe7b 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -437,7 +437,7 @@ def test_ack_splitting_large_payload(): dispatcher_.ack(items) calls = manager.send_unary_ack.call_args_list - assert len(calls) == 3 + assert len(calls) == 6 all_ack_ids = {item.ack_id for item in items} sent_ack_ids = collections.Counter() @@ -689,7 +689,7 @@ def test_modify_ack_deadline_splitting_large_payload(): dispatcher_.modify_ack_deadline(items) calls = manager.send_unary_modack.call_args_list - assert len(calls) == 3 + assert len(calls) == 6 all_ack_ids = {item.ack_id for item in items} sent_ack_ids = collections.Counter() From 9cde916d7c32a33682336591f7e9423d518b3431 Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Mon, 10 Oct 2022 15:30:17 -0400 Subject: [PATCH 5/7] chore: release as 2.3.10 (#803) Release-As: 2.3.10 We're going to use 2.3.9 for a break-fix release which contains an important fix, for which we have freeze exception approval. From 34f022b4ee62d53a193bc2babafad508e2f2540b Mon Sep 17 00:00:00 2001 From: Anthonios Partheniou Date: Mon, 10 Oct 2022 16:15:40 -0400 Subject: [PATCH 6/7] chore: release as 2.3.10 (#805) * chore: release 2.3.10 Release-As: 2.3.10 * mark 2.13.9 as taken --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9fb99c111..51c30a85b 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ name = "google-cloud-pubsub" description = "Google Cloud Pub/Sub API client library" -version = "2.13.8" +version = "2.13.9" # Should be one of: # 'Development Status :: 3 - Alpha' # 'Development Status :: 4 - Beta' From 523882f8fe0dbc93191575bcc26cc9007655f630 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Fri, 14 Oct 2022 11:15:53 -0400 Subject: [PATCH 7/7] chore(main): release 2.13.10 (#799) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- CHANGELOG.md | 14 ++++++++++++++ setup.py | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a7db66da..76c13f4f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,20 @@ [1]: https://pypi.org/project/google-cloud-pubsub/#history +## [2.13.10](https://github.com/googleapis/python-pubsub/compare/v2.13.8...v2.13.10) (2022-10-14) + + +### Bug Fixes + +* Batch at most 1,000 ack ids per request ([#802](https://github.com/googleapis/python-pubsub/issues/802)) ([4361e67](https://github.com/googleapis/python-pubsub/commit/4361e6735004a5600ee73979b99e6b9dd587c49b)) +* **deps:** Allow protobuf 3.19.5 ([#801](https://github.com/googleapis/python-pubsub/issues/801)) ([fa23503](https://github.com/googleapis/python-pubsub/commit/fa235033481783c2ec378b2a26b223bdff206461)) +* Silence invalid_ack_id warnings for receipt modacks ([#798](https://github.com/googleapis/python-pubsub/issues/798)) ([17feea5](https://github.com/googleapis/python-pubsub/commit/17feea5783f3a878b4dcfb3a8570585f7637378f)) + + +### Miscellaneous Chores + +* release as 2.13.10 ([34f022b](https://github.com/googleapis/python-pubsub/commit/34f022b4ee62d53a193bc2babafad508e2f2540b)) + ## [2.13.8](https://github.com/googleapis/python-pubsub/compare/v2.13.7...v2.13.8) (2022-10-03) diff --git a/setup.py b/setup.py index 51c30a85b..134b4d30e 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ name = "google-cloud-pubsub" description = "Google Cloud Pub/Sub API client library" -version = "2.13.9" +version = "2.13.10" # Should be one of: # 'Development Status :: 3 - Alpha' # 'Development Status :: 4 - Beta'