Skip to content

Commit 60d4460

Browse files
authored
Add support for streaming pull receipts. (googleapis#4878)
* Closes (googleapis#4325) - PubSub: Implement StreamingPullRequest modify acknowledgement as message receipt * Review Changes * Minor fixups * Minor fixups
1 parent 59ab90c commit 60d4460

File tree

2 files changed

+24
-4
lines changed

2 files changed

+24
-4
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,20 @@ def on_exception(self, exception):
329329
def on_response(self, response):
330330
"""Process all received Pub/Sub messages.
331331
332-
For each message, schedule a callback with the executor.
332+
For each message, send a modified acknowledgement request to the
333+
server. This prevents expiration of the message due to buffering by
334+
gRPC or proxy/firewall. This makes the server and client expiration
335+
timer closer to each other thus preventing the message being
336+
redelivered multiple times.
337+
338+
After the messages have all had their ack deadline updated, execute
339+
the callback for each message using the executor.
333340
"""
341+
items = [
342+
base.ModAckRequest(message.ack_id, self.histogram.percentile(99))
343+
for message in response.received_messages
344+
]
345+
self.modify_ack_deadline(items)
334346
for msg in response.received_messages:
335347
_LOGGER.debug(
336348
'Using %s to process message with ack_id %s.',

pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,17 @@ def test_on_response():
209209
],
210210
)
211211

212-
# Actually run the method and prove that executor.submit and
213-
# future.add_done_callback were called in the expected way.
214-
policy.on_response(response)
212+
# Actually run the method and prove that modack and executor.submit
213+
# are called in the expected way.
214+
modack_patch = mock.patch.object(
215+
policy, 'modify_ack_deadline', autospec=True)
216+
with modack_patch as modack:
217+
policy.on_response(response)
218+
219+
modack.assert_called_once_with(
220+
[base.ModAckRequest('fack', 10),
221+
base.ModAckRequest('back', 10)]
222+
)
215223

216224
submit_calls = [m for m in executor.method_calls if m[0] == 'submit']
217225
assert len(submit_calls) == 2

0 commit comments

Comments
 (0)