Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Subscriber future hangs with large number of messages #242

@Spawn

Description

@Spawn

Environment details

  • OS type and version: python:3.6-alpine
  • Python version: 3.6
  • pip version: 20.2.3
  • google-cloud-pubsub 1.7.0

Problem

We have an high-loaded worker in kubernetes which listens pub/sub queue. Problem is that when pub/sub queue receive several thousands messages worker is stopping processing without any errors (just freezes). Other workers whose processing time is short are working normally but problem worker sometimes could process one message up to several minutes (http requests, database requests) and after processing several hundreds messages logs are stop printing anything.

So to reproduce issue you should have pub/sub subscription with several thousands messages and create python subscriber which will process one message up to 1 minute and longer sometimes.

Regarding longer processing time I specified timeout=60 for future so if processing time taking so long it should throw TimeoutError where I'm calling future.cancel() and here is the problem - cancel() is waiting for graceful shutdown of consuming messages thread and when I did some debugging for it I found out that sometimes cancel() never ends. For instance on server this kind of freeze took 10 days without any logs/errors/etc.

I'm wondering if there way to force future canceling?

Subscription config

Selection_999(241)

Code example

def run():
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    future = subscriber.subscribe(subscription_path, callback=some_long_working_code)
    logger.info('Subscriber initialized, listening...')

    with subscriber:
        try:
            future.result(timeout=60)
        except TimeoutError:  # from concurrent.futures import TimeoutError
            logger.warning('Timeout reached, resubscribing...')
            future.cancel()

Thank you for your time!

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/python-pubsub API.type: questionRequest for information or clarification. Not an issue.

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions