Skip to content

Commit 28d348d

Browse files
committed
pr comment
1 parent 9cf81c5 commit 28d348d

File tree

1 file changed

+35
-14
lines changed

1 file changed

+35
-14
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import com.google.api.gax.core.FlowController;
2019
import com.google.api.gax.core.ApiClock;
20+
import com.google.api.gax.core.FlowController;
2121
import com.google.api.stats.Distribution;
2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.collect.Lists;
@@ -28,6 +28,7 @@
2828
import com.google.pubsub.v1.PubsubMessage;
2929
import com.google.pubsub.v1.ReceivedMessage;
3030
import java.util.ArrayList;
31+
import java.util.Collection;
3132
import java.util.Collections;
3233
import java.util.HashSet;
3334
import java.util.Iterator;
@@ -260,28 +261,46 @@ public int getMessageDeadlineSeconds() {
260261
}
261262

262263
public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> responseMessages) {
263-
if (responseMessages.size() == 0) {
264+
if (responseMessages.isEmpty()) {
264265
return;
265266
}
266-
Instant now = new Instant(clock.millisTime());
267-
int totalByteCount = 0;
267+
268268
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
269269
for (ReceivedMessage pubsubMessage : responseMessages) {
270-
int messageSize = pubsubMessage.getMessage().getSerializedSize();
271-
totalByteCount += messageSize;
272-
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
270+
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), pubsubMessage.getMessage().getSerializedSize()));
273271
}
272+
273+
Instant now = new Instant(clock.millisTime());
274274
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
275275
logger.log(
276276
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});
277277

278+
synchronized (outstandingAckHandlers) {
279+
// AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time.
280+
// We will also later iterate over ackHandlers when we give messages to user code.
281+
// We must create a new list to pass to outstandingAckHandlers,
282+
// so that we can't iterate and modify the list concurrently.
283+
outstandingAckHandlers.add(
284+
new ExtensionJob(
285+
expiration,
286+
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
287+
new ArrayList<AckHandler>(ackHandlers)));
288+
}
289+
setupNextAckDeadlineExtensionAlarm(expiration);
290+
291+
// Deadline extension must be set up before we reserve flow control.
292+
// Flow control might block for a while, and extension will keep messages from expiring.
293+
278294
try {
279-
flowController.reserve(responseMessages.size(), totalByteCount);
280-
} catch (FlowController.FlowControlException unexpectedException) {
281-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
295+
flowController.reserve(responseMessages.size(), totalMessageSize(responseMessages));
296+
} catch (FlowController.FlowControlException e) {
297+
throw new IllegalStateException("Flow control unexpected exception", e);
282298
}
283299
messagesWaiter.incrementPendingMessages(responseMessages.size());
284300

301+
// Reserving flow control must happen before we give the messages to the user,
302+
// otherwise the user code might be given too many messages to process at once.
303+
285304
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
286305
for (ReceivedMessage userMessage : responseMessages) {
287306
final PubsubMessage message = userMessage.getMessage();
@@ -307,12 +326,14 @@ public void run() {
307326
}
308327
});
309328
}
329+
}
310330

311-
synchronized (outstandingAckHandlers) {
312-
outstandingAckHandlers.add(
313-
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
331+
private static int totalMessageSize(Collection<ReceivedMessage> messages) {
332+
int total = 0;
333+
for (ReceivedMessage message : messages) {
334+
total += message.getMessage().getSerializedSize();
314335
}
315-
setupNextAckDeadlineExtensionAlarm(expiration);
336+
return total;
316337
}
317338

318339
private void setupPendingAcksAlarm() {

0 commit comments

Comments
 (0)