Skip to content

Commit c944701

Browse files
committed
enabled the server to decode using several threads
started some experiments on how to write out data the best way
1 parent 9064159 commit c944701

File tree

1 file changed

+157
-64
lines changed

1 file changed

+157
-64
lines changed

src/org/java_websocket/WebSocketServer.java

Lines changed: 157 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
import java.util.HashSet;
1414
import java.util.Iterator;
1515
import java.util.Set;
16+
import java.util.concurrent.Callable;
17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.Executors;
1619

1720
import org.java_websocket.drafts.Draft;
1821
import org.java_websocket.framing.CloseFrame;
@@ -27,9 +30,10 @@
2730
*/
2831
public abstract class WebSocketServer extends WebSocketAdapter implements Runnable {
2932

30-
// INSTANCE PROPERTIES /////////////////////////////////////////////////////
33+
public int DECODERS = Runtime.getRuntime().availableProcessors();
34+
3135
/**
32-
* Holds the list of active WebSocket connections. "Active" means WebSocket
36+
* Holds the list of active_websocktes WebSocket connections. "Active" means WebSocket
3337
* handshake is complete and socket can be written to, or read from.
3438
*/
3539
private final Set<WebSocket> connections = new HashSet<WebSocket>();
@@ -51,7 +55,13 @@ public abstract class WebSocketServer extends WebSocketAdapter implements Runnab
5155
*/
5256
private Draft draft;
5357

54-
private Thread thread;
58+
private Thread selectorthread;
59+
60+
private ExecutorService decoders = Executors.newFixedThreadPool( DECODERS );
61+
private ExecutorService flusher = Executors.newSingleThreadExecutor();
62+
63+
private Set<WebSocket> active_websocktes = new HashSet<WebSocket>();
64+
private Set<WebSocket> write_demands = new HashSet<WebSocket>();
5565

5666
// CONSTRUCTORS ////////////////////////////////////////////////////////////
5767
/**
@@ -88,20 +98,20 @@ public WebSocketServer( InetSocketAddress address , Draft draft ) {
8898
}
8999

90100
/**
91-
* Starts the server thread that binds to the currently set port number and
101+
* Starts the server selectorthread that binds to the currently set port number and
92102
* listeners for WebSocket connection requests.
93103
*
94104
* @throws IllegalStateException
95105
*/
96106
public void start() {
97-
if( thread != null )
107+
if( selectorthread != null )
98108
throw new IllegalStateException( "Already started" );
99109
new Thread( this ).start();
100110
}
101111

102112
/**
103113
* Closes all connected clients sockets, then closes the underlying
104-
* ServerSocketChannel, effectively killing the server socket thread and
114+
* ServerSocketChannel, effectively killing the server socket selectorthread and
105115
* freeing the port the server was bound to.
106116
*
107117
* @throws IOException
@@ -113,7 +123,7 @@ public void stop() throws IOException {
113123
ws.close( CloseFrame.NORMAL );
114124
}
115125
}
116-
thread.interrupt();
126+
selectorthread.interrupt();
117127
this.server.close();
118128

119129
}
@@ -158,9 +168,9 @@ public Draft getDraft() {
158168

159169
// Runnable IMPLEMENTATION /////////////////////////////////////////////////
160170
public void run() {
161-
if( thread != null )
171+
if( selectorthread != null )
162172
throw new IllegalStateException( "This instance of " + getClass().getSimpleName() + " can only be started once the same time." );
163-
thread = Thread.currentThread();
173+
selectorthread = Thread.currentThread();
164174
try {
165175
server = ServerSocketChannel.open();
166176
server.configureBlocking( false );
@@ -172,67 +182,82 @@ public void run() {
172182
onWebsocketError( null, ex );
173183
return;
174184
}
185+
try {
186+
while ( !selectorthread.isInterrupted() ) {
187+
SelectionKey key = null;
188+
WebSocket conn = null;
189+
try {
190+
selector.select();
191+
Set<SelectionKey> keys = selector.selectedKeys();
192+
Iterator<SelectionKey> i = keys.iterator();
193+
194+
while ( i.hasNext() ) {
195+
key = i.next();
196+
197+
i.remove();
198+
if( !key.isValid() ) {
199+
continue;
200+
}
175201

176-
while ( !thread.isInterrupted() ) {
177-
SelectionKey key = null;
178-
WebSocket conn = null;
179-
try {
180-
selector.select();
181-
Set<SelectionKey> keys = selector.selectedKeys();
182-
Iterator<SelectionKey> i = keys.iterator();
183-
184-
while ( i.hasNext() ) {
185-
key = i.next();
186-
187-
// Remove the current key
188-
i.remove();
189-
190-
// if isAcceptable == true
191-
// then a client required a connection
192-
if( key.isAcceptable() ) {
193-
SocketChannel client = server.accept();
194-
client.configureBlocking( false );
195-
WebSocket c = new WebSocket( this, Collections.singletonList( draft ), client );
196-
client.register( selector, SelectionKey.OP_READ, c );
197-
}
202+
if( key.isAcceptable() ) {
203+
SocketChannel client = server.accept();
204+
client.configureBlocking( false );
205+
WebSocket c = new WebSocket( this, Collections.singletonList( draft ), client );
206+
client.register( selector, SelectionKey.OP_READ, c );
207+
}
198208

199-
// if isReadable == true
200-
// then the server is ready to read
201-
if( key.isReadable() ) {
202-
conn = (WebSocket) key.attachment();
203-
conn.handleRead();
204-
}
209+
// if isReadable == true
210+
// then the server is ready to read
211+
if( key.isReadable() ) {
212+
conn = (WebSocket) key.attachment();
213+
asyncQueueRead( conn );
214+
// conn.handleRead();
215+
}
205216

206-
// if isWritable == true
207-
// then we need to send the rest of the data to the client
208-
if( key.isValid() && key.isWritable() ) {
209-
conn = (WebSocket) key.attachment();
210-
conn.flush();
211-
key.channel().register( selector, SelectionKey.OP_READ, conn );
212-
}
213-
}
214-
synchronized ( connections ) {
215-
Iterator<WebSocket> it = this.connections.iterator();
216-
while ( it.hasNext() ) {
217-
// We have to do this check here, and not in the thread that
218-
// adds the buffered data to the WebSocket, because the
219-
// Selector is not thread-safe, and can only be accessed
220-
// by this thread.
221-
conn = it.next();
222-
if( conn.hasBufferedData() ) {
217+
// if isWritable == true
218+
// then we need to send the rest of the data to the client
219+
/*if( key.isValid() && key.isWritable() ) {
220+
conn = (WebSocket) key.attachment();
223221
conn.flush();
224-
// key.channel().register( selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, conn );
225-
}
222+
key.channel().register( selector, SelectionKey.OP_READ, conn );
223+
}*/
226224
}
227-
}
228-
} catch ( IOException ex ) {
229-
if( key != null )
230-
key.cancel();
231-
onWebsocketError( conn, ex );// conn may be null here
232-
if( conn != null ) {
233-
conn.close( CloseFrame.ABNROMAL_CLOSE );
225+
/*synchronized ( connections ) {
226+
Iterator<WebSocket> it = this.connections.iterator();
227+
while ( it.hasNext() ) {
228+
// We have to do this check here, and not in the selectorthread that
229+
// adds the buffered data to the WebSocket, because the
230+
// Selector is not selectorthread-safe, and can only be accessed
231+
// by this selectorthread.
232+
conn = it.next();
233+
if( conn.hasBufferedData() ) {
234+
conn.flush();
235+
// key.channel().register( selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, conn );
236+
}
237+
}
238+
}*/
239+
} catch ( IOException ex ) {
240+
if( key != null )
241+
key.cancel();
242+
handleIOException( conn, ex );
234243
}
235244
}
245+
} catch ( RuntimeException e ) {
246+
// should hopefully never occur
247+
onError( null, e );
248+
try {
249+
selector.close();
250+
} catch ( IOException e1 ) {
251+
onError( null, e1 );
252+
}
253+
decoders.shutdown();
254+
}
255+
}
256+
257+
private void handleIOException( WebSocket conn, IOException ex ) {
258+
onWebsocketError( conn, ex );// conn may be null here
259+
if( conn != null ) {
260+
conn.close( CloseFrame.ABNROMAL_CLOSE );
236261
}
237262
}
238263

@@ -292,7 +317,17 @@ public final void onWebsocketError( WebSocket conn, Exception ex ) {
292317

293318
@Override
294319
public final void onWriteDemand( WebSocket conn ) {
295-
selector.wakeup();
320+
try {
321+
conn.flush();
322+
} catch ( IOException e ) {
323+
handleIOException( conn, e );
324+
}
325+
/*synchronized ( write_demands ) {
326+
if( !write_demands.contains( conn ) ) {
327+
write_demands.add( conn );
328+
flusher.submit( new WebsocketWriteTask( conn ) );
329+
}
330+
}*/
296331
}
297332

298333
// ABTRACT METHODS /////////////////////////////////////////////////////////
@@ -303,4 +338,62 @@ public final void onWriteDemand( WebSocket conn ) {
303338
public void onMessage( WebSocket conn, ByteBuffer message ) {
304339
};
305340

341+
private boolean asyncQueueRead( WebSocket ws ) {
342+
synchronized ( active_websocktes ) {
343+
if( active_websocktes.contains( ws ) ) {
344+
return false;
345+
}
346+
active_websocktes.add( ws );// will add ws only if it is not already added
347+
decoders.submit( new WebsocketReadTask( ws ) );
348+
return true;
349+
}
350+
}
351+
class WebsocketReadTask implements Callable<Boolean> {
352+
353+
private WebSocket ws;
354+
355+
private WebsocketReadTask( WebSocket ws ) {
356+
this.ws = ws;
357+
}
358+
359+
@Override
360+
public Boolean call() throws Exception {
361+
try {
362+
ws.handleRead();
363+
} catch ( IOException e ) {
364+
e.printStackTrace();
365+
return false;
366+
} finally {
367+
synchronized ( active_websocktes ) {
368+
active_websocktes.remove( ws );
369+
}
370+
selector.wakeup();
371+
}
372+
return true;
373+
}
374+
}
375+
376+
class WebsocketWriteTask implements Callable<Boolean> {
377+
378+
private WebSocket ws;
379+
380+
private WebsocketWriteTask( WebSocket ws ) {
381+
this.ws = ws;
382+
}
383+
384+
@Override
385+
public Boolean call() throws Exception {
386+
try {
387+
ws.flush();
388+
} catch ( IOException e ) {
389+
handleIOException( ws, e );
390+
} finally {
391+
}
392+
synchronized ( write_demands ) {
393+
write_demands.remove( ws );
394+
}
395+
return true;
396+
}
397+
}
398+
306399
}

0 commit comments

Comments
 (0)