@@ -74,11 +74,10 @@ public void close(ScheduledExecutorService instance) {
7474 private final AtomicInteger queuedCallbacks ;
7575 private final int maxQueuedCallbacks ;
7676 private final Object futureLock = new Object ();
77- private final Runnable scheduleRunnable ;
77+ private final Runnable consumerRunnable ;
7878 private boolean closed ;
7979 private Future <?> scheduledFuture ;
8080 private PullFuture pullerFuture ;
81- private boolean stopped = true ;
8281
8382 /**
8483 * Default executor factory for the message processor executor. By default a single-threaded
@@ -99,6 +98,37 @@ public void release(ExecutorService executor) {
9998 }
10099 }
101100
101+ class ConsumerRunnable implements Runnable {
102+
103+ @ Override
104+ public void run () {
105+ if (closed ) {
106+ return ;
107+ }
108+ pullerFuture = pubsubRpc .pull (createPullRequest ());
109+ pullerFuture .addCallback (new PullCallback () {
110+ @ Override
111+ public void success (PullResponse response ) {
112+ List <com .google .pubsub .v1 .ReceivedMessage > messages = response .getReceivedMessagesList ();
113+ queuedCallbacks .addAndGet (messages .size ());
114+ for (com .google .pubsub .v1 .ReceivedMessage message : messages ) {
115+ deadlineRenewer .add (subscription , message .getAckId ());
116+ ReceivedMessage receivedMessage = ReceivedMessage .fromPb (pubsub , subscription , message );
117+ executor .execute (ackingRunnable (receivedMessage ));
118+ }
119+ nextPull ();
120+ }
121+
122+ @ Override
123+ public void failure (Throwable error ) {
124+ if (!(error instanceof CancellationException )) {
125+ nextPull ();
126+ }
127+ }
128+ });
129+ }
130+ }
131+
102132 private MessageConsumerImpl (Builder builder ) {
103133 this .pubsubOptions = builder .pubsubOptions ;
104134 this .subscription = builder .subscription ;
@@ -111,17 +141,7 @@ private MessageConsumerImpl(Builder builder) {
111141 this .executorFactory = firstNonNull (builder .executorFactory , new DefaultExecutorFactory ());
112142 this .executor = executorFactory .get ();
113143 this .maxQueuedCallbacks = firstNonNull (builder .maxQueuedCallbacks , MAX_QUEUED_CALLBACKS );
114- this .scheduleRunnable = new Runnable () {
115- @ Override
116- public void run () {
117- synchronized (futureLock ) {
118- if (closed ) {
119- return ;
120- }
121- pull ();
122- }
123- }
124- };
144+ this .consumerRunnable = new ConsumerRunnable ();
125145 nextPull ();
126146 }
127147
@@ -155,51 +175,23 @@ private PullRequest createPullRequest() {
155175
156176 private void scheduleNextPull (long delay , TimeUnit timeUnit ) {
157177 synchronized (futureLock ) {
158- if (! closed && stopped ) {
159- scheduledFuture = timer . schedule ( scheduleRunnable , delay , timeUnit ) ;
178+ if (closed || scheduledFuture != null ) {
179+ return ;
160180 }
181+ scheduledFuture = timer .schedule (consumerRunnable , delay , timeUnit );
161182 }
162183 }
163184
164185 private void nextPull () {
165186 synchronized (futureLock ) {
166- if (closed ) {
187+ if (closed || queuedCallbacks .get () == maxQueuedCallbacks ) {
188+ scheduledFuture = null ;
167189 return ;
168190 }
169- if (queuedCallbacks .get () == maxQueuedCallbacks ) {
170- stopped = true ;
171- } else {
172- stopped = false ;
173- scheduledFuture = timer .submit (scheduleRunnable );
174- }
191+ scheduledFuture = timer .submit (consumerRunnable );
175192 }
176193 }
177194
178- private void pull () {
179- pullerFuture = pubsubRpc .pull (createPullRequest ());
180- pullerFuture .addCallback (new PullCallback () {
181- @ Override
182- public void success (PullResponse response ) {
183- List <com .google .pubsub .v1 .ReceivedMessage > messages = response .getReceivedMessagesList ();
184- queuedCallbacks .addAndGet (messages .size ());
185- for (com .google .pubsub .v1 .ReceivedMessage message : messages ) {
186- deadlineRenewer .add (subscription , message .getAckId ());
187- final ReceivedMessage receivedMessage =
188- ReceivedMessage .fromPb (pubsub , subscription , message );
189- executor .execute (ackingRunnable (receivedMessage ));
190- }
191- nextPull ();
192- }
193-
194- @ Override
195- public void failure (Throwable error ) {
196- if (!(error instanceof CancellationException )) {
197- nextPull ();
198- }
199- }
200- });
201- }
202-
203195 @ Override
204196 public void close () {
205197 synchronized (futureLock ) {
@@ -268,14 +260,4 @@ static Builder builder(PubSubOptions pubsubOptions, String subscription,
268260 AckDeadlineRenewer deadlineRenewer , MessageProcessor messageProcessor ) {
269261 return new Builder (pubsubOptions , subscription , deadlineRenewer , messageProcessor );
270262 }
271-
272- /**
273- * Returns a {@code MessageConsumerImpl} objects given the service options, the subscription from
274- * which messages must be pulled, the acknowledge deadline renewer and a message processor used to
275- * process messages.
276- */
277- static Builder of (PubSubOptions pubsubOptions , String subscription ,
278- AckDeadlineRenewer deadlineRenewer , MessageProcessor messageProcessor ) {
279- return new Builder (pubsubOptions , subscription , deadlineRenewer , messageProcessor );
280- }
281263}
0 commit comments