Skip to content

Commit 9404a36

Browse files
committed
simplifying publisher, subscriber snippets
1 parent 011c88f commit 9404a36

File tree

3 files changed

+80
-93
lines changed

3 files changed

+80
-93
lines changed

pubsub/cloud-client/src/main/java/com/example/pubsub/PublisherExample.java

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,60 +28,45 @@
2828

2929
public class PublisherExample {
3030

31+
static final int MESSAGE_COUNT = 5;
32+
3133
// use the default project id
3234
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
3335

34-
private static final int MESSAGE_COUNT = 5;
36+
//publish message asynchronously one at a time.
37+
private static ApiFuture<String> publishMessage(Publisher publisher, String message) throws Exception {
38+
// schedule publishing : messages get automatically batched
39+
// convert message to bytes
40+
ByteString data = ByteString.copyFromUtf8(message);
41+
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
42+
return publisher.publish(pubsubMessage);
43+
}
3544

36-
//publish messages asynchronously one at a time.
37-
static List<String> publishMessages(String topicId) throws Exception {
38-
List<String> messageIds;
39-
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
45+
public static void main(String... args) throws Exception {
46+
// topic id, eg. "my-topic-id"
47+
String topicId = args[0];
4048
TopicName topicName = TopicName.create(PROJECT_ID, topicId);
4149
Publisher publisher = null;
50+
List<ApiFuture<String>> apiFutures = new ArrayList<>();
4251
try {
4352
// Create a publisher instance with default settings bound to the topic
4453
publisher = Publisher.defaultBuilder(topicName).build();
45-
List<String> messages = getMessages();
46-
47-
// schedule publishing one message at a time : messages get automatically batched
48-
for (String message : messages) {
49-
// convert message to bytes
50-
ByteString data = ByteString.copyFromUtf8(message);
51-
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
52-
53-
// Once published, returns a server-assigned message id (unique within the topic)
54-
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
55-
messageIdFutures.add(messageIdFuture);
54+
for (int i = 0; i < MESSAGE_COUNT; i++) {
55+
String message = "message-" + i;
56+
ApiFuture<String> messageId = publishMessage(publisher, message);
57+
apiFutures.add(messageId);
5658
}
5759
} finally {
58-
// wait on any pending publish requests.
59-
messageIds = ApiFutures.allAsList(messageIdFutures).get();
60-
60+
// Once published, returns server-assigned message ids (unique within the topic)
61+
List<String> messageIds = ApiFutures.allAsList(apiFutures).get();
6162
for (String messageId : messageIds) {
62-
System.out.println("published with message ID: " + messageId);
63+
System.out.println(messageId);
6364
}
64-
6565
if (publisher != null) {
6666
// When finished with the publisher, shutdown to free up resources.
6767
publisher.shutdown();
6868
}
6969
}
70-
return messageIds;
71-
}
72-
73-
private static List<String> getMessages() {
74-
List<String> messages = new ArrayList<>();
75-
for (int i = 0; i < MESSAGE_COUNT; i++) {
76-
messages.add("message-" + String.valueOf(i));
77-
}
78-
return messages;
79-
}
80-
81-
public static void main(String... args) throws Exception {
82-
// topic id, eg. "my-topic-id"
83-
String topicId = args[0];
84-
publishMessages(topicId);
8570
}
8671
}
8772
// [END pubsub_quickstart_quickstart]

pubsub/cloud-client/src/main/java/com/example/pubsub/SubscriberExample.java

Lines changed: 18 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,73 +29,47 @@
2929
import java.util.concurrent.BlockingQueue;
3030
import java.util.concurrent.LinkedBlockingDeque;
3131

32-
public class SubscriberExample implements Runnable {
32+
public class SubscriberExample {
33+
3334
// use the default project id
3435
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
3536

36-
private final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();
37+
private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();
3738

3839
private final List<String> receivedMessageIds = new ArrayList<>();
3940

40-
private final String subscriptionId;
41-
42-
private volatile boolean listen = true;
41+
static class MessageReceiverExample implements MessageReceiver {
42+
@Override
43+
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
44+
messages.offer(message);
45+
consumer.ack();
46+
}
47+
}
4348

44-
public SubscriberExample(String subscriptionId) {
45-
this.subscriptionId = subscriptionId;
49+
List<String> getReceivedMessages() {
50+
return ImmutableList.copyOf(receivedMessageIds);
4651
}
4752

48-
@Override
49-
public void run() {
50-
MessageReceiver receiver =
51-
new MessageReceiver() {
52-
@Override
53-
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
54-
messages.offer(message);
55-
consumer.ack();
56-
}
57-
};
53+
public static void main(String... args) throws Exception {
54+
// set subscriber id, eg. my-subscriber-id
55+
String subscriptionId = args[0];
5856
SubscriptionName subscriptionName = SubscriptionName.create(PROJECT_ID, subscriptionId);
5957
Subscriber subscriber = null;
6058
try {
6159
// create a subscriber bound to the asynchronous message receiver
62-
subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).build();
60+
subscriber = Subscriber.defaultBuilder(subscriptionName, new MessageReceiverExample()).build();
6361
subscriber.startAsync().awaitRunning();
64-
// continue to wait on received messages, Ctrl-C to exit
65-
while (listen) {
66-
// block on receiving a message
62+
// Continue to listen to messages
63+
while (true) {
6764
PubsubMessage message = messages.take();
6865
System.out.println("Message Id: " + message.getMessageId());
6966
System.out.println("Data: " + message.getData().toStringUtf8());
70-
receivedMessageIds.add(message.getMessageId());
7167
}
72-
} catch (InterruptedException e) {
73-
throw new RuntimeException(e);
7468
} finally {
7569
if (subscriber != null) {
7670
subscriber.stopAsync();
7771
}
7872
}
7973
}
80-
81-
void stopSubscriber() {
82-
listen = false;
83-
}
84-
85-
List<String> getReceivedMessages() {
86-
return ImmutableList.copyOf(receivedMessageIds);
87-
}
88-
89-
public static void main(String... args) throws Exception {
90-
// set subscriber id, eg. my-subscriber-id
91-
String subscriberId = args[0];
92-
SubscriberExample subscriber = new SubscriberExample(subscriberId);
93-
Thread t = new Thread(subscriber);
94-
t.start();
95-
// Stop subscriber after 5 minutes of listening
96-
Thread.sleep(5 * 60000);
97-
subscriber.stopSubscriber();
98-
t.join();
99-
}
10074
}
10175
// [END pubsub_quickstart_subscriber]

pubsub/cloud-client/src/test/java/com/example/pubsub/QuickStartIT.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
import com.google.cloud.pubsub.spi.v1.TopicAdminClient;
2424
import com.google.pubsub.v1.SubscriptionName;
2525
import com.google.pubsub.v1.TopicName;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.HashSet;
29+
import java.util.Set;
2630
import org.junit.After;
2731
import org.junit.Before;
2832
import org.junit.Rule;
@@ -42,18 +46,35 @@
4246
public class QuickStartIT {
4347

4448
private ByteArrayOutputStream bout;
45-
private PrintStream out;
4649

4750
private String projectId = ServiceOptions.getDefaultProjectId();
4851
private String topicId = formatForTest("my-topic-id");
4952
private String subscriptionId = formatForTest("my-subscription-id");
5053

54+
class SubscriberRunnable implements Runnable {
55+
56+
private String subscriptionId;
57+
58+
SubscriberRunnable(String subscriptionId) {
59+
this.subscriptionId = subscriptionId;
60+
}
61+
62+
@Override
63+
public void run() {
64+
try {
65+
SubscriberExample.main(subscriptionId);
66+
} catch (Exception e) {
67+
e.printStackTrace();
68+
}
69+
}
70+
}
71+
5172
@Rule public Timeout globalTimeout = Timeout.seconds(300); // 5 minute timeout
5273

5374
@Before
5475
public void setUp() {
5576
bout = new ByteArrayOutputStream();
56-
out = new PrintStream(bout);
77+
PrintStream out = new PrintStream(bout);
5778
System.setOut(out);
5879
try {
5980
deleteTestSubscription();
@@ -82,22 +103,29 @@ public void testQuickstart() throws Exception {
82103
got = bout.toString();
83104
assertThat(got).contains(subscriptionId + " created.");
84105

106+
bout.reset();
85107
// publish messages
86-
List<String> published = PublisherExample.publishMessages(topicId);
87-
assertThat(published).hasSize(5);
108+
PublisherExample.main(topicId);
109+
String[] messageIds = bout.toString().split("\n");
110+
assertThat(messageIds).hasLength(PublisherExample.MESSAGE_COUNT);
88111

89-
SubscriberExample subscriberExample = new SubscriberExample(subscriptionId);
112+
bout.reset();
90113
// receive messages
91-
Thread subscriberThread = new Thread(subscriberExample);
114+
Thread subscriberThread = new Thread(new SubscriberRunnable(subscriptionId));
92115
subscriberThread.start();
93-
94-
List<String> received;
95-
while ((received = subscriberExample.getReceivedMessages()).size() < 5) {
96-
Thread.sleep(1000);
116+
Set<String> expectedMessageIds = new HashSet<>();
117+
List<String> receivedMessageIds = new ArrayList<>();
118+
expectedMessageIds.addAll(Arrays.asList(messageIds));
119+
while (!expectedMessageIds.isEmpty()) {
120+
for (String expectedId : expectedMessageIds) {
121+
if (bout.toString().contains(expectedId)) {
122+
receivedMessageIds.add(expectedId);
123+
}
124+
}
125+
expectedMessageIds.removeAll(receivedMessageIds);
97126
}
98-
99-
assertThat(received).containsAllIn(published);
100-
subscriberExample.stopSubscriber();
127+
subscriberThread.interrupt();
128+
assertThat(expectedMessageIds).isEmpty();
101129
}
102130

103131
private String formatForTest(String name) {

0 commit comments

Comments
 (0)