From 26ac207ce355ccd58cafcad9feb1f788a2f91143 Mon Sep 17 00:00:00 2001 From: Yap Sok Ann Date: Wed, 10 Apr 2019 16:29:49 +0700 Subject: [PATCH] Release the state lock before calling the publish api Prior to this change, any problem in the communication path to pubsub (e.g. bad connection, slow servers, etc) would not only tie up the calling thread itself, but also other threads waiting to get hold of the state lock as they try to publish over the same batch. We only need to hold the state lock for the transition from ACCEPTING_MESSAGES / STARTING to IN_PROGRESS. After that, since only one thread is able to transition to IN_PROGRESS, we can safely release the state lock before calling the publish api and eventually transitioning to SUCCESS / ERROR. Co-authored-by: Rencana Tarigan --- .../pubsub_v1/publisher/_batch/thread.py | 84 +++++++++---------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index d3fd0d956a90..d9bb4e74dcab 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -193,50 +193,50 @@ def _commit(self): self._status = base.BatchStatus.SUCCESS return - # Begin the request to publish these messages. - # Log how long the underlying request takes. - start = time.time() - - try: - response = self._client.api.publish(self._topic, self._messages) - except google.api_core.exceptions.GoogleAPIError as exc: - # We failed to publish, set the exception on all futures and - # exit. - self._status = base.BatchStatus.ERROR - - for future in self._futures: - future.set_exception(exc) - - _LOGGER.exception("Failed to publish %s messages.", len(self._futures)) - return + # Begin the request to publish these messages. + # Log how long the underlying request takes. + start = time.time() + + try: + response = self._client.api.publish(self._topic, self._messages) + except google.api_core.exceptions.GoogleAPIError as exc: + # We failed to publish, set the exception on all futures and + # exit. + self._status = base.BatchStatus.ERROR + + for future in self._futures: + future.set_exception(exc) + + _LOGGER.exception("Failed to publish %s messages.", len(self._futures)) + return + + end = time.time() + _LOGGER.debug("gRPC Publish took %s seconds.", end - start) + + if len(response.message_ids) == len(self._futures): + # Iterate over the futures on the queue and return the response + # IDs. We are trusting that there is a 1:1 mapping, and raise + # an exception if not. + self._status = base.BatchStatus.SUCCESS + zip_iter = six.moves.zip(response.message_ids, self._futures) + for message_id, future in zip_iter: + future.set_result(message_id) + else: + # Sanity check: If the number of message IDs is not equal to + # the number of futures I have, then something went wrong. + self._status = base.BatchStatus.ERROR + exception = exceptions.PublishError( + "Some messages were not successfully published." + ) - end = time.time() - _LOGGER.debug("gRPC Publish took %s seconds.", end - start) + for future in self._futures: + future.set_exception(exception) - if len(response.message_ids) == len(self._futures): - # Iterate over the futures on the queue and return the response - # IDs. We are trusting that there is a 1:1 mapping, and raise - # an exception if not. - self._status = base.BatchStatus.SUCCESS - zip_iter = six.moves.zip(response.message_ids, self._futures) - for message_id, future in zip_iter: - future.set_result(message_id) - else: - # Sanity check: If the number of message IDs is not equal to - # the number of futures I have, then something went wrong. - self._status = base.BatchStatus.ERROR - exception = exceptions.PublishError( - "Some messages were not successfully published." - ) - - for future in self._futures: - future.set_exception(exception) - - _LOGGER.error( - "Only %s of %s messages were published.", - len(response.message_ids), - len(self._futures), - ) + _LOGGER.error( + "Only %s of %s messages were published.", + len(response.message_ids), + len(self._futures), + ) def monitor(self): """Commit this batch after sufficient time has elapsed.