5454"""The load threshold below which to resume the incoming message stream."""
5555
5656
57- def _maybe_wrap_exception (exception ):
58- """Wraps a gRPC exception class, if needed."""
59- if isinstance (exception , grpc .RpcError ):
60- return exceptions .from_grpc_error (exception )
61- return exception
57+ def _wrap_as_exception (maybe_exception ):
58+ """Wrap an object as a Python exception, if needed.
59+
60+ Args:
61+ maybe_exception (Any): The object to wrap, usually a gRPC exception class.
62+
63+ Returns:
64+ The argument itself if an instance of ``BaseException``, otherwise
65+ the argument represented as an instance of ``Exception`` (sub)class.
66+ """
67+ if isinstance (maybe_exception , grpc .RpcError ):
68+ return exceptions .from_grpc_error (maybe_exception )
69+ elif isinstance (maybe_exception , BaseException ):
70+ return maybe_exception
71+
72+ return Exception (maybe_exception )
6273
6374
6475def _wrap_callback_errors (callback , on_callback_error , message ):
@@ -656,7 +667,7 @@ def _should_recover(self, exception):
656667 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
657668 in a list of retryable / idempotent exceptions.
658669 """
659- exception = _maybe_wrap_exception (exception )
670+ exception = _wrap_as_exception (exception )
660671 # If this is in the list of idempotent exceptions, then we want to
661672 # recover.
662673 if isinstance (exception , _RETRYABLE_STREAM_ERRORS ):
@@ -678,7 +689,7 @@ def _should_terminate(self, exception):
678689 Will be :data:`True` if the ``exception`` is "acceptable", i.e.
679690 in a list of terminating exceptions.
680691 """
681- exception = _maybe_wrap_exception (exception )
692+ exception = _wrap_as_exception (exception )
682693 if isinstance (exception , _TERMINATING_STREAM_ERRORS ):
683694 _LOGGER .info ("Observed terminating stream error %s" , exception )
684695 return True
@@ -697,9 +708,9 @@ def _on_rpc_done(self, future):
697708 background consumer and preventing it from being ``joined()``.
698709 """
699710 _LOGGER .info ("RPC termination has signaled streaming pull manager shutdown." )
700- future = _maybe_wrap_exception (future )
711+ error = _wrap_as_exception (future )
701712 thread = threading .Thread (
702- name = _RPC_ERROR_THREAD_NAME , target = self .close , kwargs = {"reason" : future }
713+ name = _RPC_ERROR_THREAD_NAME , target = self .close , kwargs = {"reason" : error }
703714 )
704715 thread .daemon = True
705716 thread .start ()
0 commit comments