Skip to content

Commit bb02dc4

Browse files
authored
change sending (GoogleCloudPlatform#3863)
1 parent f28c539 commit bb02dc4

File tree

1 file changed

+14
-20
lines changed

1 file changed

+14
-20
lines changed

pubsub/spring/src/main/java/demo/PubSubApplication.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -124,26 +124,20 @@ public Consumer<Message<String>> receiveMessageFromTopicTwo() {
124124
@Bean
125125
public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
126126
return () ->
127-
Flux.fromStream(
128-
// Generate a stream that sends a numbered message every 10 seconds.
129-
Stream.generate(
130-
new Supplier<Message<String>>() {
131-
@Override
132-
public Message<String> get() {
133-
try {
134-
Thread.sleep(10000);
135-
} finally {
136-
Message<String> message =
137-
MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
138-
LOGGER.info(
139-
"Sending a message via the output binder to topic-one! Payload: "
140-
+ message.getPayload());
141-
return message;
142-
}
143-
}
144-
}))
145-
.subscribeOn(Schedulers.elastic())
146-
.share();
127+
Flux.<Message<String>>generate(sink -> {
128+
try {
129+
Thread.sleep(10000);
130+
} catch (InterruptedException e) {
131+
// stop sleep earlier.
132+
}
133+
134+
Message<String> message =
135+
MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
136+
LOGGER.info(
137+
"Sending a message via the output binder to topic-one! Payload: "
138+
+ message.getPayload());
139+
sink.next(message);
140+
}).subscribeOn(Schedulers.elastic());
147141
}
148142
// [END pubsub_spring_cloud_stream_output_binder]
149143
}

0 commit comments

Comments
 (0)