From 6ebb16a27d0c60dccb0a87e2058d5f1209f39cbc Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 24 May 2019 18:09:13 +0200 Subject: [PATCH] Prevent unhandled background error on SPM shutdown --- .../_protocol/streaming_pull_manager.py | 13 +++++- .../subscriber/test_streaming_pull_manager.py | 46 +++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 74008bc94fcb..159bdfd8d9e5 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -428,12 +428,23 @@ def close(self, reason=None): _LOGGER.debug("Stopping scheduler.") self._scheduler.shutdown() self._scheduler = None + + # Leaser and dispatcher reference each other through the shared + # StreamingPullManager instance, i.e. "self", thus do not set their + # references to None until both have been shut down. + # + # NOTE: Even if the dispatcher operates on an inactive leaser using + # the latter's add() and remove() methods, these have no impact on + # the stopped leaser (the leaser is never again re-started). Ditto + # for the manager's maybe_resume_consumer() / maybe_pause_consumer(), + # because the consumer gets shut down first. _LOGGER.debug("Stopping leaser.") self._leaser.stop() - self._leaser = None _LOGGER.debug("Stopping dispatcher.") self._dispatcher.stop() self._dispatcher = None + # dispatcher terminated, OK to dispose the leaser reference now + self._leaser = None _LOGGER.debug("Stopping heartbeater.") self._heartbeater.stop() self._heartbeater = None diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 22585675a324..849137f7af7a 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -13,6 +13,8 @@ # limitations under the License. import logging +import threading +import time import types as stdlib_types import mock @@ -511,6 +513,50 @@ def test_close_idempotent(): assert scheduler.shutdown.call_count == 1 +class FakeDispatcher(object): + def __init__(self, manager, error_callback): + self._manager = manager + self._error_callback = error_callback + self._thread = None + self._stop = False + + def start(self): + self._thread = threading.Thread(target=self._do_work) + self._thread.daemon = True + self._thread.start() + + def stop(self): + self._stop = True + self._thread.join() + self._thread = None + + def _do_work(self): + while not self._stop: + try: + self._manager.leaser.add([mock.Mock()]) + except Exception as exc: + self._error_callback(exc) + time.sleep(0.1) + + # also try to interact with the leaser after the stop flag has been set + try: + self._manager.leaser.remove([mock.Mock()]) + except Exception as exc: + self._error_callback(exc) + + +def test_close_no_dispatcher_error(): + manager, _, _, _, _, _ = make_running_manager() + error_callback = mock.Mock(name="error_callback") + dispatcher = FakeDispatcher(manager=manager, error_callback=error_callback) + manager._dispatcher = dispatcher + dispatcher.start() + + manager.close() + + error_callback.assert_not_called() + + def test_close_callbacks(): manager, _, _, _, _, _ = make_running_manager()