diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 7476e887b..3159ba848 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -54,11 +54,22 @@ """The load threshold below which to resume the incoming message stream.""" -def _maybe_wrap_exception(exception): - """Wraps a gRPC exception class, if needed.""" - if isinstance(exception, grpc.RpcError): - return exceptions.from_grpc_error(exception) - return exception +def _wrap_as_exception(maybe_exception): + """Wrap an object as a Python exception, if needed. + + Args: + maybe_exception (Any): The object to wrap, usually a gRPC exception class. + + Returns: + The argument itself if an instance of ``BaseException``, otherwise + the argument represented as an instance of ``Exception`` (sub)class. + """ + if isinstance(maybe_exception, grpc.RpcError): + return exceptions.from_grpc_error(maybe_exception) + elif isinstance(maybe_exception, BaseException): + return maybe_exception + + return Exception(maybe_exception) def _wrap_callback_errors(callback, on_callback_error, message): @@ -656,7 +667,7 @@ def _should_recover(self, exception): Will be :data:`True` if the ``exception`` is "acceptable", i.e. in a list of retryable / idempotent exceptions. """ - exception = _maybe_wrap_exception(exception) + exception = _wrap_as_exception(exception) # If this is in the list of idempotent exceptions, then we want to # recover. if isinstance(exception, _RETRYABLE_STREAM_ERRORS): @@ -678,7 +689,7 @@ def _should_terminate(self, exception): Will be :data:`True` if the ``exception`` is "acceptable", i.e. in a list of terminating exceptions. """ - exception = _maybe_wrap_exception(exception) + exception = _wrap_as_exception(exception) if isinstance(exception, _TERMINATING_STREAM_ERRORS): _LOGGER.info("Observed terminating stream error %s", exception) return True @@ -697,9 +708,9 @@ def _on_rpc_done(self, future): background consumer and preventing it from being ``joined()``. """ _LOGGER.info("RPC termination has signaled streaming pull manager shutdown.") - future = _maybe_wrap_exception(future) + error = _wrap_as_exception(future) thread = threading.Thread( - name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": future} + name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": error} ) thread.daemon = True thread.start() diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index d1bac4335..d3eb4351b 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -44,11 +44,13 @@ mock.create_autospec(grpc.RpcError, instance=True), exceptions.GoogleAPICallError, ), + ({"error": "RPC terminated"}, Exception), + ("something broke", Exception), ], ) -def test__maybe_wrap_exception(exception, expected_cls): +def test__wrap_as_exception(exception, expected_cls): assert isinstance( - streaming_pull_manager._maybe_wrap_exception(exception), expected_cls + streaming_pull_manager._wrap_as_exception(exception), expected_cls ) @@ -948,8 +950,12 @@ def test__on_rpc_done(thread): manager._on_rpc_done(mock.sentinel.error) thread.assert_called_once_with( - name=mock.ANY, target=manager.close, kwargs={"reason": mock.sentinel.error} + name=mock.ANY, target=manager.close, kwargs={"reason": mock.ANY} ) + _, kwargs = thread.call_args + reason = kwargs["kwargs"]["reason"] + assert isinstance(reason, Exception) + assert reason.args == (mock.sentinel.error,) # Exception wraps the original error def test_activate_ordering_keys():