Skip to content

Commit 54aa55d

Browse files
dpcollins-googlesduskis
authored andcommitted
Change MessageDispatcher to own its own queue. (#4590)
1 parent 1e18a7d commit 54aa55d

File tree

4 files changed

+3
-13
lines changed

4 files changed

+3
-13
lines changed

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.ConcurrentHashMap;
4141
import java.util.concurrent.ConcurrentMap;
4242
import java.util.concurrent.Executor;
43+
import java.util.concurrent.LinkedBlockingDeque;
4344
import java.util.concurrent.LinkedBlockingQueue;
4445
import java.util.concurrent.ScheduledExecutorService;
4546
import java.util.concurrent.ScheduledFuture;
@@ -90,7 +91,8 @@ class MessageDispatcher {
9091
private final Lock jobLock;
9192
private ScheduledFuture<?> backgroundJob;
9293

93-
private final Deque<OutstandingMessageBatch> outstandingMessageBatches;
94+
private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
95+
new LinkedBlockingDeque<>();
9496

9597
// To keep track of number of seconds the receiver takes to process messages.
9698
private final Distribution ackLatencyDistribution;
@@ -200,7 +202,6 @@ void sendAckOperations(
200202
Duration maxAckExtensionPeriod,
201203
Distribution ackLatencyDistribution,
202204
FlowController flowController,
203-
Deque<OutstandingMessageBatch> outstandingMessageBatches,
204205
Executor executor,
205206
ScheduledExecutorService systemExecutor,
206207
ApiClock clock) {
@@ -211,7 +212,6 @@ void sendAckOperations(
211212
this.receiver = receiver;
212213
this.ackProcessor = ackProcessor;
213214
this.flowController = flowController;
214-
this.outstandingMessageBatches = outstandingMessageBatches;
215215
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
216216
this.ackLatencyDistribution = ackLatencyDistribution;
217217
jobLock = new ReentrantLock();

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import com.google.pubsub.v1.StreamingPullRequest;
4343
import com.google.pubsub.v1.StreamingPullResponse;
4444
import io.grpc.Status;
45-
import java.util.Deque;
4645
import java.util.List;
4746
import java.util.concurrent.ScheduledExecutorService;
4847
import java.util.concurrent.TimeUnit;
@@ -84,7 +83,6 @@ public StreamingSubscriberConnection(
8483
SubscriberStub stub,
8584
int channelAffinity,
8685
FlowController flowController,
87-
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
8886
ScheduledExecutorService executor,
8987
ScheduledExecutorService systemExecutor,
9088
ApiClock clock) {
@@ -100,7 +98,6 @@ public StreamingSubscriberConnection(
10098
maxAckExtensionPeriod,
10199
ackLatencyDistribution,
102100
flowController,
103-
outstandingMessageBatches,
104101
executor,
105102
systemExecutor,
106103
clock);

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
import com.google.pubsub.v1.ProjectSubscriptionName;
4545
import java.io.IOException;
4646
import java.util.ArrayList;
47-
import java.util.Deque;
48-
import java.util.LinkedList;
4947
import java.util.List;
5048
import java.util.concurrent.ScheduledExecutorService;
5149
import java.util.logging.Level;
@@ -115,8 +113,6 @@ public class Subscriber extends AbstractApiService {
115113

116114
private final MessageReceiver receiver;
117115
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
118-
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
119-
new LinkedList<>();
120116
private final ApiClock clock;
121117
private final List<AutoCloseable> closeables = new ArrayList<>();
122118

@@ -329,7 +325,6 @@ private void startStreamingConnections() {
329325
subStub,
330326
i,
331327
flowController,
332-
outstandingMessageBatches,
333328
executor,
334329
alarmsExecutor,
335330
clock));

google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.pubsub.v1.ReceivedMessage;
2929
import java.util.ArrayList;
3030
import java.util.Collections;
31-
import java.util.LinkedList;
3231
import java.util.List;
3332
import java.util.concurrent.LinkedBlockingQueue;
3433
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -117,7 +116,6 @@ public void sendAckOperations(
117116
Duration.ofMinutes(60),
118117
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
119118
flowController,
120-
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
121119
MoreExecutors.directExecutor(),
122120
systemExecutor,
123121
clock);

0 commit comments

Comments
 (0)