Skip to content

Commit c40e0d4

Browse files
authored
Merge pull request googleapis#2773 from daspecster/support-multiple-commits-if-failed-2771
Fix double encoding of messages when commit() fails.
2 parents 8001d45 + 51d2c3f commit c40e0d4

2 files changed

Lines changed: 27 additions & 3 deletions

File tree

pubsub/google/cloud/pubsub/_http.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Create / interact with Google Cloud Pub/Sub connections."""
1616

1717
import base64
18+
import copy
1819
import functools
1920
import os
2021

@@ -203,9 +204,10 @@ def topic_publish(self, topic_path, messages):
203204
:rtype: list of string
204205
:returns: list of opaque IDs for published messages.
205206
"""
206-
_transform_messages_base64(messages, _base64_unicode)
207+
messages_to_send = copy.deepcopy(messages)
208+
_transform_messages_base64(messages_to_send, _base64_unicode)
207209
conn = self._connection
208-
data = {'messages': messages}
210+
data = {'messages': messages_to_send}
209211
response = conn.api_request(
210212
method='POST', path='/%s:publish' % (topic_path,), data=data)
211213
return response['messageIds']

pubsub/unit_tests/test__http.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,32 @@ def test_topic_publish_hit(self):
284284
msg_data = connection._called_with['data']['messages'][0]['data']
285285
self.assertEqual(msg_data, B64_PAYLOAD)
286286

287+
def test_topic_publish_twice(self):
288+
import base64
289+
290+
PAYLOAD = b'This is the message text'
291+
B64_PAYLOAD = base64.b64encode(PAYLOAD).decode('ascii')
292+
MESSAGE = {'data': PAYLOAD, 'attributes': {}}
293+
RETURNED = {'messageIds': []}
294+
connection = _Connection(RETURNED, RETURNED)
295+
client = _Client(connection, self.PROJECT)
296+
api = self._make_one(client)
297+
298+
api.topic_publish(self.TOPIC_PATH, [MESSAGE])
299+
api.topic_publish(self.TOPIC_PATH, [MESSAGE])
300+
301+
messages = connection._called_with['data']['messages']
302+
self.assertEqual(len(messages), 1)
303+
self.assertEqual(messages[0]['data'], B64_PAYLOAD)
304+
287305
def test_topic_publish_miss(self):
306+
import base64
288307
from google.cloud.exceptions import NotFound
308+
289309
PAYLOAD = b'This is the message text'
310+
B64_PAYLOAD = base64.b64encode(PAYLOAD).decode('ascii')
290311
MESSAGE = {'data': PAYLOAD, 'attributes': {}}
312+
B64MSG = {'data': B64_PAYLOAD, 'attributes': {}}
291313
connection = _Connection()
292314
client = _Client(connection, self.PROJECT)
293315
api = self._make_one(client)
@@ -299,7 +321,7 @@ def test_topic_publish_miss(self):
299321
path = '/%s:publish' % (self.TOPIC_PATH,)
300322
self.assertEqual(connection._called_with['path'], path)
301323
self.assertEqual(connection._called_with['data'],
302-
{'messages': [MESSAGE]})
324+
{'messages': [B64MSG]})
303325

304326
def test_topic_list_subscriptions_no_paging(self):
305327
from google.cloud.pubsub.topic import Topic

0 commit comments

Comments
 (0)