Skip to content

Commit 4a191c6

Browse files
committed
WebSocket API: cleanup + refactor
- rename WebSocketContext to WebSocketListner - Make WebSocketHandler two-arg callback: (context, configurer)
1 parent 646a611 commit 4a191c6

File tree

9 files changed

+161
-86
lines changed

9 files changed

+161
-86
lines changed

examples/src/main/java/examples/WebSocketApp.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,14 @@ public class WebSocketApp extends Jooby {
1313
{
1414
assets("/?*", Paths.get(System.getProperty("user.dir"), "examples", "www", "websocket"));
1515

16-
ScheduledExecutorService executor = Executors
17-
.newSingleThreadScheduledExecutor();
18-
ws("/ws", ctx -> {
19-
AtomicInteger counter = new AtomicInteger();
20-
ctx.onConnect(ws -> {
21-
executor.scheduleWithFixedDelay(() -> {
22-
ws.send("" + counter.incrementAndGet());
23-
}, 0, 3, TimeUnit.SECONDS);
16+
ws("/ws", (ctx, initializer) -> {
17+
initializer.onConnect(ws -> {
18+
ws.send("Welcome");
2419
});
25-
ctx.onMessage((ws, msg) -> {
26-
System.out.println("msg: " + counter.incrementAndGet() + " => " + msg.value());
27-
System.out.println(Thread.currentThread());
28-
// ws.send("Got: " + msg.value());
20+
initializer.onMessage((ws, msg) -> {
21+
ws.send("Got: " + msg.value(), true);
2922
});
30-
ctx.onClose((ws, closeStatus) -> {
23+
initializer.onClose((ws, closeStatus) -> {
3124
System.out.println("Closed " + closeStatus);
3225
});
3326

jooby/src/main/java/io/jooby/WebSocket.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package io.jooby;
22

3+
import javax.annotation.Nonnull;
4+
import java.util.Map;
5+
36
public interface WebSocket {
47
interface Handler {
5-
void apply(WebSocketContext ctx);
8+
void init(Context ctx, WebSocketListener initializer);
69
}
710

811
interface OnConnect {
@@ -23,9 +26,55 @@ interface OnError {
2326

2427
Context getContext();
2528

26-
WebSocket send(String message);
29+
/**
30+
* Context attributes (a.k.a request attributes).
31+
*
32+
* @return Context attributes.
33+
*/
34+
default @Nonnull Map<String, Object> getAttributes() {
35+
return getContext().getAttributes();
36+
}
37+
38+
/**
39+
* Get an attribute by his key. This is just an utility method around {@link #getAttributes()}.
40+
* This method look first in current context and fallback to application attributes.
41+
*
42+
* @param key Attribute key.
43+
* @param <T> Attribute type.
44+
* @return Attribute value.
45+
*/
46+
default @Nonnull <T> T attribute(@Nonnull String key) {
47+
return getContext().attribute(key);
48+
}
49+
50+
/**
51+
* Set an application attribute.
52+
*
53+
* @param key Attribute key.
54+
* @param value Attribute value.
55+
* @return This router.
56+
*/
57+
default @Nonnull WebSocket attribute(@Nonnull String key, Object value) {
58+
getContext().attribute(key, value);
59+
return this;
60+
}
61+
62+
default WebSocket send(String message) {
63+
return send(message, false);
64+
}
65+
66+
WebSocket send(String message, boolean broadcast);
67+
68+
default WebSocket send(byte[] bytes) {
69+
return send(bytes, false);
70+
}
71+
72+
WebSocket send(byte[] bytes, boolean broadcast);
73+
74+
default WebSocket render(Object message) {
75+
return render(message, false);
76+
}
2777

28-
WebSocket send(byte[] bytes);
78+
WebSocket render(Object message, boolean broadcast);
2979

30-
WebSocket render(Object message);
3180
}

jooby/src/main/java/io/jooby/WebSocketContext.java

Lines changed: 0 additions & 11 deletions
This file was deleted.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.jooby;
2+
3+
public interface WebSocketListener {
4+
5+
WebSocketListener onConnect(WebSocket.OnConnect callback);
6+
7+
WebSocketListener onMessage(WebSocket.OnMessage callback);
8+
9+
WebSocketListener onError(WebSocket.OnError callback);
10+
11+
WebSocketListener onClose(WebSocket.OnClose callback);
12+
}

jooby/src/main/java/io/jooby/internal/WebSocketMessageImpl.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import io.jooby.DefaultContext;
66
import io.jooby.ForwardingContext;
77
import io.jooby.MediaType;
8-
import io.jooby.SneakyThrows;
98
import io.jooby.WebSocketMessage;
109

1110
import javax.annotation.Nonnull;
@@ -43,26 +42,8 @@ public WebSocketMessageImpl(Context ctx, byte[] bytes) {
4342
super(ctx, bytes);
4443
}
4544

46-
@Override public byte[] bytes() {
47-
byte[] bytes = super.bytes();
48-
System.out.println(new String(bytes));
49-
return bytes;
50-
}
51-
5245
@Nonnull @Override public <T> T to(@Nonnull Type type) {
5346
MediaType contentType = ctx.getRoute().getConsumes().get(0);
5447
return new WebSocketMessageBody(ctx, this).decode(type, contentType);
55-
// try {
56-
// MediaType contentType = ctx.getRoute().getConsumes().get(0);
57-
// if (MediaType.text.equals(contentType)) {
58-
// T result = ValueConverters.convert(this, type, ctx.getRouter());
59-
// if (result != null) {
60-
// return result;
61-
// }
62-
// }
63-
// return ctx.decoder(contentType).decode(new WebSocketMessageBody(ctx, this), type);
64-
// } catch (Exception x) {
65-
// throw SneakyThrows.propagate(x);
66-
// }
6748
}
6849
}

jooby/src/main/java/io/jooby/internal/handler/WebSocketHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public WebSocketHandler(WebSocket.Handler handler) {
1414
this.handler = handler;
1515
}
1616

17-
@Nonnull @Override public Object apply(@Nonnull Context ctx) throws Exception {
17+
@Nonnull @Override public Object apply(@Nonnull Context ctx) {
1818
boolean webSocket = ctx.header("Upgrade").value("")
1919
.equalsIgnoreCase("WebSocket");
2020
if (webSocket) {

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public class NettyContext implements DefaultContext, ChannelFutureListener {
117117
private Map<String, String> cookies;
118118
private Map<String, String> responseCookies;
119119
private Boolean resetHeadersOnError;
120-
NettyWebSocketContext webSocket;
120+
NettyWebSocket webSocket;
121121

122122
public NettyContext(ChannelHandlerContext ctx, HttpRequest req, Router router, String path,
123123
int bufferSize) {
@@ -277,8 +277,8 @@ public NettyContext(ChannelHandlerContext ctx, HttpRequest req, Router router, S
277277
.maxFramePayloadLength(131072)
278278
.build();
279279
responseStarted = true;
280-
webSocket = new NettyWebSocketContext(this);
281-
handler.apply(webSocket);
280+
webSocket = new NettyWebSocket(this);
281+
handler.init(Context.readOnly(this), webSocket);
282282
DefaultFullHttpRequest fullHttpRequest = new DefaultFullHttpRequest(req.protocolVersion(),
283283
req.method(), req.uri(), Unpooled.EMPTY_BUFFER, req.headers(), EmptyHttpHeaders.INSTANCE);
284284
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(webSocketURL,

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyWebSocketContext.java renamed to modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyWebSocket.java

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import io.jooby.SneakyThrows;
77
import io.jooby.WebSocket;
88
import io.jooby.WebSocketCloseStatus;
9-
import io.jooby.WebSocketContext;
9+
import io.jooby.WebSocketListener;
1010
import io.jooby.WebSocketMessage;
1111
import io.netty.buffer.ByteBuf;
1212
import io.netty.buffer.Unpooled;
@@ -17,26 +17,62 @@
1717
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
1818
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
1919

20-
public class NettyWebSocketContext implements WebSocketContext, WebSocket {
20+
import java.util.Collections;
21+
import java.util.List;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ConcurrentMap;
24+
import java.util.concurrent.CopyOnWriteArrayList;
25+
26+
public class NettyWebSocket implements WebSocketListener, WebSocket {
27+
/** All connected websocket. */
28+
private static final ConcurrentMap<String, List<WebSocket>> all = new ConcurrentHashMap<>();
29+
2130
private final NettyContext netty;
2231
private final boolean dispatch;
32+
private final String key;
2333
private ByteBuf buffer;
2434
private WebSocket.OnConnect connectCallback;
2535
private WebSocket.OnMessage messageCallback;
2636
private OnClose onCloseCallback;
2737
private OnError onErrorCallback;
2838

29-
public NettyWebSocketContext(NettyContext ctx) {
39+
public NettyWebSocket(NettyContext ctx) {
3040
this.netty = ctx;
41+
this.key = ctx.pathString();
3142
dispatch = !ctx.isInIoThread();
3243
}
3344

34-
public WebSocket send(String text) {
35-
return send(new TextWebSocketFrame(text));
45+
public WebSocket send(String text, boolean broadcast) {
46+
if (broadcast) {
47+
for (WebSocket ws : all.getOrDefault(key, Collections.emptyList())) {
48+
ws.send(text, false);
49+
}
50+
} else {
51+
send(new TextWebSocketFrame(text));
52+
}
53+
return this;
54+
}
55+
56+
public WebSocket send(byte[] bytes, boolean broadcast) {
57+
if (broadcast) {
58+
for (WebSocket ws : all.getOrDefault(key, Collections.emptyList())) {
59+
ws.send(bytes, false);
60+
}
61+
} else {
62+
send(new TextWebSocketFrame(Unpooled.wrappedBuffer(bytes)));
63+
}
64+
return this;
3665
}
3766

38-
public WebSocket send(byte[] bytes) {
39-
return send(new TextWebSocketFrame(Unpooled.wrappedBuffer(bytes)));
67+
@Override public WebSocket render(Object message, boolean broadcast) {
68+
if (broadcast) {
69+
for (WebSocket ws : all.getOrDefault(key, Collections.emptyList())) {
70+
ws.render(message, false);
71+
}
72+
} else {
73+
Context.websocket(netty, this).render(message);
74+
}
75+
return this;
4076
}
4177

4278
private WebSocket send(TextWebSocketFrame frame) {
@@ -48,11 +84,6 @@ private WebSocket send(TextWebSocketFrame frame) {
4884
return this;
4985
}
5086

51-
@Override public WebSocket render(Object message) {
52-
Context.websocket(netty, this).render(message);
53-
return this;
54-
}
55-
5687
@Override public Context getContext() {
5788
return Context.readOnly(netty);
5889
}
@@ -61,20 +92,24 @@ public boolean isOpen() {
6192
return netty.ctx.channel().isOpen();
6293
}
6394

64-
@Override public void onConnect(WebSocket.OnConnect callback) {
95+
@Override public WebSocketListener onConnect(WebSocket.OnConnect callback) {
6596
connectCallback = callback;
97+
return this;
6698
}
6799

68-
@Override public void onMessage(WebSocket.OnMessage callback) {
100+
@Override public WebSocketListener onMessage(WebSocket.OnMessage callback) {
69101
messageCallback = callback;
102+
return this;
70103
}
71104

72-
@Override public void onClose(WebSocket.OnClose callback) {
105+
@Override public WebSocketListener onClose(WebSocket.OnClose callback) {
73106
onCloseCallback = callback;
107+
return this;
74108
}
75109

76-
@Override public void onError(OnError callback) {
110+
@Override public WebSocketListener onError(OnError callback) {
77111
onErrorCallback = callback;
112+
return this;
78113
}
79114

80115
void handleFrame(WebSocketFrame frame) {
@@ -113,22 +148,26 @@ private void handleMessage(WebSocketFrame frame) {
113148
}
114149

115150
private void handleClose(WebSocketCloseStatus closeStatus) {
116-
if (isOpen()) {
117-
if (onCloseCallback != null) {
118-
Runnable task = webSocketTask(() -> onCloseCallback.onClose(this, closeStatus));
119-
Runnable closeCallback = () -> {
120-
try {
121-
task.run();
122-
} finally {
123-
netty.ctx.channel()
124-
.writeAndFlush(
125-
new CloseWebSocketFrame(closeStatus.getCode(), closeStatus.getReason()))
126-
.addListener(ChannelFutureListener.CLOSE);
127-
}
128-
};
129-
130-
fireCallback(closeCallback);
151+
try {
152+
if (isOpen()) {
153+
if (onCloseCallback != null) {
154+
Runnable task = webSocketTask(() -> onCloseCallback.onClose(this, closeStatus));
155+
Runnable closeCallback = () -> {
156+
try {
157+
task.run();
158+
} finally {
159+
netty.ctx.channel()
160+
.writeAndFlush(
161+
new CloseWebSocketFrame(closeStatus.getCode(), closeStatus.getReason()))
162+
.addListener(ChannelFutureListener.CLOSE);
163+
}
164+
};
165+
166+
fireCallback(closeCallback);
167+
}
131168
}
169+
} finally {
170+
removeSession(this);
132171
}
133172
}
134173

@@ -150,6 +189,7 @@ private void handleError(Throwable x) {
150189
}
151190

152191
void fireConnect() {
192+
addSession(this);
153193
if (connectCallback != null) {
154194
fireCallback(webSocketTask(() -> connectCallback.onConnect(this)));
155195
}
@@ -193,4 +233,15 @@ private static WebSocketCloseStatus toWebSocketCloseStatus(CloseWebSocketFrame f
193233
frame.release();
194234
}
195235
}
236+
237+
private void addSession(NettyWebSocket ws) {
238+
all.computeIfAbsent(ws.key, k -> new CopyOnWriteArrayList<>()).add(ws);
239+
}
240+
241+
private void removeSession(NettyWebSocket ws) {
242+
List<WebSocket> sockets = all.get(ws.key);
243+
if (sockets != null) {
244+
sockets.remove(ws);
245+
}
246+
}
196247
}

tests/src/test/java/io/jooby/WebSocketTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ public class WebSocketTest {
1212
public void webSocket() {
1313
new JoobyRunner(app -> {
1414

15-
app.ws("/ws/{key}", ctx -> {
16-
ctx.onMessage((ws, message) -> {
15+
app.ws("/ws/{key}", (ctx, initializer) -> {
16+
initializer.onMessage((ws, message) -> {
1717
ws.send("Hi " + message.value() + "!");
1818
});
1919
});
@@ -31,8 +31,8 @@ public void webSocketJson() {
3131
new JoobyRunner(app -> {
3232
app.install(new JacksonModule());
3333

34-
app.ws("/wsjson", ctx -> {
35-
ctx.onMessage((ws, message) -> {
34+
app.ws("/wsjson", (ctx, initializer) -> {
35+
initializer.onMessage((ws, message) -> {
3636
JsonNode node = message.to(JsonNode.class);
3737
ws.render(node);
3838
});

0 commit comments

Comments
 (0)