Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit ac9af40

Browse files
committed
Raise publish flow control errors through futures
1 parent 2fe776f commit ac9af40

2 files changed

Lines changed: 33 additions & 1 deletion

File tree

google/cloud/pubsub_v1/publisher/client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
from google.cloud.pubsub_v1 import types
3232
from google.cloud.pubsub_v1.gapic import publisher_client
3333
from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport
34+
from google.cloud.pubsub_v1.publisher import exceptions
35+
from google.cloud.pubsub_v1.publisher import futures
3436
from google.cloud.pubsub_v1.publisher._batch import thread
3537
from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
3638
from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer
@@ -379,7 +381,12 @@ def publish(self, topic, data, ordering_key="", **attrs):
379381

380382
# Messages should go through flow control to prevent excessive
381383
# queuing on the client side (depending on the settings).
382-
self._flow_controller.add(message)
384+
try:
385+
self._flow_controller.add(message)
386+
except exceptions.FlowControlLimitError as exc:
387+
future = futures.Future()
388+
future.set_exception(exc)
389+
return future
383390

384391
def on_publish_done(future):
385392
self._flow_controller.release(message)

tests/unit/pubsub_v1/publisher/test_publisher_client.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from google.cloud.pubsub_v1 import publisher
2626
from google.cloud.pubsub_v1 import types
2727

28+
from google.cloud.pubsub_v1.publisher import exceptions
2829
from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
2930

3031

@@ -156,6 +157,30 @@ def test_publish():
156157
)
157158

158159

160+
def test_publish_error_exceeding_flow_control_limits():
161+
creds = mock.Mock(spec=credentials.Credentials)
162+
publisher_options = types.PublisherOptions(
163+
flow_control=types.PublishFlowControl(
164+
message_limit=10,
165+
byte_limit=150,
166+
limit_exceeded_behavior=types.LimitExceededBehavior.ERROR,
167+
)
168+
)
169+
client = publisher.Client(credentials=creds, publisher_options=publisher_options)
170+
171+
mock_batch = mock.Mock(spec=client._batch_class)
172+
mock_batch.will_accept.return_value = True
173+
topic = "topic/path"
174+
client._set_batch(topic, mock_batch)
175+
176+
future1 = client.publish(topic, b"a" * 100)
177+
future2 = client.publish(topic, b"b" * 100)
178+
179+
future1.result() # no error, still within flow control limits
180+
with pytest.raises(exceptions.FlowControlLimitError):
181+
future2.result()
182+
183+
159184
def test_publish_data_not_bytestring_error():
160185
creds = mock.Mock(spec=credentials.Credentials)
161186
client = publisher.Client(credentials=creds)

0 commit comments

Comments
 (0)