1616
1717package com .google .cloud .pubsub .spi .v1 ;
1818
19- import com .google .api .gax .core .ApiClock ;
2019import com .google .api .gax .core .FlowController ;
20+ import com .google .api .gax .core .ApiClock ;
2121import com .google .api .stats .Distribution ;
2222import com .google .common .annotations .VisibleForTesting ;
2323import com .google .common .collect .Lists ;
2828import com .google .pubsub .v1 .PubsubMessage ;
2929import com .google .pubsub .v1 .ReceivedMessage ;
3030import java .util .ArrayList ;
31- import java .util .Collection ;
3231import java .util .Collections ;
3332import java .util .HashSet ;
3433import java .util .Iterator ;
@@ -261,47 +260,23 @@ public int getMessageDeadlineSeconds() {
261260 }
262261
263262 public void processReceivedMessages (List <com .google .pubsub .v1 .ReceivedMessage > responseMessages ) {
264- if (responseMessages .isEmpty ()) {
263+ int receivedMessagesCount = responseMessages .size ();
264+ if (receivedMessagesCount == 0 ) {
265265 return ;
266266 }
267-
267+ Instant now = new Instant (clock .millisTime ());
268+ int totalByteCount = 0 ;
268269 final ArrayList <AckHandler > ackHandlers = new ArrayList <>(responseMessages .size ());
269270 for (ReceivedMessage pubsubMessage : responseMessages ) {
270- int size = pubsubMessage .getMessage ().getSerializedSize ();
271- AckHandler handler = new AckHandler ( pubsubMessage . getAckId (), size ) ;
272- ackHandlers .add (handler );
271+ int messageSize = pubsubMessage .getMessage ().getSerializedSize ();
272+ totalByteCount += messageSize ;
273+ ackHandlers .add (new AckHandler ( pubsubMessage . getAckId (), messageSize ) );
273274 }
274-
275- Instant now = new Instant (clock .millisTime ());
276275 Instant expiration = now .plus (messageDeadlineSeconds * 1000 );
277276 logger .log (
278277 Level .FINER , "Received {0} messages at {1}" , new Object [] {responseMessages .size (), now });
279278
280- // We must add the ackHandlers to outstandingAckHandlers before setting up the deadline extension alarm.
281- // Otherwise, the alarm might go off before we can add the handlers.
282- synchronized (outstandingAckHandlers ) {
283- // AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time.
284- // We will also later iterate over ackHandlers when we give messages to user code.
285- // We must create a new list to pass to outstandingAckHandlers,
286- // so that we can't iterate and modify the list concurrently.
287- ArrayList <AckHandler > ackHandlersCopy = new ArrayList <>(ackHandlers );
288- outstandingAckHandlers .add (
289- new ExtensionJob (expiration , INITIAL_ACK_DEADLINE_EXTENSION_SECONDS , ackHandlersCopy ));
290- }
291-
292- // Deadline extension must be set up before we reserve flow control.
293- // Flow control might block for a while, and extension will keep messages from expiring.
294- setupNextAckDeadlineExtensionAlarm (expiration );
295-
296- // Reserving flow control must happen before we give the messages to the user,
297- // otherwise the user code might be given too many messages to process at once.
298- try {
299- flowController .reserve (responseMessages .size (), getTotalMessageSize (responseMessages ));
300- } catch (FlowController .FlowControlException e ) {
301- throw new IllegalStateException ("Flow control unexpected exception" , e );
302- }
303279 messagesWaiter .incrementPendingMessages (responseMessages .size ());
304-
305280 Iterator <AckHandler > acksIterator = ackHandlers .iterator ();
306281 for (ReceivedMessage userMessage : responseMessages ) {
307282 final PubsubMessage message = userMessage .getMessage ();
@@ -327,14 +302,18 @@ public void run() {
327302 }
328303 });
329304 }
330- }
331305
332- private static int getTotalMessageSize (Collection <ReceivedMessage > messages ) {
333- int total = 0 ;
334- for (ReceivedMessage message : messages ) {
335- total += message .getMessage ().getSerializedSize ();
306+ synchronized (outstandingAckHandlers ) {
307+ outstandingAckHandlers .add (
308+ new ExtensionJob (expiration , INITIAL_ACK_DEADLINE_EXTENSION_SECONDS , ackHandlers ));
309+ }
310+ setupNextAckDeadlineExtensionAlarm (expiration );
311+
312+ try {
313+ flowController .reserve (receivedMessagesCount , totalByteCount );
314+ } catch (FlowController .FlowControlException unexpectedException ) {
315+ throw new IllegalStateException ("Flow control unexpected exception" , unexpectedException );
336316 }
337- return total ;
338317 }
339318
340319 private void setupPendingAcksAlarm () {
0 commit comments