Skip to content
This repository was archived by the owner on Mar 3, 2026. It is now read-only.

Commit 4963173

Browse files
committed
ws.broadcast broadcasts to all clients instead of broadcasting to all clients on a specific path/route fix jooby-project#889
1 parent 37a9c97 commit 4963173

File tree

3 files changed

+21
-13
lines changed

3 files changed

+21
-13
lines changed

jooby/src/main/java/org/jooby/WebSocket.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -962,7 +962,6 @@ default void broadcast(final Object data, final SuccessCallback success) throws
962962
*/
963963
default void broadcast(final Object data, final OnError err) throws Exception {
964964
broadcast(data, SUCCESS, err);
965-
;
966965
}
967966

968967
/**

jooby/src/main/java/org/jooby/internal/WebSocketImpl.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,15 @@
222222

223223
import java.nio.channels.ClosedChannelException;
224224
import java.nio.charset.StandardCharsets;
225+
import java.util.Collections;
225226
import java.util.List;
226227
import java.util.Locale;
227228
import java.util.Map;
228229
import java.util.NoSuchElementException;
229-
import java.util.Queue;
230-
import java.util.concurrent.ConcurrentLinkedQueue;
230+
import java.util.Optional;
231+
import java.util.concurrent.ConcurrentHashMap;
232+
import java.util.concurrent.ConcurrentMap;
233+
import java.util.concurrent.CopyOnWriteArrayList;
231234

232235
@SuppressWarnings("unchecked")
233236
public class WebSocketImpl implements WebSocket {
@@ -243,7 +246,7 @@ public class WebSocketImpl implements WebSocket {
243246
private final Logger log = LoggerFactory.getLogger(WebSocket.class);
244247

245248
/** All connected websocket. */
246-
private static final Queue<WebSocket> sessions = new ConcurrentLinkedQueue<>();
249+
private static final ConcurrentMap<String, List<WebSocket>> sessions = new ConcurrentHashMap<>();
247250

248251
private Locale locale;
249252

@@ -290,7 +293,7 @@ public WebSocketImpl(final OnOpen handler, final String path,
290293

291294
@Override
292295
public void close(final CloseStatus status) {
293-
sessions.remove(this);
296+
removeSession(this);
294297
synchronized (this) {
295298
open = false;
296299
ws.close(status.code(), status.reason());
@@ -299,7 +302,7 @@ public void close(final CloseStatus status) {
299302

300303
@Override
301304
public void resume() {
302-
sessions.add(this);
305+
addSession(this);
303306
synchronized (this) {
304307
if (suspended) {
305308
ws.resume();
@@ -310,7 +313,7 @@ public void resume() {
310313

311314
@Override
312315
public void pause() {
313-
sessions.remove(this);
316+
removeSession(this);
314317
synchronized (this) {
315318
if (!suspended) {
316319
ws.pause();
@@ -321,7 +324,7 @@ public void pause() {
321324

322325
@Override
323326
public void terminate() throws Exception {
324-
sessions.remove(this);
327+
removeSession(this);
325328
synchronized (this) {
326329
open = false;
327330
ws.terminate();
@@ -336,7 +339,7 @@ public boolean isOpen() {
336339
@Override
337340
public void broadcast(final Object data, final SuccessCallback success, final OnError err)
338341
throws Exception {
339-
for (WebSocket ws : sessions) {
342+
for (WebSocket ws : sessions.getOrDefault(this.pattern, Collections.emptyList())) {
340343
try {
341344
ws.send(data, success, err);
342345
} catch (Exception ex) {
@@ -394,7 +397,7 @@ public void connect(final Injector injector, final Request req, final NativeWebS
394397
.onFailure(this::handleErr));
395398

396399
ws.onCloseMessage((code, reason) -> {
397-
sessions.remove(this);
400+
removeSession(this);
398401

399402
Try.run(sync(() -> {
400403
this.open = false;
@@ -410,7 +413,7 @@ public void connect(final Injector injector, final Request req, final NativeWebS
410413

411414
// connect now
412415
try {
413-
sessions.add(this);
416+
addSession(this);
414417
handler.onOpen(req, this);
415418
} catch (Throwable ex) {
416419
handleErr(ex);
@@ -515,5 +518,11 @@ private Throwing.Runnable sync(final Throwing.Runnable task) {
515518
};
516519
}
517520

518-
;
521+
private static void addSession(WebSocketImpl ws) {
522+
sessions.computeIfAbsent(ws.pattern, k -> new CopyOnWriteArrayList<>()).add(ws);
523+
}
524+
525+
private static void removeSession(WebSocketImpl ws) {
526+
Optional.ofNullable(sessions.get(ws.pattern)).ifPresent(list-> list.remove(ws));
527+
}
519528
}

jooby/src/test/java/org/jooby/internal/WebSocketImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public void sendString() throws Exception {
124124
public void resetSessions() throws Exception {
125125
Field field = WebSocketImpl.class.getDeclaredField("sessions");
126126
field.setAccessible(true);
127-
Queue<WebSocket> sessions = (Queue<WebSocket>) field.get(null);
127+
Map<String, List<WebSocket>> sessions = (Map<String, List<WebSocket>>) field.get(null);
128128
sessions.clear();
129129
}
130130

0 commit comments

Comments
 (0)