1010import java .util .List ;
1111import java .util .concurrent .BlockingQueue ;
1212import java .util .concurrent .LinkedBlockingQueue ;
13+ import java .util .concurrent .atomic .AtomicLong ;
1314
1415import org .java_websocket .drafts .Draft ;
1516import org .java_websocket .drafts .Draft .CloseHandshakeType ;
@@ -90,12 +91,13 @@ public enum Role {
9091 * Queue of buffers that need to be sent to the client.
9192 */
9293 private BlockingQueue <ByteBuffer > bufferQueue ;
94+
9395 /**
9496 * The amount of bytes still in queue to be sent, at every given time.
9597 * It's updated at every send/sent operation.
9698 */
97- private Long bufferQueueTotalAmount = ( long ) 0 ;
98-
99+ private AtomicLong bufferQueueTotalAmount = new AtomicLong ( 0l ) ;
100+
99101 private Draft draft = null ;
100102
101103 private Role role ;
@@ -495,7 +497,7 @@ boolean hasBufferedData() {
495497 * @return Amount of Data still in Queue and not sent yet of the socket
496498 */
497499 long bufferedDataAmount () {
498- return bufferQueueTotalAmount ;
500+ return bufferQueueTotalAmount . get () ;
499501 }
500502
501503 /**
@@ -508,10 +510,9 @@ public void flush() throws IOException {
508510 if ( buffer .remaining () > 0 ) {
509511 continue ;
510512 } else {
511- synchronized ( bufferQueueTotalAmount ) {
512- // subtract this amount of data from the total queued (synchronized over this object)
513- bufferQueueTotalAmount -= buffer .limit ();
514- }
513+ // subtract this amount of data from the total queued (synchronized over this object)
514+ bufferQueueTotalAmount .addAndGet (-buffer .limit ());
515+
515516 this .bufferQueue .poll (); // Buffer finished. Remove it.
516517 buffer = this .bufferQueue .peek ();
517518 }
@@ -559,10 +560,10 @@ private void channelWrite( ByteBuffer buf ) throws InterruptedException {
559560 if ( DEBUG )
560561 System .out .println ( "write(" + buf .limit () + "): {" + ( buf .limit () > 1000 ? "too big to display" : new String ( buf .array () ) ) + "}" );
561562 buf .rewind (); // TODO rewinding should not be nessesary
562- synchronized ( bufferQueueTotalAmount ) {
563- // add up the number of bytes to the total queued (synchronized over this object)
564- bufferQueueTotalAmount += buf .limit ();
565- }
563+
564+ // add up the number of bytes to the total queued (synchronized over this object)
565+ bufferQueueTotalAmount . addAndGet ( buf .limit () );
566+
566567 if ( !bufferQueue .offer ( buf ) ) {
567568 try {
568569 flush ();
0 commit comments