Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit a91bed8

Browse files
authored
feat: retry temporary GRPC statuses for ack/modack/nack when exactly-once delivery is enabled (#607)
We need to do this because [only UNAVAILABLE](https://github.com/googleapis/googleapis/blob/eb0700c6f29ca94f460307f201eb605744f055cb/google/pubsub/v1/pubsub_grpc_service_config.json#L221) is retried for acks/modacks/nacks at the GRPC level. With this CL, we extend the higher-level, manual retry mechanism for these RPCs to all the ones considered temporary for the Publish RPC. The new list of retriable codes is for these RPCs when exactly-once delivery is enabled is: DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, ABORTED, INTERNAL, UNAVAILABLE.
1 parent de0bbce commit a91bed8

File tree

2 files changed

+58
-5
lines changed

2 files changed

+58
-5
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@
7575
a subscription. We do this to reduce premature ack expiration.
7676
"""
7777

78+
_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = {
79+
code_pb2.DEADLINE_EXCEEDED,
80+
code_pb2.RESOURCE_EXHAUSTED,
81+
code_pb2.ABORTED,
82+
code_pb2.INTERNAL,
83+
code_pb2.UNAVAILABLE,
84+
}
85+
7886

7987
def _wrap_as_exception(maybe_exception: Any) -> BaseException:
8088
"""Wrap an object as a Python exception, if needed.
@@ -163,6 +171,8 @@ def _process_requests(
163171
requests_completed = []
164172
requests_to_retry = []
165173
for ack_id in ack_reqs_dict:
174+
# Handle special errors returned for ack/modack RPCs via the ErrorInfo
175+
# sidecar metadata when exactly-once delivery is enabled.
166176
if errors_dict and ack_id in errors_dict:
167177
exactly_once_error = errors_dict[ack_id]
168178
if exactly_once_error.startswith("TRANSIENT_"):
@@ -176,9 +186,14 @@ def _process_requests(
176186
future = ack_reqs_dict[ack_id].future
177187
future.set_exception(exc)
178188
requests_completed.append(ack_reqs_dict[ack_id])
189+
# Temporary GRPC errors are retried
190+
elif (
191+
error_status
192+
and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS
193+
):
194+
requests_to_retry.append(ack_reqs_dict[ack_id])
195+
# Other GRPC errors are NOT retried
179196
elif error_status:
180-
# Only permanent errors are expected here b/c retriable errors are
181-
# retried at the lower, GRPC level.
182197
if error_status.code == code_pb2.PERMISSION_DENIED:
183198
exc = AcknowledgeError(AcknowledgeStatus.PERMISSION_DENIED, info=None)
184199
elif error_status.code == code_pb2.FAILED_PRECONDITION:
@@ -188,11 +203,13 @@ def _process_requests(
188203
future = ack_reqs_dict[ack_id].future
189204
future.set_exception(exc)
190205
requests_completed.append(ack_reqs_dict[ack_id])
206+
# Since no error occurred, requests with futures are completed successfully.
191207
elif ack_reqs_dict[ack_id].future:
192208
future = ack_reqs_dict[ack_id].future
193209
# success
194210
future.set_result(AcknowledgeStatus.SUCCESS)
195211
requests_completed.append(ack_reqs_dict[ack_id])
212+
# All other requests are considered completed.
196213
else:
197214
requests_completed.append(ack_reqs_dict[ack_id])
198215

@@ -580,7 +597,9 @@ def send_unary_ack(
580597
ack_errors_dict = _get_ack_errors(exc)
581598
except exceptions.RetryError as exc:
582599
status = status_pb2.Status()
583-
status.code = code_pb2.DEADLINE_EXCEEDED
600+
# Choose a non-retriable error code so the futures fail with
601+
# exceptions.
602+
status.code = code_pb2.UNKNOWN
584603
# Makes sure to complete futures so they don't block forever.
585604
_process_requests(status, ack_reqs_dict, None)
586605
_LOGGER.debug(
@@ -634,7 +653,9 @@ def send_unary_modack(
634653
modack_errors_dict = _get_ack_errors(exc)
635654
except exceptions.RetryError as exc:
636655
status = status_pb2.Status()
637-
status.code = code_pb2.DEADLINE_EXCEEDED
656+
# Choose a non-retriable error code so the futures fail with
657+
# exceptions.
658+
status.code = code_pb2.UNKNOWN
638659
# Makes sure to complete futures so they don't block forever.
639660
_process_requests(status, ack_reqs_dict, None)
640661
_LOGGER.debug(

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1735,7 +1735,7 @@ def test_process_requests_permanent_error_raises_exception():
17351735
assert not requests_to_retry
17361736

17371737

1738-
def test_process_requests_transient_error_returns_request():
1738+
def test_process_requests_transient_error_returns_request_for_retrying():
17391739
# a transient error returns the request in `requests_to_retry`
17401740
future = futures.Future()
17411741
ack_reqs_dict = {
@@ -1772,6 +1772,38 @@ def test_process_requests_unknown_error_raises_exception():
17721772
assert not requests_to_retry
17731773

17741774

1775+
def test_process_requests_retriable_error_status_returns_request_for_retrying():
1776+
# a retriable error status returns the request in `requests_to_retry`
1777+
retriable_errors = [
1778+
code_pb2.DEADLINE_EXCEEDED,
1779+
code_pb2.RESOURCE_EXHAUSTED,
1780+
code_pb2.ABORTED,
1781+
code_pb2.INTERNAL,
1782+
code_pb2.UNAVAILABLE,
1783+
]
1784+
1785+
for retriable_error in retriable_errors:
1786+
future = futures.Future()
1787+
ack_reqs_dict = {
1788+
"ackid1": requests.AckRequest(
1789+
ack_id="ackid1",
1790+
byte_size=0,
1791+
time_to_ack=20,
1792+
ordering_key="",
1793+
future=future,
1794+
)
1795+
}
1796+
st = status_pb2.Status()
1797+
st.code = retriable_error
1798+
(
1799+
requests_completed,
1800+
requests_to_retry,
1801+
) = streaming_pull_manager._process_requests(st, ack_reqs_dict, None)
1802+
assert not requests_completed
1803+
assert requests_to_retry[0].ack_id == "ackid1"
1804+
assert not future.done()
1805+
1806+
17751807
def test_process_requests_permission_denied_error_status_raises_exception():
17761808
# a permission-denied error status raises an exception
17771809
future = futures.Future()

0 commit comments

Comments
 (0)