diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index d3fd0d956a90..d9bb4e74dcab 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -193,50 +193,50 @@ def _commit(self): self._status = base.BatchStatus.SUCCESS return - # Begin the request to publish these messages. - # Log how long the underlying request takes. - start = time.time() - - try: - response = self._client.api.publish(self._topic, self._messages) - except google.api_core.exceptions.GoogleAPIError as exc: - # We failed to publish, set the exception on all futures and - # exit. - self._status = base.BatchStatus.ERROR - - for future in self._futures: - future.set_exception(exc) - - _LOGGER.exception("Failed to publish %s messages.", len(self._futures)) - return + # Begin the request to publish these messages. + # Log how long the underlying request takes. + start = time.time() + + try: + response = self._client.api.publish(self._topic, self._messages) + except google.api_core.exceptions.GoogleAPIError as exc: + # We failed to publish, set the exception on all futures and + # exit. + self._status = base.BatchStatus.ERROR + + for future in self._futures: + future.set_exception(exc) + + _LOGGER.exception("Failed to publish %s messages.", len(self._futures)) + return + + end = time.time() + _LOGGER.debug("gRPC Publish took %s seconds.", end - start) + + if len(response.message_ids) == len(self._futures): + # Iterate over the futures on the queue and return the response + # IDs. We are trusting that there is a 1:1 mapping, and raise + # an exception if not. + self._status = base.BatchStatus.SUCCESS + zip_iter = six.moves.zip(response.message_ids, self._futures) + for message_id, future in zip_iter: + future.set_result(message_id) + else: + # Sanity check: If the number of message IDs is not equal to + # the number of futures I have, then something went wrong. + self._status = base.BatchStatus.ERROR + exception = exceptions.PublishError( + "Some messages were not successfully published." + ) - end = time.time() - _LOGGER.debug("gRPC Publish took %s seconds.", end - start) + for future in self._futures: + future.set_exception(exception) - if len(response.message_ids) == len(self._futures): - # Iterate over the futures on the queue and return the response - # IDs. We are trusting that there is a 1:1 mapping, and raise - # an exception if not. - self._status = base.BatchStatus.SUCCESS - zip_iter = six.moves.zip(response.message_ids, self._futures) - for message_id, future in zip_iter: - future.set_result(message_id) - else: - # Sanity check: If the number of message IDs is not equal to - # the number of futures I have, then something went wrong. - self._status = base.BatchStatus.ERROR - exception = exceptions.PublishError( - "Some messages were not successfully published." - ) - - for future in self._futures: - future.set_exception(exception) - - _LOGGER.error( - "Only %s of %s messages were published.", - len(response.message_ids), - len(self._futures), - ) + _LOGGER.error( + "Only %s of %s messages were published.", + len(response.message_ids), + len(self._futures), + ) def monitor(self): """Commit this batch after sufficient time has elapsed.