1919import java .util .Random ;
2020import java .util .function .Consumer ;
2121import java .util .function .Supplier ;
22- import java .util .stream .Stream ;
2322import org .apache .commons .logging .Log ;
2423import org .apache .commons .logging .LogFactory ;
2524import org .springframework .beans .factory .annotation .Qualifier ;
@@ -56,15 +55,15 @@ public static void main(String[] args) {
5655 // [START pubsub_spring_inbound_channel_adapter]
5756 // Create a message channel for messages arriving from the subscription `sub-one`.
5857 @ Bean
59- public MessageChannel inputMessageChannelForSubOne () {
58+ public MessageChannel inputMessageChannel () {
6059 return new PublishSubscribeChannel ();
6160 }
6261
6362 // Create an inbound channel adapter to listen to the subscription `sub-one` and send
6463 // messages to the input message channel.
6564 @ Bean
6665 public PubSubInboundChannelAdapter inboundChannelAdapter (
67- @ Qualifier ("inputMessageChannelForSubOne " ) MessageChannel messageChannel ,
66+ @ Qualifier ("inputMessageChannel " ) MessageChannel messageChannel ,
6867 PubSubTemplate pubSubTemplate ) {
6968 PubSubInboundChannelAdapter adapter =
7069 new PubSubInboundChannelAdapter (pubSubTemplate , "sub-one" );
@@ -75,7 +74,7 @@ public PubSubInboundChannelAdapter inboundChannelAdapter(
7574 }
7675
7776 // Define what happens to the messages arriving in the message channel.
78- @ ServiceActivator (inputChannel = "inputMessageChannelForSubOne " )
77+ @ ServiceActivator (inputChannel = "inputMessageChannel " )
7978 public void messageReceiver (
8079 String payload ,
8180 @ Header (GcpPubSubHeaders .ORIGINAL_MESSAGE ) BasicAcknowledgeablePubsubMessage message ) {
@@ -88,7 +87,7 @@ public void messageReceiver(
8887 // Create an outbound channel adapter to send messages from the input message channel to the
8988 // topic `topic-two`.
9089 @ Bean
91- @ ServiceActivator (inputChannel = "inputMessageChannelForSubOne " )
90+ @ ServiceActivator (inputChannel = "inputMessageChannel " )
9291 public MessageHandler messageSender (PubSubTemplate pubsubTemplate ) {
9392 PubSubMessageHandler adapter = new PubSubMessageHandler (pubsubTemplate , "topic-two" );
9493
@@ -124,20 +123,22 @@ public Consumer<Message<String>> receiveMessageFromTopicTwo() {
124123 @ Bean
125124 public Supplier <Flux <Message <String >>> sendMessageToTopicOne () {
126125 return () ->
127- Flux .<Message <String >>generate (sink -> {
128- try {
129- Thread .sleep (10000 );
130- } catch (InterruptedException e ) {
131- // stop sleep earlier.
132- }
126+ Flux .<Message <String >>generate (
127+ sink -> {
128+ try {
129+ Thread .sleep (10000 );
130+ } catch (InterruptedException e ) {
131+ // stop sleep earlier.
132+ }
133133
134- Message <String > message =
135- MessageBuilder .withPayload ("message-" + rand .nextInt (1000 )).build ();
136- LOGGER .info (
137- "Sending a message via the output binder to topic-one! Payload: "
138- + message .getPayload ());
139- sink .next (message );
140- }).subscribeOn (Schedulers .elastic ());
134+ Message <String > message =
135+ MessageBuilder .withPayload ("message-" + rand .nextInt (1000 )).build ();
136+ LOGGER .info (
137+ "Sending a message via the output binder to topic-one! Payload: "
138+ + message .getPayload ());
139+ sink .next (message );
140+ })
141+ .subscribeOn (Schedulers .elastic ());
141142 }
142143 // [END pubsub_spring_cloud_stream_output_binder]
143144}
0 commit comments