Skip to content

Commit cd64578

Browse files
authored
Merge pull request eugenp#3522 from eugenp/reactive-websocket-refactor
Reactive exception handling
2 parents f888a3f + 955a2e7 commit cd64578

1 file changed

Lines changed: 19 additions & 26 deletions

File tree

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,41 @@
11
package com.baeldung.reactive.websocket;
22

3-
import org.springframework.web.reactive.socket.WebSocketSession;
4-
53
import com.fasterxml.jackson.core.JsonProcessingException;
64
import com.fasterxml.jackson.databind.ObjectMapper;
7-
85
import org.springframework.stereotype.Component;
96
import org.springframework.web.reactive.socket.WebSocketHandler;
107
import org.springframework.web.reactive.socket.WebSocketMessage;
11-
8+
import org.springframework.web.reactive.socket.WebSocketSession;
129
import reactor.core.publisher.Flux;
1310
import reactor.core.publisher.Mono;
1411

1512
import java.time.Duration;
16-
import java.time.LocalDateTime;
17-
import java.util.UUID;
13+
14+
import static java.time.LocalDateTime.now;
15+
import static java.util.UUID.randomUUID;
1816

1917
@Component
2018
public class ReactiveWebSocketHandler implements WebSocketHandler {
2119

22-
private Flux<Event> eventFlux = Flux.generate(e -> {
23-
Event event = new Event(UUID.randomUUID().toString(), LocalDateTime.now().toString());
24-
e.next(event);
25-
});
20+
private static final ObjectMapper json = new ObjectMapper();
2621

27-
private Flux<Event> intervalFlux = Flux.interval(Duration.ofMillis(1000L)).zipWith(eventFlux, (time, event) -> event);
22+
private Flux<String> eventFlux = Flux.generate(sink -> {
23+
Event event = new Event(randomUUID().toString(), now().toString());
24+
try {
25+
sink.next(json.writeValueAsString(event));
26+
} catch (JsonProcessingException e) {
27+
sink.error(e);
28+
}
29+
});
2830

29-
private ObjectMapper json = new ObjectMapper();
31+
private Flux<String> intervalFlux = Flux.interval(Duration.ofMillis(1000L))
32+
.zipWith(eventFlux, (time, event) -> event);
3033

3134
@Override
3235
public Mono<Void> handle(WebSocketSession webSocketSession) {
33-
34-
return webSocketSession.send(intervalFlux.map(event -> {
35-
try {
36-
String jsonEvent = json.writeValueAsString(event);
37-
System.out.println(jsonEvent);
38-
return jsonEvent;
39-
} catch (JsonProcessingException e) {
40-
e.printStackTrace();
41-
return "";
42-
}
43-
}).map(webSocketSession::textMessage))
44-
45-
.and(webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).log());
36+
return webSocketSession.send(intervalFlux
37+
.map(webSocketSession::textMessage))
38+
.and(webSocketSession.receive()
39+
.map(WebSocketMessage::getPayloadAsText).log());
4640
}
47-
4841
}

0 commit comments

Comments
 (0)