1616
1717package com .google .cloud .pubsub .spi .v1 ;
1818
19- import com .google .api .gax .core .FlowController ;
2019import com .google .api .gax .core .ApiClock ;
20+ import com .google .api .gax .core .FlowController ;
2121import com .google .api .stats .Distribution ;
2222import com .google .common .annotations .VisibleForTesting ;
2323import com .google .common .collect .Lists ;
2828import com .google .pubsub .v1 .PubsubMessage ;
2929import com .google .pubsub .v1 .ReceivedMessage ;
3030import java .util .ArrayList ;
31+ import java .util .Collection ;
3132import java .util .Collections ;
3233import java .util .HashSet ;
3334import java .util .Iterator ;
@@ -260,28 +261,46 @@ public int getMessageDeadlineSeconds() {
260261 }
261262
262263 public void processReceivedMessages (List <com .google .pubsub .v1 .ReceivedMessage > responseMessages ) {
263- if (responseMessages .size () == 0 ) {
264+ if (responseMessages .isEmpty () ) {
264265 return ;
265266 }
266- Instant now = new Instant (clock .millisTime ());
267- int totalByteCount = 0 ;
267+
268268 final ArrayList <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
269269 for (ReceivedMessage pubsubMessage : responseMessages ) {
270- int messageSize = pubsubMessage .getMessage ().getSerializedSize ();
271- totalByteCount += messageSize ;
272- ackHandlers .add (new AckHandler (pubsubMessage .getAckId (), messageSize ));
270+ ackHandlers .add (new AckHandler (pubsubMessage .getAckId (), pubsubMessage .getMessage ().getSerializedSize ()));
273271 }
272+
273+ Instant now = new Instant (clock .millisTime ());
274274 Instant expiration = now .plus (messageDeadlineSeconds * 1000 );
275275 logger .log (
276276 Level .FINER , "Received {0} messages at {1}" , new Object [] {responseMessages .size (), now });
277277
278+ synchronized (outstandingAckHandlers ) {
279+ // AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time.
280+ // We will also later iterate over ackHandlers when we give messages to user code.
281+ // We must create a new list to pass to outstandingAckHandlers,
282+ // so that we can't iterate and modify the list concurrently.
283+ outstandingAckHandlers .add (
284+ new ExtensionJob (
285+ expiration ,
286+ INITIAL_ACK_DEADLINE_EXTENSION_SECONDS ,
287+ new ArrayList <AckHandler >(ackHandlers )));
288+ }
289+ setupNextAckDeadlineExtensionAlarm (expiration );
290+
291+ // Deadline extension must be set up before we reserve flow control.
292+ // Flow control might block for a while, and extension will keep messages from expiring.
293+
278294 try {
279- flowController .reserve (responseMessages .size (), totalByteCount );
280- } catch (FlowController .FlowControlException unexpectedException ) {
281- throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
295+ flowController .reserve (responseMessages .size (), totalMessageSize ( responseMessages ) );
296+ } catch (FlowController .FlowControlException e ) {
297+ throw new IllegalStateException ("Flow control unexpected exception" , e );
282298 }
283299 messagesWaiter .incrementPendingMessages (responseMessages .size ());
284300
301+ // Reserving flow control must happen before we give the messages to the user,
302+ // otherwise the user code might be given too many messages to process at once.
303+
285304 Iterator <AckHandler > acksIterator = ackHandlers .iterator ();
286305 for (ReceivedMessage userMessage : responseMessages ) {
287306 final PubsubMessage message = userMessage .getMessage ();
@@ -307,12 +326,14 @@ public void run() {
307326 }
308327 });
309328 }
329+ }
310330
311- synchronized (outstandingAckHandlers ) {
312- outstandingAckHandlers .add (
313- new ExtensionJob (expiration , INITIAL_ACK_DEADLINE_EXTENSION_SECONDS , ackHandlers ));
331+ private static int totalMessageSize (Collection <ReceivedMessage > messages ) {
332+ int total = 0 ;
333+ for (ReceivedMessage message : messages ) {
334+ total += message .getMessage ().getSerializedSize ();
314335 }
315- setupNextAckDeadlineExtensionAlarm ( expiration ) ;
336+ return total ;
316337 }
317338
318339 private void setupPendingAcksAlarm () {
0 commit comments