Skip to content

Commit 47256b1

Browse files
committed
simplified WebSocketClient by using blocking instead of nonblocking channels
1 parent c00c3d9 commit 47256b1

File tree

2 files changed

+66
-87
lines changed

2 files changed

+66
-87
lines changed

src/main/java/org/java_websocket/SocketChannelIOHelper.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.IOException;
44
import java.nio.ByteBuffer;
55
import java.nio.channels.ByteChannel;
6+
import java.nio.channels.spi.AbstractSelectableChannel;
67

78
public class SocketChannelIOHelper {
89

@@ -33,10 +34,11 @@ public static boolean readMore( final ByteBuffer buf, WebSocketImpl ws, WrappedB
3334
/** Returns whether the whole outQueue has been flushed */
3435
public static boolean batch( WebSocketImpl ws, ByteChannel sockchannel ) throws IOException {
3536
ByteBuffer buffer = ws.outQueue.peek();
37+
WrappedByteChannel c = null;
3638

3739
if( buffer == null ) {
3840
if( sockchannel instanceof WrappedByteChannel ) {
39-
WrappedByteChannel c = (WrappedByteChannel) sockchannel;
41+
c = (WrappedByteChannel) sockchannel;
4042
if( c.isNeedWrite() ) {
4143
c.writeMore();
4244
}
@@ -53,12 +55,21 @@ public static boolean batch( WebSocketImpl ws, ByteChannel sockchannel ) throws
5355
} while ( buffer != null );
5456
}
5557

56-
if( ws.outQueue.isEmpty() && ws.isFlushAndClose() ) {
58+
if( ws.outQueue.isEmpty() && ws.isFlushAndClose() /*&& ( c == null || c.isNeedWrite() )*/) {
5759
synchronized ( ws ) {
5860
ws.closeConnection();
5961
}
6062
}
61-
return sockchannel instanceof WrappedByteChannel == true ? !( (WrappedByteChannel) sockchannel ).isNeedWrite() : true;
63+
return c != null ? !( (WrappedByteChannel) sockchannel ).isNeedWrite() : true;
64+
}
65+
66+
public static void writeBlocking( WebSocketImpl ws, ByteChannel channel ) throws InterruptedException , IOException {
67+
assert ( channel instanceof AbstractSelectableChannel == true ? ( (AbstractSelectableChannel) channel ).isBlocking() : true );
68+
assert ( channel instanceof WrappedByteChannel == true ? ( (WrappedByteChannel) channel ).isBlocking() : true );
69+
70+
ByteBuffer buf = ws.outQueue.take();
71+
while ( buf.hasRemaining() )
72+
channel.write( buf );
6273
}
6374

6475
}

src/main/java/org/java_websocket/client/WebSocketClient.java

Lines changed: 52 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@
1010
import java.nio.channels.ClosedByInterruptException;
1111
import java.nio.channels.NotYetConnectedException;
1212
import java.nio.channels.SelectionKey;
13-
import java.nio.channels.Selector;
1413
import java.nio.channels.SocketChannel;
15-
import java.util.Iterator;
14+
import java.nio.channels.spi.SelectorProvider;
1615
import java.util.List;
1716
import java.util.Map;
18-
import java.util.Set;
1917
import java.util.concurrent.CountDownLatch;
2018

2119
import org.java_websocket.SocketChannelIOHelper;
@@ -58,13 +56,9 @@ public abstract class WebSocketClient extends WebSocketAdapter implements Runnab
5856

5957
private ByteChannel wrappedchannel = null;
6058

61-
private SelectionKey key = null;
62-
/**
63-
* The 'Selector' used to get event keys from the underlying socket.
64-
*/
65-
private Selector selector = null;
59+
private Thread writethread;
6660

67-
private Thread thread;
61+
private Thread readthread;
6862

6963
private Draft draft;
7064

@@ -88,8 +82,10 @@ public WebSocket createWebSocket( WebSocketAdapter a, List<Draft> d, Socket s )
8882
}
8983

9084
@Override
91-
public ByteChannel wrapChannel( SelectionKey c, String host, int port ) {
92-
return (ByteChannel) c.channel();
85+
public ByteChannel wrapChannel( SocketChannel channel, SelectionKey c, String host, int port ) {
86+
if( c == null )
87+
return channel;
88+
return channel;
9389
}
9490
};
9591

@@ -117,6 +113,7 @@ public WebSocketClient( URI serverUri , Draft draft , Map<String,String> headers
117113
this.draft = draft;
118114
this.headers = headers;
119115
this.timeout = connecttimeout;
116+
120117
}
121118

122119
/**
@@ -139,10 +136,10 @@ public Draft getDraft() {
139136
* <var>setURI</var>.
140137
*/
141138
public void connect() {
142-
if( thread != null )
139+
if( writethread != null )
143140
throw new IllegalStateException( "WebSocketClient objects are not reuseable" );
144-
thread = new Thread( this );
145-
thread.start();
141+
writethread = new Thread( this );
142+
writethread.start();
146143
}
147144

148145
/**
@@ -156,7 +153,7 @@ public boolean connectBlocking() throws InterruptedException {
156153
}
157154

158155
public void close() {
159-
if( thread != null && conn != null ) {
156+
if( writethread != null && conn != null ) {
160157
conn.close( CloseFrame.NORMAL );
161158
}
162159
}
@@ -190,34 +187,34 @@ public void send( byte[] data ) throws NotYetConnectedException {
190187
}
191188
}
192189

193-
private void tryToConnect( InetSocketAddress remote ) throws IOException {
194-
channel = SocketChannel.open();
195-
channel.configureBlocking( false );
190+
private void tryToConnect( InetSocketAddress remote ) throws IOException , InvalidHandshakeException {
191+
channel = SelectorProvider.provider().openSocketChannel();
192+
channel.configureBlocking( true );
196193
channel.connect( remote );
197-
selector = Selector.open();
198-
key = channel.register( selector, SelectionKey.OP_CONNECT );
194+
199195
}
200196

201197
// Runnable IMPLEMENTATION /////////////////////////////////////////////////
202198
public void run() {
203-
if( thread == null )
204-
thread = Thread.currentThread();
199+
if( writethread == null )
200+
writethread = Thread.currentThread();
205201
interruptableRun();
206202

207203
assert ( !channel.isOpen() );
208204

209-
try {
210-
if( selector != null ) // if the initialization in <code>tryToConnect</code> fails, it could be null
211-
selector.close();
212-
} catch ( IOException e ) {
213-
onError( e );
214-
}
215-
216205
}
217206

218207
private final void interruptableRun() {
219208
try {
220-
tryToConnect( new InetSocketAddress( uri.getHost(), getPort() ) );
209+
String host = uri.getHost();
210+
int port = getPort();
211+
tryToConnect( new InetSocketAddress( host, port ) );
212+
conn = (WebSocketImpl) wf.createWebSocket( this, draft, channel.socket() );
213+
conn.channel = wrappedchannel = wf.wrapChannel( channel, null, host, port );
214+
timeout = 0; // since connect is over
215+
sendHandshake();
216+
readthread = new Thread( new WebsocketWriteThread() );
217+
readthread.start();
221218
} catch ( ClosedByInterruptException e ) {
222219
onWebsocketError( null, e );
223220
return;
@@ -226,46 +223,16 @@ private final void interruptableRun() {
226223
conn.closeConnection( CloseFrame.NEVER_CONNECTED, e.getMessage() );
227224
return;
228225
}
229-
conn = (WebSocketImpl) wf.createWebSocket( this, draft, channel.socket() );
226+
230227
ByteBuffer buff = ByteBuffer.allocate( WebSocket.RCVBUF );
231228
try/*IO*/{
232229
while ( channel.isOpen() ) {
233-
SelectionKey key = null;
234-
selector.select( timeout );
235-
Set<SelectionKey> keys = selector.selectedKeys();
236-
Iterator<SelectionKey> i = keys.iterator();
237-
if( conn.getReadyState() == READYSTATE.NOT_YET_CONNECTED && !i.hasNext() ) {
238-
// Hack for issue #140:
239-
// Android does simply return form select without closing the channel if address is not reachable(which seems to be a bug in the android nio proivder)
240-
// TODO provide a way to fix this problem which does not require this hack
241-
throw new IOException( "Host is not reachable(Android Hack)" );
242-
}
243-
while ( i.hasNext() ) {
244-
key = i.next();
245-
i.remove();
246-
if( !key.isValid() ) {
247-
conn.eot();
248-
continue;
249-
}
250-
if( key.isReadable() && SocketChannelIOHelper.read( buff, this.conn, wrappedchannel ) ) {
251-
conn.decode( buff );
252-
}
253-
if( key.isConnectable() ) {
254-
try {
255-
finishConnect( key );
256-
} catch ( InvalidHandshakeException e ) {
257-
conn.close( e ); // http error
258-
}
259-
}
260-
if( key.isWritable() ) {
261-
if( SocketChannelIOHelper.batch( conn, wrappedchannel ) ) {
262-
if( key.isValid() )
263-
key.interestOps( SelectionKey.OP_READ );
264-
} else {
265-
key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE );
266-
}
267-
}
230+
if( SocketChannelIOHelper.read( buff, this.conn, wrappedchannel ) ) {
231+
conn.decode( buff );
232+
} else {
233+
conn.eot();
268234
}
235+
269236
if( wrappedchannel instanceof WrappedByteChannel ) {
270237
WrappedByteChannel w = (WrappedByteChannel) wrappedchannel;
271238
if( w.isNeedRead() ) {
@@ -302,17 +269,6 @@ private int getPort() {
302269
return port;
303270
}
304271

305-
private void finishConnect( SelectionKey key ) throws IOException , InvalidHandshakeException {
306-
if( !channel.finishConnect() )
307-
return;
308-
// Now that we're connected, re-register for only 'READ' keys.
309-
conn.key = key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE );
310-
311-
conn.channel = wrappedchannel = wf.wrapChannel( key, uri.getHost(), getPort() );
312-
timeout = 0; // since connect is over
313-
sendHandshake();
314-
}
315-
316272
private void sendHandshake() throws InvalidHandshakeException {
317273
String path;
318274
String part1 = uri.getPath();
@@ -384,6 +340,7 @@ public final void onWebsocketOpen( WebSocket conn, Handshakedata handshake ) {
384340
public final void onWebsocketClose( WebSocket conn, int code, String reason, boolean remote ) {
385341
connectLatch.countDown();
386342
closeLatch.countDown();
343+
readthread.interrupt();
387344
onClose( code, reason, remote );
388345
}
389346

@@ -399,12 +356,7 @@ public final void onWebsocketError( WebSocket conn, Exception ex ) {
399356

400357
@Override
401358
public final void onWriteDemand( WebSocket conn ) {
402-
try {
403-
key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE );
404-
selector.wakeup();
405-
} catch ( CancelledKeyException e ) {
406-
// since such an exception/event will also occur on the selector there is no need to do anything herec
407-
}
359+
// nothing to do
408360
}
409361

410362
@Override
@@ -444,6 +396,22 @@ public void onMessage( ByteBuffer bytes ) {
444396
};
445397

446398
public interface WebSocketClientFactory extends WebSocketFactory {
447-
public ByteChannel wrapChannel( SelectionKey key, String host, int port ) throws IOException;
399+
public ByteChannel wrapChannel( SocketChannel channel, SelectionKey key, String host, int port ) throws IOException;
400+
}
401+
402+
private class WebsocketWriteThread implements Runnable {
403+
@Override
404+
public void run() {
405+
Thread.currentThread().setName( "WebsocketWriteThread" );
406+
try {
407+
while ( !Thread.interrupted() ) {
408+
SocketChannelIOHelper.writeBlocking( conn, wrappedchannel );
409+
}
410+
} catch ( IOException e ) {
411+
conn.eot();
412+
} catch ( InterruptedException e ) {
413+
// this thread is regulary terminated via an interrupt
414+
}
415+
}
448416
}
449417
}

0 commit comments

Comments
 (0)