|
1 | 1 | package com.baeldung.reactive.websocket; |
2 | 2 |
|
3 | | -import org.springframework.web.reactive.socket.WebSocketSession; |
4 | | - |
5 | 3 | import com.fasterxml.jackson.core.JsonProcessingException; |
6 | 4 | import com.fasterxml.jackson.databind.ObjectMapper; |
7 | | - |
8 | 5 | import org.springframework.stereotype.Component; |
9 | 6 | import org.springframework.web.reactive.socket.WebSocketHandler; |
10 | 7 | import org.springframework.web.reactive.socket.WebSocketMessage; |
11 | | - |
| 8 | +import org.springframework.web.reactive.socket.WebSocketSession; |
12 | 9 | import reactor.core.publisher.Flux; |
13 | 10 | import reactor.core.publisher.Mono; |
14 | 11 |
|
15 | 12 | import java.time.Duration; |
16 | | -import java.time.LocalDateTime; |
17 | | -import java.util.UUID; |
| 13 | + |
| 14 | +import static java.time.LocalDateTime.now; |
| 15 | +import static java.util.UUID.randomUUID; |
18 | 16 |
|
19 | 17 | @Component |
20 | 18 | public class ReactiveWebSocketHandler implements WebSocketHandler { |
21 | 19 |
|
22 | | - private Flux<Event> eventFlux = Flux.generate(e -> { |
23 | | - Event event = new Event(UUID.randomUUID().toString(), LocalDateTime.now().toString()); |
24 | | - e.next(event); |
25 | | - }); |
| 20 | + private static final ObjectMapper json = new ObjectMapper(); |
26 | 21 |
|
27 | | - private Flux<Event> intervalFlux = Flux.interval(Duration.ofMillis(1000L)).zipWith(eventFlux, (time, event) -> event); |
| 22 | + private Flux<String> eventFlux = Flux.generate(sink -> { |
| 23 | + Event event = new Event(randomUUID().toString(), now().toString()); |
| 24 | + try { |
| 25 | + sink.next(json.writeValueAsString(event)); |
| 26 | + } catch (JsonProcessingException e) { |
| 27 | + sink.error(e); |
| 28 | + } |
| 29 | + }); |
28 | 30 |
|
29 | | - private ObjectMapper json = new ObjectMapper(); |
| 31 | + private Flux<String> intervalFlux = Flux.interval(Duration.ofMillis(1000L)) |
| 32 | + .zipWith(eventFlux, (time, event) -> event); |
30 | 33 |
|
31 | 34 | @Override |
32 | 35 | public Mono<Void> handle(WebSocketSession webSocketSession) { |
33 | | - |
34 | | - return webSocketSession.send(intervalFlux.map(event -> { |
35 | | - try { |
36 | | - String jsonEvent = json.writeValueAsString(event); |
37 | | - System.out.println(jsonEvent); |
38 | | - return jsonEvent; |
39 | | - } catch (JsonProcessingException e) { |
40 | | - e.printStackTrace(); |
41 | | - return ""; |
42 | | - } |
43 | | - }).map(webSocketSession::textMessage)) |
44 | | - |
45 | | - .and(webSocketSession.receive().map(WebSocketMessage::getPayloadAsText).log()); |
| 36 | + return webSocketSession.send(intervalFlux |
| 37 | + .map(webSocketSession::textMessage)) |
| 38 | + .and(webSocketSession.receive() |
| 39 | + .map(WebSocketMessage::getPayloadAsText).log()); |
46 | 40 | } |
47 | | - |
48 | 41 | } |
0 commit comments