Skip to content

Commit fd88437

Browse files
committed
in progress commit
1 parent f6afbfb commit fd88437

8 files changed

Lines changed: 46 additions & 78 deletions

File tree

example/AutobahnServerTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,7 @@ public void onError( WebSocket conn, Exception ex ) {
3838

3939
@Override
4040
public void onMessage( WebSocket conn, String message ) {
41-
try {
42-
conn.send( message );
43-
} catch ( InterruptedException e ) {
44-
e.printStackTrace();
45-
}
41+
conn.send( message );
4642
}
4743

4844
@Override

example/EmptyClient.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

example/ServerStressTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public void adjust() throws InterruptedException {
155155
}
156156
int totalclients = clients.getValue();
157157
while ( websockets.size() < totalclients ) {
158-
WebSocketClient cl = new EmptyClient( uri ) {
158+
WebSocketClient cl = new ExampleClient( uri ) {
159159
@Override
160160
public void onClose( int code, String reason, boolean remote ) {
161161
System.out.println( "Closed duo " + code + " " + reason );

src/org/java_websocket/WebSocket.java

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.List;
1111
import java.util.concurrent.BlockingQueue;
1212
import java.util.concurrent.LinkedBlockingQueue;
13+
import java.util.concurrent.atomic.AtomicBoolean;
1314
import java.util.concurrent.atomic.AtomicLong;
1415

1516
import org.java_websocket.drafts.Draft;
@@ -112,6 +113,8 @@ public enum Role {
112113

113114
private SocketChannel sockchannel;
114115

116+
public AtomicBoolean hasWrite = new AtomicBoolean( false );
117+
115118
// CONSTRUCTOR /////////////////////////////////////////////////////////////
116119
/**
117120
* Used in {@link WebSocketServer} and {@link WebSocketClient}.
@@ -144,7 +147,7 @@ public WebSocket( WebSocketListener listener , List<Draft> drafts , SocketChanne
144147

145148
private void init( WebSocketListener listener, Draft draft, SocketChannel socketchannel ) {
146149
this.sockchannel = socketchannel;
147-
this.bufferQueue = new LinkedBlockingQueue<ByteBuffer>( 3 );
150+
this.bufferQueue = new LinkedBlockingQueue<ByteBuffer>();
148151
this.socketBuffer = ByteBuffer.allocate( BUFFERSIZE );
149152
socketBuffer.flip();
150153
this.wsl = listener;
@@ -166,16 +169,16 @@ private void init( WebSocketListener listener, Draft draft, SocketChannel socket
166169
socketBuffer.limit( socketBuffer.capacity() );
167170
if( sockchannel.read( socketBuffer ) == -1 ) {
168171
if( draft == null ) {
169-
closeConnection( CloseFrame.ABNOROMAL_CLOSE, true );
172+
closeConnection( CloseFrame.ABNORMAL_CLOSE, true );
170173
} else if( draft.getCloseHandshakeType() == CloseHandshakeType.NONE ) {
171174
closeConnection( CloseFrame.NORMAL, true );
172175
} else if( draft.getCloseHandshakeType() == CloseHandshakeType.ONEWAY ) {
173176
if( role == Role.SERVER )
174-
closeConnection( CloseFrame.ABNOROMAL_CLOSE, true );
177+
closeConnection( CloseFrame.ABNORMAL_CLOSE, true );
175178
else
176179
closeConnection( CloseFrame.NORMAL, true );
177180
} else {
178-
closeConnection( CloseFrame.ABNOROMAL_CLOSE, true );
181+
closeConnection( CloseFrame.ABNORMAL_CLOSE, true );
179182
}
180183

181184
}
@@ -368,14 +371,14 @@ public void close( int code, String message ) {
368371
try {
369372
closeDirect( code, message );
370373
} catch ( IOException e ) {
371-
closeConnection( CloseFrame.ABNOROMAL_CLOSE, true );
374+
closeConnection( CloseFrame.ABNORMAL_CLOSE, true );
372375
}
373376
}
374377

375378
public void closeDirect( int code, String message ) throws IOException {
376379
if( !closeHandshakeSent ) {
377380
if( handshakeComplete ) {
378-
if( code == CloseFrame.ABNOROMAL_CLOSE ) {
381+
if( code == CloseFrame.ABNORMAL_CLOSE ) {
379382
closeConnection( code, true );
380383
closeHandshakeSent = true;
381384
return;
@@ -386,7 +389,7 @@ public void closeDirect( int code, String message ) throws IOException {
386389
sendFrameDirect( new CloseFrameBuilder( code, message ) );
387390
} catch ( InvalidDataException e ) {
388391
wsl.onWebsocketError( this, e );
389-
closeConnection( CloseFrame.ABNOROMAL_CLOSE, "generated frame is invalid", false );
392+
closeConnection( CloseFrame.ABNORMAL_CLOSE, "generated frame is invalid", false );
390393
}
391394
} else {
392395
closeConnection( code, false );
@@ -450,7 +453,7 @@ public void close( InvalidDataException e ) {
450453
* @throws InterruptedException
451454
* @throws NotYetConnectedException
452455
*/
453-
public void send( String text ) throws NotYetConnectedException , InterruptedException {
456+
public void send( String text ) throws NotYetConnectedException {
454457
if( text == null )
455458
throw new IllegalArgumentException( "Cannot send 'null' data to a WebSocket." );
456459
send( draft.createFrames( text, role == Role.CLIENT ) );
@@ -473,15 +476,15 @@ public void send( byte[] bytes ) throws IllegalArgumentException , NotYetConnect
473476
send( ByteBuffer.wrap( bytes ) );
474477
}
475478

476-
private void send( Collection<Framedata> frames ) throws InterruptedException {
479+
private void send( Collection<Framedata> frames ) {
477480
if( !this.handshakeComplete )
478481
throw new NotYetConnectedException();
479482
for( Framedata f : frames ) {
480483
sendFrame( f );
481484
}
482485
}
483486

484-
public void sendFrame( Framedata framedata ) throws InterruptedException {
487+
public void sendFrame( Framedata framedata ) {
485488
if( DEBUG )
486489
System.out.println( "send frame: " + framedata );
487490
channelWrite( draft.createBinaryFrame( framedata ) );
@@ -493,7 +496,7 @@ private void sendFrameDirect( Framedata framedata ) throws IOException {
493496
channelWriteDirect( draft.createBinaryFrame( framedata ) );
494497
}
495498

496-
boolean hasBufferedData() {
499+
public boolean hasBufferedData() {
497500
return !this.bufferQueue.isEmpty();
498501
}
499502

@@ -502,14 +505,14 @@ boolean hasBufferedData() {
502505
*
503506
* @return Amount of Data still in Queue and not sent yet of the socket
504507
*/
505-
long bufferedDataAmount() {
508+
public long bufferedDataAmount() {
506509
return bufferQueueTotalAmount.get();
507510
}
508511

509512
/**
510513
* Empty the internal buffer, sending all the pending data before continuing.
511514
*/
512-
public void flush() throws IOException {
515+
public synchronized void flush() throws IOException {
513516
ByteBuffer buffer = this.bufferQueue.peek();
514517
while ( buffer != null ) {
515518
int written = sockchannel.write( buffer );
@@ -525,6 +528,23 @@ public void flush() throws IOException {
525528
}
526529
}
527530

531+
public boolean batch() throws IOException {
532+
ByteBuffer buffer = this.bufferQueue.peek();
533+
while ( buffer != null ) {
534+
int written = sockchannel.write( buffer );
535+
if( buffer.remaining() > 0 ) {
536+
return false;
537+
} else {
538+
// subtract this amount of data from the total queued (synchronized over this object)
539+
bufferQueueTotalAmount.addAndGet( -written );
540+
541+
this.bufferQueue.poll(); // Buffer finished. Remove it.
542+
buffer = this.bufferQueue.peek();
543+
}
544+
}
545+
return true;
546+
}
547+
528548
public HandshakeState isFlashEdgeCase( ByteBuffer request ) {
529549
if( request.limit() > Draft.FLASH_POLICY_REQUEST.length )
530550
return HandshakeState.NOT_MATCHED;
@@ -562,24 +582,17 @@ public void startHandshake( ClientHandshakeBuilder handshakedata ) throws Invali
562582
channelWrite( draft.createHandshake( this.handshakerequest, role ) );
563583
}
564584

565-
private void channelWrite( ByteBuffer buf ) throws InterruptedException {
585+
private void channelWrite( ByteBuffer buf ) {
566586
if( DEBUG )
567587
System.out.println( "write(" + buf.remaining() + "): {" + ( buf.remaining() > 1000 ? "too big to display" : new String( buf.array() ) ) + "}" );
568588

569589
// add up the number of bytes to the total queued (synchronized over this object)
570590
bufferQueueTotalAmount.addAndGet( buf.remaining() );
571-
572-
if( !bufferQueue.offer( buf ) ) {
573-
try {
574-
flush();
575-
} catch ( IOException e ) {
576-
wsl.onWebsocketError( this, e );
577-
closeConnection( CloseFrame.ABNOROMAL_CLOSE, true );
578-
return;
579-
}
591+
try {
580592
bufferQueue.put( buf );
593+
} catch ( InterruptedException e ) {
594+
e.printStackTrace();
581595
}
582-
583596
wsl.onWriteDemand( this );
584597
}
585598

@@ -593,11 +606,13 @@ private void channelWriteDirect( ByteBuffer buf ) throws IOException {
593606
while ( buf.hasRemaining() )
594607
sockchannel.write( buf );
595608
}
609+
596610
private void writeDirect( List<ByteBuffer> bufs ) throws IOException {
597611
for( ByteBuffer b : bufs ) {
598612
channelWriteDirect( b );
599613
}
600614
}
615+
601616
private void deliverMessage( Framedata d ) throws InvalidDataException {
602617
if( d.getOpcode() == Opcode.TEXT ) {
603618
wsl.onWebsocketMessage( this, Charsetfunctions.stringUtf8( d.getPayloadData() ) );

src/org/java_websocket/WebSocketAdapter.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,7 @@ public void onWebsocketMessage( WebSocket conn, ByteBuffer blob ) {
8989
public void onWebsocketPing( WebSocket conn, Framedata f ) {
9090
FramedataImpl1 resp = new FramedataImpl1( f );
9191
resp.setOptcode( Opcode.PONG );
92-
try {
93-
conn.sendFrame( resp );
94-
} catch ( InterruptedException e ) {
95-
e.printStackTrace();
96-
}
92+
conn.sendFrame( resp );
9793
}
9894

9995
/**

src/org/java_websocket/WebSocketClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ protected final void interruptableRun() {
212212
}
213213
} catch ( IOException e ) {
214214
onError( e );
215-
conn.close( CloseFrame.ABNOROMAL_CLOSE );
215+
conn.close( CloseFrame.ABNORMAL_CLOSE );
216216
return;
217217
} catch ( RuntimeException e ) {
218218
// this catch case covers internal errors only and indicates a bug in this websocket implementation
@@ -258,7 +258,7 @@ public void flush() {
258258
conn.flush();
259259
} catch ( IOException e ) {
260260
onError( e );
261-
conn.closeConnection( CloseFrame.ABNOROMAL_CLOSE, true );
261+
conn.closeConnection( CloseFrame.ABNORMAL_CLOSE, true );
262262
return;
263263
}
264264
}

src/org/java_websocket/WebSocketServer.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public void run() {
248248
private void handleIOException( WebSocket conn, IOException ex ) {
249249
onWebsocketError( conn, ex );// conn may be null here
250250
if( conn != null ) {
251-
conn.close( CloseFrame.ABNOROMAL_CLOSE );
251+
conn.close( CloseFrame.ABNORMAL_CLOSE );
252252
}
253253
}
254254

@@ -325,12 +325,6 @@ public final void onWriteDemand( WebSocket conn ) {
325325
handleIOException( conn, e );
326326
}
327327
}
328-
/*synchronized ( write_demands ) {
329-
if( !write_demands.contains( conn ) ) {
330-
write_demands.add( conn );
331-
flusher.submit( new WebsocketWriteTask( conn ) );
332-
}
333-
}*/
334328
}
335329

336330
// ABTRACT METHODS /////////////////////////////////////////////////////////

src/org/java_websocket/framing/CloseFrame.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public interface CloseFrame extends Framedata {
4141
* connection was closed abnormally, e.g. without sending or
4242
* receiving a Close control frame.
4343
*/
44-
public static final int ABNOROMAL_CLOSE = 1006;
44+
public static final int ABNORMAL_CLOSE = 1006;
4545
/**
4646
* 1007 indicates that an endpoint is terminating the connection
4747
* because it has received data within a message that was not

0 commit comments

Comments
 (0)