Skip to content

Commit e3ca6d6

Browse files
kamalaboulhosnanguillanneuf
authored andcommitted
Make Pub/Sub subscriber example simpler and more idiomatic (GoogleCloudPlatform#1534)
* Make Pub/Sub subscriber example more idiomatic by acking only once a message has been printed out. * Add documentation and improve formatting * Fix comment * Simplify example to not use a BlockingQueue.
1 parent 4a01da2 commit e3ca6d6

File tree

1 file changed

+7
-16
lines changed

1 file changed

+7
-16
lines changed

pubsub/cloud-client/src/main/java/com/example/pubsub/SubscriberExample.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,18 @@
2424
import com.google.cloud.pubsub.v1.Subscriber;
2525
import com.google.pubsub.v1.ProjectSubscriptionName;
2626
import com.google.pubsub.v1.PubsubMessage;
27-
import java.util.concurrent.BlockingQueue;
28-
import java.util.concurrent.LinkedBlockingDeque;
2927

3028
public class SubscriberExample {
31-
3229
// use the default project id
3330
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
3431

35-
private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();
36-
3732
static class MessageReceiverExample implements MessageReceiver {
3833

3934
@Override
4035
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
41-
messages.offer(message);
36+
System.out.println(
37+
"Message Id: " + message.getMessageId() + " Data: " + message.getData().toStringUtf8());
38+
// Ack only after all work for the message is complete.
4239
consumer.ack();
4340
}
4441
}
@@ -55,16 +52,10 @@ public static void main(String... args) throws Exception {
5552
subscriber =
5653
Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
5754
subscriber.startAsync().awaitRunning();
58-
// Continue to listen to messages
59-
while (true) {
60-
PubsubMessage message = messages.take();
61-
System.out.println("Message Id: " + message.getMessageId());
62-
System.out.println("Data: " + message.getData().toStringUtf8());
63-
}
64-
} finally {
65-
if (subscriber != null) {
66-
subscriber.stopAsync();
67-
}
55+
// Allow the subscriber to run indefinitely unless an unrecoverable error occurs.
56+
subscriber.awaitTerminated();
57+
} catch (IllegalStateException e) {
58+
System.out.println("Subscriber unexpectedly stopped: " + e);
6859
}
6960
}
7061
}

0 commit comments

Comments
 (0)