1010import java .nio .channels .ClosedByInterruptException ;
1111import java .nio .channels .NotYetConnectedException ;
1212import java .nio .channels .SelectionKey ;
13- import java .nio .channels .Selector ;
1413import java .nio .channels .SocketChannel ;
15- import java .util . Iterator ;
14+ import java .nio . channels . spi . SelectorProvider ;
1615import java .util .List ;
1716import java .util .Map ;
18- import java .util .Set ;
1917import java .util .concurrent .CountDownLatch ;
2018
2119import org .java_websocket .SocketChannelIOHelper ;
@@ -58,13 +56,9 @@ public abstract class WebSocketClient extends WebSocketAdapter implements Runnab
5856
5957 private ByteChannel wrappedchannel = null ;
6058
61- private SelectionKey key = null ;
62- /**
63- * The 'Selector' used to get event keys from the underlying socket.
64- */
65- private Selector selector = null ;
59+ private Thread writethread ;
6660
67- private Thread thread ;
61+ private Thread readthread ;
6862
6963 private Draft draft ;
7064
@@ -88,8 +82,10 @@ public WebSocket createWebSocket( WebSocketAdapter a, List<Draft> d, Socket s )
8882 }
8983
9084 @ Override
91- public ByteChannel wrapChannel ( SelectionKey c , String host , int port ) {
92- return (ByteChannel ) c .channel ();
85+ public ByteChannel wrapChannel ( SocketChannel channel , SelectionKey c , String host , int port ) {
86+ if ( c == null )
87+ return channel ;
88+ return channel ;
9389 }
9490 };
9591
@@ -117,6 +113,7 @@ public WebSocketClient( URI serverUri , Draft draft , Map<String,String> headers
117113 this .draft = draft ;
118114 this .headers = headers ;
119115 this .timeout = connecttimeout ;
116+
120117 }
121118
122119 /**
@@ -139,10 +136,10 @@ public Draft getDraft() {
139136 * <var>setURI</var>.
140137 */
141138 public void connect () {
142- if ( thread != null )
139+ if ( writethread != null )
143140 throw new IllegalStateException ( "WebSocketClient objects are not reuseable" );
144- thread = new Thread ( this );
145- thread .start ();
141+ writethread = new Thread ( this );
142+ writethread .start ();
146143 }
147144
148145 /**
@@ -156,7 +153,7 @@ public boolean connectBlocking() throws InterruptedException {
156153 }
157154
158155 public void close () {
159- if ( thread != null && conn != null ) {
156+ if ( writethread != null && conn != null ) {
160157 conn .close ( CloseFrame .NORMAL );
161158 }
162159 }
@@ -190,34 +187,34 @@ public void send( byte[] data ) throws NotYetConnectedException {
190187 }
191188 }
192189
193- private void tryToConnect ( InetSocketAddress remote ) throws IOException {
194- channel = SocketChannel . open ();
195- channel .configureBlocking ( false );
190+ private void tryToConnect ( InetSocketAddress remote ) throws IOException , InvalidHandshakeException {
191+ channel = SelectorProvider . provider (). openSocketChannel ();
192+ channel .configureBlocking ( true );
196193 channel .connect ( remote );
197- selector = Selector .open ();
198- key = channel .register ( selector , SelectionKey .OP_CONNECT );
194+
199195 }
200196
201197 // Runnable IMPLEMENTATION /////////////////////////////////////////////////
202198 public void run () {
203- if ( thread == null )
204- thread = Thread .currentThread ();
199+ if ( writethread == null )
200+ writethread = Thread .currentThread ();
205201 interruptableRun ();
206202
207203 assert ( !channel .isOpen () );
208204
209- try {
210- if ( selector != null ) // if the initialization in <code>tryToConnect</code> fails, it could be null
211- selector .close ();
212- } catch ( IOException e ) {
213- onError ( e );
214- }
215-
216205 }
217206
218207 private final void interruptableRun () {
219208 try {
220- tryToConnect ( new InetSocketAddress ( uri .getHost (), getPort () ) );
209+ String host = uri .getHost ();
210+ int port = getPort ();
211+ tryToConnect ( new InetSocketAddress ( host , port ) );
212+ conn = (WebSocketImpl ) wf .createWebSocket ( this , draft , channel .socket () );
213+ conn .channel = wrappedchannel = wf .wrapChannel ( channel , null , host , port );
214+ timeout = 0 ; // since connect is over
215+ sendHandshake ();
216+ readthread = new Thread ( new WebsocketWriteThread () );
217+ readthread .start ();
221218 } catch ( ClosedByInterruptException e ) {
222219 onWebsocketError ( null , e );
223220 return ;
@@ -226,46 +223,16 @@ private final void interruptableRun() {
226223 conn .closeConnection ( CloseFrame .NEVER_CONNECTED , e .getMessage () );
227224 return ;
228225 }
229- conn = ( WebSocketImpl ) wf . createWebSocket ( this , draft , channel . socket () );
226+
230227 ByteBuffer buff = ByteBuffer .allocate ( WebSocket .RCVBUF );
231228 try /*IO*/ {
232229 while ( channel .isOpen () ) {
233- SelectionKey key = null ;
234- selector .select ( timeout );
235- Set <SelectionKey > keys = selector .selectedKeys ();
236- Iterator <SelectionKey > i = keys .iterator ();
237- if ( conn .getReadyState () == READYSTATE .NOT_YET_CONNECTED && !i .hasNext () ) {
238- // Hack for issue #140:
239- // Android does simply return form select without closing the channel if address is not reachable(which seems to be a bug in the android nio proivder)
240- // TODO provide a way to fix this problem which does not require this hack
241- throw new IOException ( "Host is not reachable(Android Hack)" );
242- }
243- while ( i .hasNext () ) {
244- key = i .next ();
245- i .remove ();
246- if ( !key .isValid () ) {
247- conn .eot ();
248- continue ;
249- }
250- if ( key .isReadable () && SocketChannelIOHelper .read ( buff , this .conn , wrappedchannel ) ) {
251- conn .decode ( buff );
252- }
253- if ( key .isConnectable () ) {
254- try {
255- finishConnect ( key );
256- } catch ( InvalidHandshakeException e ) {
257- conn .close ( e ); // http error
258- }
259- }
260- if ( key .isWritable () ) {
261- if ( SocketChannelIOHelper .batch ( conn , wrappedchannel ) ) {
262- if ( key .isValid () )
263- key .interestOps ( SelectionKey .OP_READ );
264- } else {
265- key .interestOps ( SelectionKey .OP_READ | SelectionKey .OP_WRITE );
266- }
267- }
230+ if ( SocketChannelIOHelper .read ( buff , this .conn , wrappedchannel ) ) {
231+ conn .decode ( buff );
232+ } else {
233+ conn .eot ();
268234 }
235+
269236 if ( wrappedchannel instanceof WrappedByteChannel ) {
270237 WrappedByteChannel w = (WrappedByteChannel ) wrappedchannel ;
271238 if ( w .isNeedRead () ) {
@@ -302,17 +269,6 @@ private int getPort() {
302269 return port ;
303270 }
304271
305- private void finishConnect ( SelectionKey key ) throws IOException , InvalidHandshakeException {
306- if ( !channel .finishConnect () )
307- return ;
308- // Now that we're connected, re-register for only 'READ' keys.
309- conn .key = key .interestOps ( SelectionKey .OP_READ | SelectionKey .OP_WRITE );
310-
311- conn .channel = wrappedchannel = wf .wrapChannel ( key , uri .getHost (), getPort () );
312- timeout = 0 ; // since connect is over
313- sendHandshake ();
314- }
315-
316272 private void sendHandshake () throws InvalidHandshakeException {
317273 String path ;
318274 String part1 = uri .getPath ();
@@ -384,6 +340,7 @@ public final void onWebsocketOpen( WebSocket conn, Handshakedata handshake ) {
384340 public final void onWebsocketClose ( WebSocket conn , int code , String reason , boolean remote ) {
385341 connectLatch .countDown ();
386342 closeLatch .countDown ();
343+ readthread .interrupt ();
387344 onClose ( code , reason , remote );
388345 }
389346
@@ -399,12 +356,7 @@ public final void onWebsocketError( WebSocket conn, Exception ex ) {
399356
400357 @ Override
401358 public final void onWriteDemand ( WebSocket conn ) {
402- try {
403- key .interestOps ( SelectionKey .OP_READ | SelectionKey .OP_WRITE );
404- selector .wakeup ();
405- } catch ( CancelledKeyException e ) {
406- // since such an exception/event will also occur on the selector there is no need to do anything herec
407- }
359+ // nothing to do
408360 }
409361
410362 @ Override
@@ -444,6 +396,22 @@ public void onMessage( ByteBuffer bytes ) {
444396 };
445397
446398 public interface WebSocketClientFactory extends WebSocketFactory {
447- public ByteChannel wrapChannel ( SelectionKey key , String host , int port ) throws IOException ;
399+ public ByteChannel wrapChannel ( SocketChannel channel , SelectionKey key , String host , int port ) throws IOException ;
400+ }
401+
402+ private class WebsocketWriteThread implements Runnable {
403+ @ Override
404+ public void run () {
405+ Thread .currentThread ().setName ( "WebsocketWriteThread" );
406+ try {
407+ while ( !Thread .interrupted () ) {
408+ SocketChannelIOHelper .writeBlocking ( conn , wrappedchannel );
409+ }
410+ } catch ( IOException e ) {
411+ conn .eot ();
412+ } catch ( InterruptedException e ) {
413+ // this thread is regulary terminated via an interrupt
414+ }
415+ }
448416 }
449417}
0 commit comments