Skip to content

Commit e00f6b3

Browse files
authored
Fix pubsub Streaming Pull shutdown on RetryError (googleapis#7863)
If a RetryError occurs, it is time to stop waiting for the underlying gRPC channel to recover from a transient failure, and a clean shutdown needs to be triggered. This commit assures that this indeed happens (it used to happen on terminal channel errors only).
1 parent fc3b22e commit e00f6b3

2 files changed

Lines changed: 35 additions & 2 deletions

File tree

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,11 @@ def _send_unary_request(self, request):
262262
_LOGGER.debug("Sent request(s) over unary RPC.")
263263

264264
def send(self, request):
265-
"""Queue a request to be sent to the RPC."""
265+
"""Queue a request to be sent to the RPC.
266+
267+
If a RetryError occurs, the manager shutdown is triggered, and the
268+
error is re-raised.
269+
"""
266270
if self._UNARY_REQUESTS:
267271
try:
268272
self._send_unary_request(request)
@@ -272,6 +276,17 @@ def send(self, request):
272276
"non-fatal as stream requests are best-effort.",
273277
exc_info=True,
274278
)
279+
except exceptions.RetryError as exc:
280+
_LOGGER.debug(
281+
"RetryError while sending unary RPC. Waiting on a transient "
282+
"error resolution for too long, will now trigger shutdown.",
283+
exc_info=False,
284+
)
285+
# The underlying channel has been suffering from a retryable error
286+
# for too long, time to give up and shut the streaming pull down.
287+
self._on_rpc_done(exc)
288+
raise
289+
275290
else:
276291
self._rpc.send(request)
277292

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ def test_send_unary_empty():
245245
manager._client.modify_ack_deadline.assert_not_called()
246246

247247

248-
def test_send_unary_error(caplog):
248+
def test_send_unary_api_call_error(caplog):
249249
caplog.set_level(logging.DEBUG)
250250

251251
manager = make_manager()
@@ -259,6 +259,24 @@ def test_send_unary_error(caplog):
259259
assert "The front fell off" in caplog.text
260260

261261

262+
def test_send_unary_retry_error(caplog):
263+
caplog.set_level(logging.DEBUG)
264+
265+
manager, _, _, _, _, _ = make_running_manager()
266+
manager._UNARY_REQUESTS = True
267+
268+
error = exceptions.RetryError(
269+
"Too long a transient error", cause=Exception("Out of time!")
270+
)
271+
manager._client.acknowledge.side_effect = error
272+
273+
with pytest.raises(exceptions.RetryError):
274+
manager.send(types.StreamingPullRequest(ack_ids=["ack_id1", "ack_id2"]))
275+
276+
assert "RetryError while sending unary RPC" in caplog.text
277+
assert "signaled streaming pull manager shutdown" in caplog.text
278+
279+
262280
def test_send_streaming():
263281
manager = make_manager()
264282
manager._UNARY_REQUESTS = False

0 commit comments

Comments
 (0)