222222
223223import java .nio .channels .ClosedChannelException ;
224224import java .nio .charset .StandardCharsets ;
225+ import java .util .Collections ;
225226import java .util .List ;
226227import java .util .Locale ;
227228import java .util .Map ;
228229import java .util .NoSuchElementException ;
229- import java .util .Queue ;
230- import java .util .concurrent .ConcurrentLinkedQueue ;
230+ import java .util .Optional ;
231+ import java .util .concurrent .ConcurrentHashMap ;
232+ import java .util .concurrent .ConcurrentMap ;
233+ import java .util .concurrent .CopyOnWriteArrayList ;
231234
232235@ SuppressWarnings ("unchecked" )
233236public class WebSocketImpl implements WebSocket {
@@ -243,7 +246,7 @@ public class WebSocketImpl implements WebSocket {
243246 private final Logger log = LoggerFactory .getLogger (WebSocket .class );
244247
245248 /** All connected websocket. */
246- private static final Queue < WebSocket > sessions = new ConcurrentLinkedQueue <>();
249+ private static final ConcurrentMap < String , List < WebSocket >> sessions = new ConcurrentHashMap <>();
247250
248251 private Locale locale ;
249252
@@ -290,7 +293,7 @@ public WebSocketImpl(final OnOpen handler, final String path,
290293
291294 @ Override
292295 public void close (final CloseStatus status ) {
293- sessions . remove (this );
296+ removeSession (this );
294297 synchronized (this ) {
295298 open = false ;
296299 ws .close (status .code (), status .reason ());
@@ -299,7 +302,7 @@ public void close(final CloseStatus status) {
299302
300303 @ Override
301304 public void resume () {
302- sessions . add (this );
305+ addSession (this );
303306 synchronized (this ) {
304307 if (suspended ) {
305308 ws .resume ();
@@ -310,7 +313,7 @@ public void resume() {
310313
311314 @ Override
312315 public void pause () {
313- sessions . remove (this );
316+ removeSession (this );
314317 synchronized (this ) {
315318 if (!suspended ) {
316319 ws .pause ();
@@ -321,7 +324,7 @@ public void pause() {
321324
322325 @ Override
323326 public void terminate () throws Exception {
324- sessions . remove (this );
327+ removeSession (this );
325328 synchronized (this ) {
326329 open = false ;
327330 ws .terminate ();
@@ -336,7 +339,7 @@ public boolean isOpen() {
336339 @ Override
337340 public void broadcast (final Object data , final SuccessCallback success , final OnError err )
338341 throws Exception {
339- for (WebSocket ws : sessions ) {
342+ for (WebSocket ws : sessions . getOrDefault ( this . pattern , Collections . emptyList ()) ) {
340343 try {
341344 ws .send (data , success , err );
342345 } catch (Exception ex ) {
@@ -394,7 +397,7 @@ public void connect(final Injector injector, final Request req, final NativeWebS
394397 .onFailure (this ::handleErr ));
395398
396399 ws .onCloseMessage ((code , reason ) -> {
397- sessions . remove (this );
400+ removeSession (this );
398401
399402 Try .run (sync (() -> {
400403 this .open = false ;
@@ -410,7 +413,7 @@ public void connect(final Injector injector, final Request req, final NativeWebS
410413
411414 // connect now
412415 try {
413- sessions . add (this );
416+ addSession (this );
414417 handler .onOpen (req , this );
415418 } catch (Throwable ex ) {
416419 handleErr (ex );
@@ -515,5 +518,11 @@ private Throwing.Runnable sync(final Throwing.Runnable task) {
515518 };
516519 }
517520
518- ;
521+ private static void addSession (WebSocketImpl ws ) {
522+ sessions .computeIfAbsent (ws .pattern , k -> new CopyOnWriteArrayList <>()).add (ws );
523+ }
524+
525+ private static void removeSession (WebSocketImpl ws ) {
526+ Optional .ofNullable (sessions .get (ws .pattern )).ifPresent (list -> list .remove (ws ));
527+ }
519528}
0 commit comments