Skip to content

Commit 262b5af

Browse files
dpcollins-googlesduskis
authored andcommitted
Change MessageDispatcher to be synchronous instead of asynchronous. (#4916)
* Change MessageDispatcher to be synchronous instead of asynchronous. This removes the failure mode described in #2452 that can occur when MaxOutstandingElementCount is low and there is more than one connection. In this case, it is possible for an individual MessageDispatcher to have no outstanding in-flight messages, but also be blocked by flow control with a whole new batch outstanding. In this case, it will never make progress on that batch since it will never receive another batch and the queue was made to not be shared in #4590, so the batch will never be pulled off by another MessageDispatcher. By changing this to use a blocking flow controller, this will never happen, as each batch will synchronously wait until it is allowed by flow control before being processed. * Run mvn com.coveo:fmt-maven-plugin:format
1 parent b697a8b commit 262b5af

4 files changed

Lines changed: 45 additions & 107 deletions

File tree

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 21 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.api.gax.batching.FlowController;
2525
import com.google.api.gax.batching.FlowController.FlowControlException;
2626
import com.google.api.gax.core.Distribution;
27-
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage;
2827
import com.google.common.primitives.Ints;
2928
import com.google.common.util.concurrent.MoreExecutors;
3029
import com.google.pubsub.v1.PubsubMessage;
@@ -33,14 +32,11 @@
3332
import java.util.Arrays;
3433
import java.util.Collection;
3534
import java.util.Collections;
36-
import java.util.Deque;
37-
import java.util.LinkedList;
3835
import java.util.List;
3936
import java.util.Map;
4037
import java.util.concurrent.ConcurrentHashMap;
4138
import java.util.concurrent.ConcurrentMap;
4239
import java.util.concurrent.Executor;
43-
import java.util.concurrent.LinkedBlockingDeque;
4440
import java.util.concurrent.LinkedBlockingQueue;
4541
import java.util.concurrent.ScheduledExecutorService;
4642
import java.util.concurrent.ScheduledFuture;
@@ -91,9 +87,6 @@ class MessageDispatcher {
9187
private final Lock jobLock;
9288
private ScheduledFuture<?> backgroundJob;
9389

94-
private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
95-
new LinkedBlockingDeque<>();
96-
9790
// To keep track of number of seconds the receiver takes to process messages.
9891
private final Distribution ackLatencyDistribution;
9992

@@ -155,7 +148,6 @@ private void forget() {
155148
}
156149
flowController.release(1, outstandingBytes);
157150
messagesWaiter.incrementPendingMessages(-1);
158-
processOutstandingBatches();
159151
}
160152

161153
@Override
@@ -296,50 +288,19 @@ int getMessageDeadlineSeconds() {
296288
return messageDeadlineSeconds.get();
297289
}
298290

299-
static class OutstandingMessageBatch {
300-
private final Deque<OutstandingMessage> messages;
301-
private final Runnable doneCallback;
302-
303-
static class OutstandingMessage {
304-
private final ReceivedMessage receivedMessage;
305-
private final AckHandler ackHandler;
306-
307-
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
308-
this.receivedMessage = receivedMessage;
309-
this.ackHandler = ackHandler;
310-
}
311-
312-
public ReceivedMessage receivedMessage() {
313-
return receivedMessage;
314-
}
315-
316-
public AckHandler ackHandler() {
317-
return ackHandler;
318-
}
319-
}
291+
static class OutstandingMessage {
292+
private final ReceivedMessage receivedMessage;
293+
private final AckHandler ackHandler;
320294

321-
public OutstandingMessageBatch(Runnable doneCallback) {
322-
this.messages = new LinkedList<>();
323-
this.doneCallback = doneCallback;
324-
}
325-
326-
public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
327-
this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
328-
}
329-
330-
public Deque<OutstandingMessage> messages() {
331-
return messages;
295+
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
296+
this.receivedMessage = receivedMessage;
297+
this.ackHandler = ackHandler;
332298
}
333299
}
334300

335-
public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) {
336-
if (messages.isEmpty()) {
337-
doneCallback.run();
338-
return;
339-
}
340-
301+
public void processReceivedMessages(List<ReceivedMessage> messages) {
341302
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
342-
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
303+
List<OutstandingMessage> outstandingBatch = new ArrayList<>(messages.size());
343304
for (ReceivedMessage message : messages) {
344305
AckHandler ackHandler =
345306
new AckHandler(
@@ -355,42 +316,25 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
355316
// totally expire so that pubsub service sends us the message again.
356317
continue;
357318
}
358-
outstandingBatch.addMessage(message, ackHandler);
319+
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
359320
pendingReceipts.add(message.getAckId());
360321
}
361322

362-
if (outstandingBatch.messages.isEmpty()) {
363-
doneCallback.run();
364-
return;
365-
}
366-
367-
messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
368-
outstandingMessageBatches.add(outstandingBatch);
369-
processOutstandingBatches();
323+
processBatch(outstandingBatch);
370324
}
371325

372-
private void processOutstandingBatches() {
373-
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
374-
nextBatch != null;
375-
nextBatch = outstandingMessageBatches.poll()) {
376-
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
377-
nextMessage != null;
378-
nextMessage = nextBatch.messages.poll()) {
379-
try {
380-
// This is a non-blocking flow controller.
381-
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
382-
} catch (FlowController.MaxOutstandingElementCountReachedException
383-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
384-
// Unwind previous changes in the batches outstanding.
385-
nextBatch.messages.addFirst(nextMessage);
386-
outstandingMessageBatches.addFirst(nextBatch);
387-
return;
388-
} catch (FlowControlException unexpectedException) {
389-
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
390-
}
391-
processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
326+
private void processBatch(List<OutstandingMessage> batch) {
327+
messagesWaiter.incrementPendingMessages(batch.size());
328+
for (OutstandingMessage message : batch) {
329+
// This is a blocking flow controller. We have already incremented MessageWaiter, so
330+
// shutdown will block on processing of all these messages anyway.
331+
try {
332+
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
333+
} catch (FlowControlException unexpectedException) {
334+
// This should be a blocking flow controller and never throw an exception.
335+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
392336
}
393-
nextBatch.doneCallback.run();
337+
processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler);
394338
}
395339
}
396340

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -151,26 +151,20 @@ public void onStart(StreamController controller) {
151151
@Override
152152
public void onResponse(StreamingPullResponse response) {
153153
channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
154-
messageDispatcher.processReceivedMessages(
155-
response.getReceivedMessagesList(),
156-
new Runnable() {
157-
@Override
158-
public void run() {
159-
// Only request more if we're not shutdown.
160-
// If errorFuture is done, the stream has either failed or hung up,
161-
// and we don't need to request.
162-
if (isAlive() && !errorFuture.isDone()) {
163-
lock.lock();
164-
try {
165-
thisController.request(1);
166-
} catch (Exception e) {
167-
logger.log(Level.WARNING, "cannot request more messages", e);
168-
} finally {
169-
lock.unlock();
170-
}
171-
}
172-
}
173-
});
154+
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
155+
// Only request more if we're not shutdown.
156+
// If errorFuture is done, the stream has either failed or hung up,
157+
// and we don't need to request.
158+
if (isAlive() && !errorFuture.isDone()) {
159+
lock.lock();
160+
try {
161+
thisController.request(1);
162+
} catch (Exception e) {
163+
logger.log(Level.WARNING, "cannot request more messages", e);
164+
} finally {
165+
lock.unlock();
166+
}
167+
}
174168
}
175169

176170
@Override

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private Subscriber(Builder builder) {
130130
builder
131131
.flowControlSettings
132132
.toBuilder()
133-
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
133+
.setLimitExceededBehavior(LimitExceededBehavior.Block)
134134
.build());
135135

136136
this.numPullers = builder.parallelPullCount;

google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void sendAckOperations(
105105
new FlowController(
106106
FlowControlSettings.newBuilder()
107107
.setMaxOutstandingElementCount(1L)
108-
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
108+
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
109109
.build());
110110

111111
dispatcher =
@@ -124,31 +124,31 @@ public void sendAckOperations(
124124

125125
@Test
126126
public void testReceipt() throws Exception {
127-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
127+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
128128
dispatcher.processOutstandingAckOperations();
129129
assertThat(sentModAcks)
130130
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
131131
}
132132

133133
@Test
134134
public void testAck() throws Exception {
135-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
135+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
136136
consumers.take().ack();
137137
dispatcher.processOutstandingAckOperations();
138138
assertThat(sentAcks).contains(TEST_MESSAGE.getAckId());
139139
}
140140

141141
@Test
142142
public void testNack() throws Exception {
143-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
143+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
144144
consumers.take().nack();
145145
dispatcher.processOutstandingAckOperations();
146146
assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0));
147147
}
148148

149149
@Test
150150
public void testExtension() throws Exception {
151-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
151+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
152152
dispatcher.extendDeadlines();
153153
assertThat(sentModAcks)
154154
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
@@ -161,7 +161,7 @@ public void testExtension() throws Exception {
161161

162162
@Test
163163
public void testExtension_Close() throws Exception {
164-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
164+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
165165
dispatcher.extendDeadlines();
166166
assertThat(sentModAcks)
167167
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
@@ -176,7 +176,7 @@ public void testExtension_Close() throws Exception {
176176

177177
@Test
178178
public void testExtension_GiveUp() throws Exception {
179-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
179+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
180180
dispatcher.extendDeadlines();
181181
assertThat(sentModAcks)
182182
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
@@ -188,7 +188,7 @@ public void testExtension_GiveUp() throws Exception {
188188
dispatcher.extendDeadlines();
189189
assertThat(sentModAcks).isEmpty();
190190

191-
// We should be able to reserve another item in the flow controller and not block shutdown
191+
// We should be able to reserve another item in the flow controller and not block.
192192
flowController.reserve(1, 0);
193193
dispatcher.stop();
194194
}
@@ -197,7 +197,7 @@ public void testExtension_GiveUp() throws Exception {
197197
public void testDeadlineAdjustment() throws Exception {
198198
assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10);
199199

200-
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
200+
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
201201
clock.advance(42, TimeUnit.SECONDS);
202202
consumers.take().ack();
203203

0 commit comments

Comments
 (0)