22
33import java .io .IOException ;
44import java .net .InetSocketAddress ;
5+ import java .net .Socket ;
56import java .nio .ByteBuffer ;
67import java .nio .channels .NotYetConnectedException ;
78import java .nio .channels .SelectionKey ;
8- import java .nio .channels .SocketChannel ;
99import java .util .ArrayList ;
1010import java .util .Collection ;
1111import java .util .List ;
1212import java .util .concurrent .BlockingQueue ;
1313import java .util .concurrent .LinkedBlockingQueue ;
14- import java .util .concurrent .atomic .AtomicLong ;
1514
1615import org .java_websocket .client .WebSocketClient ;
1716import org .java_websocket .drafts .Draft ;
4746 * by your code. However, instances are exposed in <tt>WebSocketServer</tt> through the <i>onClientOpen</i>, <i>onClientClose</i>,
4847 * <i>onClientMessage</i> callbacks.
4948 *
50- * @author Nathan Rajlich
5149 */
5250public final class WebSocketImpl extends WebSocket {
5351
@@ -72,13 +70,7 @@ public final class WebSocketImpl extends WebSocket {
7270 /**
7371 * Queue of buffers that need to be sent to the client.
7472 */
75- private BlockingQueue <ByteBuffer > outQueue ;
76-
77- /**
78- * The amount of bytes still in queue to be sent, at every given time.
79- * It's updated at every send/sent operation.
80- */
81- private AtomicLong bufferQueueTotalAmount = new AtomicLong ( 0l );
73+ public BlockingQueue <ByteBuffer > outQueue ;
8274
8375 private Draft draft = null ;
8476
@@ -92,14 +84,14 @@ public final class WebSocketImpl extends WebSocket {
9284
9385 private ByteBuffer tmpHandshakeBytes ;
9486
95- public SocketChannel sockchannel ;
96-
9787 public BlockingQueue <ByteBuffer > in ;
9888
9989 public volatile WebSocketWorker worker ;
10090
10191 public SelectionKey key ;
10292
93+ public final Socket socket ;
94+
10395 // CONSTRUCTOR /////////////////////////////////////////////////////////////
10496 /**
10597 * Used in {@link WebSocketServer} and {@link WebSocketClient}.
@@ -112,12 +104,10 @@ public final class WebSocketImpl extends WebSocket {
112104 * The {@link WebSocketListener} to notify of events when
113105 * they occur.
114106 */
115- public WebSocketImpl ( WebSocketListener listener , Draft draft , SocketChannel socketchannel ) {
116- init ( listener , draft , socketchannel );
117- }
118107
119- public WebSocketImpl ( WebSocketListener listener , List <Draft > drafts , SocketChannel socketchannel ) {
120- init ( listener , null , socketchannel );
108+
109+ public WebSocketImpl ( WebSocketListener listener , List <Draft > drafts , Socket sock ) {
110+ this ( listener , (Draft ) null , sock );
121111 this .role = Role .SERVER ;
122112 if ( known_drafts == null || known_drafts .isEmpty () ) {
123113 known_drafts = new ArrayList <Draft >( 1 );
@@ -130,21 +120,21 @@ public WebSocketImpl( WebSocketListener listener , List<Draft> drafts , SocketCh
130120 }
131121 }
132122
133- private void init ( WebSocketListener listener , Draft draft , SocketChannel socketchannel ) {
134- this .sockchannel = socketchannel ;
123+ public WebSocketImpl ( WebSocketListener listener , Draft draft , Socket sock ) {
135124 this .outQueue = new LinkedBlockingQueue <ByteBuffer >();
136125 in = new LinkedBlockingQueue <ByteBuffer >();
137126 this .wsl = listener ;
138127 this .role = Role .CLIENT ;
139128 this .draft = draft ;
129+ this .socket = sock ;
140130 }
141131
142132 /**
143133 * Returns whether the buffer has been filled.
144134 *
145135 * @throws IOException
146136 **/
147- public boolean read ( ByteBuffer buf ) throws IOException {
137+ /* public boolean read( final ByteBuffer buf ) throws IOException {
148138 buf.clear();
149139 int read = sockchannel.read( buf );
150140 buf.flip();
@@ -164,7 +154,7 @@ public boolean read( ByteBuffer buf ) throws IOException {
164154 return false;
165155 }
166156 return read != 0;
167- }
157+ }*/
168158 /**
169159 * Should be called when a Selector has a key that is writable for this
170160 * WebSocketImpl's SocketChannel connection.
@@ -426,8 +416,8 @@ public void close( int code, String message ) {
426416 * false means this endpoint decided to send the given code,<br>
427417 * <code>remote</code> may also be true if this endpoint started the closing handshake since the other endpoint may not simply echo the <code>code</code> but close the connection the same time this endpoint does do but with an other <code>code</code>. <br>
428418 **/
429- @ Override
430- public void closeConnection ( int code , String message , boolean remote ) {
419+
420+ protected synchronized void closeConnection ( int code , String message , boolean remote ) {
431421 if ( connectionClosed ) {
432422 return ;
433423 }
@@ -445,11 +435,29 @@ public void closeConnection( int code, String message, boolean remote ) {
445435 handshakerequest = null ;
446436 }
447437
448- @ Override
449- public void closeConnection ( int code , boolean remote ) {
438+ protected void closeConnection ( int code , boolean remote ) {
450439 closeConnection ( code , "" , remote );
451440 }
452441
442+ public void eot ( Exception e ) {
443+ if ( e == null || e instanceof IOException ) {
444+ if ( draft == null ) {
445+ closeConnection ( CloseFrame .ABNORMAL_CLOSE , true );
446+ } else if ( draft .getCloseHandshakeType () == CloseHandshakeType .NONE ) {
447+ closeConnection ( CloseFrame .NORMAL , true );
448+ } else if ( draft .getCloseHandshakeType () == CloseHandshakeType .ONEWAY ) {
449+ if ( role == Role .SERVER )
450+ closeConnection ( CloseFrame .ABNORMAL_CLOSE , true );
451+ else
452+ closeConnection ( CloseFrame .NORMAL , true );
453+ } else {
454+ closeConnection ( CloseFrame .ABNORMAL_CLOSE , true );
455+ }
456+ } else {
457+ closeConnection ( CloseFrame .BUGGYCLOSE , e .toString (), false );
458+ }
459+ }
460+
453461 @ Override
454462 public void close ( int code ) {
455463 close ( code , "" );
@@ -513,32 +521,7 @@ public boolean hasBufferedData() {
513521 return !this .outQueue .isEmpty ();
514522 }
515523
516- /**
517- * The amount of data in Queue, ready to be sent.
518- *
519- * @return Amount of Data still in Queue and not sent yet of the socket
520- */
521- @ Override
522- public long bufferedDataAmount () {
523- return bufferQueueTotalAmount .get ();
524- }
525-
526- /**
527- * Empty the internal buffer, sending all the pending data before continuing.
528- *
529- * @deprecated
530- **/
531-
532- public synchronized void flush () throws IOException {
533- // this method is garbage and must be removed as soon as possible
534- while ( !batch () )
535- try {
536- Thread .sleep ( 10 );
537- } catch ( InterruptedException e ) {
538- }
539- }
540-
541- public boolean batch () throws IOException {
524+ /*public boolean batch() throws IOException {
542525 ByteBuffer buffer = this.outQueue.peek();
543526 while ( buffer != null ) {
544527 int written = sockchannel.write( buffer );
@@ -558,9 +541,9 @@ public boolean batch() throws IOException {
558541 }
559542 }
560543 return true;
561- }
544+ }*/
562545
563- public HandshakeState isFlashEdgeCase ( ByteBuffer request ) throws IncompleteHandshakeException {
546+ private HandshakeState isFlashEdgeCase ( ByteBuffer request ) throws IncompleteHandshakeException {
564547 request .mark ();
565548 if ( request .limit () > Draft .FLASH_POLICY_REQUEST .length ) {
566549 return HandshakeState .NOT_MATCHED ;
@@ -601,9 +584,6 @@ public void startHandshake( ClientHandshakeBuilder handshakedata ) throws Invali
601584 private void write ( ByteBuffer buf ) {
602585 if ( DEBUG )
603586 System .out .println ( "write(" + buf .remaining () + "): {" + ( buf .remaining () > 1000 ? "too big to display" : new String ( buf .array () ) ) + "}" );
604-
605- // add up the number of bytes to the total queued (synchronized over this object)
606- bufferQueueTotalAmount .addAndGet ( buf .remaining () );
607587 try {
608588 outQueue .put ( buf );
609589 } catch ( InterruptedException e ) {
@@ -637,16 +617,6 @@ private void open( Handshakedata d ) throws IOException {
637617 wsl .onWebsocketOpen ( this , d );
638618 }
639619
640- @ Override
641- public InetSocketAddress getRemoteSocketAddress () {
642- return (InetSocketAddress ) sockchannel .socket ().getRemoteSocketAddress ();
643- }
644-
645- @ Override
646- public InetSocketAddress getLocalSocketAddress () {
647- return (InetSocketAddress ) sockchannel .socket ().getLocalSocketAddress ();
648- }
649-
650620 @ Override
651621 public boolean isConnecting () {
652622 return ( !connectionClosed && !closeHandshakeSent && !handshakeComplete );
@@ -699,4 +669,19 @@ public String toString() {
699669 return super .toString (); // its nice to be able to set breakpoints here
700670 }
701671
672+ @ Override
673+ public InetSocketAddress getRemoteSocketAddress () {
674+ return (InetSocketAddress ) socket .getRemoteSocketAddress ();
675+ }
676+
677+ @ Override
678+ public InetSocketAddress getLocalSocketAddress () {
679+ return (InetSocketAddress ) socket .getLocalSocketAddress ();
680+ }
681+
682+ @ Override
683+ public Draft getDraft () {
684+ return draft ;
685+ }
686+
702687}
0 commit comments