1010import java .util .List ;
1111import java .util .concurrent .BlockingQueue ;
1212import java .util .concurrent .LinkedBlockingQueue ;
13+ import java .util .concurrent .atomic .AtomicBoolean ;
1314import java .util .concurrent .atomic .AtomicLong ;
1415
1516import org .java_websocket .drafts .Draft ;
@@ -112,6 +113,8 @@ public enum Role {
112113
113114 private SocketChannel sockchannel ;
114115
116+ public AtomicBoolean hasWrite = new AtomicBoolean ( false );
117+
115118 // CONSTRUCTOR /////////////////////////////////////////////////////////////
116119 /**
117120 * Used in {@link WebSocketServer} and {@link WebSocketClient}.
@@ -144,7 +147,7 @@ public WebSocket( WebSocketListener listener , List<Draft> drafts , SocketChanne
144147
145148 private void init ( WebSocketListener listener , Draft draft , SocketChannel socketchannel ) {
146149 this .sockchannel = socketchannel ;
147- this .bufferQueue = new LinkedBlockingQueue <ByteBuffer >( 3 );
150+ this .bufferQueue = new LinkedBlockingQueue <ByteBuffer >();
148151 this .socketBuffer = ByteBuffer .allocate ( BUFFERSIZE );
149152 socketBuffer .flip ();
150153 this .wsl = listener ;
@@ -166,16 +169,16 @@ private void init( WebSocketListener listener, Draft draft, SocketChannel socket
166169 socketBuffer .limit ( socketBuffer .capacity () );
167170 if ( sockchannel .read ( socketBuffer ) == -1 ) {
168171 if ( draft == null ) {
169- closeConnection ( CloseFrame .ABNOROMAL_CLOSE , true );
172+ closeConnection ( CloseFrame .ABNORMAL_CLOSE , true );
170173 } else if ( draft .getCloseHandshakeType () == CloseHandshakeType .NONE ) {
171174 closeConnection ( CloseFrame .NORMAL , true );
172175 } else if ( draft .getCloseHandshakeType () == CloseHandshakeType .ONEWAY ) {
173176 if ( role == Role .SERVER )
174- closeConnection ( CloseFrame .ABNOROMAL_CLOSE , true );
177+ closeConnection ( CloseFrame .ABNORMAL_CLOSE , true );
175178 else
176179 closeConnection ( CloseFrame .NORMAL , true );
177180 } else {
178- closeConnection ( CloseFrame .ABNOROMAL_CLOSE , true );
181+ closeConnection ( CloseFrame .ABNORMAL_CLOSE , true );
179182 }
180183
181184 }
@@ -368,14 +371,14 @@ public void close( int code, String message ) {
368371 try {
369372 closeDirect ( code , message );
370373 } catch ( IOException e ) {
371- closeConnection ( CloseFrame .ABNOROMAL_CLOSE , true );
374+ closeConnection ( CloseFrame .ABNORMAL_CLOSE , true );
372375 }
373376 }
374377
375378 public void closeDirect ( int code , String message ) throws IOException {
376379 if ( !closeHandshakeSent ) {
377380 if ( handshakeComplete ) {
378- if ( code == CloseFrame .ABNOROMAL_CLOSE ) {
381+ if ( code == CloseFrame .ABNORMAL_CLOSE ) {
379382 closeConnection ( code , true );
380383 closeHandshakeSent = true ;
381384 return ;
@@ -386,7 +389,7 @@ public void closeDirect( int code, String message ) throws IOException {
386389 sendFrameDirect ( new CloseFrameBuilder ( code , message ) );
387390 } catch ( InvalidDataException e ) {
388391 wsl .onWebsocketError ( this , e );
389- closeConnection ( CloseFrame .ABNOROMAL_CLOSE , "generated frame is invalid" , false );
392+ closeConnection ( CloseFrame .ABNORMAL_CLOSE , "generated frame is invalid" , false );
390393 }
391394 } else {
392395 closeConnection ( code , false );
@@ -450,7 +453,7 @@ public void close( InvalidDataException e ) {
450453 * @throws InterruptedException
451454 * @throws NotYetConnectedException
452455 */
453- public void send ( String text ) throws NotYetConnectedException , InterruptedException {
456+ public void send ( String text ) throws NotYetConnectedException {
454457 if ( text == null )
455458 throw new IllegalArgumentException ( "Cannot send 'null' data to a WebSocket." );
456459 send ( draft .createFrames ( text , role == Role .CLIENT ) );
@@ -473,15 +476,15 @@ public void send( byte[] bytes ) throws IllegalArgumentException , NotYetConnect
473476 send ( ByteBuffer .wrap ( bytes ) );
474477 }
475478
476- private void send ( Collection <Framedata > frames ) throws InterruptedException {
479+ private void send ( Collection <Framedata > frames ) {
477480 if ( !this .handshakeComplete )
478481 throw new NotYetConnectedException ();
479482 for ( Framedata f : frames ) {
480483 sendFrame ( f );
481484 }
482485 }
483486
484- public void sendFrame ( Framedata framedata ) throws InterruptedException {
487+ public void sendFrame ( Framedata framedata ) {
485488 if ( DEBUG )
486489 System .out .println ( "send frame: " + framedata );
487490 channelWrite ( draft .createBinaryFrame ( framedata ) );
@@ -493,7 +496,7 @@ private void sendFrameDirect( Framedata framedata ) throws IOException {
493496 channelWriteDirect ( draft .createBinaryFrame ( framedata ) );
494497 }
495498
496- boolean hasBufferedData () {
499+ public boolean hasBufferedData () {
497500 return !this .bufferQueue .isEmpty ();
498501 }
499502
@@ -502,14 +505,14 @@ boolean hasBufferedData() {
502505 *
503506 * @return Amount of Data still in Queue and not sent yet of the socket
504507 */
505- long bufferedDataAmount () {
508+ public long bufferedDataAmount () {
506509 return bufferQueueTotalAmount .get ();
507510 }
508511
509512 /**
510513 * Empty the internal buffer, sending all the pending data before continuing.
511514 */
512- public void flush () throws IOException {
515+ public synchronized void flush () throws IOException {
513516 ByteBuffer buffer = this .bufferQueue .peek ();
514517 while ( buffer != null ) {
515518 int written = sockchannel .write ( buffer );
@@ -525,6 +528,23 @@ public void flush() throws IOException {
525528 }
526529 }
527530
531+ public boolean batch () throws IOException {
532+ ByteBuffer buffer = this .bufferQueue .peek ();
533+ while ( buffer != null ) {
534+ int written = sockchannel .write ( buffer );
535+ if ( buffer .remaining () > 0 ) {
536+ return false ;
537+ } else {
538+ // subtract this amount of data from the total queued (synchronized over this object)
539+ bufferQueueTotalAmount .addAndGet ( -written );
540+
541+ this .bufferQueue .poll (); // Buffer finished. Remove it.
542+ buffer = this .bufferQueue .peek ();
543+ }
544+ }
545+ return true ;
546+ }
547+
528548 public HandshakeState isFlashEdgeCase ( ByteBuffer request ) {
529549 if ( request .limit () > Draft .FLASH_POLICY_REQUEST .length )
530550 return HandshakeState .NOT_MATCHED ;
@@ -562,24 +582,17 @@ public void startHandshake( ClientHandshakeBuilder handshakedata ) throws Invali
562582 channelWrite ( draft .createHandshake ( this .handshakerequest , role ) );
563583 }
564584
565- private void channelWrite ( ByteBuffer buf ) throws InterruptedException {
585+ private void channelWrite ( ByteBuffer buf ) {
566586 if ( DEBUG )
567587 System .out .println ( "write(" + buf .remaining () + "): {" + ( buf .remaining () > 1000 ? "too big to display" : new String ( buf .array () ) ) + "}" );
568588
569589 // add up the number of bytes to the total queued (synchronized over this object)
570590 bufferQueueTotalAmount .addAndGet ( buf .remaining () );
571-
572- if ( !bufferQueue .offer ( buf ) ) {
573- try {
574- flush ();
575- } catch ( IOException e ) {
576- wsl .onWebsocketError ( this , e );
577- closeConnection ( CloseFrame .ABNOROMAL_CLOSE , true );
578- return ;
579- }
591+ try {
580592 bufferQueue .put ( buf );
593+ } catch ( InterruptedException e ) {
594+ e .printStackTrace ();
581595 }
582-
583596 wsl .onWriteDemand ( this );
584597 }
585598
@@ -593,11 +606,13 @@ private void channelWriteDirect( ByteBuffer buf ) throws IOException {
593606 while ( buf .hasRemaining () )
594607 sockchannel .write ( buf );
595608 }
609+
596610 private void writeDirect ( List <ByteBuffer > bufs ) throws IOException {
597611 for ( ByteBuffer b : bufs ) {
598612 channelWriteDirect ( b );
599613 }
600614 }
615+
601616 private void deliverMessage ( Framedata d ) throws InvalidDataException {
602617 if ( d .getOpcode () == Opcode .TEXT ) {
603618 wsl .onWebsocketMessage ( this , Charsetfunctions .stringUtf8 ( d .getPayloadData () ) );
0 commit comments