Skip to content

Commit 60628f9

Browse files
committed
Send message to all connected web socket clients fix jooby-project#645
1 parent 7c9f59b commit 60628f9

File tree

6 files changed

+375
-10
lines changed

6 files changed

+375
-10
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package org.jooby.issues;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.util.Arrays;
6+
import java.util.LinkedList;
7+
import java.util.concurrent.CountDownLatch;
8+
import java.util.concurrent.TimeUnit;
9+
10+
import org.jooby.test.ServerFeature;
11+
import org.junit.After;
12+
import org.junit.Before;
13+
import org.junit.Test;
14+
15+
import com.ning.http.client.AsyncHttpClient;
16+
import com.ning.http.client.AsyncHttpClientConfig;
17+
import com.ning.http.client.ws.WebSocket;
18+
import com.ning.http.client.ws.WebSocketTextListener;
19+
import com.ning.http.client.ws.WebSocketUpgradeHandler;
20+
21+
public class Issue645 extends ServerFeature {
22+
23+
{
24+
ws("/ws", (ws) -> {
25+
26+
ws.onMessage(message -> {
27+
System.out.println(Thread.currentThread());
28+
ws.broadcast("=" + message.value(), () -> {
29+
System.out.println(Thread.currentThread());
30+
ws.close();
31+
});
32+
});
33+
});
34+
35+
}
36+
37+
private AsyncHttpClient client;
38+
39+
@Before
40+
public void before() {
41+
client = new AsyncHttpClient(new AsyncHttpClientConfig.Builder().build());
42+
}
43+
44+
@After
45+
public void after() {
46+
client.close();
47+
}
48+
49+
@Test
50+
public void sendText() throws Exception {
51+
LinkedList<String> messages = new LinkedList<>();
52+
53+
CountDownLatch latch = new CountDownLatch(1);
54+
55+
client.prepareGet(ws("ws").toString())
56+
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
57+
new WebSocketTextListener() {
58+
59+
@Override
60+
public void onMessage(final String message) {
61+
messages.add(message);
62+
latch.countDown();
63+
}
64+
65+
@Override
66+
public void onOpen(final WebSocket websocket) {
67+
websocket.sendMessage("hey!");
68+
}
69+
70+
@Override
71+
public void onClose(final WebSocket websocket) {
72+
}
73+
74+
@Override
75+
public void onError(final Throwable t) {
76+
}
77+
}).build())
78+
.get();
79+
if (latch.await(1L, TimeUnit.SECONDS)) {
80+
assertEquals(Arrays.asList("=hey!"), messages);
81+
}
82+
}
83+
84+
}

jooby/src/main/java/org/jooby/WebSocket.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -734,4 +734,56 @@ default void send(final Object data, final OnError err) throws Exception {
734734
*/
735735
void send(Object data, SuccessCallback success, OnError err) throws Exception;
736736

737+
/**
738+
* Send data to all connected sessions.
739+
*
740+
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
741+
*
742+
* @param data Data to send.
743+
* @throws Exception If something goes wrong.
744+
*/
745+
default void broadcast(final Object data) throws Exception {
746+
broadcast(data, SUCCESS, ERR);
747+
}
748+
749+
/**
750+
* Send data to all connected sessions.
751+
*
752+
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
753+
*
754+
* @param data Data to send.
755+
* @param success A success callback.
756+
* @throws Exception If something goes wrong.
757+
*/
758+
default void broadcast(final Object data, final SuccessCallback success) throws Exception {
759+
broadcast(data, success, ERR);
760+
}
761+
762+
/**
763+
* Send data to all connected sessions.
764+
*
765+
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
766+
*
767+
* @param data Data to send.
768+
* @param err An err callback.
769+
* @throws Exception If something goes wrong.
770+
*/
771+
default void broadcast(final Object data, final OnError err) throws Exception {
772+
broadcast(data, SUCCESS, err);
773+
;
774+
}
775+
776+
/**
777+
* Send data to all connected sessions.
778+
*
779+
* If the web socket is closed this method throw an {@link Err} with {@link #NORMAL} close status.
780+
*
781+
* @param data Data to send.
782+
* @param success A success callback.
783+
* @param err An err callback.
784+
* @throws Exception If something goes wrong.
785+
*/
786+
void broadcast(Object data, SuccessCallback success, OnError err)
787+
throws Exception;
788+
737789
}

jooby/src/main/java/org/jooby/internal/WebSocketImpl.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.Locale;
2727
import java.util.Map;
2828
import java.util.NoSuchElementException;
29+
import java.util.Queue;
30+
import java.util.concurrent.ConcurrentLinkedQueue;
2931

3032
import org.jooby.Err;
3133
import org.jooby.MediaType;
@@ -58,6 +60,9 @@ public class WebSocketImpl implements WebSocket {
5860
/** The logging system. */
5961
private final Logger log = LoggerFactory.getLogger(WebSocket.class);
6062

63+
/** All connected websocket. */
64+
private static final Queue<WebSocket> sessions = new ConcurrentLinkedQueue<>();
65+
6166
private Locale locale;
6267

6368
private String path;
@@ -103,6 +108,7 @@ public WebSocketImpl(final OnOpen handler, final String path,
103108

104109
@Override
105110
public void close(final CloseStatus status) {
111+
sessions.remove(this);
106112
synchronized (this) {
107113
open = false;
108114
ws.close(status.code(), status.reason());
@@ -111,6 +117,7 @@ public void close(final CloseStatus status) {
111117

112118
@Override
113119
public void resume() {
120+
sessions.add(this);
114121
synchronized (this) {
115122
if (suspended) {
116123
ws.resume();
@@ -121,6 +128,7 @@ public void resume() {
121128

122129
@Override
123130
public void pause() {
131+
sessions.remove(this);
124132
synchronized (this) {
125133
if (!suspended) {
126134
ws.pause();
@@ -131,6 +139,7 @@ public void pause() {
131139

132140
@Override
133141
public void terminate() throws Exception {
142+
sessions.remove(this);
134143
synchronized (this) {
135144
open = false;
136145
ws.terminate();
@@ -142,6 +151,18 @@ public boolean isOpen() {
142151
return open && ws.isOpen();
143152
}
144153

154+
@Override
155+
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
156+
throws Exception {
157+
for (WebSocket ws : sessions) {
158+
try {
159+
ws.send(data, success, err);
160+
} catch (Exception ex) {
161+
err.onError(ex);
162+
}
163+
}
164+
}
165+
145166
@Override
146167
public void send(final Object data, final SuccessCallback success, final OnError err)
147168
throws Exception {
@@ -160,7 +181,7 @@ public void send(final Object data, final SuccessCallback success, final OnError
160181
success,
161182
err).render(data);
162183
} else {
163-
throw new Err(WebSocket.NORMAL, "Cannot send message on closed web socket");
184+
throw new Err(WebSocket.NORMAL, "WebSocket is closed.");
164185
}
165186
}
166187
}
@@ -190,20 +211,24 @@ public void connect(final Injector injector, final Request req, final NativeWebS
190211
new StrParamReferenceImpl("body", "message", ImmutableList.of(message))))))
191212
.onFailure(this::handleErr));
192213

193-
ws.onCloseMessage((code, reason) -> Try
194-
.run(sync(() -> {
195-
this.open = false;
196-
if (closeCallback != null) {
197-
closeCallback.onClose(reason.map(r -> WebSocket.CloseStatus.of(code, r))
198-
.orElse(WebSocket.CloseStatus.of(code)));
199-
}
200-
closeCallback = null;
201-
})).onFailure(this::handleErr));
214+
ws.onCloseMessage((code, reason) -> {
215+
sessions.remove(this);
216+
217+
Try.run(sync(() -> {
218+
this.open = false;
219+
if (closeCallback != null) {
220+
closeCallback.onClose(reason.map(r -> WebSocket.CloseStatus.of(code, r))
221+
.orElse(WebSocket.CloseStatus.of(code)));
222+
}
223+
closeCallback = null;
224+
})).onFailure(this::handleErr);
225+
});
202226

203227
ws.onErrorMessage(this::handleErr);
204228

205229
// connect now
206230
try {
231+
sessions.add(this);
207232
handler.onOpen(req, this);
208233
} catch (Throwable ex) {
209234
handleErr(ex);

jooby/src/test/java/org/jooby/WebSocketTest.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ public void send(final Object data, final SuccessCallback success, final OnError
5656
throw new UnsupportedOperationException();
5757
}
5858

59+
@Override
60+
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
61+
throws Exception {
62+
throw new UnsupportedOperationException();
63+
}
64+
5965
@Override
6066
public void onMessage(final OnMessage<Mutant> callback) throws Exception {
6167
throw new UnsupportedOperationException();
@@ -260,6 +266,27 @@ public void send(final Object data, final SuccessCallback success, final OnError
260266
assertEquals(data, dataList.getFirst());
261267
}
262268

269+
@SuppressWarnings("resource")
270+
@Test
271+
public void broadcast() throws Exception {
272+
Object data = new Object();
273+
WebSocket.SuccessCallback SUCCESS_ = WebSocket.SUCCESS;
274+
WebSocket.OnError ERR_ = WebSocket.ERR;
275+
LinkedList<Object> dataList = new LinkedList<>();
276+
WebSocket ws = new WebSocketMock() {
277+
@Override
278+
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
279+
throws Exception {
280+
dataList.add(data);
281+
assertEquals(SUCCESS_, success);
282+
assertEquals(ERR_, err);
283+
}
284+
};
285+
ws.broadcast(data);
286+
assertTrue(dataList.size() > 0);
287+
assertEquals(data, dataList.getFirst());
288+
}
289+
263290
@SuppressWarnings("resource")
264291
@Test
265292
public void sendCustomSuccess() throws Exception {
@@ -282,6 +309,28 @@ public void send(final Object data, final SuccessCallback success, final OnError
282309
assertEquals(data, dataList.getFirst());
283310
}
284311

312+
@SuppressWarnings("resource")
313+
@Test
314+
public void broadcastCustomSuccess() throws Exception {
315+
Object data = new Object();
316+
WebSocket.SuccessCallback SUCCESS_ = () -> {
317+
};
318+
WebSocket.OnError ERR_ = WebSocket.ERR;
319+
LinkedList<Object> dataList = new LinkedList<>();
320+
WebSocket ws = new WebSocketMock() {
321+
@Override
322+
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
323+
throws Exception {
324+
dataList.add(data);
325+
assertEquals(SUCCESS_, success);
326+
assertEquals(ERR_, err);
327+
}
328+
};
329+
ws.broadcast(data, SUCCESS_);
330+
assertTrue(dataList.size() > 0);
331+
assertEquals(data, dataList.getFirst());
332+
}
333+
285334
@SuppressWarnings("resource")
286335
@Test
287336
public void sendCustomErr() throws Exception {
@@ -304,6 +353,28 @@ public void send(final Object data, final SuccessCallback success, final OnError
304353
assertEquals(data, dataList.getFirst());
305354
}
306355

356+
@SuppressWarnings("resource")
357+
@Test
358+
public void broadcastCustomErr() throws Exception {
359+
Object data = new Object();
360+
WebSocket.SuccessCallback SUCCESS_ = WebSocket.SUCCESS;
361+
WebSocket.OnError ERR_ = (ex) -> {
362+
};
363+
LinkedList<Object> dataList = new LinkedList<>();
364+
WebSocket ws = new WebSocketMock() {
365+
@Override
366+
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
367+
throws Exception {
368+
dataList.add(data);
369+
assertEquals(SUCCESS_, success);
370+
assertEquals(ERR_, err);
371+
}
372+
};
373+
ws.broadcast(data, ERR_);
374+
assertTrue(dataList.size() > 0);
375+
assertEquals(data, dataList.getFirst());
376+
}
377+
307378
@SuppressWarnings("resource")
308379
@Test
309380
public void sendCustomSuccessAndErr() throws Exception {
@@ -327,6 +398,29 @@ public void send(final Object data, final SuccessCallback success, final OnError
327398
assertEquals(data, dataList.getFirst());
328399
}
329400

401+
@SuppressWarnings("resource")
402+
@Test
403+
public void broadcastCustomSuccessAndErr() throws Exception {
404+
Object data = new Object();
405+
WebSocket.SuccessCallback SUCCESS_ = () -> {
406+
};
407+
WebSocket.OnError ERR_ = (ex) -> {
408+
};
409+
LinkedList<Object> dataList = new LinkedList<>();
410+
WebSocket ws = new WebSocketMock() {
411+
@Override
412+
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
413+
throws Exception {
414+
dataList.add(data);
415+
assertEquals(SUCCESS_, success);
416+
assertEquals(ERR_, err);
417+
}
418+
};
419+
ws.broadcast(data, SUCCESS_, ERR_);
420+
assertTrue(dataList.size() > 0);
421+
assertEquals(data, dataList.getFirst());
422+
}
423+
330424
@SuppressWarnings("resource")
331425
@Test
332426
public void getInstance() throws Exception {

0 commit comments

Comments
 (0)