1010import java .util .List ;
1111import java .util .concurrent .BlockingQueue ;
1212import java .util .concurrent .LinkedBlockingQueue ;
13+ import java .util .concurrent .atomic .AtomicLong ;
1314
1415import org .java_websocket .drafts .Draft ;
1516import org .java_websocket .drafts .Draft .CloseHandshakeType ;
@@ -90,12 +91,13 @@ public enum Role {
9091 * Queue of buffers that need to be sent to the client.
9192 */
9293 private BlockingQueue <ByteBuffer > bufferQueue ;
94+
9395 /**
9496 * The amount of bytes still in queue to be sent, at every given time.
9597 * It's updated at every send/sent operation.
9698 */
97- private Long bufferQueueTotalAmount = ( long ) 0 ;
98-
99+ private AtomicLong bufferQueueTotalAmount = new AtomicLong ( 0l ) ;
100+
99101 private Draft draft = null ;
100102
101103 private Role role ;
@@ -499,7 +501,7 @@ boolean hasBufferedData() {
499501 * @return Amount of Data still in Queue and not sent yet of the socket
500502 */
501503 long bufferedDataAmount () {
502- return bufferQueueTotalAmount ;
504+ return bufferQueueTotalAmount . get () ;
503505 }
504506
505507 /**
@@ -512,10 +514,9 @@ public void flush() throws IOException {
512514 if ( buffer .remaining () > 0 ) {
513515 continue ;
514516 } else {
515- synchronized ( bufferQueueTotalAmount ) {
516- // subtract this amount of data from the total queued (synchronized over this object)
517- bufferQueueTotalAmount -= written ;
518- }
517+ // subtract this amount of data from the total queued (synchronized over this object)
518+ bufferQueueTotalAmount .addAndGet ( -written );
519+
519520 this .bufferQueue .poll (); // Buffer finished. Remove it.
520521 buffer = this .bufferQueue .peek ();
521522 }
@@ -562,10 +563,10 @@ public void startHandshake( ClientHandshakeBuilder handshakedata ) throws Invali
562563 private void channelWrite ( ByteBuffer buf ) throws InterruptedException {
563564 if ( DEBUG )
564565 System .out .println ( "write(" + buf .remaining () + "): {" + ( buf .remaining () > 1000 ? "too big to display" : new String ( buf .array () ) ) + "}" );
565- synchronized ( bufferQueueTotalAmount ) {
566- // add up the number of bytes to the total queued (synchronized over this object)
567- bufferQueueTotalAmount += buf .remaining ();
568- }
566+
567+ // add up the number of bytes to the total queued (synchronized over this object)
568+ bufferQueueTotalAmount . addAndGet ( buf .remaining () );
569+
569570 if ( !bufferQueue .offer ( buf ) ) {
570571 try {
571572 flush ();
0 commit comments