Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class StreamingPullManager(object):
``projects/{project}/subscriptions/{subscription}``.
flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow
control settings.
use_legacy_flow_control (bool): Disables enforcing flow control settings
Comment thread
pradn marked this conversation as resolved.
at the Cloud PubSub server and uses the less accurate method of only
enforcing flow control at the client side.
scheduler (~google.cloud.pubsub_v1.scheduler.Scheduler): The scheduler
to use to process messages. If not provided, a thread pool-based
scheduler will be used.
Expand All @@ -115,11 +118,17 @@ class StreamingPullManager(object):
RPC instead of over the streaming RPC."""

def __init__(
self, client, subscription, flow_control=types.FlowControl(), scheduler=None
self,
client,
subscription,
flow_control=types.FlowControl(),
scheduler=None,
use_legacy_flow_control=False,
):
self._client = client
self._subscription = subscription
self._flow_control = flow_control
self._use_legacy_flow_control = use_legacy_flow_control
self._ack_histogram = histogram.Histogram()
self._last_histogram_size = 0
self._ack_deadline = 10
Expand Down Expand Up @@ -587,8 +596,12 @@ def _get_initial_request(self, stream_ack_deadline_seconds):
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
subscription=self._subscription,
client_id=self._client_id,
max_outstanding_messages=self._flow_control.max_messages,
max_outstanding_bytes=self._flow_control.max_bytes,
max_outstanding_messages=(
0 if self._use_legacy_flow_control else self._flow_control.max_messages
Comment thread
pradn marked this conversation as resolved.
),
max_outstanding_bytes=(
0 if self._use_legacy_flow_control else self._flow_control.max_bytes
),
)

# Return the initial request.
Expand Down
19 changes: 17 additions & 2 deletions google/cloud/pubsub_v1/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,14 @@ def api(self):
"""The underlying gapic API client."""
return self._api

def subscribe(self, subscription, callback, flow_control=(), scheduler=None):
def subscribe(
self,
subscription,
callback,
flow_control=(),
scheduler=None,
use_legacy_flow_control=False,
):
"""Asynchronously start receiving messages on a given subscription.

This method starts a background thread to begin pulling messages from
Expand All @@ -179,6 +186,10 @@ def subscribe(self, subscription, callback, flow_control=(), scheduler=None):
settings may lead to faster throughput for messages that do not take
a long time to process.

The ``use_legacy_flow_control`` argument disables enforcing flow control
settings at the Cloud PubSub server and uses the less accurate method of
only enforcing flow control at the client side.

This method starts the receiver in the background and returns a
*Future* representing its execution. Waiting on the future (calling
``result()``) will block forever or until a non-recoverable error
Expand Down Expand Up @@ -238,7 +249,11 @@ def callback(message):
flow_control = types.FlowControl(*flow_control)

manager = streaming_pull_manager.StreamingPullManager(
self, subscription, flow_control=flow_control, scheduler=scheduler
self,
subscription,
flow_control=flow_control,
scheduler=scheduler,
use_legacy_flow_control=use_legacy_flow_control,
)

future = futures.StreamingPullFuture(manager)
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ def test_streaming_flow_control():
assert request.max_outstanding_bytes == 1000


def test_streaming_flow_control_use_legacy_flow_control():
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000),
use_legacy_flow_control=True,
)
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
assert request.max_outstanding_messages == 0
assert request.max_outstanding_bytes == 0


def test_ack_deadline_with_max_duration_per_lease_extension():
manager = make_manager()
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)
Expand Down