Skip to content

Commit 8e78889

Browse files
Pub/Sub: update Spring tutorial with test + other nits (GoogleCloudPlatform#6446)
* Pub/Sub: update Spring tutorial * lint * onSuccess and license header * fix empty catch block in IT * revert to original logger message * update README and more test output
1 parent a2eac28 commit 8e78889

File tree

4 files changed

+164
-28
lines changed

4 files changed

+164
-28
lines changed

pubsub/spring/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
The code samples demonstrate two ways to send messages to and receive messages from [Cloud Pub/Sub](https://cloud.google.com/pubsub/docs/) from your Spring application using:
44

5-
* [Spring Integration Channel Adapters](https://cloud.spring.io/spring-cloud-gcp/reference/html/#channel-adapters-for-cloud-pubsub)
6-
* [Spring Cloud Stream Binders](https://cloud.spring.io/spring-cloud-gcp/reference/html/#spring-cloud-stream)
5+
* [Spring Integration Channel Adapters](https://googlecloudplatform.github.io/spring-cloud-gcp/reference/html/index.html#channel-adapters-for-cloud-pubsub)
6+
* [Spring Cloud Stream Binders](https://googlecloudplatform.github.io/spring-cloud-gcp/reference/html/index.html#spring-cloud-stream)
77

88
When the application starts, it will do the following every ten seconds:
99
1. send a message which contains a random integer [0-1000) to a Pub/Sub topic `topic-one` via a Spring Cloud Stream output binder;

pubsub/spring/pom.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,29 @@
8686
<artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
8787
</dependency>
8888
<!-- [END pubsub_spring_cloud_stream_binder] -->
89+
<dependency>
90+
<groupId>junit</groupId>
91+
<artifactId>junit</artifactId>
92+
<version>4.13.2</version>
93+
<scope>test</scope>
94+
</dependency>
95+
<dependency>
96+
<groupId>com.google.truth</groupId>
97+
<artifactId>truth</artifactId>
98+
<version>1.1.3</version>
99+
<scope>test</scope>
100+
</dependency>
101+
<dependency>
102+
<groupId>com.google.cloud</groupId>
103+
<artifactId>google-cloud-core</artifactId>
104+
<version>2.3.1</version>
105+
<classifier>tests</classifier>
106+
</dependency>
107+
<dependency>
108+
<groupId>org.springframework.boot</groupId>
109+
<artifactId>spring-boot-test</artifactId>
110+
<scope>test</scope>
111+
</dependency>
89112
</dependencies>
90113

91114
<build>

pubsub/spring/src/main/java/demo/PubSubApplication.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.springframework.messaging.MessageChannel;
3939
import org.springframework.messaging.MessageHandler;
4040
import org.springframework.messaging.handler.annotation.Header;
41-
import org.springframework.util.concurrent.ListenableFutureCallback;
4241
import reactor.core.publisher.Flux;
4342
import reactor.core.scheduler.Schedulers;
4443

@@ -91,18 +90,13 @@ public void messageReceiver(
9190
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
9291
PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");
9392

94-
adapter.setPublishCallback(
95-
new ListenableFutureCallback<String>() {
96-
@Override
97-
public void onFailure(Throwable throwable) {
98-
LOGGER.info("There was an error sending the message.");
99-
}
93+
adapter.setSuccessCallback(
94+
((ackId, message) ->
95+
LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));
96+
97+
adapter.setFailureCallback(
98+
(cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));
10099

101-
@Override
102-
public void onSuccess(String result) {
103-
LOGGER.info("Message was sent via the outbound channel adapter to topic-two!");
104-
}
105-
});
106100
return adapter;
107101
}
108102
// [END pubsub_spring_outbound_channel_adapter]
@@ -124,21 +118,21 @@ public Consumer<Message<String>> receiveMessageFromTopicTwo() {
124118
public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
125119
return () ->
126120
Flux.<Message<String>>generate(
127-
sink -> {
128-
try {
129-
Thread.sleep(10000);
130-
} catch (InterruptedException e) {
131-
// stop sleep earlier.
132-
}
121+
sink -> {
122+
try {
123+
Thread.sleep(10000);
124+
} catch (InterruptedException e) {
125+
// Stop sleep earlier.
126+
}
133127

134-
Message<String> message =
135-
MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
136-
LOGGER.info(
137-
"Sending a message via the output binder to topic-one! Payload: "
138-
+ message.getPayload());
139-
sink.next(message);
140-
})
141-
.subscribeOn(Schedulers.elastic());
128+
Message<String> message =
129+
MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
130+
LOGGER.info(
131+
"Sending a message via the output binder to topic-one! Payload: "
132+
+ message.getPayload());
133+
sink.next(message);
134+
})
135+
.subscribeOn(Schedulers.boundedElastic());
142136
}
143137
// [END pubsub_spring_cloud_stream_output_binder]
144138
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package demo;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
import static junit.framework.TestCase.assertNotNull;
21+
22+
import com.google.api.gax.rpc.AlreadyExistsException;
23+
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
24+
import com.google.cloud.pubsub.v1.TopicAdminClient;
25+
import com.google.pubsub.v1.ProjectSubscriptionName;
26+
import com.google.pubsub.v1.Subscription;
27+
import com.google.pubsub.v1.TopicName;
28+
import java.io.ByteArrayOutputStream;
29+
import java.io.PrintStream;
30+
import java.util.concurrent.TimeUnit;
31+
import org.junit.After;
32+
import org.junit.Before;
33+
import org.junit.BeforeClass;
34+
import org.junit.Rule;
35+
import org.junit.Test;
36+
import org.junit.rules.Timeout;
37+
38+
public class PubSubApplicationIT {
39+
private ByteArrayOutputStream bout;
40+
private PrintStream out;
41+
42+
private static final String projectId = System.getenv("GOOGLE_CLOUD_PROJECT");
43+
private static final String topicOneId = "topic-one";
44+
private static final String topicTwoId = "topic-two";
45+
private static final String subscriptionOneId = "sub-one";
46+
private static final String subscriptionTwoId = "sub-two";
47+
48+
private static void requireEnvVar(String varName) {
49+
assertNotNull(
50+
"Environment variable " + varName + " is required to perform these tests.",
51+
System.getenv(varName));
52+
}
53+
54+
@Rule public Timeout globalTimeout = Timeout.seconds(600); // 10 minute timeout
55+
56+
@BeforeClass
57+
public static void checkRequirements() {
58+
requireEnvVar("GOOGLE_CLOUD_PROJECT");
59+
}
60+
61+
@Before
62+
public void setUp() throws Exception {
63+
bout = new ByteArrayOutputStream();
64+
out = new PrintStream(bout);
65+
System.setOut(out);
66+
67+
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
68+
try {
69+
topicAdminClient.createTopic(TopicName.of(projectId, topicOneId));
70+
topicAdminClient.createTopic(TopicName.of(projectId, topicTwoId));
71+
} catch (AlreadyExistsException ignore) {
72+
System.out.println("Using existing topics.");
73+
}
74+
}
75+
76+
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
77+
Subscription subscriptionOne =
78+
Subscription.newBuilder()
79+
.setName(String.valueOf(ProjectSubscriptionName.of(projectId, subscriptionOneId)))
80+
.setTopic(String.valueOf(TopicName.of(projectId, topicOneId)))
81+
.build();
82+
Subscription subscriptionTwo =
83+
Subscription.newBuilder()
84+
.setName(String.valueOf(ProjectSubscriptionName.of(projectId, subscriptionTwoId)))
85+
.setTopic(String.valueOf(TopicName.of(projectId, topicTwoId)))
86+
.build();
87+
88+
try {
89+
subscriptionAdminClient.createSubscription(subscriptionOne);
90+
subscriptionAdminClient.createSubscription(subscriptionTwo);
91+
} catch (AlreadyExistsException ignore) {
92+
System.out.println("Using existing subscriptions");
93+
}
94+
}
95+
}
96+
97+
@After
98+
public void tearDown() {
99+
// No need to clean up these pairs of topics and subscriptions.
100+
System.setOut(null);
101+
}
102+
103+
@Test
104+
public void testPubSubApplication() throws Exception {
105+
bout.reset();
106+
107+
demo.PubSubApplication.main(new String[] {});
108+
109+
TimeUnit.MINUTES.sleep(1);
110+
111+
assertThat(bout.toString()).contains("Started PubSubApplication");
112+
assertThat(bout.toString()).contains("Sending a message via the output binder to topic-one!");
113+
assertThat(bout.toString())
114+
.contains("Message arrived via an inbound channel adapter from sub-one!");
115+
assertThat(bout.toString())
116+
.contains("Message was sent via the outbound channel adapter to topic-two!");
117+
assertThat(bout.toString()).contains("Message arrived via an input binder from topic-two!");
118+
}
119+
}

0 commit comments

Comments
 (0)