1212import java .util .Collections ;
1313import java .util .HashSet ;
1414import java .util .Iterator ;
15+ import java .util .List ;
1516import java .util .Set ;
1617import java .util .concurrent .Callable ;
1718import java .util .concurrent .ExecutorService ;
1819import java .util .concurrent .Executors ;
20+ import java .util .concurrent .ThreadFactory ;
1921
2022import org .java_websocket .drafts .Draft ;
2123import org .java_websocket .framing .CloseFrame ;
3032 */
3133public abstract class WebSocketServer extends WebSocketAdapter implements Runnable {
3234
33- public int DECODERS = Runtime . getRuntime (). availableProcessors () ;
35+ public static int DECODERS = 1 ;
3436
3537 /**
3638 * Holds the list of active WebSocket connections. "Active" means WebSocket
@@ -53,12 +55,11 @@ public abstract class WebSocketServer extends WebSocketAdapter implements Runnab
5355 /**
5456 * The Draft of the WebSocket protocol the Server is adhering to.
5557 */
56- private Draft draft ;
58+ private List < Draft > drafts ;
5759
5860 private Thread selectorthread ;
5961
60- private ExecutorService decoders = Executors .newFixedThreadPool ( DECODERS );
61- // private ExecutorService flusher = Executors.newSingleThreadExecutor();
62+ private ExecutorService decoders ;
6263
6364 private Set <WebSocket > active_websocktes = new HashSet <WebSocket >();
6465 // private Set<WebSocket> write_demands = new HashSet<WebSocket>();
@@ -69,7 +70,7 @@ public abstract class WebSocketServer extends WebSocketAdapter implements Runnab
6970 * listen on port <var>WebSocket.DEFAULT_PORT</var>.
7071 */
7172 public WebSocketServer () throws UnknownHostException {
72- this ( new InetSocketAddress ( WebSocket .DEFAULT_PORT ), null );
73+ this ( new InetSocketAddress ( WebSocket .DEFAULT_PORT ), DECODERS , null );
7374 }
7475
7576 /**
@@ -79,7 +80,11 @@ public WebSocketServer() throws UnknownHostException {
7980 * The address (host:port) this server should listen on.
8081 */
8182 public WebSocketServer ( InetSocketAddress address ) {
82- this ( address , null );
83+ this ( address , DECODERS , null );
84+ }
85+
86+ public WebSocketServer ( InetSocketAddress address , int decoders ) {
87+ this ( address , decoders , null );
8388 }
8489
8590 /**
@@ -92,9 +97,17 @@ public WebSocketServer( InetSocketAddress address ) {
9297 * The version of the WebSocket protocol that this server
9398 * instance should comply to.
9499 */
95- public WebSocketServer ( InetSocketAddress address , Draft draft ) {
96- this .draft = draft ;
100+ public WebSocketServer ( InetSocketAddress address , List <Draft > drafts ) {
101+ this ( address , DECODERS , drafts );
102+ }
103+
104+ public WebSocketServer ( InetSocketAddress address , int decodercount , List <Draft > drafts ) {
105+ if ( drafts == null )
106+ this .drafts = Collections .emptyList ();
107+ else
108+ this .drafts = drafts ;
97109 setAddress ( address );
110+ this .decoders = Executors .newFixedThreadPool ( decodercount , new WebsocketThreadFactory () );
98111 }
99112
100113 /**
@@ -165,8 +178,8 @@ public int getPort() {
165178 return getAddress ().getPort ();
166179 }
167180
168- public Draft getDraft () {
169- return this . draft ;
181+ public List < Draft > getDraft () {
182+ return Collections . unmodifiableList ( drafts ) ;
170183 }
171184
172185 // Runnable IMPLEMENTATION /////////////////////////////////////////////////
@@ -205,7 +218,7 @@ public void run() {
205218 if ( key .isAcceptable () ) {
206219 SocketChannel client = server .accept ();
207220 client .configureBlocking ( false );
208- WebSocket c = new WebSocket ( this , Collections . singletonList ( draft ) , client );
221+ WebSocket c = new WebSocket ( this , drafts , client );
209222 client .register ( selector , SelectionKey .OP_READ , c );
210223 i .remove ();
211224 } else if ( key .isReadable () ) {
@@ -302,10 +315,12 @@ public final void onWebsocketError( WebSocket conn, Exception ex ) {
302315
303316 @ Override
304317 public final void onWriteDemand ( WebSocket conn ) {
305- try {
306- conn .flush ();
307- } catch ( IOException e ) {
308- handleIOException ( conn , e );
318+ if ( Thread .currentThread () instanceof WebsocketExecutorThread == false ) {
319+ try {
320+ conn .flush ();
321+ } catch ( IOException e ) {
322+ handleIOException ( conn , e );
323+ }
309324 }
310325 /*synchronized ( write_demands ) {
311326 if( !write_demands.contains( conn ) ) {
@@ -326,14 +341,16 @@ public void onMessage( WebSocket conn, ByteBuffer message ) {
326341 private boolean asyncQueueRead ( WebSocket ws ) {
327342 synchronized ( active_websocktes ) {
328343 if ( active_websocktes .contains ( ws ) ) {
344+ System .out .println ( "queue failed" );
329345 return false ;
330346 }
331347 active_websocktes .add ( ws );// will add ws only if it is not already added
332348 decoders .submit ( new WebsocketReadTask ( ws ) );
333349 return true ;
334350 }
335351 }
336- class WebsocketReadTask implements Callable <Boolean > {
352+
353+ private class WebsocketReadTask implements Callable <Boolean > {
337354
338355 private WebSocket ws ;
339356
@@ -345,6 +362,8 @@ private WebsocketReadTask( WebSocket ws ) {
345362 public Boolean call () throws Exception {
346363 try {
347364 ws .handleRead ();
365+ ws .flush ();
366+ System .out .println ( "Assync Complete" );
348367 } catch ( IOException e ) {
349368 handleIOException ( ws , e );
350369 } catch ( RuntimeException e ) {
@@ -358,4 +377,24 @@ public Boolean call() throws Exception {
358377 return true ;
359378 }
360379 }
380+
381+ private class WebsocketThreadFactory implements ThreadFactory {
382+
383+ @ Override
384+ public Thread newThread ( Runnable r ) {
385+ return new WebsocketExecutorThread ( r );
386+ }
387+ }
388+
389+ private class WebsocketExecutorThread extends Thread {
390+ private Runnable r ;
391+ WebsocketExecutorThread ( Runnable r ) {
392+ this .r = r ;
393+
394+ }
395+ @ Override
396+ public void run () {
397+ r .run ();
398+ }
399+ }
361400}
0 commit comments