Skip to content

Commit 59ddf8c

Browse files
authored
Revert "pubsub: acquire FlowController before releasing (#1831)"
This reverts commit 3717ac6.
1 parent 3717ac6 commit 59ddf8c

1 file changed

Lines changed: 18 additions & 39 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import com.google.api.gax.core.ApiClock;
2019
import com.google.api.gax.core.FlowController;
20+
import com.google.api.gax.core.ApiClock;
2121
import com.google.api.stats.Distribution;
2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.collect.Lists;
@@ -28,7 +28,6 @@
2828
import com.google.pubsub.v1.PubsubMessage;
2929
import com.google.pubsub.v1.ReceivedMessage;
3030
import java.util.ArrayList;
31-
import java.util.Collection;
3231
import java.util.Collections;
3332
import java.util.HashSet;
3433
import java.util.Iterator;
@@ -261,47 +260,23 @@ public int getMessageDeadlineSeconds() {
261260
}
262261

263262
public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> responseMessages) {
264-
if (responseMessages.isEmpty()) {
263+
int receivedMessagesCount = responseMessages.size();
264+
if (receivedMessagesCount == 0) {
265265
return;
266266
}
267-
267+
Instant now = new Instant(clock.millisTime());
268+
int totalByteCount = 0;
268269
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
269270
for (ReceivedMessage pubsubMessage : responseMessages) {
270-
int size = pubsubMessage.getMessage().getSerializedSize();
271-
AckHandler handler = new AckHandler(pubsubMessage.getAckId(), size);
272-
ackHandlers.add(handler);
271+
int messageSize = pubsubMessage.getMessage().getSerializedSize();
272+
totalByteCount += messageSize;
273+
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
273274
}
274-
275-
Instant now = new Instant(clock.millisTime());
276275
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
277276
logger.log(
278277
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});
279278

280-
// We must add the ackHandlers to outstandingAckHandlers before setting up the deadline extension alarm.
281-
// Otherwise, the alarm might go off before we can add the handlers.
282-
synchronized (outstandingAckHandlers) {
283-
// AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time.
284-
// We will also later iterate over ackHandlers when we give messages to user code.
285-
// We must create a new list to pass to outstandingAckHandlers,
286-
// so that we can't iterate and modify the list concurrently.
287-
ArrayList<AckHandler> ackHandlersCopy = new ArrayList<>(ackHandlers);
288-
outstandingAckHandlers.add(
289-
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlersCopy));
290-
}
291-
292-
// Deadline extension must be set up before we reserve flow control.
293-
// Flow control might block for a while, and extension will keep messages from expiring.
294-
setupNextAckDeadlineExtensionAlarm(expiration);
295-
296-
// Reserving flow control must happen before we give the messages to the user,
297-
// otherwise the user code might be given too many messages to process at once.
298-
try {
299-
flowController.reserve(responseMessages.size(), getTotalMessageSize(responseMessages));
300-
} catch (FlowController.FlowControlException e) {
301-
throw new IllegalStateException("Flow control unexpected exception", e);
302-
}
303279
messagesWaiter.incrementPendingMessages(responseMessages.size());
304-
305280
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
306281
for (ReceivedMessage userMessage : responseMessages) {
307282
final PubsubMessage message = userMessage.getMessage();
@@ -327,14 +302,18 @@ public void run() {
327302
}
328303
});
329304
}
330-
}
331305

332-
private static int getTotalMessageSize(Collection<ReceivedMessage> messages) {
333-
int total = 0;
334-
for (ReceivedMessage message : messages) {
335-
total += message.getMessage().getSerializedSize();
306+
synchronized (outstandingAckHandlers) {
307+
outstandingAckHandlers.add(
308+
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
309+
}
310+
setupNextAckDeadlineExtensionAlarm(expiration);
311+
312+
try {
313+
flowController.reserve(receivedMessagesCount, totalByteCount);
314+
} catch (FlowController.FlowControlException unexpectedException) {
315+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
336316
}
337-
return total;
338317
}
339318

340319
private void setupPendingAcksAlarm() {

0 commit comments

Comments
 (0)