Skip to content

Commit 50c8e88

Browse files
Pubsub batch autocommitting. (googleapis#2966)
* Pubsub batch autocommitting. This PR adds some functionality to the Batch object: * The ability to specify `max_messages` and have the batch automatically call `commit` when the number of messages gets that high. * The ability to specify `max_interval` and have the batch automatically commit when a publish occurs and the batch is at least as old as the specified interval. This is one of two changes requested by the PubSub team. * Addressing comments from @dhermes. * Remove unneeded -lt check @dhermes. * Make INFINITY have a leading underscore. @dhermes
1 parent d0a3819 commit 50c8e88

File tree

2 files changed

+114
-2
lines changed

2 files changed

+114
-2
lines changed

pubsub/google/cloud/pubsub/topic.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
"""Define API Topics."""
1616

17+
import time
18+
1719
from google.cloud._helpers import _datetime_to_rfc3339
1820
from google.cloud._helpers import _NOW
1921
from google.cloud.exceptions import NotFound
@@ -408,15 +410,40 @@ class Batch(object):
408410
:type topic: :class:`google.cloud.pubsub.topic.Topic`
409411
:param topic: the topic being published
410412
411-
:type client: :class:`google.cloud.pubsub.client.Client`
412413
:param client: The client to use.
414+
:type client: :class:`google.cloud.pubsub.client.Client`
415+
416+
:param max_interval: The maximum interval, in seconds, before the batch
417+
will automatically commit. Note that this does not
418+
run a background loop; it just checks when each
419+
message is published. Therefore, this is intended
420+
for situations where messages are published at
421+
reasonably regular intervals. Defaults to infinity
422+
(off).
423+
:type max_interval: float
424+
425+
:param max_messages: The maximum number of messages to hold in the batch
426+
before automatically commiting. Defaults to infinity
427+
(off).
428+
:type max_messages: float
413429
"""
414-
def __init__(self, topic, client):
430+
_INFINITY = float('inf')
431+
432+
def __init__(self, topic, client, max_interval=_INFINITY,
433+
max_messages=_INFINITY):
415434
self.topic = topic
416435
self.messages = []
417436
self.message_ids = []
418437
self.client = client
419438

439+
# Set the autocommit rules. If the interval or number of messages
440+
# is exceeded, then the .publish() method will imply a commit.
441+
self._max_interval = max_interval
442+
self._max_messages = max_messages
443+
444+
# Set the initial starting timestamp (used against the interval).
445+
self._start_timestamp = time.time()
446+
420447
def __enter__(self):
421448
return self
422449

@@ -441,6 +468,20 @@ def publish(self, message, **attrs):
441468
{'data': message,
442469
'attributes': attrs})
443470

471+
# If too much time has elapsed since the first message
472+
# was added, autocommit.
473+
now = time.time()
474+
if now - self._start_timestamp > self._max_interval:
475+
self.commit()
476+
self._start_timestamp = now
477+
return
478+
479+
# If the number of messages on the list is greater than the
480+
# maximum allowed, autocommit (with the batch's client).
481+
if len(self.messages) >= self._max_messages:
482+
self.commit()
483+
return
484+
444485
def commit(self, client=None):
445486
"""Send saved messages as a single API call.
446487

pubsub/unit_tests/test_topic.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,6 +779,77 @@ def test_context_mgr_failure(self):
779779
self.assertEqual(list(batch.messages), [MESSAGE1, MESSAGE2])
780780
self.assertEqual(getattr(api, '_topic_published', self), self)
781781

782+
def test_message_count_autocommit(self):
783+
"""Establish that if the batch is assigned to take a maximum
784+
number of messages, that it commits when it reaches that maximum.
785+
"""
786+
client = _Client(project='PROJECT')
787+
topic = _Topic(name='TOPIC')
788+
789+
# Track commits, but do not perform them.
790+
Batch = self._get_target_class()
791+
with mock.patch.object(Batch, 'commit') as commit:
792+
with self._make_one(topic, client=client, max_messages=5) as batch:
793+
self.assertIsInstance(batch, Batch)
794+
795+
# Publish four messages and establish that the batch does
796+
# not commit.
797+
for i in range(0, 4):
798+
batch.publish({
799+
'attributes': {},
800+
'data': 'Batch message %d.' % (i,),
801+
})
802+
commit.assert_not_called()
803+
804+
# Publish a fifth message and observe the commit.
805+
batch.publish({
806+
'attributes': {},
807+
'data': 'The final call to trigger a commit!',
808+
})
809+
commit.assert_called_once_with()
810+
811+
# There should be a second commit after the context manager
812+
# exits.
813+
self.assertEqual(commit.call_count, 2)
814+
815+
@mock.patch('time.time')
816+
def test_message_time_autocommit(self, mock_time):
817+
"""Establish that if the batch is sufficiently old, that it commits
818+
the next time it receives a publish.
819+
"""
820+
client = _Client(project='PROJECT')
821+
topic = _Topic(name='TOPIC')
822+
823+
# Track commits, but do not perform them.
824+
Batch = self._get_target_class()
825+
with mock.patch.object(Batch, 'commit') as commit:
826+
mock_time.return_value = 0.0
827+
with self._make_one(topic, client=client, max_interval=5) as batch:
828+
self.assertIsInstance(batch, Batch)
829+
830+
# Publish some messages and establish that the batch does
831+
# not commit.
832+
for i in range(0, 10):
833+
batch.publish({
834+
'attributes': {},
835+
'data': 'Batch message %d.' % (i,),
836+
})
837+
commit.assert_not_called()
838+
839+
# Move time ahead so that this batch is too old.
840+
mock_time.return_value = 10.0
841+
842+
# Publish another message and observe the commit.
843+
batch.publish({
844+
'attributes': {},
845+
'data': 'The final call to trigger a commit!',
846+
})
847+
commit.assert_called_once_with()
848+
849+
# There should be a second commit after the context manager
850+
# exits.
851+
self.assertEqual(commit.call_count, 2)
852+
782853

783854
class _FauxPublisherAPI(object):
784855
_api_called = 0

0 commit comments

Comments
 (0)