Skip to content

Commit 4a9a950

Browse files
authored
Checking Pub / Sub Batch.will_accept in thread-safe way. (googleapis#4616)
Fixes googleapis#4575.
1 parent 2edd612 commit 4a9a950

4 files changed

Lines changed: 118 additions & 64 deletions

File tree

pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class Batch(base.Batch):
3737
"""A batch of messages.
3838
3939
The batch is the internal group of messages which are either awaiting
40-
publication or currently in-flight.
40+
publication or currently in progress.
4141
4242
A batch is automatically created by the PublisherClient when the first
4343
message to be published is received; subsequent messages are added to
@@ -72,9 +72,9 @@ def __init__(self, client, topic, settings, autocommit=True):
7272
self._topic = topic
7373
self._settings = settings
7474

75-
self._commit_lock = threading.Lock()
76-
# These objects are all communicated between threads; ensure that
77-
# any writes to them are atomic.
75+
self._state_lock = threading.Lock()
76+
# These members are all communicated between threads; ensure that
77+
# any writes to them use the "state lock" to remain atomic.
7878
self._futures = []
7979
self._messages = []
8080
self._size = 0
@@ -133,17 +133,24 @@ def status(self):
133133
def commit(self):
134134
"""Actually publish all of the messages on the active batch.
135135
136-
This synchronously sets the batch status to in-flight, and then opens
137-
a new thread, which handles actually sending the messages to Pub/Sub.
138-
139136
.. note::
140137
141138
This method is non-blocking. It opens a new thread, which calls
142139
:meth:`_commit`, which does block.
140+
141+
This synchronously sets the batch status to "starting", and then opens
142+
a new thread, which handles actually sending the messages to Pub/Sub.
143+
144+
If the current batch is **not** accepting messages, this method
145+
does nothing.
143146
"""
144147
# Set the status to "starting" synchronously, to ensure that
145148
# this batch will necessarily not accept new messages.
146-
self._status = base.BatchStatus.STARTING
149+
with self._state_lock:
150+
if self._status == base.BatchStatus.ACCEPTING_MESSAGES:
151+
self._status = base.BatchStatus.STARTING
152+
else:
153+
return
147154

148155
# Start a new thread to actually handle the commit.
149156
commit_thread = threading.Thread(
@@ -155,7 +162,7 @@ def commit(self):
155162
def _commit(self):
156163
"""Actually publish all of the messages on the active batch.
157164
158-
This moves the batch out from being the active batch to an in-flight
165+
This moves the batch out from being the active batch to an in progress
159166
batch on the publisher, and then the batch is discarded upon
160167
completion.
161168
@@ -164,7 +171,7 @@ def _commit(self):
164171
This method blocks. The :meth:`commit` method is the non-blocking
165172
version, which calls this one.
166173
"""
167-
with self._commit_lock:
174+
with self._state_lock:
168175
if self._status in _CAN_COMMIT:
169176
self._status = base.BatchStatus.IN_PROGRESS
170177
else:
@@ -213,13 +220,13 @@ def monitor(self):
213220
This simply sleeps for ``self._settings.max_latency`` seconds,
214221
and then calls commit unless the batch has already been committed.
215222
"""
216-
# Note: This thread blocks; it is up to the calling code to call it
217-
# in a separate thread.
218-
#
223+
# NOTE: This blocks; it is up to the calling code to call it
224+
# in a separate thread.
225+
219226
# Sleep for however long we should be waiting.
220227
time.sleep(self._settings.max_latency)
221228

222-
# Commit.
229+
_LOGGER.debug('Monitor is waking up')
223230
return self._commit()
224231

225232
def publish(self, message):
@@ -235,24 +242,34 @@ def publish(self, message):
235242
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.
236243
237244
Returns:
238-
~google.api_core.future.Future: An object conforming to
239-
the :class:`concurrent.futures.Future` interface.
245+
Optional[~google.api_core.future.Future]: An object conforming to
246+
the :class:`~concurrent.futures.Future` interface or :data:`None`.
247+
If :data:`None` is returned, that signals that the batch cannot
248+
accept a message.
240249
"""
241250
# Coerce the type, just in case.
242251
if not isinstance(message, types.PubsubMessage):
243252
message = types.PubsubMessage(**message)
244253

245-
# Add the size to the running total of the size, so we know
246-
# if future messages need to be rejected.
247-
self._size += message.ByteSize()
248-
249-
# Store the actual message in the batch's message queue.
250-
self._messages.append(message)
251-
if len(self._messages) >= self._settings.max_messages:
254+
with self._state_lock:
255+
if not self.will_accept(message):
256+
return None
257+
258+
# Add the size to the running total of the size, so we know
259+
# if future messages need to be rejected.
260+
self._size += message.ByteSize()
261+
# Store the actual message in the batch's message queue.
262+
self._messages.append(message)
263+
# Track the future on this batch (so that the result of the
264+
# future can be set).
265+
future = futures.Future()
266+
self._futures.append(future)
267+
# Determine the number of messages before releasing the lock.
268+
num_messages = len(self._messages)
269+
270+
# Try to commit, but it must be **without** the lock held, since
271+
# ``commit()`` will try to obtain the lock.
272+
if num_messages >= self._settings.max_messages:
252273
self.commit()
253274

254-
# Return a Future. That future needs to be aware of the status
255-
# of this batch.
256-
f = futures.Future()
257-
self._futures.append(f)
258-
return f
275+
return future

pubsub/google/cloud/pubsub_v1/publisher/client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,8 @@ def publish(self, topic, data, **attrs):
196196
batch = self.batch(topic)
197197
future = None
198198
while future is None:
199-
if batch.will_accept(message):
200-
future = batch.publish(message)
201-
else:
199+
future = batch.publish(message)
200+
if future is None:
202201
batch = self.batch(topic, create=True)
203202

204203
return future

pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,19 @@ def test_commit():
9797
assert batch.status == BatchStatus.STARTING
9898

9999

100+
def test_commit_no_op():
101+
batch = create_batch()
102+
batch._status = BatchStatus.IN_PROGRESS
103+
with mock.patch.object(threading, 'Thread', autospec=True) as Thread:
104+
batch.commit()
105+
106+
# Make sure a thread was not created.
107+
Thread.assert_not_called()
108+
109+
# Check that batch status is unchanged.
110+
assert batch.status == BatchStatus.IN_PROGRESS
111+
112+
100113
def test_blocking__commit():
101114
batch = create_batch()
102115
futures = (
@@ -217,21 +230,35 @@ def test_publish():
217230
)
218231

219232
# Publish each of the messages, which should save them to the batch.
220-
for message in messages:
221-
batch.publish(message)
233+
futures = [batch.publish(message) for message in messages]
222234

223235
# There should be three messages on the batch, and three futures.
224236
assert len(batch.messages) == 3
225-
assert len(batch._futures) == 3
237+
assert batch._futures == futures
226238

227239
# The size should have been incremented by the sum of the size of the
228240
# messages.
229-
assert batch.size == sum([m.ByteSize() for m in messages])
241+
expected_size = sum([message_pb.ByteSize() for message_pb in messages])
242+
assert batch.size == expected_size
230243
assert batch.size > 0 # I do not always trust protobuf.
231244

232245

233-
def test_publish_max_messages():
234-
batch = create_batch(max_messages=4)
246+
def test_publish_not_will_accept():
247+
batch = create_batch(max_messages=0)
248+
249+
# Publish the message.
250+
message = types.PubsubMessage(data=b'foobarbaz')
251+
future = batch.publish(message)
252+
253+
assert future is None
254+
assert batch.size == 0
255+
assert batch.messages == []
256+
assert batch._futures == []
257+
258+
259+
def test_publish_exceed_max_messages():
260+
max_messages = 4
261+
batch = create_batch(max_messages=max_messages)
235262
messages = (
236263
types.PubsubMessage(data=b'foobarbaz'),
237264
types.PubsubMessage(data=b'spameggs'),
@@ -240,27 +267,29 @@ def test_publish_max_messages():
240267

241268
# Publish each of the messages, which should save them to the batch.
242269
with mock.patch.object(batch, 'commit') as commit:
243-
for message in messages:
244-
batch.publish(message)
270+
futures = [batch.publish(message) for message in messages]
271+
assert batch._futures == futures
272+
assert len(futures) == max_messages - 1
245273

246274
# Commit should not yet have been called.
247275
assert commit.call_count == 0
248276

249277
# When a fourth message is published, commit should be called.
250-
batch.publish(types.PubsubMessage(data=b'last one'))
278+
future = batch.publish(types.PubsubMessage(data=b'last one'))
251279
commit.assert_called_once_with()
252280

281+
futures.append(future)
282+
assert batch._futures == futures
283+
assert len(futures) == max_messages
284+
253285

254286
def test_publish_dict():
255287
batch = create_batch()
256-
batch.publish({'data': b'foobarbaz', 'attributes': {'spam': 'eggs'}})
288+
future = batch.publish(
289+
{'data': b'foobarbaz', 'attributes': {'spam': 'eggs'}})
257290

258291
# There should be one message on the batch.
259-
assert len(batch.messages) == 1
260-
261-
# It should be an actual protobuf Message at this point, with the
262-
# expected values.
263-
message = batch.messages[0]
264-
assert isinstance(message, types.PubsubMessage)
265-
assert message.data == b'foobarbaz'
266-
assert message.attributes == {'spam': 'eggs'}
292+
expected_message = types.PubsubMessage(
293+
data=b'foobarbaz', attributes={'spam': 'eggs'})
294+
assert batch.messages == [expected_message]
295+
assert batch._futures == [future]

pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,20 @@ def test_publish():
9595
batch = mock.Mock(spec=client._batch_class)
9696
# Set the mock up to claim indiscriminately that it accepts all messages.
9797
batch.will_accept.return_value = True
98+
batch.publish.side_effect = (
99+
mock.sentinel.future1,
100+
mock.sentinel.future2,
101+
)
98102

99103
topic = 'topic/path'
100104
client._batches[topic] = batch
101105

102106
# Begin publishing.
103-
client.publish(topic, b'spam')
104-
client.publish(topic, b'foo', bar='baz')
107+
future1 = client.publish(topic, b'spam')
108+
future2 = client.publish(topic, b'foo', bar='baz')
109+
110+
assert future1 is mock.sentinel.future1
111+
assert future2 is mock.sentinel.future2
105112

106113
# Check mock.
107114
batch.publish.assert_has_calls(
@@ -138,7 +145,9 @@ def test_publish_attrs_bytestring():
138145
client._batches[topic] = batch
139146

140147
# Begin publishing.
141-
client.publish(topic, b'foo', bar=b'baz')
148+
future = client.publish(topic, b'foo', bar=b'baz')
149+
150+
assert future is batch.publish.return_value
142151

143152
# The attributes should have been sent as text.
144153
batch.publish.assert_called_once_with(
@@ -158,8 +167,8 @@ def test_publish_new_batch_needed():
158167
batch2 = mock.Mock(spec=client._batch_class)
159168
# Set the first mock up to claim indiscriminately that it rejects all
160169
# messages and the second accepts all.
161-
batch1.will_accept.return_value = False
162-
batch2.will_accept.return_value = True
170+
batch1.publish.return_value = None
171+
batch2.publish.return_value = mock.sentinel.future
163172

164173
topic = 'topic/path'
165174
client._batches[topic] = batch1
@@ -168,23 +177,23 @@ def test_publish_new_batch_needed():
168177
batch_class = mock.Mock(spec=(), return_value=batch2)
169178
client._batch_class = batch_class
170179

171-
# Begin publishing.
172-
client.publish(topic, b'foo', bar=b'baz')
180+
# Publish a message.
181+
future = client.publish(topic, b'foo', bar=b'baz')
182+
assert future is mock.sentinel.future
183+
184+
# Check the mocks.
173185
batch_class.assert_called_once_with(
174186
autocommit=True,
175187
client=client,
176188
settings=client.batch_settings,
177189
topic=topic,
178190
)
179-
180-
# The attributes should have been sent as text.
181-
batch1.publish.assert_not_called()
182-
batch2.publish.assert_called_once_with(
183-
types.PubsubMessage(
184-
data=b'foo',
185-
attributes={'bar': u'baz'},
186-
),
191+
message_pb = types.PubsubMessage(
192+
data=b'foo',
193+
attributes={'bar': u'baz'},
187194
)
195+
batch1.publish.assert_called_once_with(message_pb)
196+
batch2.publish.assert_called_once_with(message_pb)
188197

189198

190199
def test_publish_attrs_type_error():

0 commit comments

Comments
 (0)