Skip to content
This repository was archived by the owner on Mar 3, 2026. It is now read-only.

Commit 849bf6a

Browse files
committed
WebSockets: #isOpen fix jooby-project#574
* synchronized ws access
1 parent c35e10e commit 849bf6a

File tree

10 files changed

+321
-54
lines changed

10 files changed

+321
-54
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package org.jooby.issues;
2+
3+
import java.nio.channels.ClosedChannelException;
4+
import java.util.concurrent.CountDownLatch;
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.jooby.Err;
8+
import org.jooby.test.ServerFeature;
9+
import org.junit.After;
10+
import org.junit.Before;
11+
import org.junit.Test;
12+
13+
import com.ning.http.client.AsyncHttpClient;
14+
import com.ning.http.client.AsyncHttpClientConfig;
15+
import com.ning.http.client.ws.WebSocket;
16+
import com.ning.http.client.ws.WebSocketUpgradeHandler;
17+
18+
public class Issue636 extends ServerFeature {
19+
20+
private static CountDownLatch closeLatch;
21+
22+
{
23+
ws("/636", ws -> {
24+
25+
ws.onClose(status -> {
26+
try {
27+
ws.send("636");
28+
} catch (Err x) {
29+
closeLatch.countDown();
30+
}
31+
});
32+
});
33+
34+
}
35+
36+
private AsyncHttpClient client;
37+
38+
@Before
39+
public void before() {
40+
client = new AsyncHttpClient(new AsyncHttpClientConfig.Builder().build());
41+
}
42+
43+
@After
44+
public void after() {
45+
client.close();
46+
}
47+
48+
@Test
49+
public void sendClose() throws Exception {
50+
closeLatch = new CountDownLatch(1);
51+
52+
WebSocket ws = client.prepareGet(ws("636").toString())
53+
.execute(new WebSocketUpgradeHandler.Builder().build())
54+
.get();
55+
56+
ws.sendMessage("foo");
57+
Thread.sleep(100L);
58+
ws.close();
59+
Thread.sleep(100L);
60+
closeLatch.await(1L, TimeUnit.SECONDS);
61+
}
62+
63+
}

jooby-netty/src/main/java/org/jooby/internal/netty/NettyHandler.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
2222
import static java.util.Objects.requireNonNull;
2323

24-
import java.io.IOException;
25-
24+
import org.jooby.internal.ConnectionResetByPeer;
2625
import org.jooby.spi.HttpHandler;
2726
import org.slf4j.Logger;
2827
import org.slf4j.LoggerFactory;
@@ -114,7 +113,7 @@ public void channelReadComplete(final ChannelHandlerContext ctx) throws Exceptio
114113
@Override
115114
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
116115
try {
117-
if (connectionResetByPeer(cause)) {
116+
if (ConnectionResetByPeer.test(cause)) {
118117
log.trace("execution of: " + ctx.channel().attr(PATH).get() + " resulted in error", cause);
119118
} else {
120119
Attribute<NettyWebSocket> ws = ctx.channel().attr(NettyWebSocket.KEY);
@@ -131,11 +130,6 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau
131130

132131
}
133132

134-
private boolean connectionResetByPeer(final Throwable cause) {
135-
return cause instanceof IOException
136-
&& cause.getMessage().toLowerCase().contains("connection reset by peer");
137-
}
138-
139133
@Override
140134
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt)
141135
throws Exception {

jooby/src/main/java/org/jooby/Err.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,19 @@ public Err(final Status status, final String message, final Throwable cause) {
144144
* @param cause The cause of the problem.
145145
*/
146146
public Err(final int status, final String message, final Throwable cause) {
147-
this(Status.valueOf(status), message, cause);
147+
super(message("", status, message), cause);
148+
this.status = status;
149+
}
150+
151+
/**
152+
* Creates a new {@link Err}.
153+
*
154+
* @param status A web socket close status. Required.
155+
* @param message Close message.
156+
*/
157+
public Err(final WebSocket.CloseStatus status, String message) {
158+
super(message(status.reason(), status.code(), message));
159+
this.status = status.code();
148160
}
149161

150162
/**
@@ -251,7 +263,19 @@ public Map<String, Object> toMap() {
251263
* @return An error message.
252264
*/
253265
private static String message(final Status status, final String tail) {
254-
requireNonNull(status, "A HTTP Status is required.");
255-
return status.reason() + "(" + status.value() + ")" + (tail == null ? "" : ": " + tail);
266+
return message(status.reason(), status.value(), tail);
256267
}
268+
269+
/**
270+
* Build an error message using the HTTP status.
271+
*
272+
* @param reason Reason.
273+
* @param status The Status.
274+
* @param tail A message to append.
275+
* @return An error message.
276+
*/
277+
private static String message(final String reason, int status, final String tail) {
278+
return reason + "(" + status + ")" + (tail == null ? "" : ": " + tail);
279+
}
280+
257281
}

jooby/src/main/java/org/jooby/WebSocket.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static java.util.Objects.requireNonNull;
2222

2323
import java.io.Closeable;
24+
import java.nio.channels.ClosedChannelException;
2425
import java.util.Map;
2526
import java.util.Optional;
2627
import java.util.Set;
@@ -585,6 +586,13 @@ default void close() {
585586
close(NORMAL);
586587
}
587588

589+
/**
590+
* True if the websocket is still open.
591+
*
592+
* @return True if the websocket is still open.
593+
*/
594+
boolean isOpen();
595+
588596
/**
589597
* Gracefully closes the connection, after sending a description message
590598
*
@@ -612,6 +620,8 @@ default void close() {
612620
/**
613621
* Send data through the connection.
614622
*
623+
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
624+
*
615625
* @param data Data to send.
616626
* @throws Exception If something goes wrong.
617627
*/
@@ -622,6 +632,8 @@ default void send(final Object data) throws Exception {
622632
/**
623633
* Send data through the connection.
624634
*
635+
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
636+
*
625637
* @param data Data to send.
626638
* @param success A success callback.
627639
* @throws Exception If something goes wrong.
@@ -633,24 +645,26 @@ default void send(final Object data, final SuccessCallback success) throws Excep
633645
/**
634646
* Send data through the connection.
635647
*
648+
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
649+
*
636650
* @param data Data to send.
637651
* @param err An err callback.
638652
* @throws Exception If something goes wrong.
639653
*/
640-
default void send(final Object data, final ErrCallback err)
641-
throws Exception {
654+
default void send(final Object data, final ErrCallback err) throws Exception {
642655
send(data, SUCCESS, err);
643656
}
644657

645658
/**
646659
* Send data through the connection.
647660
*
661+
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
662+
*
648663
* @param data Data to send.
649664
* @param success A success callback.
650665
* @param err An err callback.
651666
* @throws Exception If something goes wrong.
652667
*/
653-
void send(Object data, SuccessCallback success, ErrCallback err)
654-
throws Exception;
668+
void send(Object data, SuccessCallback success, ErrCallback err) throws Exception;
655669

656670
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.jooby.internal;
2+
3+
import static javaslang.Predicates.instanceOf;
4+
5+
import java.io.IOException;
6+
import java.util.Objects;
7+
import java.util.Optional;
8+
9+
public class ConnectionResetByPeer {
10+
11+
public static boolean test(final Throwable cause) {
12+
return Optional.ofNullable(cause)
13+
.filter(instanceOf(IOException.class))
14+
.map(x -> x.getMessage())
15+
.filter(Objects::nonNull)
16+
.map(message -> message.toLowerCase().contains("connection reset by peer"))
17+
.orElse(false);
18+
}
19+
}

0 commit comments

Comments
 (0)