Skip to content

Commit 3717ac6

Browse files
authored
pubsub: acquire FlowController before releasing (#1831)
1 parent 38095d6 commit 3717ac6

File tree

1 file changed

+39
-18
lines changed

1 file changed

+39
-18
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import com.google.api.gax.core.FlowController;
2019
import com.google.api.gax.core.ApiClock;
20+
import com.google.api.gax.core.FlowController;
2121
import com.google.api.stats.Distribution;
2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.collect.Lists;
@@ -28,6 +28,7 @@
2828
import com.google.pubsub.v1.PubsubMessage;
2929
import com.google.pubsub.v1.ReceivedMessage;
3030
import java.util.ArrayList;
31+
import java.util.Collection;
3132
import java.util.Collections;
3233
import java.util.HashSet;
3334
import java.util.Iterator;
@@ -260,23 +261,47 @@ public int getMessageDeadlineSeconds() {
260261
}
261262

262263
public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> responseMessages) {
263-
int receivedMessagesCount = responseMessages.size();
264-
if (receivedMessagesCount == 0) {
264+
if (responseMessages.isEmpty()) {
265265
return;
266266
}
267-
Instant now = new Instant(clock.millisTime());
268-
int totalByteCount = 0;
267+
269268
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
270269
for (ReceivedMessage pubsubMessage : responseMessages) {
271-
int messageSize = pubsubMessage.getMessage().getSerializedSize();
272-
totalByteCount += messageSize;
273-
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
270+
int size = pubsubMessage.getMessage().getSerializedSize();
271+
AckHandler handler = new AckHandler(pubsubMessage.getAckId(), size);
272+
ackHandlers.add(handler);
274273
}
274+
275+
Instant now = new Instant(clock.millisTime());
275276
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
276277
logger.log(
277278
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});
278279

280+
// We must add the ackHandlers to outstandingAckHandlers before setting up the deadline extension alarm.
281+
// Otherwise, the alarm might go off before we can add the handlers.
282+
synchronized (outstandingAckHandlers) {
283+
// AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time.
284+
// We will also later iterate over ackHandlers when we give messages to user code.
285+
// We must create a new list to pass to outstandingAckHandlers,
286+
// so that we can't iterate and modify the list concurrently.
287+
ArrayList<AckHandler> ackHandlersCopy = new ArrayList<>(ackHandlers);
288+
outstandingAckHandlers.add(
289+
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlersCopy));
290+
}
291+
292+
// Deadline extension must be set up before we reserve flow control.
293+
// Flow control might block for a while, and extension will keep messages from expiring.
294+
setupNextAckDeadlineExtensionAlarm(expiration);
295+
296+
// Reserving flow control must happen before we give the messages to the user,
297+
// otherwise the user code might be given too many messages to process at once.
298+
try {
299+
flowController.reserve(responseMessages.size(), getTotalMessageSize(responseMessages));
300+
} catch (FlowController.FlowControlException e) {
301+
throw new IllegalStateException("Flow control unexpected exception", e);
302+
}
279303
messagesWaiter.incrementPendingMessages(responseMessages.size());
304+
280305
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
281306
for (ReceivedMessage userMessage : responseMessages) {
282307
final PubsubMessage message = userMessage.getMessage();
@@ -302,18 +327,14 @@ public void run() {
302327
}
303328
});
304329
}
330+
}
305331

306-
synchronized (outstandingAckHandlers) {
307-
outstandingAckHandlers.add(
308-
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
309-
}
310-
setupNextAckDeadlineExtensionAlarm(expiration);
311-
312-
try {
313-
flowController.reserve(receivedMessagesCount, totalByteCount);
314-
} catch (FlowController.FlowControlException unexpectedException) {
315-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
332+
private static int getTotalMessageSize(Collection<ReceivedMessage> messages) {
333+
int total = 0;
334+
for (ReceivedMessage message : messages) {
335+
total += message.getMessage().getSerializedSize();
316336
}
337+
return total;
317338
}
318339

319340
private void setupPendingAcksAlarm() {

0 commit comments

Comments
 (0)