From e782d97f3afcaf68e5583fc02f072171207055f0 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 24 Jul 2020 15:05:01 +0200 Subject: [PATCH] fix: convert all RPC error types to exceptions --- .../_protocol/streaming_pull_manager.py | 29 +++++++++++++------ .../subscriber/test_streaming_pull_manager.py | 12 ++++++-- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 4e3f24933..933ee6a3e 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -53,11 +53,22 @@ """The load threshold below which to resume the incoming message stream.""" -def _maybe_wrap_exception(exception): - """Wraps a gRPC exception class, if needed.""" - if isinstance(exception, grpc.RpcError): - return exceptions.from_grpc_error(exception) - return exception +def _wrap_as_exception(maybe_exception): + """Wrap an object as a Python exception, if needed. + + Args: + maybe_exception (Any): The object to wrap, usually a gRPC exception class. + + Returns: + The argument itself if an instance of ``BaseException``, otherwise + the argument represented as an instance of ``Exception`` (sub)class. + """ + if isinstance(maybe_exception, grpc.RpcError): + return exceptions.from_grpc_error(maybe_exception) + elif isinstance(maybe_exception, BaseException): + return maybe_exception + + return Exception(maybe_exception) def _wrap_callback_errors(callback, on_callback_error, message): @@ -651,7 +662,7 @@ def _should_recover(self, exception): Will be :data:`True` if the ``exception`` is "acceptable", i.e. in a list of retryable / idempotent exceptions. """ - exception = _maybe_wrap_exception(exception) + exception = _wrap_as_exception(exception) # If this is in the list of idempotent exceptions, then we want to # recover. if isinstance(exception, _RETRYABLE_STREAM_ERRORS): @@ -673,7 +684,7 @@ def _should_terminate(self, exception): Will be :data:`True` if the ``exception`` is "acceptable", i.e. in a list of terminating exceptions. """ - exception = _maybe_wrap_exception(exception) + exception = _wrap_as_exception(exception) if isinstance(exception, _TERMINATING_STREAM_ERRORS): _LOGGER.info("Observed terminating stream error %s", exception) return True @@ -692,9 +703,9 @@ def _on_rpc_done(self, future): background consumer and preventing it from being ``joined()``. """ _LOGGER.info("RPC termination has signaled streaming pull manager shutdown.") - future = _maybe_wrap_exception(future) + error = _wrap_as_exception(future) thread = threading.Thread( - name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": future} + name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": error} ) thread.daemon = True thread.start() diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 3f2881df6..3a26a5423 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -44,11 +44,13 @@ mock.create_autospec(grpc.RpcError, instance=True), exceptions.GoogleAPICallError, ), + ({"error": "RPC terminated"}, Exception), + ("something broke", Exception), ], ) -def test__maybe_wrap_exception(exception, expected_cls): +def test__wrap_as_exception(exception, expected_cls): assert isinstance( - streaming_pull_manager._maybe_wrap_exception(exception), expected_cls + streaming_pull_manager._wrap_as_exception(exception), expected_cls ) @@ -956,8 +958,12 @@ def test__on_rpc_done(thread): manager._on_rpc_done(mock.sentinel.error) thread.assert_called_once_with( - name=mock.ANY, target=manager.close, kwargs={"reason": mock.sentinel.error} + name=mock.ANY, target=manager.close, kwargs={"reason": mock.ANY} ) + _, kwargs = thread.call_args + reason = kwargs["kwargs"]["reason"] + assert isinstance(reason, Exception) + assert reason.args == (mock.sentinel.error,) # Exception wraps the original error def test_activate_ordering_keys():