@@ -176,6 +176,7 @@ public boolean read( ByteBuffer buf ) throws IOException {
176176 public void decode ( ByteBuffer socketBuffer ) throws IOException {
177177 if ( !socketBuffer .hasRemaining () )
178178 return ;
179+
179180 if ( DEBUG )
180181 System .out .println ( "process(" + socketBuffer .remaining () + "): {" + ( socketBuffer .remaining () > 1000 ? "too big to display" : new String ( socketBuffer .array (), socketBuffer .position (), socketBuffer .remaining () ) ) + "}" );
181182
@@ -214,8 +215,8 @@ private boolean decodeHandshake( ByteBuffer socketBufferNew ) throws IOException
214215 if ( draft == null ) {
215216 HandshakeState isflashedgecase = isFlashEdgeCase ( socketBuffer );
216217 if ( isflashedgecase == HandshakeState .MATCHED ) {
217- channelWriteDirect ( ByteBuffer .wrap ( Charsetfunctions .utf8Bytes ( wsl .getFlashPolicy ( this ) ) ) );
218- closeDirect ( CloseFrame .FLASHPOLICY , "" );
218+ write ( ByteBuffer .wrap ( Charsetfunctions .utf8Bytes ( wsl .getFlashPolicy ( this ) ) ) );
219+ close ( CloseFrame .FLASHPOLICY , "" );
219220 return false ;
220221 }
221222 }
@@ -243,7 +244,7 @@ private boolean decodeHandshake( ByteBuffer socketBufferNew ) throws IOException
243244 closeConnection ( e .getCloseCode (), e .getMessage (), false );
244245 return false ;
245246 }
246- writeDirect ( d .createHandshake ( d .postProcessHandshakeResponseAsServer ( handshake , response ), role ) );
247+ write ( d .createHandshake ( d .postProcessHandshakeResponseAsServer ( handshake , response ), role ) );
247248 draft = d ;
248249 open ( handshake );
249250 return true ;
@@ -382,33 +383,18 @@ private void decodeFrames( ByteBuffer socketBuffer ) {
382383 }
383384 }
384385
385- // PUBLIC INSTANCE METHODS /////////////////////////////////////////////////
386-
387- /**
388- * sends the closing handshake.
389- * may be send in response to an other handshake.
390- */
391386 @ Override
392387 public void close ( int code , String message ) {
393- try {
394- closeDirect ( code , message );
395- } catch ( IOException e ) {
396- closeConnection ( CloseFrame .ABNORMAL_CLOSE , true );
397- }
398- }
399-
400- public void closeDirect ( int code , String message ) throws IOException {
401388 if ( !closeHandshakeSent ) {
402389 if ( handshakeComplete ) {
403390 if ( code == CloseFrame .ABNORMAL_CLOSE ) {
404391 closeConnection ( code , true );
405392 closeHandshakeSent = true ;
406393 return ;
407394 }
408- flush ();
409395 if ( draft .getCloseHandshakeType () != CloseHandshakeType .NONE ) {
410396 try {
411- sendFrameDirect ( new CloseFrameBuilder ( code , message ) );
397+ sendFrame ( new CloseFrameBuilder ( code , message ) );
412398 } catch ( InvalidDataException e ) {
413399 wsl .onWebsocketError ( this , e );
414400 closeConnection ( CloseFrame .ABNORMAL_CLOSE , "generated frame is invalid" , false );
@@ -446,15 +432,12 @@ public void closeConnection( int code, String message, boolean remote ) {
446432 return ;
447433 }
448434 connectionClosed = true ;
449- try {
450- if ( key != null ) {
451- key .attach ( null );
452- key .cancel ();
453- }
454- sockchannel .close ();
455- } catch ( IOException e ) {
456- wsl .onWebsocketError ( this , e );
435+
436+ if ( key != null ) {
437+ key .attach ( null );
438+ key .cancel ();
457439 }
440+ // sockchannel.close();
458441 this .wsl .onWebsocketClose ( this , code , message , remote );
459442 if ( draft != null )
460443 draft .reset ();
@@ -522,13 +505,7 @@ private void send( Collection<Framedata> frames ) {
522505 public void sendFrame ( Framedata framedata ) {
523506 if ( DEBUG )
524507 System .out .println ( "send frame: " + framedata );
525- channelWrite ( draft .createBinaryFrame ( framedata ) );
526- }
527-
528- private void sendFrameDirect ( Framedata framedata ) throws IOException {
529- if ( DEBUG )
530- System .out .println ( "send frame: " + framedata );
531- channelWriteDirect ( draft .createBinaryFrame ( framedata ) );
508+ write ( draft .createBinaryFrame ( framedata ) );
532509 }
533510
534511 @ Override
@@ -548,24 +525,19 @@ public long bufferedDataAmount() {
548525
549526 /**
550527 * Empty the internal buffer, sending all the pending data before continuing.
551- */
552- public synchronized void flush () throws IOException {
553- ByteBuffer buffer = this .outQueue .peek ();
554- while ( buffer != null ) {
555- int written = sockchannel .write ( buffer );
556- if ( buffer .remaining () > 0 ) {
557- continue ;
558- } else {
559- // subtract this amount of data from the total queued (synchronized over this object)
560- bufferQueueTotalAmount .addAndGet ( -written );
528+ *
529+ * @deprecated
530+ **/
561531
562- this .outQueue .poll (); // Buffer finished. Remove it.
563- buffer = this .outQueue .peek ();
532+ public synchronized void flush () throws IOException {
533+ // this method is garbage and must be removed as soon as possible
534+ while ( !batch () )
535+ try {
536+ Thread .sleep ( 10 );
537+ } catch ( InterruptedException e ) {
564538 }
565- }
566539 }
567540
568- @ Override
569541 public boolean batch () throws IOException {
570542 ByteBuffer buffer = this .outQueue .peek ();
571543 while ( buffer != null ) {
@@ -580,6 +552,11 @@ public boolean batch() throws IOException {
580552 buffer = this .outQueue .peek ();
581553 }
582554 }
555+ if ( connectionClosed ) {
556+ synchronized ( this ) {
557+ sockchannel .close ();
558+ }
559+ }
583560 return true ;
584561 }
585562
@@ -618,10 +595,10 @@ public void startHandshake( ClientHandshakeBuilder handshakedata ) throws Invali
618595 }
619596
620597 // Send
621- channelWrite ( draft .createHandshake ( this .handshakerequest , role ) );
598+ write ( draft .createHandshake ( this .handshakerequest , role ) );
622599 }
623600
624- private void channelWrite ( ByteBuffer buf ) {
601+ private void write ( ByteBuffer buf ) {
625602 if ( DEBUG )
626603 System .out .println ( "write(" + buf .remaining () + "): {" + ( buf .remaining () > 1000 ? "too big to display" : new String ( buf .array () ) ) + "}" );
627604
@@ -635,20 +612,9 @@ private void channelWrite( ByteBuffer buf ) {
635612 wsl .onWriteDemand ( this );
636613 }
637614
638- private void channelWrite ( List <ByteBuffer > bufs ) throws InterruptedException {
639- for ( ByteBuffer b : bufs ) {
640- channelWrite ( b );
641- }
642- }
643-
644- private void channelWriteDirect ( ByteBuffer buf ) throws IOException {
645- while ( buf .hasRemaining () )
646- sockchannel .write ( buf );
647- }
648-
649- private void writeDirect ( List <ByteBuffer > bufs ) throws IOException {
615+ private void write ( List <ByteBuffer > bufs ) {
650616 for ( ByteBuffer b : bufs ) {
651- channelWriteDirect ( b );
617+ write ( b );
652618 }
653619 }
654620
0 commit comments