@@ -48,8 +48,10 @@ public class HTTP2Stream {
4848
4949 private volatile Thread thread ;
5050 private volatile boolean streamOpen = true ;
51+ // halfClosed is set when a END_STREAM is received. The streams are bidirectional.
5152 private volatile boolean halfClosed = false ;
52- private volatile boolean streamOutClosed = false ;
53+ // streamOutputClosed is when the handler, either via close(), or sendResponseHeaders(code,-1) closes the output
54+ private volatile boolean streamOutputClosed = false ;
5355 private volatile AtomicBoolean handlingRequest = new AtomicBoolean (false );
5456
5557 private long dataInSize = 0 ;
@@ -96,7 +98,7 @@ public String toString() {
9698 }
9799
98100 public void debug () {
99- logger .log (Level .INFO ,connection .toString ()+", stream " +streamId +" open " +streamOpen +" half closed " +halfClosed +", thread " +thread );
101+ logger .log (Level .INFO ,connection .toString ()+", stream " +streamId +" open " +streamOpen +" half closed " +halfClosed +", streamOutputClosed " + streamOutputClosed + ", thread " +thread );
100102 logger .log (Level .INFO ,connection .toString ()+", stream " +streamId +" data in size " +dataInSize +" expected " +expectedSize ());
101103 logger .log (Level .INFO ,"" +Arrays .toString (thread .getStackTrace ()));
102104 }
@@ -117,11 +119,11 @@ private long expectedSize() {
117119
118120 public void close () {
119121 streamOpen = false ;
120- halfClosed = true ;
121122
122123 if (connection .http2Streams .put (streamId ,null )==null ) {
123124 return ;
124125 }
126+
125127 logger .log (Level .TRACE ,() -> "closing stream " +streamId );
126128
127129 try {
@@ -146,8 +148,8 @@ public void processFrame(BaseFrame frame) throws HTTP2Exception, IOException {
146148 if (halfClosed ) {
147149 throw new HTTP2Exception (HTTP2ErrorCode .STREAM_CLOSED );
148150 }
149-
150- performRequest (frame . getHeader (). getFlags (). contains ( FrameFlag . END_STREAM ) );
151+ halfClosed = frame . getHeader (). getFlags (). contains ( FrameFlag . END_STREAM );
152+ performRequest ();
151153 break ;
152154 case DATA :
153155 DataFrame dataFrame = (DataFrame ) frame ;
@@ -167,8 +169,8 @@ public void processFrame(BaseFrame frame) throws HTTP2Exception, IOException {
167169 close ();
168170 break ;
169171 }
170- dataIn .close ();
171172 halfClosed = true ;
173+ dataIn .wakeupReader ();
172174 }
173175 break ;
174176 case PRIORITY :
@@ -179,6 +181,7 @@ public void processFrame(BaseFrame frame) throws HTTP2Exception, IOException {
179181 case RST_STREAM :
180182 ResetStreamFrame resetFrame = (ResetStreamFrame ) frame ;
181183 logger .log (Level .DEBUG ,"received reset stream " +resetFrame .errorCode +", on stream " +streamId );
184+ halfClosed = true ;
182185 close ();
183186 break ;
184187 case WINDOW_UPDATE :
@@ -194,7 +197,7 @@ public void processFrame(BaseFrame frame) throws HTTP2Exception, IOException {
194197 }
195198 }
196199
197- private void performRequest (boolean halfClosed ) throws IOException , HTTP2Exception {
200+ private void performRequest () throws IOException , HTTP2Exception {
198201 if (!handlingRequest .compareAndSet (false , true )) {
199202 throw new HTTP2Exception (HTTP2ErrorCode .PROTOCOL_ERROR ,"already received headers for stream " +streamId );
200203 }
@@ -203,11 +206,6 @@ private void performRequest(boolean halfClosed) throws IOException, HTTP2Excepti
203206 connection .stats .activeStreams .incrementAndGet ();
204207
205208 InputStream in = halfClosed ? InputStream .nullInputStream () : dataIn ;
206-
207- if (halfClosed ) {
208- this .halfClosed = true ;
209- dataIn .close ();
210- }
211209
212210 handler .getExecutor ().execute (() -> {
213211 thread = Thread .currentThread ();
@@ -231,7 +229,7 @@ public void writeResponseHeaders(boolean closeStream) throws IOException {
231229 try {
232230 HPackContext .writeHeaderFrame (responseHeaders , connection .outputStream , streamId , closeStream );
233231 if (closeStream ) {
234- streamOutClosed = true ;
232+ streamOutputClosed = true ;
235233 }
236234 } finally {
237235 connection .unlock ();
@@ -279,7 +277,7 @@ public void write(byte[] b, int off, int len) throws IOException {
279277 LockSupport .parkNanos (TimeUnit .MILLISECONDS .toNanos (1 ));
280278 }
281279 writeResponseHeaders (false );
282- if (streamOutClosed ) {
280+ if (streamOutputClosed ) {
283281 throw new IOException ("output stream was closed during headers send" );
284282 }
285283 while (len >0 ) {
@@ -340,7 +338,7 @@ public void close() throws IOException {
340338 connection .lock ();
341339 boolean lastRequest = connection .requestsInProgress .decrementAndGet () == 0 ;
342340 try {
343- if (!streamOutClosed ) {
341+ if (!streamOutputClosed ) {
344342 FrameHeader .writeTo (connection .outputStream , 0 , FrameType .DATA , END_STREAM , streamId );
345343 connection .stats .framesSent .incrementAndGet ();
346344 }
@@ -351,10 +349,7 @@ public void close() throws IOException {
351349 } finally {
352350 connection .unlock ();
353351 }
354- // same as http1, read all incoming frames when closing the output stream.
355- // TODO review this, as the http2 stream is bidirectional and the spec may allow the server to continue to process inbound frames
356- // after closing the outbound stream - similar to a http2 client
357- dataIn .readAllBytes ();
352+ dataIn .close ();
358353 } finally {
359354 connection .stats .activeStreams .decrementAndGet ();
360355 closed =true ;
@@ -369,21 +364,26 @@ private class DataIn extends InputStream {
369364 private volatile Thread reader ;
370365 /** offset into the top of the queue array */
371366 private int offset = 0 ;
372- private volatile boolean closed ;
373367
374368 public DataIn () {
375369 }
376370
377371 void enqueue (byte [] data ) {
378- if (closed ) return ;
379372 queue .add (data );
380373 LockSupport .unpark (reader );
381374 }
375+
376+ void wakeupReader () {
377+ LockSupport .unpark (reader );
378+ }
382379
383380 @ Override
384381 public void close () throws IOException {
385- closed =true ;
386- LockSupport .unpark (reader );
382+ if (Thread .currentThread ()==reader || reader == null ) {
383+ readAllBytes ();
384+ } else {
385+ LockSupport .unpark (reader );
386+ }
387387 }
388388
389389 private final byte [] single = new byte [1 ];
@@ -405,7 +405,7 @@ public int read(byte[] b, int off, int len) throws IOException {
405405 if (read >0 ) {
406406 return read ;
407407 }
408- if (closed ) return -1 ;
408+ if (halfClosed ) return -1 ;
409409 LockSupport .park ();
410410 if (Thread .interrupted ()) {
411411 throw new IOException ("interrupted" );
0 commit comments