Skip to content

Commit 0699ba6

Browse files
authored
fix(pubsub): include request overhead when computing publish batch size overflow (googleapis#9911)
* Clarify the description of BatchSettings.max_bytes * Include overhead in batch overflow calculation The maximum allowed size for a PublishRequest on the backend is lower than a mere sum of the byte sizes of individual messages. This commit adjusts the batch size overflow calculation to account for this overhead. It also caps the effective maximum BatchSetting.max_size value to 10_000_000 bytes (the limit on the backend). (credit also to GitHub @relud for outlining the main idea first in the issue description) * Access settings inside Batch in a consistent way. * Cleanup and refactor a few code snippets * Raise more specific error if message too large
1 parent 6838a4f commit 0699ba6

6 files changed

Lines changed: 137 additions & 31 deletions

File tree

pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,12 @@ def messages(self):
7575
def size(self):
7676
"""Return the total size of all of the messages currently in the batch.
7777
78+
The size includes any overhead of the actual ``PublishRequest`` that is
79+
sent to the backend.
80+
7881
Returns:
7982
int: The total size of all of the messages currently
80-
in the batch, in bytes.
83+
in the batch (including the request overhead), in bytes.
8184
"""
8285
raise NotImplementedError
8386

pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
_LOGGER = logging.getLogger(__name__)
3131
_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
32+
_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest
3233

3334

3435
class Batch(base.Batch):
@@ -79,13 +80,17 @@ def __init__(self, client, topic, settings, autocommit=True):
7980
# in order to avoid race conditions
8081
self._futures = []
8182
self._messages = []
82-
self._size = 0
8383
self._status = base.BatchStatus.ACCEPTING_MESSAGES
8484

85+
# The initial size is not zero, we need to account for the size overhead
86+
# of the PublishRequest message itself.
87+
self._base_request_size = types.PublishRequest(topic=topic).ByteSize()
88+
self._size = self._base_request_size
89+
8590
# If max latency is specified, start a thread to monitor the batch and
8691
# commit when the max latency is reached.
8792
self._thread = None
88-
if autocommit and self._settings.max_latency < float("inf"):
93+
if autocommit and self.settings.max_latency < float("inf"):
8994
self._thread = threading.Thread(
9095
name="Thread-MonitorBatchPublisher", target=self.monitor
9196
)
@@ -124,9 +129,12 @@ def settings(self):
124129
def size(self):
125130
"""Return the total size of all of the messages currently in the batch.
126131
132+
The size includes any overhead of the actual ``PublishRequest`` that is
133+
sent to the backend.
134+
127135
Returns:
128136
int: The total size of all of the messages currently
129-
in the batch, in bytes.
137+
in the batch (including the request overhead), in bytes.
130138
"""
131139
return self._size
132140

@@ -251,14 +259,14 @@ def _commit(self):
251259
def monitor(self):
252260
"""Commit this batch after sufficient time has elapsed.
253261
254-
This simply sleeps for ``self._settings.max_latency`` seconds,
262+
This simply sleeps for ``self.settings.max_latency`` seconds,
255263
and then calls commit unless the batch has already been committed.
256264
"""
257265
# NOTE: This blocks; it is up to the calling code to call it
258266
# in a separate thread.
259267

260268
# Sleep for however long we should be waiting.
261-
time.sleep(self._settings.max_latency)
269+
time.sleep(self.settings.max_latency)
262270

263271
_LOGGER.debug("Monitor is waking up")
264272
return self._commit()
@@ -281,6 +289,10 @@ def publish(self, message):
281289
the :class:`~concurrent.futures.Future` interface or :data:`None`.
282290
If :data:`None` is returned, that signals that the batch cannot
283291
accept a message.
292+
293+
Raises:
294+
pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
295+
the ``message`` would exceed the max size limit on the backend.
284296
"""
285297
# Coerce the type, just in case.
286298
if not isinstance(message, types.PubsubMessage):
@@ -292,12 +304,21 @@ def publish(self, message):
292304
if not self.will_accept(message):
293305
return future
294306

295-
new_size = self._size + message.ByteSize()
307+
size_increase = types.PublishRequest(messages=[message]).ByteSize()
308+
309+
if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES:
310+
err_msg = (
311+
"The message being published would produce too large a publish "
312+
"request that would exceed the maximum allowed size on the "
313+
"backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES)
314+
)
315+
raise exceptions.MessageTooLargeError(err_msg)
316+
317+
new_size = self._size + size_increase
296318
new_count = len(self._messages) + 1
297-
overflow = (
298-
new_size > self.settings.max_bytes
299-
or new_count >= self._settings.max_messages
300-
)
319+
320+
size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES)
321+
overflow = new_size > size_limit or new_count >= self.settings.max_messages
301322

302323
if not self._messages or not overflow:
303324

pubsub/google/cloud/pubsub_v1/publisher/exceptions.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,8 @@ class PublishError(GoogleAPICallError):
2222
pass
2323

2424

25-
__all__ = ("PublishError", "TimeoutError")
25+
class MessageTooLargeError(ValueError):
26+
"""Attempt to publish a message that would exceed the server max size limit."""
27+
28+
29+
__all__ = ("MessageTooLargeError", "PublishError", "TimeoutError")

pubsub/google/cloud/pubsub_v1/types.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@
4848
BatchSettings.__doc__ = "The settings for batch publishing the messages."
4949
BatchSettings.max_bytes.__doc__ = (
5050
"The maximum total size of the messages to collect before automatically "
51-
"publishing the batch."
51+
"publishing the batch, including any byte size overhead of the publish "
52+
"request itself. The maximum value is bound by the server-side limit of "
53+
"10_000_000 bytes."
5254
)
5355
BatchSettings.max_latency.__doc__ = (
5456
"The maximum number of seconds to wait for additional messages before "

pubsub/tests/system.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,24 +74,52 @@ def cleanup():
7474

7575

7676
def test_publish_messages(publisher, topic_path, cleanup):
77-
futures = []
7877
# Make sure the topic gets deleted.
7978
cleanup.append((publisher.delete_topic, topic_path))
8079

8180
publisher.create_topic(topic_path)
82-
for index in six.moves.range(500):
83-
futures.append(
84-
publisher.publish(
85-
topic_path,
86-
b"The hail in Wales falls mainly on the snails.",
87-
num=str(index),
88-
)
81+
82+
futures = [
83+
publisher.publish(
84+
topic_path, b"The hail in Wales falls mainly on the snails.", num=str(i)
8985
)
86+
for i in six.moves.range(500)
87+
]
88+
9089
for future in futures:
9190
result = future.result()
9291
assert isinstance(result, six.string_types)
9392

9493

94+
def test_publish_large_messages(publisher, topic_path, cleanup):
95+
# Make sure the topic gets deleted.
96+
cleanup.append((publisher.delete_topic, topic_path))
97+
98+
# Each message should be smaller than 10**7 bytes (the server side limit for
99+
# PublishRequest), but all messages combined in a PublishRequest should
100+
# slightly exceed that threshold to make sure the publish code handles these
101+
# cases well.
102+
# Mind that the total PublishRequest size must still be smaller than
103+
# 10 * 1024 * 1024 bytes in order to not exceed the max request body size limit.
104+
msg_data = b"x" * (2 * 10 ** 6)
105+
106+
publisher.batch_settings = types.BatchSettings(
107+
max_bytes=11 * 1000 * 1000, # more than the server limit of 10 ** 7
108+
max_latency=2.0, # so that autocommit happens after publishing all messages
109+
max_messages=100,
110+
)
111+
publisher.create_topic(topic_path)
112+
113+
futures = [publisher.publish(topic_path, msg_data, num=str(i)) for i in range(5)]
114+
115+
# If the publishing logic correctly split all messages into more than a
116+
# single batch despite a high BatchSettings.max_bytes limit, there should
117+
# be no "InvalidArgument: request_size is too large" error.
118+
for future in futures:
119+
result = future.result(timeout=10)
120+
assert isinstance(result, six.string_types) # the message ID
121+
122+
95123
def test_subscribe_to_messages(
96124
publisher, topic_path, subscriber, subscription_path, cleanup
97125
):

pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ def create_client():
3434
return publisher.Client(credentials=creds)
3535

3636

37-
def create_batch(autocommit=False, **batch_settings):
37+
def create_batch(autocommit=False, topic="topic_name", **batch_settings):
3838
"""Return a batch object suitable for testing.
3939
4040
Args:
4141
autocommit (bool): Whether the batch should commit after
4242
``max_latency`` seconds. By default, this is ``False``
4343
for unit testing.
44+
topic (str): The name of the topic the batch should publish
45+
the messages to.
4446
batch_settings (dict): Arguments passed on to the
4547
:class:``~.pubsub_v1.types.BatchSettings`` constructor.
4648
@@ -49,7 +51,7 @@ def create_batch(autocommit=False, **batch_settings):
4951
"""
5052
client = create_client()
5153
settings = types.BatchSettings(**batch_settings)
52-
return Batch(client, "topic_name", settings, autocommit=autocommit)
54+
return Batch(client, topic, settings, autocommit=autocommit)
5355

5456

5557
def test_init():
@@ -299,8 +301,8 @@ def test_monitor_already_committed():
299301
assert batch._status == status
300302

301303

302-
def test_publish():
303-
batch = create_batch()
304+
def test_publish_updating_batch_size():
305+
batch = create_batch(topic="topic_foo")
304306
messages = (
305307
types.PubsubMessage(data=b"foobarbaz"),
306308
types.PubsubMessage(data=b"spameggs"),
@@ -314,22 +316,27 @@ def test_publish():
314316
assert len(batch.messages) == 3
315317
assert batch._futures == futures
316318

317-
# The size should have been incremented by the sum of the size of the
318-
# messages.
319-
expected_size = sum([message_pb.ByteSize() for message_pb in messages])
320-
assert batch.size == expected_size
319+
# The size should have been incremented by the sum of the size
320+
# contributions of each message to the PublishRequest.
321+
base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()
322+
expected_request_size = base_request_size + sum(
323+
types.PublishRequest(messages=[msg]).ByteSize() for msg in messages
324+
)
325+
326+
assert batch.size == expected_request_size
321327
assert batch.size > 0 # I do not always trust protobuf.
322328

323329

324330
def test_publish_not_will_accept():
325-
batch = create_batch(max_messages=0)
331+
batch = create_batch(topic="topic_foo", max_messages=0)
332+
base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()
326333

327334
# Publish the message.
328335
message = types.PubsubMessage(data=b"foobarbaz")
329336
future = batch.publish(message)
330337

331338
assert future is None
332-
assert batch.size == 0
339+
assert batch.size == base_request_size
333340
assert batch.messages == []
334341
assert batch._futures == []
335342

@@ -361,6 +368,47 @@ def test_publish_exceed_max_messages():
361368
assert batch._futures == futures
362369

363370

371+
@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000)
372+
def test_publish_single_message_size_exceeds_server_size_limit():
373+
batch = create_batch(
374+
topic="topic_foo",
375+
max_messages=1000,
376+
max_bytes=1000 * 1000, # way larger than (mocked) server side limit
377+
)
378+
379+
big_message = types.PubsubMessage(data=b"x" * 984)
380+
381+
request_size = types.PublishRequest(
382+
topic="topic_foo", messages=[big_message]
383+
).ByteSize()
384+
assert request_size == 1001 # sanity check, just above the (mocked) server limit
385+
386+
with pytest.raises(exceptions.MessageTooLargeError):
387+
batch.publish(big_message)
388+
389+
390+
@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000)
391+
def test_publish_total_messages_size_exceeds_server_size_limit():
392+
batch = create_batch(topic="topic_foo", max_messages=10, max_bytes=1500)
393+
394+
messages = (
395+
types.PubsubMessage(data=b"x" * 500),
396+
types.PubsubMessage(data=b"x" * 600),
397+
)
398+
399+
# Sanity check - request size is still below BatchSettings.max_bytes,
400+
# but it exceeds the server-side size limit.
401+
request_size = types.PublishRequest(topic="topic_foo", messages=messages).ByteSize()
402+
assert 1000 < request_size < 1500
403+
404+
with mock.patch.object(batch, "commit") as fake_commit:
405+
batch.publish(messages[0])
406+
batch.publish(messages[1])
407+
408+
# The server side limit should kick in and cause a commit.
409+
fake_commit.assert_called_once()
410+
411+
364412
def test_publish_dict():
365413
batch = create_batch()
366414
future = batch.publish({"data": b"foobarbaz", "attributes": {"spam": "eggs"}})

0 commit comments

Comments
 (0)