Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,23 @@ def close(self, reason=None):
_LOGGER.debug("Stopping scheduler.")
self._scheduler.shutdown()
self._scheduler = None

# Leaser and dispatcher reference each other through the shared
# StreamingPullManager instance, i.e. "self", thus do not set their
# references to None until both have been shut down.
#
# NOTE: Even if the dispatcher operates on an inactive leaser using
# the latter's add() and remove() methods, these have no impact on
# the stopped leaser (the leaser is never again re-started). Ditto
# for the manager's maybe_resume_consumer() / maybe_pause_consumer(),
# because the consumer gets shut down first.
_LOGGER.debug("Stopping leaser.")
self._leaser.stop()
self._leaser = None
_LOGGER.debug("Stopping dispatcher.")
self._dispatcher.stop()
self._dispatcher = None
# dispatcher terminated, OK to dispose the leaser reference now
self._leaser = None
_LOGGER.debug("Stopping heartbeater.")
self._heartbeater.stop()
self._heartbeater = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

import logging
import threading
import time
import types as stdlib_types

import mock
Expand Down Expand Up @@ -511,6 +513,50 @@ def test_close_idempotent():
assert scheduler.shutdown.call_count == 1


class FakeDispatcher(object):
def __init__(self, manager, error_callback):
self._manager = manager
self._error_callback = error_callback
self._thread = None
self._stop = False

def start(self):
self._thread = threading.Thread(target=self._do_work)
self._thread.daemon = True
self._thread.start()

def stop(self):
self._stop = True
self._thread.join()
self._thread = None

def _do_work(self):
while not self._stop:
try:
self._manager.leaser.add([mock.Mock()])
except Exception as exc:
self._error_callback(exc)
time.sleep(0.1)

# also try to interact with the leaser after the stop flag has been set
try:
self._manager.leaser.remove([mock.Mock()])
except Exception as exc:
self._error_callback(exc)


def test_close_no_dispatcher_error():
manager, _, _, _, _, _ = make_running_manager()
error_callback = mock.Mock(name="error_callback")
dispatcher = FakeDispatcher(manager=manager, error_callback=error_callback)
manager._dispatcher = dispatcher
dispatcher.start()

manager.close()

error_callback.assert_not_called()


def test_close_callbacks():
manager, _, _, _, _, _ = make_running_manager()

Expand Down