You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
doesn't enforce a maximum number of threads, which is eating memory
Describe the solution you'd like
I wrote a new google.cloud.pubsub_v1.publisher._batch.async.Batch that implements google.cloud.pubsub_v1.publisher._batch.base.Batch. It uses asyncio to provide awaitable futures that automatically propagate exceptions. It uses a shared concurrent.futures.ThreadPoolExecutor in conjunction with asyncio.wrap_future to asynchronously call Batch.client.publish while enforcing a maximum number of workers. I specifically only wrapped Batch.client.publish in a thread because (if i understand correctly) it only blocks on exclusive access to the grpc channel, so it shouldn't create performance issues as seen in the first alternative below.
I would like to submit this as a pull request, but only if it would be useful.
Describe alternatives you've considered
I tried patching google.cloud.pubsub_v1.publisher._batch.thread.Batch to use concurrent.futures.ThreadPoolExecutor. Unfortunately it had performance issues when all workers would reach a time.sleep and there wouldn't be any workers to check that not yet submitted tasks could be ready.
I tried patching google.cloud.pubsub_v1.futures.Future to inherit from concurrent.futures.Future. This fixed compatiblity with asyncio.wrap_future, but not uncaught exceptions and unlimited thread spawning.
I tried to patch google.cloud.pubsub_v1.publisher._batch.thread.Batch to join spawned threads, which would propagate uncaught exceptions, but I was unable to figure out a solution.
Is your feature request related to a problem? Please describe.
I have an
asyncioapplication that needs to publish messages to PubSub, but I'm having issues becausegoogle.cloud.pubsub.PublisherClient.publish:awaitorasyncio.wrap_futureBatch._committhrows an uncaught exception (like in PubSub: RetryError in batch publish causes futures to never complete #7103 and PubSub: Propagate RetryError in PublisherClient.publish #7071)Describe the solution you'd like
I wrote a new
google.cloud.pubsub_v1.publisher._batch.async.Batchthat implementsgoogle.cloud.pubsub_v1.publisher._batch.base.Batch. It usesasyncioto provide awaitable futures that automatically propagate exceptions. It uses a sharedconcurrent.futures.ThreadPoolExecutorin conjunction withasyncio.wrap_futureto asynchronously callBatch.client.publishwhile enforcing a maximum number of workers. I specifically only wrappedBatch.client.publishin a thread because (if i understand correctly) it only blocks on exclusive access to the grpc channel, so it shouldn't create performance issues as seen in the first alternative below.I would like to submit this as a pull request, but only if it would be useful.
Describe alternatives you've considered
google.cloud.pubsub_v1.publisher._batch.thread.Batchto useconcurrent.futures.ThreadPoolExecutor. Unfortunately it had performance issues when all workers would reach atime.sleepand there wouldn't be any workers to check that not yet submitted tasks could be ready.google.cloud.pubsub_v1.futures.Futureto inherit fromconcurrent.futures.Future. This fixed compatiblity withasyncio.wrap_future, but not uncaught exceptions and unlimited thread spawning.google.cloud.pubsub_v1.publisher._batch.thread.Batchto join spawned threads, which would propagate uncaught exceptions, but I was unable to figure out a solution.