File tree Expand file tree Collapse file tree 2 files changed +24
-4
lines changed
google/cloud/pubsub_v1/subscriber/policy
tests/unit/pubsub_v1/subscriber Expand file tree Collapse file tree 2 files changed +24
-4
lines changed Original file line number Diff line number Diff line change @@ -329,8 +329,20 @@ def on_exception(self, exception):
329329 def on_response (self , response ):
330330 """Process all received Pub/Sub messages.
331331
332- For each message, schedule a callback with the executor.
332+ For each message, send a modified acknowledgement request to the
333+ server. This prevents expiration of the message due to buffering by
334+ gRPC or proxy/firewall. This makes the server and client expiration
335+ timer closer to each other thus preventing the message being
336+ redelivered multiple times.
337+
338+ After the messages have all had their ack deadline updated, execute
339+ the callback for each message using the executor.
333340 """
341+ items = [
342+ base .ModAckRequest (message .ack_id , self .histogram .percentile (99 ))
343+ for message in response .received_messages
344+ ]
345+ self .modify_ack_deadline (items )
334346 for msg in response .received_messages :
335347 _LOGGER .debug (
336348 'Using %s to process message with ack_id %s.' ,
Original file line number Diff line number Diff line change @@ -209,9 +209,17 @@ def test_on_response():
209209 ],
210210 )
211211
212- # Actually run the method and prove that executor.submit and
213- # future.add_done_callback were called in the expected way.
214- policy .on_response (response )
212+ # Actually run the method and prove that modack and executor.submit
213+ # are called in the expected way.
214+ modack_patch = mock .patch .object (
215+ policy , 'modify_ack_deadline' , autospec = True )
216+ with modack_patch as modack :
217+ policy .on_response (response )
218+
219+ modack .assert_called_once_with (
220+ [base .ModAckRequest ('fack' , 10 ),
221+ base .ModAckRequest ('back' , 10 )]
222+ )
215223
216224 submit_calls = [m for m in executor .method_calls if m [0 ] == 'submit' ]
217225 assert len (submit_calls ) == 2
You can’t perform that action at this time.
0 commit comments