1616
1717package com .google .cloud .pubsub .spi .v1 ;
1818
19- import com .google .api .gax .core .FlowController ;
2019import com .google .api .gax .core .ApiClock ;
20+ import com .google .api .gax .core .FlowController ;
2121import com .google .api .stats .Distribution ;
2222import com .google .common .annotations .VisibleForTesting ;
2323import com .google .common .collect .Lists ;
2828import com .google .pubsub .v1 .PubsubMessage ;
2929import com .google .pubsub .v1 .ReceivedMessage ;
3030import java .util .ArrayList ;
31+ import java .util .Collection ;
3132import java .util .Collections ;
3233import java .util .HashSet ;
3334import java .util .Iterator ;
@@ -260,23 +261,47 @@ public int getMessageDeadlineSeconds() {
260261 }
261262
262263 public void processReceivedMessages (List <com .google .pubsub .v1 .ReceivedMessage > responseMessages ) {
263- int receivedMessagesCount = responseMessages .size ();
264- if (receivedMessagesCount == 0 ) {
264+ if (responseMessages .isEmpty ()) {
265265 return ;
266266 }
267- Instant now = new Instant (clock .millisTime ());
268- int totalByteCount = 0 ;
267+
269268 final ArrayList <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
270269 for (ReceivedMessage pubsubMessage : responseMessages ) {
271- int messageSize = pubsubMessage .getMessage ().getSerializedSize ();
272- totalByteCount += messageSize ;
273- ackHandlers .add (new AckHandler ( pubsubMessage . getAckId (), messageSize ) );
270+ int size = pubsubMessage .getMessage ().getSerializedSize ();
271+ AckHandler handler = new AckHandler ( pubsubMessage . getAckId (), size ) ;
272+ ackHandlers .add (handler );
274273 }
274+
275+ Instant now = new Instant (clock .millisTime ());
275276 Instant expiration = now .plus (messageDeadlineSeconds * 1000 );
276277 logger .log (
277278 Level .FINER , "Received {0} messages at {1}" , new Object [] {responseMessages .size (), now });
278279
280+ // We must add the ackHandlers to outstandingAckHandlers before setting up the deadline extension alarm.
281+ // Otherwise, the alarm might go off before we can add the handlers.
282+ synchronized (outstandingAckHandlers ) {
283+ // AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time.
284+ // We will also later iterate over ackHandlers when we give messages to user code.
285+ // We must create a new list to pass to outstandingAckHandlers,
286+ // so that we can't iterate and modify the list concurrently.
287+ ArrayList <AckHandler > ackHandlersCopy = new ArrayList <>(ackHandlers );
288+ outstandingAckHandlers .add (
289+ new ExtensionJob (expiration , INITIAL_ACK_DEADLINE_EXTENSION_SECONDS , ackHandlersCopy ));
290+ }
291+
292+ // Deadline extension must be set up before we reserve flow control.
293+ // Flow control might block for a while, and extension will keep messages from expiring.
294+ setupNextAckDeadlineExtensionAlarm (expiration );
295+
296+ // Reserving flow control must happen before we give the messages to the user,
297+ // otherwise the user code might be given too many messages to process at once.
298+ try {
299+ flowController .reserve (responseMessages .size (), getTotalMessageSize (responseMessages ));
300+ } catch (FlowController .FlowControlException e ) {
301+ throw new IllegalStateException ("Flow control unexpected exception" , e );
302+ }
279303 messagesWaiter .incrementPendingMessages (responseMessages .size ());
304+
280305 Iterator <AckHandler > acksIterator = ackHandlers .iterator ();
281306 for (ReceivedMessage userMessage : responseMessages ) {
282307 final PubsubMessage message = userMessage .getMessage ();
@@ -302,18 +327,14 @@ public void run() {
302327 }
303328 });
304329 }
330+ }
305331
306- synchronized (outstandingAckHandlers ) {
307- outstandingAckHandlers .add (
308- new ExtensionJob (expiration , INITIAL_ACK_DEADLINE_EXTENSION_SECONDS , ackHandlers ));
309- }
310- setupNextAckDeadlineExtensionAlarm (expiration );
311-
312- try {
313- flowController .reserve (receivedMessagesCount , totalByteCount );
314- } catch (FlowController .FlowControlException unexpectedException ) {
315- throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
332+ private static int getTotalMessageSize (Collection <ReceivedMessage > messages ) {
333+ int total = 0 ;
334+ for (ReceivedMessage message : messages ) {
335+ total += message .getMessage ().getSerializedSize ();
316336 }
337+ return total ;
317338 }
318339
319340 private void setupPendingAcksAlarm () {
0 commit comments