|
44 | 44 | exceptions.GatewayTimeout, |
45 | 45 | exceptions.Aborted, |
46 | 46 | ) |
| 47 | +_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,) |
47 | 48 | _MAX_LOAD = 1.0 |
48 | 49 | """The load threshold above which to pause the incoming message stream.""" |
49 | 50 |
|
@@ -410,6 +411,7 @@ def open(self, callback, on_callback_error): |
410 | 411 | start_rpc=self._client.api.streaming_pull, |
411 | 412 | initial_request=get_initial_request, |
412 | 413 | should_recover=self._should_recover, |
| 414 | + should_terminate=self._should_terminate, |
413 | 415 | throttle_reopen=True, |
414 | 416 | ) |
415 | 417 | self._rpc.add_done_callback(self._on_rpc_done) |
@@ -598,6 +600,26 @@ def _should_recover(self, exception): |
598 | 600 | _LOGGER.info("Observed non-recoverable stream error %s", exception) |
599 | 601 | return False |
600 | 602 |
|
| 603 | + def _should_terminate(self, exception): |
| 604 | + """Determine if an error on the RPC stream should be terminated. |
| 605 | +
|
| 606 | + If the exception is one of the terminating exceptions, this will signal |
| 607 | + to the consumer thread that it should terminate. |
| 608 | +
|
| 609 | + This will cause the stream to exit when it returns :data:`True`. |
| 610 | +
|
| 611 | + Returns: |
| 612 | + bool: Indicates if the caller should terminate or attempt recovery. |
| 613 | + Will be :data:`True` if the ``exception`` is "acceptable", i.e. |
| 614 | + in a list of terminating exceptions. |
| 615 | + """ |
| 616 | + exception = _maybe_wrap_exception(exception) |
| 617 | + if isinstance(exception, _TERMINATING_STREAM_ERRORS): |
| 618 | + _LOGGER.info("Observed terminating stream error %s", exception) |
| 619 | + return True |
| 620 | + _LOGGER.info("Observed non-terminating stream error %s", exception) |
| 621 | + return False |
| 622 | + |
601 | 623 | def _on_rpc_done(self, future): |
602 | 624 | """Triggered whenever the underlying RPC terminates without recovery. |
603 | 625 |
|
|
0 commit comments