1313import java .util .HashSet ;
1414import java .util .Iterator ;
1515import java .util .Set ;
16+ import java .util .concurrent .Callable ;
17+ import java .util .concurrent .ExecutorService ;
18+ import java .util .concurrent .Executors ;
1619
1720import org .java_websocket .drafts .Draft ;
1821import org .java_websocket .framing .CloseFrame ;
2730 */
2831public abstract class WebSocketServer extends WebSocketAdapter implements Runnable {
2932
30- // INSTANCE PROPERTIES /////////////////////////////////////////////////////
33+ public int DECODERS = Runtime .getRuntime ().availableProcessors ();
34+
3135 /**
32- * Holds the list of active WebSocket connections. "Active" means WebSocket
36+ * Holds the list of active_websocktes WebSocket connections. "Active" means WebSocket
3337 * handshake is complete and socket can be written to, or read from.
3438 */
3539 private final Set <WebSocket > connections = new HashSet <WebSocket >();
@@ -51,7 +55,13 @@ public abstract class WebSocketServer extends WebSocketAdapter implements Runnab
5155 */
5256 private Draft draft ;
5357
54- private Thread thread ;
58+ private Thread selectorthread ;
59+
60+ private ExecutorService decoders = Executors .newFixedThreadPool ( DECODERS );
61+ private ExecutorService flusher = Executors .newSingleThreadExecutor ();
62+
63+ private Set <WebSocket > active_websocktes = new HashSet <WebSocket >();
64+ private Set <WebSocket > write_demands = new HashSet <WebSocket >();
5565
5666 // CONSTRUCTORS ////////////////////////////////////////////////////////////
5767 /**
@@ -88,20 +98,20 @@ public WebSocketServer( InetSocketAddress address , Draft draft ) {
8898 }
8999
90100 /**
91- * Starts the server thread that binds to the currently set port number and
101+ * Starts the server selectorthread that binds to the currently set port number and
92102 * listeners for WebSocket connection requests.
93103 *
94104 * @throws IllegalStateException
95105 */
96106 public void start () {
97- if ( thread != null )
107+ if ( selectorthread != null )
98108 throw new IllegalStateException ( "Already started" );
99109 new Thread ( this ).start ();
100110 }
101111
102112 /**
103113 * Closes all connected clients sockets, then closes the underlying
104- * ServerSocketChannel, effectively killing the server socket thread and
114+ * ServerSocketChannel, effectively killing the server socket selectorthread and
105115 * freeing the port the server was bound to.
106116 *
107117 * @throws IOException
@@ -113,7 +123,7 @@ public void stop() throws IOException {
113123 ws .close ( CloseFrame .NORMAL );
114124 }
115125 }
116- thread .interrupt ();
126+ selectorthread .interrupt ();
117127 this .server .close ();
118128
119129 }
@@ -158,9 +168,9 @@ public Draft getDraft() {
158168
159169 // Runnable IMPLEMENTATION /////////////////////////////////////////////////
160170 public void run () {
161- if ( thread != null )
171+ if ( selectorthread != null )
162172 throw new IllegalStateException ( "This instance of " + getClass ().getSimpleName () + " can only be started once the same time." );
163- thread = Thread .currentThread ();
173+ selectorthread = Thread .currentThread ();
164174 try {
165175 server = ServerSocketChannel .open ();
166176 server .configureBlocking ( false );
@@ -172,67 +182,82 @@ public void run() {
172182 onWebsocketError ( null , ex );
173183 return ;
174184 }
185+ try {
186+ while ( !selectorthread .isInterrupted () ) {
187+ SelectionKey key = null ;
188+ WebSocket conn = null ;
189+ try {
190+ selector .select ();
191+ Set <SelectionKey > keys = selector .selectedKeys ();
192+ Iterator <SelectionKey > i = keys .iterator ();
193+
194+ while ( i .hasNext () ) {
195+ key = i .next ();
196+
197+ i .remove ();
198+ if ( !key .isValid () ) {
199+ continue ;
200+ }
175201
176- while ( !thread .isInterrupted () ) {
177- SelectionKey key = null ;
178- WebSocket conn = null ;
179- try {
180- selector .select ();
181- Set <SelectionKey > keys = selector .selectedKeys ();
182- Iterator <SelectionKey > i = keys .iterator ();
183-
184- while ( i .hasNext () ) {
185- key = i .next ();
186-
187- // Remove the current key
188- i .remove ();
189-
190- // if isAcceptable == true
191- // then a client required a connection
192- if ( key .isAcceptable () ) {
193- SocketChannel client = server .accept ();
194- client .configureBlocking ( false );
195- WebSocket c = new WebSocket ( this , Collections .singletonList ( draft ), client );
196- client .register ( selector , SelectionKey .OP_READ , c );
197- }
202+ if ( key .isAcceptable () ) {
203+ SocketChannel client = server .accept ();
204+ client .configureBlocking ( false );
205+ WebSocket c = new WebSocket ( this , Collections .singletonList ( draft ), client );
206+ client .register ( selector , SelectionKey .OP_READ , c );
207+ }
198208
199- // if isReadable == true
200- // then the server is ready to read
201- if ( key .isReadable () ) {
202- conn = (WebSocket ) key .attachment ();
203- conn .handleRead ();
204- }
209+ // if isReadable == true
210+ // then the server is ready to read
211+ if ( key .isReadable () ) {
212+ conn = (WebSocket ) key .attachment ();
213+ asyncQueueRead ( conn );
214+ // conn.handleRead();
215+ }
205216
206- // if isWritable == true
207- // then we need to send the rest of the data to the client
208- if ( key .isValid () && key .isWritable () ) {
209- conn = (WebSocket ) key .attachment ();
210- conn .flush ();
211- key .channel ().register ( selector , SelectionKey .OP_READ , conn );
212- }
213- }
214- synchronized ( connections ) {
215- Iterator <WebSocket > it = this .connections .iterator ();
216- while ( it .hasNext () ) {
217- // We have to do this check here, and not in the thread that
218- // adds the buffered data to the WebSocket, because the
219- // Selector is not thread-safe, and can only be accessed
220- // by this thread.
221- conn = it .next ();
222- if ( conn .hasBufferedData () ) {
217+ // if isWritable == true
218+ // then we need to send the rest of the data to the client
219+ /*if( key.isValid() && key.isWritable() ) {
220+ conn = (WebSocket) key.attachment();
223221 conn.flush();
224- // key.channel().register( selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE , conn );
225- }
222+ key.channel().register( selector, SelectionKey.OP_READ, conn );
223+ }*/
226224 }
227- }
228- } catch ( IOException ex ) {
229- if ( key != null )
230- key .cancel ();
231- onWebsocketError ( conn , ex );// conn may be null here
232- if ( conn != null ) {
233- conn .close ( CloseFrame .ABNROMAL_CLOSE );
225+ /*synchronized ( connections ) {
226+ Iterator<WebSocket> it = this.connections.iterator();
227+ while ( it.hasNext() ) {
228+ // We have to do this check here, and not in the selectorthread that
229+ // adds the buffered data to the WebSocket, because the
230+ // Selector is not selectorthread-safe, and can only be accessed
231+ // by this selectorthread.
232+ conn = it.next();
233+ if( conn.hasBufferedData() ) {
234+ conn.flush();
235+ // key.channel().register( selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, conn );
236+ }
237+ }
238+ }*/
239+ } catch ( IOException ex ) {
240+ if ( key != null )
241+ key .cancel ();
242+ handleIOException ( conn , ex );
234243 }
235244 }
245+ } catch ( RuntimeException e ) {
246+ // should hopefully never occur
247+ onError ( null , e );
248+ try {
249+ selector .close ();
250+ } catch ( IOException e1 ) {
251+ onError ( null , e1 );
252+ }
253+ decoders .shutdown ();
254+ }
255+ }
256+
257+ private void handleIOException ( WebSocket conn , IOException ex ) {
258+ onWebsocketError ( conn , ex );// conn may be null here
259+ if ( conn != null ) {
260+ conn .close ( CloseFrame .ABNROMAL_CLOSE );
236261 }
237262 }
238263
@@ -292,7 +317,17 @@ public final void onWebsocketError( WebSocket conn, Exception ex ) {
292317
293318 @ Override
294319 public final void onWriteDemand ( WebSocket conn ) {
295- selector .wakeup ();
320+ try {
321+ conn .flush ();
322+ } catch ( IOException e ) {
323+ handleIOException ( conn , e );
324+ }
325+ /*synchronized ( write_demands ) {
326+ if( !write_demands.contains( conn ) ) {
327+ write_demands.add( conn );
328+ flusher.submit( new WebsocketWriteTask( conn ) );
329+ }
330+ }*/
296331 }
297332
298333 // ABTRACT METHODS /////////////////////////////////////////////////////////
@@ -303,4 +338,62 @@ public final void onWriteDemand( WebSocket conn ) {
303338 public void onMessage ( WebSocket conn , ByteBuffer message ) {
304339 };
305340
341+ private boolean asyncQueueRead ( WebSocket ws ) {
342+ synchronized ( active_websocktes ) {
343+ if ( active_websocktes .contains ( ws ) ) {
344+ return false ;
345+ }
346+ active_websocktes .add ( ws );// will add ws only if it is not already added
347+ decoders .submit ( new WebsocketReadTask ( ws ) );
348+ return true ;
349+ }
350+ }
351+ class WebsocketReadTask implements Callable <Boolean > {
352+
353+ private WebSocket ws ;
354+
355+ private WebsocketReadTask ( WebSocket ws ) {
356+ this .ws = ws ;
357+ }
358+
359+ @ Override
360+ public Boolean call () throws Exception {
361+ try {
362+ ws .handleRead ();
363+ } catch ( IOException e ) {
364+ e .printStackTrace ();
365+ return false ;
366+ } finally {
367+ synchronized ( active_websocktes ) {
368+ active_websocktes .remove ( ws );
369+ }
370+ selector .wakeup ();
371+ }
372+ return true ;
373+ }
374+ }
375+
376+ class WebsocketWriteTask implements Callable <Boolean > {
377+
378+ private WebSocket ws ;
379+
380+ private WebsocketWriteTask ( WebSocket ws ) {
381+ this .ws = ws ;
382+ }
383+
384+ @ Override
385+ public Boolean call () throws Exception {
386+ try {
387+ ws .flush ();
388+ } catch ( IOException e ) {
389+ handleIOException ( ws , e );
390+ } finally {
391+ }
392+ synchronized ( write_demands ) {
393+ write_demands .remove ( ws );
394+ }
395+ return true ;
396+ }
397+ }
398+
306399}
0 commit comments