Skip to content

Commit 984397c

Browse files
committed
WebSocket: Close doesn't work
- Implement close on netty, undertow and jetty - Fix jooby-project#2565
1 parent 0fed5ee commit 984397c

File tree

3 files changed

+123
-62
lines changed

3 files changed

+123
-62
lines changed

modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyWebSocket.java

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,6 @@
55
*/
66
package io.jooby.internal.jetty;
77

8-
import io.jooby.Context;
9-
import io.jooby.Server;
10-
import io.jooby.SneakyThrows;
11-
import io.jooby.WebSocket;
12-
import io.jooby.WebSocketCloseStatus;
13-
import io.jooby.WebSocketConfigurer;
14-
import io.jooby.WebSocketMessage;
15-
import org.eclipse.jetty.websocket.api.CloseException;
16-
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
17-
import org.eclipse.jetty.websocket.api.Session;
18-
import org.eclipse.jetty.websocket.api.WebSocketListener;
19-
import org.eclipse.jetty.websocket.api.WriteCallback;
20-
21-
import javax.annotation.Nonnull;
228
import java.nio.charset.StandardCharsets;
239
import java.util.ArrayList;
2410
import java.util.Collections;
@@ -27,6 +13,24 @@
2713
import java.util.concurrent.ConcurrentMap;
2814
import java.util.concurrent.CopyOnWriteArrayList;
2915
import java.util.concurrent.TimeoutException;
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
import java.util.concurrent.atomic.AtomicReference;
18+
19+
import javax.annotation.Nonnull;
20+
21+
import org.eclipse.jetty.websocket.api.CloseException;
22+
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
23+
import org.eclipse.jetty.websocket.api.Session;
24+
import org.eclipse.jetty.websocket.api.WebSocketListener;
25+
import org.eclipse.jetty.websocket.api.WriteCallback;
26+
27+
import io.jooby.Context;
28+
import io.jooby.Server;
29+
import io.jooby.SneakyThrows;
30+
import io.jooby.WebSocket;
31+
import io.jooby.WebSocketCloseStatus;
32+
import io.jooby.WebSocketConfigurer;
33+
import io.jooby.WebSocketMessage;
3034

3135
public class JettyWebSocket implements WebSocketListener, WebSocketConfigurer, WebSocket,
3236
WriteCallback {
@@ -41,8 +45,9 @@ public class JettyWebSocket implements WebSocketListener, WebSocketConfigurer, W
4145
private Session session;
4246
private WebSocket.OnConnect onConnectCallback;
4347
private WebSocket.OnMessage onMessageCallback;
44-
private WebSocket.OnClose onCloseCallback;
48+
private AtomicReference<WebSocket.OnClose> onCloseCallback = new AtomicReference<>();
4549
private WebSocket.OnError onErrorCallback;
50+
private AtomicBoolean open = new AtomicBoolean(false);
4651

4752
public JettyWebSocket(JettyContext ctx) {
4853
this.ctx = ctx;
@@ -73,6 +78,7 @@ public JettyWebSocket(JettyContext ctx) {
7378

7479
@Override public void onWebSocketConnect(Session session) {
7580
try {
81+
open.set(true);
7682
this.session = session;
7783
addSession(this);
7884
if (onConnectCallback != null) {
@@ -86,7 +92,7 @@ public JettyWebSocket(JettyContext ctx) {
8692
@Override public void onWebSocketError(Throwable x) {
8793
// should close?
8894
if (!isTimeout(x)) {
89-
if (Server.connectionLost(x) || SneakyThrows.isFatal(x)) {
95+
if (isOpen() && (Server.connectionLost(x) || SneakyThrows.isFatal(x))) {
9096
handleClose(WebSocketCloseStatus.SERVER_ERROR);
9197
}
9298

@@ -131,7 +137,7 @@ private boolean isTimeout(Throwable x) {
131137
}
132138

133139
@Nonnull @Override public WebSocketConfigurer onClose(@Nonnull WebSocket.OnClose callback) {
134-
onCloseCallback = callback;
140+
onCloseCallback.set(callback);
135141
return this;
136142
}
137143

@@ -150,7 +156,7 @@ private boolean isTimeout(Throwable x) {
150156
}
151157

152158
@Override public boolean isOpen() {
153-
return session.isOpen();
159+
return open.get() && session.isOpen();
154160
}
155161

156162
@Nonnull @Override public WebSocket send(@Nonnull String message, boolean broadcast) {
@@ -167,7 +173,8 @@ private boolean isTimeout(Throwable x) {
167173
onWebSocketError(x);
168174
}
169175
} else {
170-
onWebSocketError(new IllegalStateException("Attempt to send a message on closed web socket"));
176+
onWebSocketError(
177+
new IllegalStateException("Attempt to send a message on closed web socket"));
171178
}
172179
}
173180
return this;
@@ -209,14 +216,34 @@ private boolean isTimeout(Throwable x) {
209216
}
210217

211218
private void handleClose(WebSocketCloseStatus closeStatus) {
219+
WebSocket.OnClose callback = this.onCloseCallback.getAndSet(null);
220+
Throwable cause = null;
221+
// 1. close socket
212222
try {
213-
if (onCloseCallback != null) {
214-
onCloseCallback.onClose(this, closeStatus);
223+
if (isOpen()) {
224+
session.close(closeStatus.getCode(), closeStatus.getReason());
215225
}
216226
} catch (Throwable x) {
217-
onWebSocketError(x);
218-
} finally {
219-
removeSession(this);
227+
cause = x;
228+
}
229+
open.set(false);
230+
// fire callback:
231+
if (callback != null) {
232+
try {
233+
callback.onClose(this, closeStatus);
234+
} catch (Throwable x) {
235+
if (cause != null) {
236+
x.addSuppressed(cause);
237+
}
238+
cause = x;
239+
}
240+
}
241+
// clear from active sessions:
242+
removeSession(this);
243+
244+
if (cause != null) {
245+
// fire error:
246+
onWebSocketError(cause);
220247
}
221248
}
222249

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyWebSocket.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import java.util.concurrent.ConcurrentMap;
1414
import java.util.concurrent.CopyOnWriteArrayList;
1515
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
import java.util.concurrent.atomic.AtomicReference;
1618

1719
import javax.annotation.Nonnull;
1820

@@ -48,9 +50,10 @@ public class NettyWebSocket implements WebSocketConfigurer, WebSocket, ChannelFu
4850
private ByteBuf buffer;
4951
private WebSocket.OnConnect connectCallback;
5052
private WebSocket.OnMessage messageCallback;
51-
private OnClose onCloseCallback;
53+
private AtomicReference<OnClose> onCloseCallback = new AtomicReference<>();
5254
private OnError onErrorCallback;
5355
private CountDownLatch ready = new CountDownLatch(1);
56+
private AtomicBoolean open = new AtomicBoolean(false);
5457

5558
public NettyWebSocket(NettyContext ctx) {
5659
this.netty = ctx;
@@ -112,7 +115,7 @@ private WebSocket send(ByteBuf buffer, boolean broadcast) {
112115
}
113116

114117
public boolean isOpen() {
115-
return netty.ctx.channel().isOpen();
118+
return open.get() && netty.ctx.channel().isOpen();
116119
}
117120

118121
@Override public WebSocketConfigurer onConnect(WebSocket.OnConnect callback) {
@@ -126,7 +129,7 @@ public boolean isOpen() {
126129
}
127130

128131
@Override public WebSocketConfigurer onClose(WebSocket.OnClose callback) {
129-
onCloseCallback = callback;
132+
onCloseCallback.set(callback);
130133
return this;
131134
}
132135

@@ -179,23 +182,22 @@ private void handleMessage(WebSocketFrame frame) {
179182
}
180183

181184
private void handleClose(WebSocketCloseStatus closeStatus) {
185+
OnClose callback = onCloseCallback.getAndSet(null);
186+
if (isOpen()) {
187+
// close socket:
188+
netty.ctx.channel()
189+
.writeAndFlush(
190+
new CloseWebSocketFrame(closeStatus.getCode(), closeStatus.getReason()))
191+
.addListener(ChannelFutureListener.CLOSE);
192+
}
193+
open.set(false);
182194
try {
183-
if (isOpen()) {
184-
if (onCloseCallback != null) {
185-
Runnable closeCallback = () -> {
186-
try {
187-
webSocketTask(() -> onCloseCallback.onClose(this, closeStatus), false).run();
188-
} finally {
189-
netty.ctx.channel()
190-
.writeAndFlush(
191-
new CloseWebSocketFrame(closeStatus.getCode(), closeStatus.getReason()))
192-
.addListener(ChannelFutureListener.CLOSE);
193-
}
194-
};
195-
fireCallback(closeCallback);
196-
}
195+
if (callback != null) {
196+
// fire callback:
197+
fireCallback(webSocketTask(() -> callback.onClose(this, closeStatus), false));
197198
}
198199
} finally {
200+
// clear from active sessions:
199201
this.netty.ctx.channel().attr(WS).set(null);
200202
removeSession(this);
201203
}
@@ -291,6 +293,7 @@ private Runnable webSocketTask(Runnable runnable, boolean isInit) {
291293
private void waitForConnect() {
292294
try {
293295
ready.await();
296+
open.set(true);
294297
} catch (InterruptedException x) {
295298
Thread.currentThread().interrupt();
296299
}

modules/jooby-utow/src/main/java/io/jooby/internal/utow/UtowWebSocket.java

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,26 @@
55
*/
66
package io.jooby.internal.utow;
77

8+
import static io.undertow.websockets.core.WebSockets.sendClose;
9+
10+
import java.io.IOException;
11+
import java.nio.ByteBuffer;
12+
import java.nio.charset.StandardCharsets;
13+
import java.util.ArrayList;
14+
import java.util.Collections;
15+
import java.util.List;
16+
import java.util.concurrent.ConcurrentHashMap;
17+
import java.util.concurrent.ConcurrentMap;
18+
import java.util.concurrent.CopyOnWriteArrayList;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
import javax.annotation.Nonnull;
25+
26+
import org.xnio.IoUtils;
27+
828
import com.typesafe.config.Config;
929
import io.jooby.Context;
1030
import io.jooby.Server;
@@ -20,19 +40,6 @@
2040
import io.undertow.websockets.core.WebSocketChannel;
2141
import io.undertow.websockets.core.WebSockets;
2242

23-
import javax.annotation.Nonnull;
24-
import java.io.IOException;
25-
import java.nio.ByteBuffer;
26-
import java.nio.charset.StandardCharsets;
27-
import java.util.ArrayList;
28-
import java.util.Collections;
29-
import java.util.List;
30-
import java.util.concurrent.ConcurrentHashMap;
31-
import java.util.concurrent.ConcurrentMap;
32-
import java.util.concurrent.CopyOnWriteArrayList;
33-
import java.util.concurrent.CountDownLatch;
34-
import java.util.concurrent.TimeUnit;
35-
3643
public class UtowWebSocket extends AbstractReceiveListener
3744
implements WebSocketConfigurer, WebSocket, WebSocketCallback<Void> {
3845

@@ -44,10 +51,11 @@ public class UtowWebSocket extends AbstractReceiveListener
4451
private final boolean dispatch;
4552
private OnConnect onConnectCallback;
4653
private OnMessage onMessageCallback;
47-
private OnClose onCloseCallback;
54+
private AtomicReference<OnClose> onCloseCallback = new AtomicReference<>();
4855
private OnError onErrorCallback;
4956
private String key;
5057
private CountDownLatch ready = new CountDownLatch(1);
58+
private AtomicBoolean open = new AtomicBoolean(false);
5159

5260
public UtowWebSocket(UtowContext ctx, WebSocketChannel channel) {
5361
this.ctx = ctx;
@@ -79,7 +87,7 @@ public UtowWebSocket(UtowContext ctx, WebSocketChannel channel) {
7987
}
8088

8189
@Override public boolean isOpen() {
82-
return channel.isOpen();
90+
return open.get() && channel.isOpen();
8391
}
8492

8593
@Nonnull @Override public WebSocket send(@Nonnull String message, boolean broadcast) {
@@ -142,7 +150,7 @@ public UtowWebSocket(UtowContext ctx, WebSocketChannel channel) {
142150
}
143151

144152
@Nonnull @Override public WebSocketConfigurer onClose(@Nonnull OnClose callback) {
145-
onCloseCallback = callback;
153+
onCloseCallback.set(callback);
146154
return this;
147155
}
148156

@@ -182,6 +190,7 @@ void fireConnect() {
182190
private void waitForConnect() {
183191
try {
184192
ready.await();
193+
open.set(true);
185194
} catch (InterruptedException x) {
186195
Thread.currentThread().interrupt();
187196
}
@@ -198,7 +207,7 @@ private void dispatch(Runnable runnable) {
198207
@Override protected void onError(WebSocketChannel channel, Throwable x) {
199208
// should close?
200209
if (Server.connectionLost(x) || SneakyThrows.isFatal(x)) {
201-
if (channel.isOpen()) {
210+
if (isOpen()) {
202211
handleClose(WebSocketCloseStatus.SERVER_ERROR);
203212
}
204213
}
@@ -222,20 +231,42 @@ private void dispatch(Runnable runnable) {
222231

223232
@Override protected void onCloseMessage(CloseMessage cm,
224233
WebSocketChannel channel) {
225-
if (channel.isOpen()) {
234+
if (isOpen()) {
226235
handleClose(WebSocketCloseStatus.valueOf(cm.getCode())
227236
.orElseGet(() -> new WebSocketCloseStatus(cm.getCode(), cm.getReason())));
228237
}
229238
}
230239

231-
private void handleClose(WebSocketCloseStatus closeStatus) {
240+
private void handleClose(WebSocketCloseStatus status) {
241+
OnClose callback = onCloseCallback.getAndSet(null);
242+
if (isOpen()) {
243+
// close socket:
244+
sendClose(status.getCode(), status.getReason(), channel,
245+
new WebSocketCallback<UtowWebSocket>() {
246+
@Override
247+
public void onError(final WebSocketChannel channel, final UtowWebSocket ws,
248+
final Throwable throwable) {
249+
ws.onError(channel, throwable);
250+
IoUtils.safeClose(channel);
251+
}
252+
253+
@Override
254+
public void complete(final WebSocketChannel channel, final UtowWebSocket ws) {
255+
IoUtils.safeClose(channel);
256+
}
257+
}, this);
258+
}
259+
open.set(false);
232260
try {
233-
if (onCloseCallback != null) {
234-
onCloseCallback.onClose(this, closeStatus);
261+
// fire callback:
262+
if (callback != null) {
263+
callback.onClose(this, status);
235264
}
236265
} catch (Throwable x) {
266+
// fire error:
237267
onError(channel, x);
238268
} finally {
269+
// clear from active sessions:
239270
removeSession(this);
240271
}
241272
}

0 commit comments

Comments
 (0)