Skip to content

PubSub: support batching publish requests with asyncio #15654

@relud

Description

@relud

Is your feature request related to a problem? Please describe.

I have an asyncio application that needs to publish messages to PubSub, but I'm having issues because google.cloud.pubsub.PublisherClient.publish:

  1. returns futures that aren't compatible with await or asyncio.wrap_future
  2. returns futures that never complete if Batch._commit throws an uncaught exception (like in PubSub: RetryError in batch publish causes futures to never complete #7103 and PubSub: Propagate RetryError in PublisherClient.publish #7071)
  3. doesn't enforce a maximum number of threads, which is eating memory

Describe the solution you'd like

I wrote a new google.cloud.pubsub_v1.publisher._batch.async.Batch that implements google.cloud.pubsub_v1.publisher._batch.base.Batch. It uses asyncio to provide awaitable futures that automatically propagate exceptions. It uses a shared concurrent.futures.ThreadPoolExecutor in conjunction with asyncio.wrap_future to asynchronously call Batch.client.publish while enforcing a maximum number of workers. I specifically only wrapped Batch.client.publish in a thread because (if i understand correctly) it only blocks on exclusive access to the grpc channel, so it shouldn't create performance issues as seen in the first alternative below.

I would like to submit this as a pull request, but only if it would be useful.

Describe alternatives you've considered

  • I tried patching google.cloud.pubsub_v1.publisher._batch.thread.Batch to use concurrent.futures.ThreadPoolExecutor. Unfortunately it had performance issues when all workers would reach a time.sleep and there wouldn't be any workers to check that not yet submitted tasks could be ready.
  • I tried patching google.cloud.pubsub_v1.futures.Future to inherit from concurrent.futures.Future. This fixed compatiblity with asyncio.wrap_future, but not uncaught exceptions and unlimited thread spawning.
  • I tried to patch google.cloud.pubsub_v1.publisher._batch.thread.Batch to join spawned threads, which would propagate uncaught exceptions, but I was unable to figure out a solution.

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.type: feature request‘Nice-to-have’ improvement, new feature or different behavior or design.

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions