Skip to content

Commit 76de334

Browse files
authored
Ensure SPM methods check that 'self._consumer' is not None before use. (googleapis#5758)
Closes googleapis#5751.
1 parent 8ace982 commit 76de334

2 files changed

Lines changed: 23 additions & 5 deletions

File tree

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,11 @@ def add_close_callback(self, callback):
208208

209209
def maybe_pause_consumer(self):
210210
"""Check the current load and pause the consumer if needed."""
211-
if self.load >= 1.0 and not self._consumer.is_paused:
212-
_LOGGER.debug(
213-
'Message backlog over load at %.2f, pausing.', self.load)
214-
self._consumer.pause()
211+
if self.load >= 1.0:
212+
if self._consumer is not None and not self._consumer.is_paused:
213+
_LOGGER.debug(
214+
'Message backlog over load at %.2f, pausing.', self.load)
215+
self._consumer.pause()
215216

216217
def maybe_resume_consumer(self):
217218
"""Check the current load and resume the consumer if needed."""
@@ -221,7 +222,7 @@ def maybe_resume_consumer(self):
221222
# In order to not thrash too much, require us to have passed below
222223
# the resume threshold (80% by default) of each flow control setting
223224
# before restarting.
224-
if not self._consumer.is_paused:
225+
if self._consumer is None or not self._consumer.is_paused:
225226
return
226227

227228
if self.load < self.flow_control.resume_threshold:

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,17 @@ def test_ack_deadline():
111111
assert manager.ack_deadline == 20
112112

113113

114+
def test_maybe_pause_consumer_wo_consumer_set():
115+
manager = make_manager(
116+
flow_control=types.FlowControl(max_messages=10, max_bytes=1000))
117+
manager.maybe_pause_consumer() # no raise
118+
# Ensure load > 1
119+
_leaser = manager._leaser = mock.create_autospec(leaser.Leaser)
120+
_leaser.message_count = 100
121+
_leaser.bytes = 10000
122+
manager.maybe_pause_consumer() # no raise
123+
124+
114125
def test_lease_load_and_pause():
115126
manager = make_manager(
116127
flow_control=types.FlowControl(max_messages=10, max_bytes=1000))
@@ -177,6 +188,12 @@ def test_resume_not_paused():
177188
manager._consumer.resume.assert_not_called()
178189

179190

191+
def test_maybe_resume_consumer_wo_consumer_set():
192+
manager = make_manager(
193+
flow_control=types.FlowControl(max_messages=10, max_bytes=1000))
194+
manager.maybe_resume_consumer() # no raise
195+
196+
180197
def test_send_unary():
181198
manager = make_manager()
182199
manager._UNARY_REQUESTS = True

0 commit comments

Comments
 (0)