Skip to content

Commit 732e145

Browse files
authored
Removing redundant Consumer.active member. (googleapis#4549)
Also renaming `Consumer._exiting` as `Consumer.stopped` (and making it a public member).
1 parent ea342a2 commit 732e145

4 files changed

Lines changed: 20 additions & 27 deletions

File tree

pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,8 @@ def __init__(self, policy):
184184
"""
185185
self._policy = policy
186186
self._request_queue = queue.Queue()
187-
self._exiting = threading.Event()
187+
self.stopped = threading.Event()
188188
self._put_lock = threading.Lock()
189-
190-
self.active = False
191189
self._consumer_thread = None
192190

193191
def send_request(self, request):
@@ -299,7 +297,7 @@ def _blocking_consume(self):
299297
# exit cleanly when the user has called stop_consuming(). This
300298
# checks to make sure we're not exiting before opening a new
301299
# stream.
302-
if self._exiting.is_set():
300+
if self.stopped.is_set():
303301
_LOGGER.debug('Event signalled consumer exit.')
304302
break
305303

@@ -326,8 +324,7 @@ def _blocking_consume(self):
326324

327325
def start_consuming(self):
328326
"""Start consuming the stream."""
329-
self.active = True
330-
self._exiting.clear()
327+
self.stopped.clear()
331328
thread = threading.Thread(
332329
name=_BIDIRECTIONAL_CONSUMER_NAME,
333330
target=self._blocking_consume,
@@ -350,8 +347,7 @@ def _stop_no_join(self):
350347
threading.Thread: The worker ("consumer thread") that is being
351348
stopped.
352349
"""
353-
self.active = False
354-
self._exiting.set()
350+
self.stopped.set()
355351
_LOGGER.debug('Stopping helper thread %s', self._consumer_thread.name)
356352
self.send_request(_helper_threads.STOP)
357353
thread = self._consumer_thread

pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def ack(self, ack_id, time_to_ack=None, byte_size=None):
177177
# However, if the consumer is inactive, then queue the ack_id here
178178
# instead; it will be acked as part of the initial request when the
179179
# consumer is started again.
180-
if self._consumer.active:
180+
if self._consumer.stopped.is_set():
181181
request = types.StreamingPullRequest(ack_ids=[ack_id])
182182
self._consumer.send_request(request)
183183
else:
@@ -311,7 +311,7 @@ def maintain_leases(self):
311311
"""
312312
while True:
313313
# Sanity check: Should this infinite loop quit?
314-
if not self._consumer.active:
314+
if not self._consumer.stopped.is_set():
315315
_LOGGER.debug('Consumer inactive, ending lease maintenance.')
316316
return
317317

@@ -326,7 +326,7 @@ def maintain_leases(self):
326326
# because it is more efficient to make a single request.
327327
ack_ids = list(self.managed_ack_ids)
328328
_LOGGER.debug('Renewing lease for %d ack IDs.', len(ack_ids))
329-
if ack_ids and self._consumer.active:
329+
if ack_ids and self._consumer.stopped.is_set():
330330
request = types.StreamingPullRequest(
331331
modify_deadline_ack_ids=ack_ids,
332332
modify_deadline_seconds=[p99] * len(ack_ids),

pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ def test_blocking_consume():
8080
@mock.patch.object(_consumer, '_LOGGER')
8181
def test_blocking_consume_when_exiting(_LOGGER):
8282
consumer = create_consumer()
83-
assert consumer._exiting.is_set() is False
84-
consumer._exiting.set()
83+
assert consumer.stopped.is_set() is False
84+
consumer.stopped.set()
8585

8686
# Make sure method cleanly exits.
8787
assert consumer._blocking_consume() is None
@@ -152,8 +152,7 @@ def test_start_consuming():
152152
with mock.patch.object(threading, 'Thread', autospec=True) as Thread:
153153
consumer.start_consuming()
154154

155-
assert consumer._exiting.is_set() is False
156-
assert consumer.active is True
155+
assert consumer.stopped.is_set() is False
157156
Thread.assert_called_once_with(
158157
name=_consumer._BIDIRECTIONAL_CONSUMER_NAME,
159158
target=consumer._blocking_consume,
@@ -163,16 +162,14 @@ def test_start_consuming():
163162

164163
def test_stop_consuming():
165164
consumer = create_consumer()
166-
consumer.active = True
167-
assert consumer._exiting.is_set() is False
165+
assert consumer.stopped.is_set() is False
168166
thread = mock.Mock(spec=threading.Thread)
169167
consumer._consumer_thread = thread
170168

171169
assert consumer.stop_consuming() is None
172170

173171
# Make sure state was updated.
174-
assert consumer.active is False
175-
assert consumer._exiting.is_set() is True
172+
assert consumer.stopped.is_set() is True
176173
assert consumer._consumer_thread is None
177174
# Check mocks.
178175
thread.join.assert_called_once_with()

pubsub/tests/unit/pubsub_v1/subscriber/test_policy_base.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def test_subscription():
8585

8686
def test_ack():
8787
policy = create_policy()
88-
policy._consumer.active = True
88+
policy._consumer.stopped.set()
8989
with mock.patch.object(policy._consumer, 'send_request') as send_request:
9090
policy.ack('ack_id_string', 20)
9191
send_request.assert_called_once_with(types.StreamingPullRequest(
@@ -97,7 +97,7 @@ def test_ack():
9797

9898
def test_ack_no_time():
9999
policy = create_policy()
100-
policy._consumer.active = True
100+
policy._consumer.stopped.set()
101101
with mock.patch.object(policy._consumer, 'send_request') as send_request:
102102
policy.ack('ack_id_string')
103103
send_request.assert_called_once_with(types.StreamingPullRequest(
@@ -109,7 +109,7 @@ def test_ack_no_time():
109109
def test_ack_paused():
110110
policy = create_policy()
111111
policy._paused = True
112-
policy._consumer.active = False
112+
policy._consumer.stopped.clear()
113113
with mock.patch.object(policy, 'open') as open_:
114114
policy.ack('ack_id_string')
115115
open_.assert_called()
@@ -198,20 +198,20 @@ def test_modify_ack_deadline():
198198

199199
def test_maintain_leases_inactive_consumer():
200200
policy = create_policy()
201-
policy._consumer.active = False
201+
policy._consumer.stopped.clear()
202202
assert policy.maintain_leases() is None
203203

204204

205205
def test_maintain_leases_ack_ids():
206206
policy = create_policy()
207-
policy._consumer.active = True
207+
policy._consumer.stopped.set()
208208
policy.lease('my ack id', 50)
209209

210210
# Mock the sleep object.
211211
with mock.patch.object(time, 'sleep', autospec=True) as sleep:
212212
def trigger_inactive(seconds):
213213
assert 0 < seconds < 10
214-
policy._consumer.active = False
214+
policy._consumer.stopped.clear()
215215
sleep.side_effect = trigger_inactive
216216

217217
# Also mock the consumer, which sends the request.
@@ -226,11 +226,11 @@ def trigger_inactive(seconds):
226226

227227
def test_maintain_leases_no_ack_ids():
228228
policy = create_policy()
229-
policy._consumer.active = True
229+
policy._consumer.stopped.set()
230230
with mock.patch.object(time, 'sleep', autospec=True) as sleep:
231231
def trigger_inactive(seconds):
232232
assert 0 < seconds < 10
233-
policy._consumer.active = False
233+
policy._consumer.stopped.clear()
234234
sleep.side_effect = trigger_inactive
235235
policy.maintain_leases()
236236
sleep.assert_called()

0 commit comments

Comments
 (0)