Skip to content

Commit 32e2af2

Browse files
authored
Revert "pubsub: acquire FlowController before releasing (#1831)" (#1872)
This reverts commit 3717ac6. This change brings up another serious bug. If the number of messages we pull in one RPC is greater than the number size of the semaphore, we deadlock forever. Will redo this later.
1 parent 3717ac6 commit 32e2af2

1 file changed

Lines changed: 18 additions & 39 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import com.google.api.gax.core.ApiClock;
2019
import com.google.api.gax.core.FlowController;
20+
import com.google.api.gax.core.ApiClock;
2121
import com.google.api.stats.Distribution;
2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.collect.Lists;
@@ -28,7 +28,6 @@
2828
import com.google.pubsub.v1.PubsubMessage;
2929
import com.google.pubsub.v1.ReceivedMessage;
3030
import java.util.ArrayList;
31-
import java.util.Collection;
3231
import java.util.Collections;
3332
import java.util.HashSet;
3433
import java.util.Iterator;
@@ -261,47 +260,23 @@ public int getMessageDeadlineSeconds() {
261260
}
262261

263262
public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> responseMessages) {
264-
if (responseMessages.isEmpty()) {
263+
int receivedMessagesCount = responseMessages.size();
264+
if (receivedMessagesCount == 0) {
265265
return;
266266
}
267-
267+
Instant now = new Instant(clock.millisTime());
268+
int totalByteCount = 0;
268269
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
269270
for (ReceivedMessage pubsubMessage : responseMessages) {
270-
int size = pubsubMessage.getMessage().getSerializedSize();
271-
AckHandler handler = new AckHandler(pubsubMessage.getAckId(), size);
272-
ackHandlers.add(handler);
271+
int messageSize = pubsubMessage.getMessage().getSerializedSize();
272+
totalByteCount += messageSize;
273+
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
273274
}
274-
275-
Instant now = new Instant(clock.millisTime());
276275
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
277276
logger.log(
278277
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});
279278

280-
// We must add the ackHandlers to outstandingAckHandlers before setting up the deadline extension alarm.
281-
// Otherwise, the alarm might go off before we can add the handlers.
282-
synchronized (outstandingAckHandlers) {
283-
// AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time.
284-
// We will also later iterate over ackHandlers when we give messages to user code.
285-
// We must create a new list to pass to outstandingAckHandlers,
286-
// so that we can't iterate and modify the list concurrently.
287-
ArrayList<AckHandler> ackHandlersCopy = new ArrayList<>(ackHandlers);
288-
outstandingAckHandlers.add(
289-
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlersCopy));
290-
}
291-
292-
// Deadline extension must be set up before we reserve flow control.
293-
// Flow control might block for a while, and extension will keep messages from expiring.
294-
setupNextAckDeadlineExtensionAlarm(expiration);
295-
296-
// Reserving flow control must happen before we give the messages to the user,
297-
// otherwise the user code might be given too many messages to process at once.
298-
try {
299-
flowController.reserve(responseMessages.size(), getTotalMessageSize(responseMessages));
300-
} catch (FlowController.FlowControlException e) {
301-
throw new IllegalStateException("Flow control unexpected exception", e);
302-
}
303279
messagesWaiter.incrementPendingMessages(responseMessages.size());
304-
305280
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
306281
for (ReceivedMessage userMessage : responseMessages) {
307282
final PubsubMessage message = userMessage.getMessage();
@@ -327,14 +302,18 @@ public void run() {
327302
}
328303
});
329304
}
330-
}
331305

332-
private static int getTotalMessageSize(Collection<ReceivedMessage> messages) {
333-
int total = 0;
334-
for (ReceivedMessage message : messages) {
335-
total += message.getMessage().getSerializedSize();
306+
synchronized (outstandingAckHandlers) {
307+
outstandingAckHandlers.add(
308+
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
309+
}
310+
setupNextAckDeadlineExtensionAlarm(expiration);
311+
312+
try {
313+
flowController.reserve(receivedMessagesCount, totalByteCount);
314+
} catch (FlowController.FlowControlException unexpectedException) {
315+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
336316
}
337-
return total;
338317
}
339318

340319
private void setupPendingAcksAlarm() {

0 commit comments

Comments
 (0)