Skip to content

Commit 89c671a

Browse files
authored
fix: convert all RPC error types to exceptions (googleapis#170)
1 parent f34e5b5 commit 89c671a

2 files changed

Lines changed: 29 additions & 12 deletions

File tree

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,22 @@
5454
"""The load threshold below which to resume the incoming message stream."""
5555

5656

57-
def _maybe_wrap_exception(exception):
58-
"""Wraps a gRPC exception class, if needed."""
59-
if isinstance(exception, grpc.RpcError):
60-
return exceptions.from_grpc_error(exception)
61-
return exception
57+
def _wrap_as_exception(maybe_exception):
58+
"""Wrap an object as a Python exception, if needed.
59+
60+
Args:
61+
maybe_exception (Any): The object to wrap, usually a gRPC exception class.
62+
63+
Returns:
64+
The argument itself if an instance of ``BaseException``, otherwise
65+
the argument represented as an instance of ``Exception`` (sub)class.
66+
"""
67+
if isinstance(maybe_exception, grpc.RpcError):
68+
return exceptions.from_grpc_error(maybe_exception)
69+
elif isinstance(maybe_exception, BaseException):
70+
return maybe_exception
71+
72+
return Exception(maybe_exception)
6273

6374

6475
def _wrap_callback_errors(callback, on_callback_error, message):
@@ -656,7 +667,7 @@ def _should_recover(self, exception):
656667
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
657668
in a list of retryable / idempotent exceptions.
658669
"""
659-
exception = _maybe_wrap_exception(exception)
670+
exception = _wrap_as_exception(exception)
660671
# If this is in the list of idempotent exceptions, then we want to
661672
# recover.
662673
if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
@@ -678,7 +689,7 @@ def _should_terminate(self, exception):
678689
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
679690
in a list of terminating exceptions.
680691
"""
681-
exception = _maybe_wrap_exception(exception)
692+
exception = _wrap_as_exception(exception)
682693
if isinstance(exception, _TERMINATING_STREAM_ERRORS):
683694
_LOGGER.info("Observed terminating stream error %s", exception)
684695
return True
@@ -697,9 +708,9 @@ def _on_rpc_done(self, future):
697708
background consumer and preventing it from being ``joined()``.
698709
"""
699710
_LOGGER.info("RPC termination has signaled streaming pull manager shutdown.")
700-
future = _maybe_wrap_exception(future)
711+
error = _wrap_as_exception(future)
701712
thread = threading.Thread(
702-
name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": future}
713+
name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": error}
703714
)
704715
thread.daemon = True
705716
thread.start()

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@
4444
mock.create_autospec(grpc.RpcError, instance=True),
4545
exceptions.GoogleAPICallError,
4646
),
47+
({"error": "RPC terminated"}, Exception),
48+
("something broke", Exception),
4749
],
4850
)
49-
def test__maybe_wrap_exception(exception, expected_cls):
51+
def test__wrap_as_exception(exception, expected_cls):
5052
assert isinstance(
51-
streaming_pull_manager._maybe_wrap_exception(exception), expected_cls
53+
streaming_pull_manager._wrap_as_exception(exception), expected_cls
5254
)
5355

5456

@@ -948,8 +950,12 @@ def test__on_rpc_done(thread):
948950
manager._on_rpc_done(mock.sentinel.error)
949951

950952
thread.assert_called_once_with(
951-
name=mock.ANY, target=manager.close, kwargs={"reason": mock.sentinel.error}
953+
name=mock.ANY, target=manager.close, kwargs={"reason": mock.ANY}
952954
)
955+
_, kwargs = thread.call_args
956+
reason = kwargs["kwargs"]["reason"]
957+
assert isinstance(reason, Exception)
958+
assert reason.args == (mock.sentinel.error,) # Exception wraps the original error
953959

954960

955961
def test_activate_ordering_keys():

0 commit comments

Comments
 (0)