Skip to content

Commit b444907

Browse files
committed
WebSocket API: Undertow implementation
1 parent ec9f4c5 commit b444907

File tree

10 files changed

+276
-10
lines changed

10 files changed

+276
-10
lines changed

examples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
</dependency>
3737
<dependency>
3838
<groupId>io.jooby</groupId>
39-
<artifactId>jooby-netty</artifactId>
39+
<artifactId>jooby-utow</artifactId>
4040
<version>${jooby.version}</version>
4141
</dependency>
4242
<dependency>

examples/src/main/java/examples/WebSocketApp.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ public class WebSocketApp extends Jooby {
1515
assets("/?*", Paths.get(System.getProperty("user.dir"), "examples", "www", "websocket"));
1616

1717
ws("/ws", (ctx, initializer) -> {
18+
System.out.println(Thread.currentThread());
19+
System.out.println("Response Started: " + ctx.isResponseStarted());
1820
initializer.onConnect(ws -> {
21+
System.out.println("Connected: " + Thread.currentThread());
1922
ws.send("Welcome");
20-
ws.close();
2123
});
2224
initializer.onMessage((ws, msg) -> {
2325
ws.send("Got: " + msg.value(), true);
@@ -26,11 +28,14 @@ public class WebSocketApp extends Jooby {
2628
System.out.println("Closed " + closeStatus);
2729
});
2830

31+
initializer.onError((ws, cause) -> {
32+
ws.getContext().getRouter().getLog().error("error ", cause);
33+
});
34+
2935
});
3036
}
3137

3238
public static void main(String[] args) {
3339
runApp(args, ExecutionMode.DEFAULT, WebSocketApp::new);
34-
// runApp(args, ExecutionMode.EVENT_LOOP, WebSocketApp::new);
3540
}
3641
}

jooby/src/main/java/io/jooby/WebSocket.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.jooby;
22

33
import javax.annotation.Nonnull;
4+
import java.util.List;
45
import java.util.Map;
56

67
/**
@@ -76,12 +77,12 @@ interface OnMessage {
7677

7778
/**
7879
* On close callback. Generated when client close the connection or when explicit calls to
79-
* {@link #close(WebSocketCloseStatus)} or {@link #disconnect()}.
80+
* {@link #close(WebSocketCloseStatus)}.
8081
*/
8182
interface OnClose {
8283
/**
8384
* Generated when client close the connection or when explicit calls to
84-
* {@link #close(WebSocketCloseStatus)} or {@link #disconnect()}.
85+
* {@link #close(WebSocketCloseStatus)}.
8586
*
8687
* @param ws WebSocket.
8788
* @param closeStatus Close status.
@@ -155,6 +156,21 @@ interface OnError {
155156
return send(message, false);
156157
}
157158

159+
/**
160+
* Web sockets connected to the same path. This method doesn't include the current
161+
* websocket.
162+
*
163+
* @return Web sockets or empty list.
164+
*/
165+
@Nonnull List<WebSocket> getSessions();
166+
167+
/**
168+
* True if websocket is open.
169+
*
170+
* @return True when open.
171+
*/
172+
boolean isOpen();
173+
158174
/**
159175
* Send a text message to current client (broadcast = false) or to ALL connected clients under the
160176
* websocket path (broadcast = true).

jooby/src/main/java/io/jooby/WebSocketMessage.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import javax.annotation.Nonnull;
66
import java.lang.reflect.Type;
7+
import java.nio.charset.StandardCharsets;
78

89
/**
910
* Websocket message generated from a {@link WebSocket.OnMessage} callback. Message is a subclass.
@@ -32,4 +33,15 @@ public interface WebSocketMessage extends Value {
3233
static @Nonnull WebSocketMessage create(@Nonnull Context ctx, @Nonnull byte[] bytes) {
3334
return new WebSocketMessageImpl(ctx, bytes);
3435
}
36+
37+
/**
38+
* Creates a websocket message.
39+
*
40+
* @param ctx HTTP context.
41+
* @param message Text message.
42+
* @return A websocket message.
43+
*/
44+
static @Nonnull WebSocketMessage create(@Nonnull Context ctx, @Nonnull String message) {
45+
return new WebSocketMessageImpl(ctx, message.getBytes(StandardCharsets.UTF_8));
46+
}
3547
}

jooby/src/main/java/io/jooby/internal/ReadOnlyContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public ReadOnlyContext(@Nonnull Context context) {
3434
super(context);
3535
}
3636

37+
@Override public boolean isResponseStarted() {
38+
return true;
39+
}
40+
3741
@Nonnull @Override public Context send(@Nonnull Path file) {
3842
throw new IllegalStateException(MESSAGE);
3943
}

modules/jooby-jackson/src/main/java/io/jooby/json/JacksonModule.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package io.jooby.json;
77

8+
import com.fasterxml.jackson.core.JsonGenerator;
89
import com.fasterxml.jackson.core.JsonParseException;
910
import com.fasterxml.jackson.databind.JsonNode;
1011
import com.fasterxml.jackson.databind.Module;
@@ -25,8 +26,10 @@
2526
import io.jooby.StatusCode;
2627

2728
import javax.annotation.Nonnull;
29+
import java.io.DataOutput;
2830
import java.io.InputStream;
2931
import java.lang.reflect.Type;
32+
import java.nio.charset.StandardCharsets;
3033
import java.util.HashSet;
3134
import java.util.Set;
3235

@@ -129,7 +132,7 @@ public JacksonModule module(Class<? extends Module> module) {
129132

130133
@Override public byte[] encode(@Nonnull Context ctx, @Nonnull Object value) throws Exception {
131134
ctx.setDefaultResponseType(MediaType.json);
132-
return mapper.writeValueAsBytes(value);
135+
return mapper.writer().writeValueAsBytes(value);
133136
}
134137

135138
@Override public <T> T decode(Context ctx, Type type) throws Exception {

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyWebSocket.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
1818
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
1919

20+
import javax.annotation.Nonnull;
21+
import java.util.ArrayList;
2022
import java.util.Collections;
2123
import java.util.List;
2224
import java.util.concurrent.ConcurrentHashMap;
@@ -38,8 +40,8 @@ public class NettyWebSocket implements WebSocketConfigurer, WebSocket {
3840

3941
public NettyWebSocket(NettyContext ctx) {
4042
this.netty = ctx;
41-
this.key = ctx.pathString();
42-
dispatch = !ctx.isInIoThread();
43+
this.key = ctx.getRoute().getPattern();
44+
this.dispatch = !ctx.isInIoThread();
4345
}
4446

4547
public WebSocket send(String text, boolean broadcast) {
@@ -88,6 +90,16 @@ private WebSocket send(TextWebSocketFrame frame) {
8890
return Context.readOnly(netty);
8991
}
9092

93+
@Nonnull @Override public List<WebSocket> getSessions() {
94+
List<WebSocket> sessions = all.get(key);
95+
if (sessions == null) {
96+
return Collections.emptyList();
97+
}
98+
List<WebSocket> result = new ArrayList<>(sessions);
99+
result.remove(this);
100+
return result;
101+
}
102+
91103
public boolean isOpen() {
92104
return netty.ctx.channel().isOpen();
93105
}

modules/jooby-utow/src/main/java/io/jooby/internal/utow/UtowContext.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.jooby.StatusCode;
2424
import io.jooby.Value;
2525
import io.jooby.ValueNode;
26+
import io.jooby.WebSocket;
27+
import io.undertow.Handlers;
2628
import io.undertow.io.IoCallback;
2729
import io.undertow.io.Sender;
2830
import io.undertow.server.HttpServerExchange;
@@ -32,6 +34,9 @@
3234
import io.undertow.util.Headers;
3335
import io.undertow.util.HttpString;
3436
import io.undertow.util.SameThreadExecutor;
37+
import io.undertow.websockets.WebSocketConnectionCallback;
38+
import io.undertow.websockets.core.WebSocketChannel;
39+
import io.undertow.websockets.spi.WebSocketHttpExchange;
3540
import org.slf4j.Logger;
3641

3742
import javax.annotation.Nonnull;
@@ -222,6 +227,19 @@ public UtowContext(HttpServerExchange exchange, Router router) {
222227
return this;
223228
}
224229

230+
@Nonnull @Override public Context upgrade(@Nonnull WebSocket.Initializer handler) {
231+
try {
232+
Handlers.websocket((exchange, channel) -> {
233+
UtowWebSocket ws = new UtowWebSocket(this, channel);
234+
handler.init(Context.readOnly(this), ws);
235+
ws.fireConnect();
236+
}).handleRequest(exchange);
237+
return this;
238+
} catch (Exception x) {
239+
throw SneakyThrows.propagate(x);
240+
}
241+
}
242+
225243
@Nonnull @Override public StatusCode getResponseCode() {
226244
return StatusCode.valueOf(exchange.getStatusCode());
227245
}

0 commit comments

Comments
 (0)