Skip to content

Commit 8ec721e

Browse files
authored
Use returnImmediately=false and disable timeouts for pullAsync (#1387)
1 parent 89f83f3 commit 8ec721e

File tree

5 files changed

+60
-18
lines changed

5 files changed

+60
-18
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -702,9 +702,10 @@ interface MessageConsumer extends AutoCloseable {
702702
/**
703703
* Pulls messages from the provided subscription. This method possibly returns no messages if no
704704
* message was available at the time the request was processed by the Pub/Sub service (i.e. the
705-
* system is not allowed to wait until at least one message is available). Pulled messages have
706-
* their acknowledge deadline automatically renewed until they are explicitly consumed using
707-
* {@link Iterator#next()}.
705+
* system is not allowed to wait until at least one message is available -
706+
* <a href="https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PullRequest.FIELDS.bool.google.pubsub.v1.PullRequest.return_immediately">return_immediately</a>
707+
* option is set to {@code true}). Pulled messages have their acknowledge deadline automatically
708+
* renewed until they are explicitly consumed using {@link Iterator#next()}.
708709
*
709710
* <p>Example of pulling a maximum number of messages from a subscription.
710711
* <pre> {@code
@@ -728,9 +729,12 @@ interface MessageConsumer extends AutoCloseable {
728729
/**
729730
* Sends a request for pulling messages from the provided subscription. This method returns a
730731
* {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
731-
* This method possibly returns no messages if no message was available at the time the request
732-
* was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
733-
* message is available).
732+
* When using this method the system is allowed to wait until at least one message is available
733+
* rather than returning no messages (i.e.
734+
* <a href="https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PullRequest.FIELDS.bool.google.pubsub.v1.PullRequest.return_immediately">return_immediately</a>
735+
* option is set to {@code false}). The client may cancel the request by calling
736+
* {@link Future#cancel(boolean)} if it does not wish to wait any longer. Notice that the Pub/Sub
737+
* service might still return no messages if a timeout is reached on the service side.
734738
*
735739
* <p>Example of asynchronously pulling a maximum number of messages from a subscription.
736740
* <pre> {@code

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -512,18 +512,13 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,
512512
return listSubscriptionsAsync(topic, getOptions(), optionMap(options));
513513
}
514514

515-
@Override
516-
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
517-
return get(pullAsync(subscription, maxMessages));
518-
}
519-
520-
@Override
521-
public Future<Iterator<ReceivedMessage>> pullAsync(final String subscription, int maxMessages) {
522-
PullRequest request = PullRequest.newBuilder().setReturnImmediately(true)
515+
private Future<Iterator<ReceivedMessage>> pullAsync(final String subscription,
516+
int maxMessages, boolean returnImmediately) {
517+
PullRequest request = PullRequest.newBuilder()
523518
.setSubscription(
524519
SubscriberApi.formatSubscriptionName(getOptions().getProjectId(), subscription))
525520
.setMaxMessages(maxMessages)
526-
.setReturnImmediately(true)
521+
.setReturnImmediately(returnImmediately)
527522
.build();
528523
PullFuture future = rpc.pull(request);
529524
future.addCallback(new PubSubRpc.PullCallback() {
@@ -555,6 +550,16 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag
555550
});
556551
}
557552

553+
@Override
554+
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
555+
return get(pullAsync(subscription, maxMessages, true));
556+
}
557+
558+
@Override
559+
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) {
560+
return pullAsync(subscription, maxMessages, false);
561+
}
562+
558563
@Override
559564
public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
560565
PullOption... options) {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@
6868
import io.grpc.netty.NegotiationType;
6969
import io.grpc.netty.NettyChannelBuilder;
7070

71+
import org.joda.time.Duration;
72+
7173
import java.io.IOException;
7274
import java.util.Set;
7375
import java.util.concurrent.Future;
@@ -77,6 +79,7 @@ public class DefaultPubSubRpc implements PubSubRpc {
7779

7880
private final PublisherApi publisherApi;
7981
private final SubscriberApi subscriberApi;
82+
private final SubscriberApi noTimeoutSubscriberApi;
8083
private final ScheduledExecutorService executor;
8184
private final ProviderManager providerManager;
8285
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
@@ -164,6 +167,12 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException {
164167
.applyToAllApiMethods(callSettingsBuilder);
165168
publisherApi = PublisherApi.create(pubBuilder.build());
166169
subscriberApi = SubscriberApi.create(subBuilder.build());
170+
callSettingsBuilder.setRetrySettingsBuilder(callSettingsBuilder.getRetrySettingsBuilder()
171+
.setTotalTimeout(Duration.millis(Long.MAX_VALUE))
172+
.setInitialRpcTimeout(Duration.millis(Long.MAX_VALUE))
173+
.setMaxRpcTimeout(Duration.millis(Long.MAX_VALUE)));
174+
subBuilder.applyToAllApiMethods(callSettingsBuilder);
175+
noTimeoutSubscriberApi = SubscriberApi.create(subBuilder.build());
167176
} catch (Exception ex) {
168177
throw new IOException(ex);
169178
}
@@ -256,9 +265,14 @@ public Future<Empty> acknowledge(AcknowledgeRequest request) {
256265
return translate(subscriberApi.acknowledgeCallable().futureCall(request), false);
257266
}
258267

268+
private static PullFuture pull(SubscriberApi subscriberApi, PullRequest request) {
269+
return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
270+
}
271+
259272
@Override
260273
public PullFuture pull(PullRequest request) {
261-
return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
274+
return request.getReturnImmediately()
275+
? pull(subscriberApi, request) : pull(noTimeoutSubscriberApi, request);
262276
}
263277

264278
@Override
@@ -290,6 +304,7 @@ public void close() throws Exception {
290304
}
291305
closed = true;
292306
subscriberApi.close();
307+
noTimeoutSubscriberApi.close();
293308
publisherApi.close();
294309
providerManager.getChannel().shutdown();
295310
executorFactory.release(executor);

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,24 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
549549
assertTrue(pubsub().deleteTopic(topic));
550550
}
551551

552+
@Test
553+
public void testPullMessagesAsyncNonImmediately() throws ExecutionException, InterruptedException {
554+
String topic = formatForTest("test-pull-messages-async-non-immediately-topic");
555+
pubsub().create(TopicInfo.of(topic));
556+
String subscription = formatForTest("test-pull-messages-async-subscription");
557+
pubsub().create(SubscriptionInfo.of(topic, subscription));
558+
Future<Iterator<ReceivedMessage>> future = pubsub().pullAsync(subscription, 2);
559+
Message message1 = Message.of("payload1");
560+
Message message2 = Message.of("payload2");
561+
List<String> messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
562+
assertEquals(2, messageIds.size());
563+
Iterator<ReceivedMessage> iterator = future.get();
564+
assertEquals(message1.getPayloadAsString(), iterator.next().getPayloadAsString());
565+
assertEquals(message2.getPayloadAsString(), iterator.next().getPayloadAsString());
566+
assertTrue(pubsub().deleteSubscription(subscription));
567+
assertTrue(pubsub().deleteTopic(topic));
568+
}
569+
552570
@Test
553571
public void testPullAsyncNonExistingSubscription()
554572
throws ExecutionException, InterruptedException {

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,7 +1302,7 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
13021302
PullRequest request = PullRequest.newBuilder()
13031303
.setSubscription(SUBSCRIPTION_NAME_PB)
13041304
.setMaxMessages(42)
1305-
.setReturnImmediately(true)
1305+
.setReturnImmediately(false)
13061306
.build();
13071307
List<ReceivedMessage> messageList = ImmutableList.of(
13081308
ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB1),
@@ -1363,7 +1363,7 @@ public void testPullMessagesAsyncError() throws ExecutionException, InterruptedE
13631363
PullRequest request = PullRequest.newBuilder()
13641364
.setSubscription(SUBSCRIPTION_NAME_PB)
13651365
.setMaxMessages(42)
1366-
.setReturnImmediately(true)
1366+
.setReturnImmediately(false)
13671367
.build();
13681368
PubSubException exception = new PubSubException(new IOException(), false);
13691369
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);

0 commit comments

Comments
 (0)