Skip to content

Commit db14d1d

Browse files
committed
Prevent unhandled background error on SPM shutdown
1 parent d9127d7 commit db14d1d

2 files changed

Lines changed: 44 additions & 4 deletions

File tree

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,31 @@ def drop(self, items):
125125
Args:
126126
items(Sequence[DropRequest]): The items to drop.
127127
"""
128-
self._manager.leaser.remove(items)
129-
self._manager.maybe_resume_consumer()
128+
# If the manager is in the process of being shut down, the leaser might
129+
# not exist on it anymore, thus we need to obtain our own reference to
130+
# it and check it for None to avoid errors.
131+
# Nevertheless, calling maybe_resume_consumer() in the manager shutdown
132+
# state is still fine, as it is effectively a no-op at that point.
133+
leaser = getattr(self._manager, "leaser", None)
134+
if leaser is not None:
135+
leaser.remove(items)
136+
self._manager.maybe_resume_consumer()
130137

131138
def lease(self, items):
132139
"""Add the given messages to lease management.
133140
134141
Args:
135142
items(Sequence[LeaseRequest]): The items to lease.
136143
"""
137-
self._manager.leaser.add(items)
138-
self._manager.maybe_pause_consumer()
144+
# If the manager is in the process of being shut down, the leaser might
145+
# not exist on it anymore, thus we need to obtain our own reference to
146+
# it and check it for None to avoid errors.
147+
# Nevertheless, calling maybe_pause_consumer() in the manager shutdown
148+
# state is still fine, as it is effectively a no-op at that point.
149+
leaser = getattr(self._manager, "leaser", None)
150+
if leaser is not None:
151+
leaser.add(items)
152+
self._manager.maybe_pause_consumer()
139153

140154
def modify_ack_deadline(self, items):
141155
"""Modify the ack deadline for the given messages.

pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ def test_lease():
108108
manager.maybe_pause_consumer.assert_called_once()
109109

110110

111+
def test_lease_no_manager_leaser():
112+
manager = mock.create_autospec(
113+
streaming_pull_manager.StreamingPullManager, instance=True
114+
)
115+
manager.leaser = None # simulate manager being shut down
116+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
117+
118+
items = [requests.LeaseRequest(ack_id="ack_id_string", byte_size=10)]
119+
dispatcher_.lease(items) # no error
120+
121+
manager.maybe_pause_consumer.assert_not_called()
122+
123+
111124
def test_drop():
112125
manager = mock.create_autospec(
113126
streaming_pull_manager.StreamingPullManager, instance=True
@@ -121,6 +134,19 @@ def test_drop():
121134
manager.maybe_resume_consumer.assert_called_once()
122135

123136

137+
def test_drop_no_manager_leaser():
138+
manager = mock.create_autospec(
139+
streaming_pull_manager.StreamingPullManager, instance=True
140+
)
141+
manager.leaser = None # simulate manager being shut down
142+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
143+
144+
items = [requests.DropRequest(ack_id="ack_id_string", byte_size=10)]
145+
dispatcher_.drop(items) # no error
146+
147+
manager.maybe_resume_consumer.assert_not_called()
148+
149+
124150
def test_nack():
125151
manager = mock.create_autospec(
126152
streaming_pull_manager.StreamingPullManager, instance=True

0 commit comments

Comments
 (0)