Skip to content

Commit 6274007

Browse files
author
Jon Wayne Parrott
authored
Make the pausable response iterator aware of the RPC state to prevent deadlock (googleapis#5108)
1 parent f5e2e76 commit 6274007

2 files changed

Lines changed: 91 additions & 22 deletions

File tree

pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,36 @@ def __iter__(self):
228228
yield item
229229

230230

231+
def _pausable_response_iterator(iterator, can_continue, period=1):
232+
"""Converts a gRPC response iterator into one that can be paused.
233+
234+
The ``can_continue`` event can be used by an independent, concurrent
235+
worker to pause and resume the iteration over ``iterator``.
236+
237+
Args:
238+
iterator (grpc.RpcContext, Iterator[protobuf.Message]): A
239+
``grpc.RpcContext`` instance that is also an iterator of responses.
240+
This is a typically returned from grpc's streaming response call
241+
types.
242+
can_continue (threading.Event): An event which determines if we
243+
can advance to the next iteration. Will be ``wait()``-ed on
244+
before consuming more items from the iterator.
245+
period (float): The number of seconds to wait to be able to consume
246+
before checking if the RPC is cancelled. In practice, this
247+
determines the maximum amount of time that ``next()`` on this
248+
iterator will block after the RPC is cancelled.
249+
250+
Yields:
251+
Any: The items yielded from ``iterator``.
252+
"""
253+
while True:
254+
can_yield = can_continue.wait(timeout=period)
255+
# Calling next() on a cancelled RPC will cause it to raise the
256+
# grpc.RpcError associated with the cancellation.
257+
if can_yield or not iterator.is_active():
258+
yield next(iterator)
259+
260+
231261
class Consumer(object):
232262
"""Bi-directional streaming RPC consumer.
233263
@@ -328,7 +358,7 @@ def _blocking_consume(self, policy):
328358
self._request_queue, initial_request=initial_request)
329359
rpc = policy.call_rpc(iter(request_generator))
330360
request_generator.rpc = rpc
331-
responses = _pausable_iterator(rpc, self._can_consume)
361+
responses = _pausable_response_iterator(rpc, self._can_consume)
332362
try:
333363
for response in responses:
334364
_LOGGER.debug('Received response on stream')
@@ -439,23 +469,3 @@ def stop_consuming(self):
439469
"""
440470
thread = self._stop_no_join()
441471
thread.join()
442-
443-
444-
def _pausable_iterator(iterator, can_continue):
445-
"""Converts a standard iterator into one that can be paused.
446-
447-
The ``can_continue`` event can be used by an independent, concurrent
448-
worker to pause and resume the iteration over ``iterator``.
449-
450-
Args:
451-
iterator (Iterator): Any iterator to be iterated over.
452-
can_continue (threading.Event): An event which determines if we
453-
can advance to the next iteration. Will be ``wait()``-ed on
454-
before
455-
456-
Yields:
457-
Any: The items from ``iterator``.
458-
"""
459-
while True:
460-
can_continue.wait()
461-
yield next(iterator)

pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,38 @@ def test_exit_with_stop(self):
105105
assert items == []
106106

107107

108+
class _ResponseIterator(object):
109+
def __init__(self, items, active=True):
110+
self._items = iter(items)
111+
self._active = active
112+
113+
def is_active(self):
114+
return self._active
115+
116+
def __next__(self):
117+
return next(self._items)
118+
119+
next = __next__
120+
121+
122+
def test__pausable_response_iterator_active_but_cant_consume():
123+
# Note: we can't autospec threading.Event because it's goofy on Python 2.
124+
can_consume = mock.Mock(spec=['wait'])
125+
# First call will return false, indicating the loop should try again.
126+
# second call will allow it to consume the first (and only) item.
127+
can_consume.wait.side_effect = [False, True]
128+
iterator = _ResponseIterator([1])
129+
130+
print(can_consume)
131+
132+
pausable_iter = _consumer._pausable_response_iterator(
133+
iterator, can_consume)
134+
135+
items = list(pausable_iter)
136+
137+
assert items == [1]
138+
139+
108140
def test_send_request():
109141
consumer = _consumer.Consumer()
110142
request = types.StreamingPullRequest(subscription='foo')
@@ -176,9 +208,10 @@ class RaisingResponseGenerator(object):
176208
# rather than the **class** will not be iterable in Python 2.
177209
# This is problematic since a `Mock` just sets members.
178210

179-
def __init__(self, exception):
211+
def __init__(self, exception, active=True):
180212
self.exception = exception
181213
self.next_calls = 0
214+
self._active = active
182215

183216
def __next__(self):
184217
self.next_calls += 1
@@ -187,6 +220,32 @@ def __next__(self):
187220
def next(self):
188221
return self.__next__() # Python 2
189222

223+
def is_active(self):
224+
return self._active
225+
226+
227+
def test_blocking_consume_iter_exception_while_paused():
228+
policy = mock.create_autospec(base.BasePolicy, instance=True)
229+
exc = TypeError('Bad things!')
230+
policy.call_rpc.return_value = RaisingResponseGenerator(
231+
exc, active=False)
232+
233+
consumer = _consumer.Consumer()
234+
# Ensure the consume is paused.
235+
consumer.pause()
236+
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
237+
policy.on_exception.side_effect = OnException()
238+
239+
# Start the thread. It should not block forever but should notice the rpc
240+
# is inactive and raise the exception from the stream and then exit
241+
# because on_exception returns false.
242+
consumer._blocking_consume(policy)
243+
assert consumer._consumer_thread is None
244+
245+
# Check mocks.
246+
policy.call_rpc.assert_called_once()
247+
policy.on_exception.assert_called_once_with(exc)
248+
190249

191250
def test_blocking_consume_two_exceptions():
192251
policy = mock.create_autospec(base.BasePolicy, instance=True)

0 commit comments

Comments
 (0)