File tree Expand file tree Collapse file tree 1 file changed +14
-20
lines changed
pubsub/spring/src/main/java/demo Expand file tree Collapse file tree 1 file changed +14
-20
lines changed Original file line number Diff line number Diff line change @@ -124,26 +124,20 @@ public Consumer<Message<String>> receiveMessageFromTopicTwo() {
124124 @ Bean
125125 public Supplier <Flux <Message <String >>> sendMessageToTopicOne () {
126126 return () ->
127- Flux .fromStream (
128- // Generate a stream that sends a numbered message every 10 seconds.
129- Stream .generate (
130- new Supplier <Message <String >>() {
131- @ Override
132- public Message <String > get () {
133- try {
134- Thread .sleep (10000 );
135- } finally {
136- Message <String > message =
137- MessageBuilder .withPayload ("message-" + rand .nextInt (1000 )).build ();
138- LOGGER .info (
139- "Sending a message via the output binder to topic-one! Payload: "
140- + message .getPayload ());
141- return message ;
142- }
143- }
144- }))
145- .subscribeOn (Schedulers .elastic ())
146- .share ();
127+ Flux .<Message <String >>generate (sink -> {
128+ try {
129+ Thread .sleep (10000 );
130+ } catch (InterruptedException e ) {
131+ // stop sleep earlier.
132+ }
133+
134+ Message <String > message =
135+ MessageBuilder .withPayload ("message-" + rand .nextInt (1000 )).build ();
136+ LOGGER .info (
137+ "Sending a message via the output binder to topic-one! Payload: "
138+ + message .getPayload ());
139+ sink .next (message );
140+ }).subscribeOn (Schedulers .elastic ());
147141 }
148142 // [END pubsub_spring_cloud_stream_output_binder]
149143}
You can’t perform that action at this time.
0 commit comments