Skip to content

Commit 983829b

Browse files
authored
Un-confuse the "active"<->"stopped" relationship in Pub/Sub consumer. (googleapis#4562)
I messed this up during googleapis#4549, and as a result, there is no lease maintenance in the `0.29.3` release. Yay Friday releases!
1 parent af2e6ca commit 983829b

2 files changed

Lines changed: 18 additions & 15 deletions

File tree

pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,15 +173,15 @@ def ack(self, ack_id, time_to_ack=None, byte_size=None):
173173
if time_to_ack is not None:
174174
self.histogram.add(int(time_to_ack))
175175

176-
# Send the request to ack the message.
177-
# However, if the consumer is inactive, then queue the ack_id here
178-
# instead; it will be acked as part of the initial request when the
179-
# consumer is started again.
180176
if self._consumer.stopped.is_set():
177+
# If the consumer is inactive, then queue the ack_id here; it
178+
# will be acked as part of the initial request when the consumer
179+
# is started again.
180+
self._ack_on_resume.add(ack_id)
181+
else:
182+
# Send the request to ack the message.
181183
request = types.StreamingPullRequest(ack_ids=[ack_id])
182184
self._consumer.send_request(request)
183-
else:
184-
self._ack_on_resume.add(ack_id)
185185

186186
# Remove the message from lease management.
187187
self.drop(ack_id=ack_id, byte_size=byte_size)
@@ -311,7 +311,7 @@ def maintain_leases(self):
311311
"""
312312
while True:
313313
# Sanity check: Should this infinite loop quit?
314-
if not self._consumer.stopped.is_set():
314+
if self._consumer.stopped.is_set():
315315
_LOGGER.debug('Consumer inactive, ending lease maintenance.')
316316
return
317317

@@ -326,7 +326,7 @@ def maintain_leases(self):
326326
# because it is more efficient to make a single request.
327327
ack_ids = list(self.managed_ack_ids)
328328
_LOGGER.debug('Renewing lease for %d ack IDs.', len(ack_ids))
329-
if ack_ids and self._consumer.stopped.is_set():
329+
if ack_ids and not self._consumer.stopped.is_set():
330330
request = types.StreamingPullRequest(
331331
modify_deadline_ack_ids=ack_ids,
332332
modify_deadline_seconds=[p99] * len(ack_ids),

pubsub/tests/unit/pubsub_v1/subscriber/test_policy_base.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def test_subscription():
8585

8686
def test_ack():
8787
policy = create_policy()
88-
policy._consumer.stopped.set()
88+
policy._consumer.stopped.clear()
8989
with mock.patch.object(policy._consumer, 'send_request') as send_request:
9090
policy.ack('ack_id_string', 20)
9191
send_request.assert_called_once_with(types.StreamingPullRequest(
@@ -97,7 +97,7 @@ def test_ack():
9797

9898
def test_ack_no_time():
9999
policy = create_policy()
100-
policy._consumer.stopped.set()
100+
policy._consumer.stopped.clear()
101101
with mock.patch.object(policy._consumer, 'send_request') as send_request:
102102
policy.ack('ack_id_string')
103103
send_request.assert_called_once_with(types.StreamingPullRequest(
@@ -110,6 +110,7 @@ def test_ack_paused():
110110
policy = create_policy()
111111
policy._paused = True
112112
consumer = policy._consumer
113+
consumer.stopped.set()
113114

114115
with mock.patch.object(consumer, 'resume') as resume:
115116
policy.ack('ack_id_string')
@@ -206,20 +207,21 @@ def test_modify_ack_deadline():
206207

207208
def test_maintain_leases_inactive_consumer():
208209
policy = create_policy()
209-
policy._consumer.stopped.clear()
210+
policy._consumer.stopped.set()
210211
assert policy.maintain_leases() is None
211212

212213

213214
def test_maintain_leases_ack_ids():
214215
policy = create_policy()
215-
policy._consumer.stopped.set()
216+
policy._consumer.stopped.clear()
216217
policy.lease('my ack id', 50)
217218

218219
# Mock the sleep object.
219220
with mock.patch.object(time, 'sleep', autospec=True) as sleep:
220221
def trigger_inactive(seconds):
221222
assert 0 < seconds < 10
222-
policy._consumer.stopped.clear()
223+
policy._consumer.stopped.set()
224+
223225
sleep.side_effect = trigger_inactive
224226

225227
# Also mock the consumer, which sends the request.
@@ -234,11 +236,12 @@ def trigger_inactive(seconds):
234236

235237
def test_maintain_leases_no_ack_ids():
236238
policy = create_policy()
237-
policy._consumer.stopped.set()
239+
policy._consumer.stopped.clear()
238240
with mock.patch.object(time, 'sleep', autospec=True) as sleep:
239241
def trigger_inactive(seconds):
240242
assert 0 < seconds < 10
241-
policy._consumer.stopped.clear()
243+
policy._consumer.stopped.set()
244+
242245
sleep.side_effect = trigger_inactive
243246
policy.maintain_leases()
244247
sleep.assert_called()

0 commit comments

Comments
 (0)