From c112342f2be44e1b36ad994140e1073d8e6b896a Mon Sep 17 00:00:00 2001 From: Thea Flowers Date: Tue, 11 Sep 2018 13:57:42 -0700 Subject: [PATCH 1/3] Testing --- .../pubsub_v1/subscriber/_protocol/bidi.py | 108 ++++++++++++++--- .../_protocol/streaming_pull_manager.py | 3 +- pubsub/test.py | 110 ++++++++++++++++++ 3 files changed, 201 insertions(+), 20 deletions(-) create mode 100644 pubsub/test.py diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py index 00877e70058e..92c268000cab 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py @@ -207,8 +207,21 @@ def open(self): else: call.add_done_callback(self._on_call_done) + import time + import random + from unittest import mock + from google.cloud.pubsub_v1.proto import pubsub_pb2 + + mock_call = mock.MagicMock(wraps=call) + + def mock_next(): + #time.sleep(random.uniform(0.0, 1.0)) + return pubsub_pb2.StreamingPullResponse() + + mock_call.__next__.side_effect = mock_next + self._request_generator = request_generator - self.call = call + self.call = mock_call def close(self): """Closes the stream.""" @@ -239,6 +252,7 @@ def send(self, request): # to mean something semantically different. if self.call.is_active(): self._request_queue.put(request) + pass else: # calling next should cause the call to raise. next(self.call) @@ -330,18 +344,19 @@ def _on_call_done(self, future): # Unlike the base class, we only execute the callbacks on a terminal # error, not for errors that we can recover from. Note that grpc's # "future" here is also a grpc.RpcError. - if not self._should_recover(future): - self._finalize(future) - else: - _LOGGER.debug('Re-opening stream from gRPC callback.') - self._reopen() + with self._operational_lock: + if not self._should_recover(future): + self._finalize(future) + else: + _LOGGER.debug('Re-opening stream from gRPC callback.') + self._reopen() def _reopen(self): with self._operational_lock: # Another thread already managed to re-open this stream. if self.call is not None and self.call.is_active(): _LOGGER.debug('Stream was already re-established.') - return + #return self.call = None # Request generator should exit cleanly since the RPC its bound to @@ -361,6 +376,7 @@ def _reopen(self): # If re-opening or re-calling the method fails for any reason, # consider it a terminal error and finalize the stream. except Exception as exc: + _LOGGER.debug('Failed to re-open stream due to %s', exc) self._finalize(exc) raise @@ -385,19 +401,57 @@ def _recoverable(self, method, *args, **kwargs): return method(*args, **kwargs) except Exception as exc: - _LOGGER.debug('Call to retryable %r caused %s.', method, exc) - if not self._should_recover(exc): - self.close() - _LOGGER.debug('Not retrying %r due to %s.', method, exc) - self._finalize(exc) - raise exc + with self._operational_lock: + _LOGGER.debug( + 'Call to retryable %r caused %s.', method, exc) + + if not self._should_recover(exc): + self.close() + _LOGGER.debug( + 'Not retrying %r due to %s.', method, exc) + self._finalize(exc) + raise exc + + _LOGGER.debug( + 'Re-opening stream from retryable %r.', method) + self._reopen() + + def _send(self, request): + # Grab a reference to the RPC call. Because another thread (notably + # the gRPC error thread) can modify self.call (by invoking reopen), + # we should ensure our reference can not change underneath us. + # If self.call is modified (such as replaced with a new RPC call) then + # this will use the "old" RPC, which should result in the same + # exception passed into gRPC's error handler being raised here, which + # will be handled by the usual error handling in retryable. + with self._operational_lock: + call = self.call + + if call is None: + raise ValueError( + 'Can not send() on an RPC that has never been open()ed.') - _LOGGER.debug('Re-opening stream from retryable %r.', method) - self._reopen() + # Don't use self.is_active(), as ResumableBidiRpc will overload it + # to mean something semantically different. + if call.is_active(): + self._request_queue.put(request) + pass + else: + # calling next should cause the call to raise. + next(call) def send(self, request): - return self._recoverable( - super(ResumableBidiRpc, self).send, request) + return self._recoverable(self._send, request) + + def _recv(self): + with self._operational_lock: + call = self.call + + if call is None: + raise ValueError( + 'Can not recv() on an RPC that has never been open()ed.') + + return next(call) def recv(self): return self._recoverable( @@ -417,6 +471,16 @@ def is_active(self): return self.call is not None and not self._finalized +def meanie_thread_main(rpc): + import time + import random + + while True: + time.sleep(random.uniform(0, 0.1)) + print('Wahaha') + rpc._reopen() + + class BackgroundConsumer(object): """A bi-directional stream consumer that runs in a separate thread. @@ -472,6 +536,13 @@ def _thread_main(self): self._bidi_rpc.add_done_callback(self._on_call_done) self._bidi_rpc.open() + import functools + meanie_thread = threading.Thread( + name='meanie thread', + target=functools.partial(meanie_thread_main, self._bidi_rpc)) + meanie_thread.deamon = True + meanie_thread.start() + while self._bidi_rpc.is_active: # Do not allow the paused status to change at all during this # section. There is a condition where we could be resumed @@ -506,8 +577,7 @@ def _thread_main(self): else: _LOGGER.error( - 'The bidirectional RPC unexpectedly exited. This is a truly ' - 'exceptional case. Please file a bug with your logs.') + 'The bidirectional RPC exited.') _LOGGER.info('%s exiting', _BIDIRECTIONAL_CONSUMER_NAME) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 21a8f98851a0..5892c5f744b8 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -263,7 +263,8 @@ def send(self, request): """Queue a request to be sent to the RPC.""" if self._UNARY_REQUESTS: try: - self._send_unary_request(request) + pass + #self._send_unary_request(request) except exceptions.GoogleAPICallError as exc: _LOGGER.debug( 'Exception while sending unary RPC. This is typically ' diff --git a/pubsub/test.py b/pubsub/test.py new file mode 100644 index 000000000000..83757f2570d3 --- /dev/null +++ b/pubsub/test.py @@ -0,0 +1,110 @@ + +import logging +import random +import threading +import time + +import google.api_core.exceptions +from google.cloud import pubsub + +message_count = 0 +message_count_lock = threading.Lock() + + +def monitor(future, interval=10): + import datetime + import textwrap + import time + + manager = future._manager + start_time = datetime.datetime.now() + + while not future.done(): + run_time = datetime.datetime.now() - start_time + rate = message_count / run_time.total_seconds() + status = textwrap.dedent("""\ + Messages processed: {message_count} + Rate: {rate:.2f} Messages/second + Run time: {run_time} + Load: {load:.2f} + p99 ack: {ack_deadline} seconds + Leased Messages: {leased_messages} + Executor queue size: {work_queue} + Callback queue size: {callback_size} + Request queue size: {queue_size} + """).format( + message_count=message_count, + rate=rate, + run_time=run_time, + ack_deadline=manager.ack_deadline, + load=manager.load, + leased_messages=manager.leaser.message_count, + work_queue=manager._scheduler._executor._work_queue.qsize(), + callback_size=manager._scheduler.queue.qsize(), + queue_size=manager._rpc.pending_requests + ) + + #print('===== Subscriber Monitor =====') + #print(status) + + try: + time.sleep(interval) + except KeyboardInterrupt: + future.cancel() + + print('waiting on future...') + print(future.result()) + print('clean exit') + + +def incr_count(): + # Note: this should be done within a lock as multiple threads mess with + # this, however, using a lock slows down the program enough to possibly + # affect the results. Consider this count as incredibly inaccurate and + # best-effort. + global message_count + message_count += 1 + return message_count + + +def callback(message): + incr_count() + + # Sleep a random amount of time to simulate a heterogenous load. + time.sleep(random.uniform(5, 10)) + + message.ack() + + +def main(): + # Enabling logging will output a *ton* of stuff, but it might be helpful. + logging.basicConfig(level=logging.DEBUG) + logging.getLogger( + 'google.cloud.pubsub_v1.subscriber._protocol.leaser').setLevel('INFO') + + subscriber = pubsub.SubscriberClient() + topic = 'projects/{project_id}/topics/{topic}'.format( + project_id='python-docs-samples-tests', + topic='repro-topic', # Set this to something appropriate. + ) + subscription = 'projects/{project_id}/subscriptions/{sub}'.format( + project_id='python-docs-samples-tests', + sub='repro-sub2', # Set this to something appropriate. + ) + + try: + subscriber.create_subscription(subscription, topic) + except google.api_core.exceptions.AlreadyExists: + print('subscription exists') + pass + + future = subscriber.subscribe( + subscription=subscription, + callback=callback) + + print('listening') + monitor(future) + + +if __name__ == '__main__': + main() From df5cc6df61968f460170971a2103be9213dca926 Mon Sep 17 00:00:00 2001 From: Thea Flowers Date: Tue, 11 Sep 2018 14:04:32 -0700 Subject: [PATCH 2/3] Fix race condition in recv()'s usage of self.call. --- .../pubsub_v1/subscriber/_protocol/bidi.py | 38 +----- .../_protocol/streaming_pull_manager.py | 3 +- pubsub/test.py | 110 ------------------ 3 files changed, 4 insertions(+), 147 deletions(-) delete mode 100644 pubsub/test.py diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py index 92c268000cab..7c995c57652e 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py @@ -207,21 +207,8 @@ def open(self): else: call.add_done_callback(self._on_call_done) - import time - import random - from unittest import mock - from google.cloud.pubsub_v1.proto import pubsub_pb2 - - mock_call = mock.MagicMock(wraps=call) - - def mock_next(): - #time.sleep(random.uniform(0.0, 1.0)) - return pubsub_pb2.StreamingPullResponse() - - mock_call.__next__.side_effect = mock_next - self._request_generator = request_generator - self.call = mock_call + self.call = call def close(self): """Closes the stream.""" @@ -252,7 +239,6 @@ def send(self, request): # to mean something semantically different. if self.call.is_active(): self._request_queue.put(request) - pass else: # calling next should cause the call to raise. next(self.call) @@ -356,7 +342,7 @@ def _reopen(self): # Another thread already managed to re-open this stream. if self.call is not None and self.call.is_active(): _LOGGER.debug('Stream was already re-established.') - #return + return self.call = None # Request generator should exit cleanly since the RPC its bound to @@ -454,8 +440,7 @@ def _recv(self): return next(call) def recv(self): - return self._recoverable( - super(ResumableBidiRpc, self).recv) + return self._recoverable(self._recv) @property def is_active(self): @@ -471,16 +456,6 @@ def is_active(self): return self.call is not None and not self._finalized -def meanie_thread_main(rpc): - import time - import random - - while True: - time.sleep(random.uniform(0, 0.1)) - print('Wahaha') - rpc._reopen() - - class BackgroundConsumer(object): """A bi-directional stream consumer that runs in a separate thread. @@ -536,13 +511,6 @@ def _thread_main(self): self._bidi_rpc.add_done_callback(self._on_call_done) self._bidi_rpc.open() - import functools - meanie_thread = threading.Thread( - name='meanie thread', - target=functools.partial(meanie_thread_main, self._bidi_rpc)) - meanie_thread.deamon = True - meanie_thread.start() - while self._bidi_rpc.is_active: # Do not allow the paused status to change at all during this # section. There is a condition where we could be resumed diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 5892c5f744b8..21a8f98851a0 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -263,8 +263,7 @@ def send(self, request): """Queue a request to be sent to the RPC.""" if self._UNARY_REQUESTS: try: - pass - #self._send_unary_request(request) + self._send_unary_request(request) except exceptions.GoogleAPICallError as exc: _LOGGER.debug( 'Exception while sending unary RPC. This is typically ' diff --git a/pubsub/test.py b/pubsub/test.py deleted file mode 100644 index 83757f2570d3..000000000000 --- a/pubsub/test.py +++ /dev/null @@ -1,110 +0,0 @@ - -import logging -import random -import threading -import time - -import google.api_core.exceptions -from google.cloud import pubsub - -message_count = 0 -message_count_lock = threading.Lock() - - -def monitor(future, interval=10): - import datetime - import textwrap - import time - - manager = future._manager - start_time = datetime.datetime.now() - - while not future.done(): - run_time = datetime.datetime.now() - start_time - rate = message_count / run_time.total_seconds() - status = textwrap.dedent("""\ - Messages processed: {message_count} - Rate: {rate:.2f} Messages/second - Run time: {run_time} - Load: {load:.2f} - p99 ack: {ack_deadline} seconds - Leased Messages: {leased_messages} - Executor queue size: {work_queue} - Callback queue size: {callback_size} - Request queue size: {queue_size} - """).format( - message_count=message_count, - rate=rate, - run_time=run_time, - ack_deadline=manager.ack_deadline, - load=manager.load, - leased_messages=manager.leaser.message_count, - work_queue=manager._scheduler._executor._work_queue.qsize(), - callback_size=manager._scheduler.queue.qsize(), - queue_size=manager._rpc.pending_requests - ) - - #print('===== Subscriber Monitor =====') - #print(status) - - try: - time.sleep(interval) - except KeyboardInterrupt: - future.cancel() - - print('waiting on future...') - print(future.result()) - print('clean exit') - - -def incr_count(): - # Note: this should be done within a lock as multiple threads mess with - # this, however, using a lock slows down the program enough to possibly - # affect the results. Consider this count as incredibly inaccurate and - # best-effort. - global message_count - message_count += 1 - return message_count - - -def callback(message): - incr_count() - - # Sleep a random amount of time to simulate a heterogenous load. - time.sleep(random.uniform(5, 10)) - - message.ack() - - -def main(): - # Enabling logging will output a *ton* of stuff, but it might be helpful. - logging.basicConfig(level=logging.DEBUG) - logging.getLogger( - 'google.cloud.pubsub_v1.subscriber._protocol.leaser').setLevel('INFO') - - subscriber = pubsub.SubscriberClient() - topic = 'projects/{project_id}/topics/{topic}'.format( - project_id='python-docs-samples-tests', - topic='repro-topic', # Set this to something appropriate. - ) - subscription = 'projects/{project_id}/subscriptions/{sub}'.format( - project_id='python-docs-samples-tests', - sub='repro-sub2', # Set this to something appropriate. - ) - - try: - subscriber.create_subscription(subscription, topic) - except google.api_core.exceptions.AlreadyExists: - print('subscription exists') - pass - - future = subscriber.subscribe( - subscription=subscription, - callback=callback) - - print('listening') - monitor(future) - - -if __name__ == '__main__': - main() From 3f43005fc7490c2bbf5d6428a742c086d5f8b45c Mon Sep 17 00:00:00 2001 From: Thea Flowers Date: Wed, 12 Sep 2018 10:48:59 -0700 Subject: [PATCH 3/3] Fixup tests --- .../unit/pubsub_v1/subscriber/test_bidi.py | 44 ++++++++----------- 1 file changed, 18 insertions(+), 26 deletions(-) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_bidi.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_bidi.py index 2e72a757600a..058cd53c29cf 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_bidi.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_bidi.py @@ -373,41 +373,21 @@ def test_recv_recover(self): assert bidi_rpc.call == call_2 assert bidi_rpc.is_active is True - def test_recv_recover_race_condition(self): - # This test checks the race condition where two threads recv() and - # encounter an error and must re-open the stream. Only one thread - # should succeed in doing so. - error = ValueError() - call_1 = CallStub([error, error]) - call_2 = CallStub([1, 2]) + def test_recv_recover_already_recovered(self): + call_1 = CallStub([]) + call_2 = CallStub([]) start_rpc = mock.create_autospec( grpc.StreamStreamMultiCallable, instance=True, side_effect=[call_1, call_2]) - recovered_event = threading.Event() - - def second_thread_main(): - assert bidi_rpc.recv() == 2 - - second_thread = threading.Thread(target=second_thread_main) - - def should_recover(exception): - assert exception == error - if threading.current_thread() == second_thread: - recovered_event.wait() - return True - - bidi_rpc = bidi.ResumableBidiRpc(start_rpc, should_recover) + bidi_rpc = bidi.ResumableBidiRpc(start_rpc, lambda _: True) bidi_rpc.open() - second_thread.start() - assert bidi_rpc.recv() == 1 - recovered_event.set() + bidi_rpc._reopen() - assert bidi_rpc.call == call_2 + assert bidi_rpc.call is call_1 assert bidi_rpc.is_active is True - second_thread.join() def test_recv_failure(self): error = ValueError() @@ -456,6 +436,18 @@ def test_reopen_failure_on_rpc_restart(self): assert bidi_rpc.is_active is False callback.assert_called_once_with(error2) + def test_send_not_open(self): + bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False) + + with pytest.raises(ValueError): + bidi_rpc.send(mock.sentinel.request) + + def test_recv_not_open(self): + bidi_rpc = bidi.ResumableBidiRpc(None, lambda _: False) + + with pytest.raises(ValueError): + bidi_rpc.recv() + def test_finalize_idempotent(self): error1 = ValueError('1') error2 = ValueError('2')