Skip to content

Commit bbd09fd

Browse files
authored
fix(pubsub): add 'StreamingPullManager._should_terminate' (googleapis#9335)
Toward clean shutdown of the subscriber's background thread. See: googleapis#8616.
1 parent 63b70d4 commit bbd09fd

File tree

2 files changed

+40
-0
lines changed

2 files changed

+40
-0
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
exceptions.GatewayTimeout,
4545
exceptions.Aborted,
4646
)
47+
_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,)
4748
_MAX_LOAD = 1.0
4849
"""The load threshold above which to pause the incoming message stream."""
4950

@@ -410,6 +411,7 @@ def open(self, callback, on_callback_error):
410411
start_rpc=self._client.api.streaming_pull,
411412
initial_request=get_initial_request,
412413
should_recover=self._should_recover,
414+
should_terminate=self._should_terminate,
413415
throttle_reopen=True,
414416
)
415417
self._rpc.add_done_callback(self._on_rpc_done)
@@ -598,6 +600,26 @@ def _should_recover(self, exception):
598600
_LOGGER.info("Observed non-recoverable stream error %s", exception)
599601
return False
600602

603+
def _should_terminate(self, exception):
604+
"""Determine if an error on the RPC stream should be terminated.
605+
606+
If the exception is one of the terminating exceptions, this will signal
607+
to the consumer thread that it should terminate.
608+
609+
This will cause the stream to exit when it returns :data:`True`.
610+
611+
Returns:
612+
bool: Indicates if the caller should terminate or attempt recovery.
613+
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
614+
in a list of terminating exceptions.
615+
"""
616+
exception = _maybe_wrap_exception(exception)
617+
if isinstance(exception, _TERMINATING_STREAM_ERRORS):
618+
_LOGGER.info("Observed terminating stream error %s", exception)
619+
return True
620+
_LOGGER.info("Observed non-terminating stream error %s", exception)
621+
return False
622+
601623
def _on_rpc_done(self, future):
602624
"""Triggered whenever the underlying RPC terminates without recovery.
603625

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
430430
start_rpc=manager._client.api.streaming_pull,
431431
initial_request=mock.ANY,
432432
should_recover=manager._should_recover,
433+
should_terminate=manager._should_terminate,
433434
throttle_reopen=True,
434435
)
435436
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
@@ -724,6 +725,23 @@ def test__should_recover_false():
724725
assert manager._should_recover(exc) is False
725726

726727

728+
def test__should_terminate_true():
729+
manager = make_manager()
730+
731+
details = "Cancelled. Go away, before I taunt you a second time."
732+
exc = exceptions.Cancelled(details)
733+
734+
assert manager._should_terminate(exc) is True
735+
736+
737+
def test__should_terminate_false():
738+
manager = make_manager()
739+
740+
exc = TypeError("wahhhhhh")
741+
742+
assert manager._should_terminate(exc) is False
743+
744+
727745
@mock.patch("threading.Thread", autospec=True)
728746
def test__on_rpc_done(thread):
729747
manager = make_manager()

0 commit comments

Comments
 (0)