Skip to content

Commit 8a84f54

Browse files
Pub/Sub: rename input Spring message channel and lint (GoogleCloudPlatform#3866)
* rename input message channel and lint * lint
1 parent 7092398 commit 8a84f54

File tree

1 file changed

+19
-18
lines changed

1 file changed

+19
-18
lines changed

pubsub/spring/src/main/java/demo/PubSubApplication.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.Random;
2020
import java.util.function.Consumer;
2121
import java.util.function.Supplier;
22-
import java.util.stream.Stream;
2322
import org.apache.commons.logging.Log;
2423
import org.apache.commons.logging.LogFactory;
2524
import org.springframework.beans.factory.annotation.Qualifier;
@@ -56,15 +55,15 @@ public static void main(String[] args) {
5655
// [START pubsub_spring_inbound_channel_adapter]
5756
// Create a message channel for messages arriving from the subscription `sub-one`.
5857
@Bean
59-
public MessageChannel inputMessageChannelForSubOne() {
58+
public MessageChannel inputMessageChannel() {
6059
return new PublishSubscribeChannel();
6160
}
6261

6362
// Create an inbound channel adapter to listen to the subscription `sub-one` and send
6463
// messages to the input message channel.
6564
@Bean
6665
public PubSubInboundChannelAdapter inboundChannelAdapter(
67-
@Qualifier("inputMessageChannelForSubOne") MessageChannel messageChannel,
66+
@Qualifier("inputMessageChannel") MessageChannel messageChannel,
6867
PubSubTemplate pubSubTemplate) {
6968
PubSubInboundChannelAdapter adapter =
7069
new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
@@ -75,7 +74,7 @@ public PubSubInboundChannelAdapter inboundChannelAdapter(
7574
}
7675

7776
// Define what happens to the messages arriving in the message channel.
78-
@ServiceActivator(inputChannel = "inputMessageChannelForSubOne")
77+
@ServiceActivator(inputChannel = "inputMessageChannel")
7978
public void messageReceiver(
8079
String payload,
8180
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
@@ -88,7 +87,7 @@ public void messageReceiver(
8887
// Create an outbound channel adapter to send messages from the input message channel to the
8988
// topic `topic-two`.
9089
@Bean
91-
@ServiceActivator(inputChannel = "inputMessageChannelForSubOne")
90+
@ServiceActivator(inputChannel = "inputMessageChannel")
9291
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
9392
PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");
9493

@@ -124,20 +123,22 @@ public Consumer<Message<String>> receiveMessageFromTopicTwo() {
124123
@Bean
125124
public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
126125
return () ->
127-
Flux.<Message<String>>generate(sink -> {
128-
try {
129-
Thread.sleep(10000);
130-
} catch (InterruptedException e) {
131-
// stop sleep earlier.
132-
}
126+
Flux.<Message<String>>generate(
127+
sink -> {
128+
try {
129+
Thread.sleep(10000);
130+
} catch (InterruptedException e) {
131+
// stop sleep earlier.
132+
}
133133

134-
Message<String> message =
135-
MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
136-
LOGGER.info(
137-
"Sending a message via the output binder to topic-one! Payload: "
138-
+ message.getPayload());
139-
sink.next(message);
140-
}).subscribeOn(Schedulers.elastic());
134+
Message<String> message =
135+
MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
136+
LOGGER.info(
137+
"Sending a message via the output binder to topic-one! Payload: "
138+
+ message.getPayload());
139+
sink.next(message);
140+
})
141+
.subscribeOn(Schedulers.elastic());
141142
}
142143
// [END pubsub_spring_cloud_stream_output_binder]
143144
}

0 commit comments

Comments
 (0)