Skip to content

Commit 72cbe41

Browse files
authored
Moving will_accept() check out of PublisherClient.batch() factory. (googleapis#4613)
This is the first stage of moving that check directly into `Batch.publish()`. This check **must** occur when the message is being published because concurrent access to the messages and futures on a `Batch` make it impossible to reliably do an LBYL "will accept?" check.
1 parent 25d93e6 commit 72cbe41

File tree

2 files changed

+121
-62
lines changed

2 files changed

+121
-62
lines changed

pubsub/google/cloud/pubsub_v1/publisher/client.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,30 +98,34 @@ def target(self):
9898
"""
9999
return publisher_client.PublisherClient.SERVICE_ADDRESS
100100

101-
def batch(self, topic, message, create=True, autocommit=True):
101+
def batch(self, topic, create=False, autocommit=True):
102102
"""Return the current batch for the provided topic.
103103
104-
This will create a new batch only if no batch currently exists.
104+
This will create a new batch if ``create=True`` or if no batch
105+
currently exists.
105106
106107
Args:
107108
topic (str): A string representing the topic.
108-
message (~google.cloud.pubsub_v1.types.PubsubMessage): The message
109-
that will be committed.
110-
create (bool): Whether to create a new batch if no batch is
111-
found. Defaults to True.
112-
autocommit (bool): Whether to autocommit this batch.
113-
This is primarily useful for debugging.
109+
create (bool): Whether to create a new batch. Defaults to
110+
:data:`False`. If :data:`True`, this will create a new batch
111+
even if one already exists.
112+
autocommit (bool): Whether to autocommit this batch. This is
113+
primarily useful for debugging and testing, since it allows
114+
the caller to avoid some side effects that batch creation
115+
might have (e.g. spawning a worker to publish a batch).
114116
115117
Returns:
116118
~.pubsub_v1.batch.Batch: The batch object.
117119
"""
118120
# If there is no matching batch yet, then potentially create one
119121
# and place it on the batches dictionary.
120122
with self._batch_lock:
121-
batch = self._batches.get(topic, None)
122-
if batch is None or not batch.will_accept(message):
123-
if not create:
124-
return None
123+
if not create:
124+
batch = self._batches.get(topic)
125+
if batch is None:
126+
create = True
127+
128+
if create:
125129
batch = self._batch_class(
126130
autocommit=autocommit,
127131
client=self,
@@ -130,7 +134,6 @@ def batch(self, topic, message, create=True, autocommit=True):
130134
)
131135
self._batches[topic] = batch
132136

133-
# Simply return the appropriate batch.
134137
return batch
135138

136139
def publish(self, topic, data, **attrs):
@@ -190,4 +193,12 @@ def publish(self, topic, data, **attrs):
190193
message = types.PubsubMessage(data=data, attributes=attrs)
191194

192195
# Delegate the publishing to the batch.
193-
return self.batch(topic, message=message).publish(message)
196+
batch = self.batch(topic)
197+
future = None
198+
while future is None:
199+
if batch.will_accept(message):
200+
future = batch.publish(message)
201+
else:
202+
batch = self.batch(topic, create=True)
203+
204+
return future

pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py

Lines changed: 96 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -51,100 +51,148 @@ def test_init_emulator(monkeypatch):
5151
assert channel.target().decode('utf8') == '/foo/bar/'
5252

5353

54-
def test_batch_accepting():
55-
"""Establish that an existing batch is returned if it accepts messages."""
54+
def test_batch_create():
5655
creds = mock.Mock(spec=credentials.Credentials)
5756
client = publisher.Client(credentials=creds)
58-
message = types.PubsubMessage(data=b'foo')
5957

60-
# At first, there are no batches, so this should return a new batch
61-
# which is also saved to the object.
62-
ante = len(client._batches)
63-
batch = client.batch('topic_name', message, autocommit=False)
64-
assert len(client._batches) == ante + 1
65-
assert batch is client._batches['topic_name']
58+
assert len(client._batches) == 0
59+
topic = 'topic/path'
60+
batch = client.batch(topic, autocommit=False)
61+
assert client._batches == {topic: batch}
62+
63+
64+
def test_batch_exists():
65+
creds = mock.Mock(spec=credentials.Credentials)
66+
client = publisher.Client(credentials=creds)
67+
68+
topic = 'topic/path'
69+
client._batches[topic] = mock.sentinel.batch
6670

6771
# A subsequent request should return the same batch.
68-
batch2 = client.batch('topic_name', message, autocommit=False)
69-
assert batch is batch2
70-
assert batch2 is client._batches['topic_name']
72+
batch = client.batch(topic, autocommit=False)
73+
assert batch is mock.sentinel.batch
74+
assert client._batches == {topic: batch}
7175

7276

73-
def test_batch_without_autocreate():
77+
def test_batch_create_and_exists():
7478
creds = mock.Mock(spec=credentials.Credentials)
7579
client = publisher.Client(credentials=creds)
76-
message = types.PubsubMessage(data=b'foo')
7780

78-
# If `create=False` is sent, then when the batch is not found, None
79-
# is returned instead.
80-
ante = len(client._batches)
81-
batch = client.batch('topic_name', message, create=False)
82-
assert batch is None
83-
assert len(client._batches) == ante
81+
topic = 'topic/path'
82+
client._batches[topic] = mock.sentinel.batch
83+
84+
# A subsequent request should return the same batch.
85+
batch = client.batch(topic, create=True, autocommit=False)
86+
assert batch is not mock.sentinel.batch
87+
assert client._batches == {topic: batch}
8488

8589

8690
def test_publish():
8791
creds = mock.Mock(spec=credentials.Credentials)
8892
client = publisher.Client(credentials=creds)
8993

90-
# Use a mock in lieu of the actual batch class; set the mock up to claim
91-
# indiscriminately that it accepts all messages.
94+
# Use a mock in lieu of the actual batch class.
9295
batch = mock.Mock(spec=client._batch_class)
96+
# Set the mock up to claim indiscriminately that it accepts all messages.
9397
batch.will_accept.return_value = True
94-
client._batches['topic_name'] = batch
95-
96-
# Begin publishing.
97-
client.publish('topic_name', b'spam')
98-
client.publish('topic_name', b'foo', bar='baz')
9998

100-
# The batch's publish method should have been called twice.
101-
assert batch.publish.call_count == 2
99+
topic = 'topic/path'
100+
client._batches[topic] = batch
102101

103-
# In both cases
104-
# The first call should correspond to the first message.
105-
_, args, _ = batch.publish.mock_calls[0]
106-
assert args[0].data == b'spam'
107-
assert not args[0].attributes
102+
# Begin publishing.
103+
client.publish(topic, b'spam')
104+
client.publish(topic, b'foo', bar='baz')
108105

109-
# The second call should correspond to the second message.
110-
_, args, _ = batch.publish.mock_calls[1]
111-
assert args[0].data == b'foo'
112-
assert args[0].attributes == {u'bar': u'baz'}
106+
# Check mock.
107+
batch.publish.assert_has_calls(
108+
[
109+
mock.call(types.PubsubMessage(data=b'spam')),
110+
mock.call(types.PubsubMessage(
111+
data=b'foo',
112+
attributes={'bar': 'baz'},
113+
)),
114+
],
115+
)
113116

114117

115118
def test_publish_data_not_bytestring_error():
116119
creds = mock.Mock(spec=credentials.Credentials)
117120
client = publisher.Client(credentials=creds)
121+
topic = 'topic/path'
118122
with pytest.raises(TypeError):
119-
client.publish('topic_name', u'This is a text string.')
123+
client.publish(topic, u'This is a text string.')
120124
with pytest.raises(TypeError):
121-
client.publish('topic_name', 42)
125+
client.publish(topic, 42)
122126

123127

124128
def test_publish_attrs_bytestring():
125129
creds = mock.Mock(spec=credentials.Credentials)
126130
client = publisher.Client(credentials=creds)
127131

128-
# Use a mock in lieu of the actual batch class; set the mock up to claim
129-
# indiscriminately that it accepts all messages.
132+
# Use a mock in lieu of the actual batch class.
130133
batch = mock.Mock(spec=client._batch_class)
134+
# Set the mock up to claim indiscriminately that it accepts all messages.
131135
batch.will_accept.return_value = True
132-
client._batches['topic_name'] = batch
136+
137+
topic = 'topic/path'
138+
client._batches[topic] = batch
139+
140+
# Begin publishing.
141+
client.publish(topic, b'foo', bar=b'baz')
142+
143+
# The attributes should have been sent as text.
144+
batch.publish.assert_called_once_with(
145+
types.PubsubMessage(
146+
data=b'foo',
147+
attributes={'bar': u'baz'},
148+
),
149+
)
150+
151+
152+
def test_publish_new_batch_needed():
153+
creds = mock.Mock(spec=credentials.Credentials)
154+
client = publisher.Client(credentials=creds)
155+
156+
# Use mocks in lieu of the actual batch class.
157+
batch1 = mock.Mock(spec=client._batch_class)
158+
batch2 = mock.Mock(spec=client._batch_class)
159+
# Set the first mock up to claim indiscriminately that it rejects all
160+
# messages and the second accepts all.
161+
batch1.will_accept.return_value = False
162+
batch2.will_accept.return_value = True
163+
164+
topic = 'topic/path'
165+
client._batches[topic] = batch1
166+
167+
# Actually mock the batch class now.
168+
batch_class = mock.Mock(spec=(), return_value=batch2)
169+
client._batch_class = batch_class
133170

134171
# Begin publishing.
135-
client.publish('topic_name', b'foo', bar=b'baz')
172+
client.publish(topic, b'foo', bar=b'baz')
173+
batch_class.assert_called_once_with(
174+
autocommit=True,
175+
client=client,
176+
settings=client.batch_settings,
177+
topic=topic,
178+
)
136179

137180
# The attributes should have been sent as text.
138-
_, args, _ = batch.publish.mock_calls[0]
139-
assert args[0].data == b'foo'
140-
assert args[0].attributes == {u'bar': u'baz'}
181+
batch1.publish.assert_not_called()
182+
batch2.publish.assert_called_once_with(
183+
types.PubsubMessage(
184+
data=b'foo',
185+
attributes={'bar': u'baz'},
186+
),
187+
)
141188

142189

143190
def test_publish_attrs_type_error():
144191
creds = mock.Mock(spec=credentials.Credentials)
145192
client = publisher.Client(credentials=creds)
193+
topic = 'topic/path'
146194
with pytest.raises(TypeError):
147-
client.publish('topic_name', b'foo', answer=42)
195+
client.publish(topic, b'foo', answer=42)
148196

149197

150198
def test_gapic_instance_method():

0 commit comments

Comments
 (0)