Skip to content

Commit 58c26b1

Browse files
committed
The user can now specify more than one Draft.java in the servers constructor
Test implementation of an only partially blocking writing in the WebSocketServer.java
1 parent a1ef53a commit 58c26b1

File tree

3 files changed

+59
-22
lines changed

3 files changed

+59
-22
lines changed

example/AutobahnServerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import java.net.InetSocketAddress;
22
import java.net.UnknownHostException;
33
import java.nio.ByteBuffer;
4+
import java.util.Collections;
45

56
import org.java_websocket.WebSocket;
67
import org.java_websocket.WebSocketServer;
@@ -11,12 +12,12 @@
1112
public class AutobahnServerTest extends WebSocketServer {
1213
private static int counter = 0;
1314

14-
public AutobahnServerTest( int port, Draft d ) throws UnknownHostException {
15-
super( new InetSocketAddress( "localhost", port ), d );
15+
public AutobahnServerTest( int port , Draft d ) throws UnknownHostException {
16+
super( new InetSocketAddress( port ), Collections.singletonList( d ) );
1617
}
1718

1819
public AutobahnServerTest( InetSocketAddress address, Draft d ) {
19-
super( address, d );
20+
super( address, Collections.singletonList( d ) );
2021
}
2122

2223
@Override

src/org/java_websocket/WebSocketServer.java

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
import java.util.Collections;
1313
import java.util.HashSet;
1414
import java.util.Iterator;
15+
import java.util.List;
1516
import java.util.Set;
1617
import java.util.concurrent.Callable;
1718
import java.util.concurrent.ExecutorService;
1819
import java.util.concurrent.Executors;
20+
import java.util.concurrent.ThreadFactory;
1921

2022
import org.java_websocket.drafts.Draft;
2123
import org.java_websocket.framing.CloseFrame;
@@ -30,7 +32,7 @@
3032
*/
3133
public abstract class WebSocketServer extends WebSocketAdapter implements Runnable {
3234

33-
public int DECODERS = Runtime.getRuntime().availableProcessors();
35+
public static int DECODERS = 1;
3436

3537
/**
3638
* Holds the list of active WebSocket connections. "Active" means WebSocket
@@ -53,12 +55,11 @@ public abstract class WebSocketServer extends WebSocketAdapter implements Runnab
5355
/**
5456
* The Draft of the WebSocket protocol the Server is adhering to.
5557
*/
56-
private Draft draft;
58+
private List<Draft> drafts;
5759

5860
private Thread selectorthread;
5961

60-
private ExecutorService decoders = Executors.newFixedThreadPool( DECODERS );
61-
// private ExecutorService flusher = Executors.newSingleThreadExecutor();
62+
private ExecutorService decoders;
6263

6364
private Set<WebSocket> active_websocktes = new HashSet<WebSocket>();
6465
// private Set<WebSocket> write_demands = new HashSet<WebSocket>();
@@ -69,7 +70,7 @@ public abstract class WebSocketServer extends WebSocketAdapter implements Runnab
6970
* listen on port <var>WebSocket.DEFAULT_PORT</var>.
7071
*/
7172
public WebSocketServer() throws UnknownHostException {
72-
this( new InetSocketAddress( WebSocket.DEFAULT_PORT ), null );
73+
this( new InetSocketAddress( WebSocket.DEFAULT_PORT ), DECODERS, null );
7374
}
7475

7576
/**
@@ -79,7 +80,11 @@ public WebSocketServer() throws UnknownHostException {
7980
* The address (host:port) this server should listen on.
8081
*/
8182
public WebSocketServer( InetSocketAddress address ) {
82-
this( address, null );
83+
this( address, DECODERS, null );
84+
}
85+
86+
public WebSocketServer( InetSocketAddress address , int decoders ) {
87+
this( address, decoders, null );
8388
}
8489

8590
/**
@@ -92,9 +97,17 @@ public WebSocketServer( InetSocketAddress address ) {
9297
* The version of the WebSocket protocol that this server
9398
* instance should comply to.
9499
*/
95-
public WebSocketServer( InetSocketAddress address , Draft draft ) {
96-
this.draft = draft;
100+
public WebSocketServer( InetSocketAddress address , List<Draft> drafts ) {
101+
this( address, DECODERS, drafts );
102+
}
103+
104+
public WebSocketServer( InetSocketAddress address , int decodercount , List<Draft> drafts ) {
105+
if( drafts == null )
106+
this.drafts = Collections.emptyList();
107+
else
108+
this.drafts = drafts;
97109
setAddress( address );
110+
this.decoders = Executors.newFixedThreadPool( decodercount, new WebsocketThreadFactory() );
98111
}
99112

100113
/**
@@ -165,8 +178,8 @@ public int getPort() {
165178
return getAddress().getPort();
166179
}
167180

168-
public Draft getDraft() {
169-
return this.draft;
181+
public List<Draft> getDraft() {
182+
return Collections.unmodifiableList( drafts );
170183
}
171184

172185
// Runnable IMPLEMENTATION /////////////////////////////////////////////////
@@ -205,7 +218,7 @@ public void run() {
205218
if( key.isAcceptable() ) {
206219
SocketChannel client = server.accept();
207220
client.configureBlocking( false );
208-
WebSocket c = new WebSocket( this, Collections.singletonList( draft ), client );
221+
WebSocket c = new WebSocket( this, drafts, client );
209222
client.register( selector, SelectionKey.OP_READ, c );
210223
i.remove();
211224
} else if( key.isReadable() ) {
@@ -302,10 +315,12 @@ public final void onWebsocketError( WebSocket conn, Exception ex ) {
302315

303316
@Override
304317
public final void onWriteDemand( WebSocket conn ) {
305-
try {
306-
conn.flush();
307-
} catch ( IOException e ) {
308-
handleIOException( conn, e );
318+
if( Thread.currentThread() instanceof WebsocketExecutorThread == false ) {
319+
try {
320+
conn.flush();
321+
} catch ( IOException e ) {
322+
handleIOException( conn, e );
323+
}
309324
}
310325
/*synchronized ( write_demands ) {
311326
if( !write_demands.contains( conn ) ) {
@@ -326,14 +341,16 @@ public void onMessage( WebSocket conn, ByteBuffer message ) {
326341
private boolean asyncQueueRead( WebSocket ws ) {
327342
synchronized ( active_websocktes ) {
328343
if( active_websocktes.contains( ws ) ) {
344+
System.out.println( "queue failed" );
329345
return false;
330346
}
331347
active_websocktes.add( ws );// will add ws only if it is not already added
332348
decoders.submit( new WebsocketReadTask( ws ) );
333349
return true;
334350
}
335351
}
336-
class WebsocketReadTask implements Callable<Boolean> {
352+
353+
private class WebsocketReadTask implements Callable<Boolean> {
337354

338355
private WebSocket ws;
339356

@@ -345,6 +362,8 @@ private WebsocketReadTask( WebSocket ws ) {
345362
public Boolean call() throws Exception {
346363
try {
347364
ws.handleRead();
365+
ws.flush();
366+
System.out.println( "Assync Complete" );
348367
} catch ( IOException e ) {
349368
handleIOException( ws, e );
350369
} catch ( RuntimeException e ) {
@@ -358,4 +377,24 @@ public Boolean call() throws Exception {
358377
return true;
359378
}
360379
}
380+
381+
private class WebsocketThreadFactory implements ThreadFactory {
382+
383+
@Override
384+
public Thread newThread( Runnable r ) {
385+
return new WebsocketExecutorThread( r );
386+
}
387+
}
388+
389+
private class WebsocketExecutorThread extends Thread {
390+
private Runnable r;
391+
WebsocketExecutorThread( Runnable r ) {
392+
this.r = r;
393+
394+
}
395+
@Override
396+
public void run() {
397+
r.run();
398+
}
399+
}
361400
}

src/org/java_websocket/drafts/Draft.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import java.util.Iterator;
66
import java.util.List;
77
import java.util.Locale;
8-
import java.util.regex.Pattern;
98

109
import org.java_websocket.WebSocket.Role;
1110
import org.java_websocket.exeptions.InvalidDataException;
@@ -44,8 +43,6 @@ public enum CloseHandshakeType {
4443
public static int INITIAL_FAMESIZE = 64;
4544

4645
public static final byte[] FLASH_POLICY_REQUEST = Charsetfunctions.utf8Bytes( "<policy-file-request/>\0" );
47-
private static Pattern getpattern = Pattern.compile( "" ); // GET / HTTP/1.1
48-
private static Pattern statuspattern = Pattern.compile( "" ); // HTTP/1.1 101 Switching Protocols
4946

5047
/** In some cases the handshake will be parsed different depending on whether */
5148
protected Role role = null;

0 commit comments

Comments
 (0)