Skip to content

Commit 5af57f9

Browse files
committed
Merge pull request #2 from lbt05/leancloud
try to avoid duplicated connect
2 parents 8157b9f + 3a5e11b commit 5af57f9

6 files changed

Lines changed: 139 additions & 79 deletions

File tree

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ repositories {
99
}
1010

1111
group = 'cn.leancloud.android'
12-
version = '1.3.1-leancloud'
12+
version = '1.3.2-leancloud-SNAPSHOT'
1313
sourceCompatibility = 1.6
1414
targetCompatibility = 1.6
1515

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<modelVersion>4.0.0</modelVersion>
77
<groupId>cn.leancloud.android</groupId>
88
<artifactId>Java-WebSocket</artifactId>
9-
<version>1.3.1-leancloud</version>
9+
<version>1.3.2-leancloud-SNAPSHOT</version>
1010
<packaging>jar</packaging>
1111
<name>Java WebSocket</name>
1212
<description>A barebones WebSocket client and server implementation written in 100% Java

src/main/java/com/avos/avoscloud/java_websocket/WebSocketImpl.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,5 @@
11
package com.avos.avoscloud.java_websocket;
22

3-
import java.io.IOException;
4-
import java.net.InetSocketAddress;
5-
import java.net.Socket;
6-
import java.nio.ByteBuffer;
7-
import java.nio.channels.ByteChannel;
8-
import java.nio.channels.NotYetConnectedException;
9-
import java.nio.channels.SelectionKey;
10-
import java.util.ArrayList;
11-
import java.util.Collection;
12-
import java.util.List;
13-
import java.util.concurrent.BlockingQueue;
14-
import java.util.concurrent.LinkedBlockingQueue;
15-
163
import com.avos.avoscloud.java_websocket.drafts.Draft;
174
import com.avos.avoscloud.java_websocket.drafts.Draft.CloseHandshakeType;
185
import com.avos.avoscloud.java_websocket.drafts.Draft.HandshakeState;
@@ -36,6 +23,19 @@
3623
import com.avos.avoscloud.java_websocket.server.WebSocketServer.WebSocketWorker;
3724
import com.avos.avoscloud.java_websocket.util.Charsetfunctions;
3825

26+
import java.io.IOException;
27+
import java.net.InetSocketAddress;
28+
import java.net.Socket;
29+
import java.nio.ByteBuffer;
30+
import java.nio.channels.ByteChannel;
31+
import java.nio.channels.NotYetConnectedException;
32+
import java.nio.channels.SelectionKey;
33+
import java.util.ArrayList;
34+
import java.util.Collection;
35+
import java.util.List;
36+
import java.util.concurrent.BlockingQueue;
37+
import java.util.concurrent.LinkedBlockingQueue;
38+
3939
/**
4040
* Represents one end (client or server) of a single WebSocketImpl connection.
4141
* Takes care of the "handshake" phase, then allows for easy sending of
@@ -462,17 +462,17 @@ protected synchronized void closeConnection( int code, String message, boolean r
462462
wsl.onWebsocketError( this, e );
463463
}
464464
}
465-
try {
466-
this.wsl.onWebsocketClose( this, code, message, remote );
467-
} catch ( RuntimeException e ) {
468-
wsl.onWebsocketError( this, e );
469-
}
470465
if( draft != null )
471466
draft.reset();
472467
handshakerequest = null;
473468

474469
readystate = READYSTATE.CLOSED;
475470
this.outQueue.clear();
471+
try {
472+
this.wsl.onWebsocketClose( this, code, message, remote );
473+
} catch ( RuntimeException e ) {
474+
wsl.onWebsocketError( this, e );
475+
}
476476
}
477477

478478
protected void closeConnection( int code, boolean remote ) {
@@ -512,7 +512,9 @@ protected synchronized void flushAndClose( int code, String message, boolean rem
512512
}
513513

514514
public void eot() {
515-
if( getReadyState() == READYSTATE.NOT_YET_CONNECTED ) {
515+
if(getReadyState()==READYSTATE.CLOSED){
516+
return;
517+
}else if( getReadyState() == READYSTATE.NOT_YET_CONNECTED ) {
516518
closeConnection( CloseFrame.NEVER_CONNECTED, true );
517519
} else if( flushandclosestate ) {
518520
closeConnection( closecode, closemessage, closedremotely );

src/main/java/com/avos/avoscloud/java_websocket/client/WebSocketClient.java

Lines changed: 34 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,5 @@
11
package com.avos.avoscloud.java_websocket.client;
22

3-
import java.io.IOException;
4-
import java.io.InputStream;
5-
import java.io.OutputStream;
6-
import java.net.InetSocketAddress;
7-
import java.net.Proxy;
8-
import java.net.Socket;
9-
import java.net.URI;
10-
import java.nio.ByteBuffer;
11-
import java.nio.channels.NotYetConnectedException;
12-
import java.util.Map;
13-
import java.util.concurrent.CountDownLatch;
14-
153
import com.avos.avoscloud.java_websocket.WebSocket;
164
import com.avos.avoscloud.java_websocket.WebSocketAdapter;
175
import com.avos.avoscloud.java_websocket.WebSocketImpl;
@@ -24,6 +12,18 @@
2412
import com.avos.avoscloud.java_websocket.handshake.Handshakedata;
2513
import com.avos.avoscloud.java_websocket.handshake.ServerHandshake;
2614

15+
import java.io.IOException;
16+
import java.io.InputStream;
17+
import java.io.OutputStream;
18+
import java.net.InetSocketAddress;
19+
import java.net.Proxy;
20+
import java.net.Socket;
21+
import java.net.URI;
22+
import java.nio.ByteBuffer;
23+
import java.nio.channels.NotYetConnectedException;
24+
import java.util.Map;
25+
import java.util.concurrent.CountDownLatch;
26+
2727
/**
2828
* A subclass must implement at least <var>onOpen</var>, <var>onClose</var>, and <var>onMessage</var> to be
2929
* useful. At runtime the user is expected to establish a connection via {@link #connect()}, then receive events like {@link #onMessage(String)} via the overloaded methods and to {@link #send(String)} data to the server.
@@ -35,7 +35,7 @@ public abstract class WebSocketClient extends WebSocketAdapter implements Runnab
3535
*/
3636
protected URI uri = null;
3737

38-
private WebSocketImpl engine = null;
38+
private transient WebSocketImpl engine = null;
3939

4040
private Socket socket = null;
4141

@@ -47,6 +47,8 @@ public abstract class WebSocketClient extends WebSocketAdapter implements Runnab
4747

4848
private Thread writeThread;
4949

50+
private Thread readThread;
51+
5052
private Draft draft;
5153

5254
private Map<String,String> headers;
@@ -103,10 +105,17 @@ public Draft getDraft() {
103105
* Initiates the websocket connection. This method does not block.
104106
*/
105107
public void connect() {
106-
if( writeThread != null )
107-
throw new IllegalStateException( "WebSocketClient objects are not reuseable" );
108-
writeThread = new Thread( this );
109-
writeThread.start();
108+
if(isOpen()||isConnecting()){
109+
return;
110+
}else{
111+
if(writeThread!=null){
112+
writeThread.interrupt();
113+
readThread.interrupt();
114+
this.draft = draft.copyInstance();
115+
this.engine = new WebSocketImpl( this,this.draft);
116+
}
117+
new Thread(this).start();
118+
}
110119
}
111120

112121
/**
@@ -136,7 +145,7 @@ public void closeBlocking() throws InterruptedException {
136145

137146
/**
138147
* Sends <var>text</var> to the connected websocket server.
139-
*
148+
*
140149
* @param text
141150
* The string which will be transmitted.
142151
*/
@@ -146,7 +155,7 @@ public void send( String text ) throws NotYetConnectedException {
146155

147156
/**
148157
* Sends binary <var> data</var> to the connected webSocket server.
149-
*
158+
*
150159
* @param data
151160
* The byte-Array of data to send to the WebSocket server.
152161
*/
@@ -156,10 +165,8 @@ public void send( byte[] data ) throws NotYetConnectedException {
156165

157166
public void run() {
158167
try {
159-
if( socket == null ) {
168+
if( socket == null || socket.isClosed()) {
160169
socket = new Socket( proxy );
161-
} else if( socket.isClosed() ) {
162-
throw new IOException();
163170
}
164171
if( !socket.isBound() )
165172
socket.connect( new InetSocketAddress( uri.getHost(), getPort() ), connectTimeout );
@@ -173,25 +180,11 @@ public void run() {
173180
return;
174181
}
175182

176-
writeThread = new Thread( new WebsocketWriteThread() );
183+
writeThread = new Thread( new WebSocketWriteThread(engine,ostream));
177184
writeThread.start();
178185

179-
byte[] rawbuffer = new byte[ WebSocketImpl.RCVBUF ];
180-
int readBytes;
181-
182-
try {
183-
while ( !isClosed() && ( readBytes = istream.read( rawbuffer ) ) != -1 ) {
184-
engine.decode( ByteBuffer.wrap( rawbuffer, 0, readBytes ) );
185-
}
186-
engine.eot();
187-
} catch ( IOException e ) {
188-
engine.eot();
189-
} catch ( RuntimeException e ) {
190-
// this catch case covers internal errors only and indicates a bug in this websocket implementation
191-
onError( e );
192-
engine.closeConnection( CloseFrame.ABNORMAL_CLOSE, e.getMessage() );
193-
}
194-
assert ( socket.isClosed() );
186+
readThread = new Thread(new WebSocketReadThread(this,engine,istream));
187+
readThread.start();
195188
}
196189
private int getPort() {
197190
int port = uri.getPort();
@@ -226,7 +219,7 @@ private void sendHandshake() throws InvalidHandshakeException {
226219
handshake.put( "Host", host );
227220
if( headers != null ) {
228221
for( Map.Entry<String,String> kv : headers.entrySet() ) {
229-
handshake.put( kv.getKey(), kv.getValue() );
222+
handshake.put(kv.getKey(), kv.getValue() );
230223
}
231224
}
232225
engine.startHandshake( handshake );
@@ -341,23 +334,6 @@ public void onMessage( ByteBuffer bytes ) {
341334
public void onFragment( Framedata frame ) {
342335
}
343336

344-
private class WebsocketWriteThread implements Runnable {
345-
@Override
346-
public void run() {
347-
Thread.currentThread().setName( "WebsocketWriteThread" );
348-
try {
349-
while ( !Thread.interrupted() ) {
350-
ByteBuffer buffer = engine.outQueue.take();
351-
ostream.write( buffer.array(), 0, buffer.limit() );
352-
ostream.flush();
353-
}
354-
} catch ( IOException e ) {
355-
engine.eot();
356-
} catch ( InterruptedException e ) {
357-
// this thread is regularly terminated via an interrupt
358-
}
359-
}
360-
}
361337

362338
public void setProxy( Proxy proxy ) {
363339
if( proxy == null )
@@ -445,7 +421,7 @@ public InetSocketAddress getLocalSocketAddress() {
445421
public InetSocketAddress getRemoteSocketAddress() {
446422
return engine.getRemoteSocketAddress();
447423
}
448-
424+
449425
@Override
450426
public String getResourceDescriptor() {
451427
return uri.getPath();
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.avos.avoscloud.java_websocket.client;
2+
3+
import com.avos.avoscloud.java_websocket.WebSocketImpl;
4+
import com.avos.avoscloud.java_websocket.WebSocketListener;
5+
import com.avos.avoscloud.java_websocket.framing.CloseFrame;
6+
7+
import java.io.IOException;
8+
import java.io.InputStream;
9+
import java.nio.ByteBuffer;
10+
11+
/**
12+
* Created by lbt05 on 5/24/16.
13+
*/
14+
class WebSocketReadThread implements Runnable {
15+
WebSocketImpl engine;
16+
InputStream istream;
17+
WebSocketListener wsl;
18+
19+
public WebSocketReadThread(WebSocketListener wsl, WebSocketImpl engine, InputStream istream) {
20+
this.engine = engine;
21+
this.istream = istream;
22+
this.wsl = wsl;
23+
}
24+
25+
@Override
26+
public void run() {
27+
byte[] rawbuffer = new byte[WebSocketImpl.RCVBUF];
28+
int readBytes;
29+
Thread.currentThread().setName("WebsocketReadThread");
30+
try {
31+
while (!Thread.interrupted() &&
32+
!engine.isClosed() &&
33+
(readBytes = istream.read(rawbuffer)) != -1) {
34+
engine.decode(ByteBuffer.wrap(rawbuffer, 0, readBytes));
35+
}
36+
engine.eot();
37+
} catch (IOException e) {
38+
engine.eot();
39+
} catch (RuntimeException e) {
40+
// this catch case covers internal errors only and indicates a bug in this websocket
41+
// implementation
42+
wsl.onWebsocketError(engine, e);
43+
engine.closeConnection(CloseFrame.ABNORMAL_CLOSE, e.getMessage());
44+
}
45+
}
46+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.avos.avoscloud.java_websocket.client;
2+
3+
import com.avos.avoscloud.java_websocket.WebSocketImpl;
4+
5+
import java.io.IOException;
6+
import java.io.OutputStream;
7+
import java.nio.ByteBuffer;
8+
9+
/**
10+
* Created by lbt05 on 5/24/16.
11+
*/
12+
class WebSocketWriteThread implements Runnable {
13+
WebSocketImpl engine;
14+
OutputStream ostream;
15+
16+
public WebSocketWriteThread(WebSocketImpl engine, OutputStream os) {
17+
this.engine = engine;
18+
this.ostream = os;
19+
}
20+
21+
@Override
22+
public void run() {
23+
Thread.currentThread().setName("WebsocketWriteThread");
24+
try {
25+
while (!Thread.interrupted()) {
26+
ByteBuffer buffer = engine.outQueue.take();
27+
ostream.write(buffer.array(), 0, buffer.limit());
28+
ostream.flush();
29+
}
30+
} catch (IOException e) {
31+
engine.eot();
32+
} catch (InterruptedException e) {
33+
// this thread is regularly terminated via an interrupt
34+
}
35+
}
36+
}

0 commit comments

Comments
 (0)