Skip to content

Commit fc9c671

Browse files
committed
http2: input stream should be closed if the output stream is closed
1 parent 4f23456 commit fc9c671

File tree

1 file changed

+24
-24
lines changed

1 file changed

+24
-24
lines changed

src/main/java/robaho/net/httpserver/http2/HTTP2Stream.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ public class HTTP2Stream {
4848

4949
private volatile Thread thread;
5050
private volatile boolean streamOpen = true;
51+
// halfClosed is set when a END_STREAM is received. The streams are bidirectional.
5152
private volatile boolean halfClosed = false;
52-
private volatile boolean streamOutClosed = false;
53+
// streamOutputClosed is when the handler, either via close(), or sendResponseHeaders(code,-1) closes the output
54+
private volatile boolean streamOutputClosed = false;
5355
private volatile AtomicBoolean handlingRequest = new AtomicBoolean(false);
5456

5557
private long dataInSize = 0;
@@ -96,7 +98,7 @@ public String toString() {
9698
}
9799

98100
public void debug() {
99-
logger.log(Level.INFO,connection.toString()+", stream "+streamId+" open "+streamOpen+" half closed "+halfClosed+", thread "+thread);
101+
logger.log(Level.INFO,connection.toString()+", stream "+streamId+" open "+streamOpen+" half closed "+halfClosed+", streamOutputClosed "+streamOutputClosed+", thread "+thread);
100102
logger.log(Level.INFO,connection.toString()+", stream "+streamId+" data in size "+dataInSize+" expected "+expectedSize());
101103
logger.log(Level.INFO,""+Arrays.toString(thread.getStackTrace()));
102104
}
@@ -117,11 +119,11 @@ private long expectedSize() {
117119

118120
public void close() {
119121
streamOpen = false;
120-
halfClosed = true;
121122

122123
if(connection.http2Streams.put(streamId,null)==null) {
123124
return;
124125
}
126+
125127
logger.log(Level.TRACE,() -> "closing stream "+streamId);
126128

127129
try {
@@ -146,8 +148,8 @@ public void processFrame(BaseFrame frame) throws HTTP2Exception, IOException {
146148
if(halfClosed) {
147149
throw new HTTP2Exception(HTTP2ErrorCode.STREAM_CLOSED);
148150
}
149-
150-
performRequest(frame.getHeader().getFlags().contains(FrameFlag.END_STREAM));
151+
halfClosed = frame.getHeader().getFlags().contains(FrameFlag.END_STREAM);
152+
performRequest();
151153
break;
152154
case DATA:
153155
DataFrame dataFrame = (DataFrame) frame;
@@ -167,8 +169,8 @@ public void processFrame(BaseFrame frame) throws HTTP2Exception, IOException {
167169
close();
168170
break;
169171
}
170-
dataIn.close();
171172
halfClosed = true;
173+
dataIn.wakeupReader();
172174
}
173175
break;
174176
case PRIORITY:
@@ -179,6 +181,7 @@ public void processFrame(BaseFrame frame) throws HTTP2Exception, IOException {
179181
case RST_STREAM:
180182
ResetStreamFrame resetFrame = (ResetStreamFrame) frame;
181183
logger.log(Level.DEBUG,"received reset stream "+resetFrame.errorCode+", on stream "+streamId);
184+
halfClosed = true;
182185
close();
183186
break;
184187
case WINDOW_UPDATE:
@@ -194,7 +197,7 @@ public void processFrame(BaseFrame frame) throws HTTP2Exception, IOException {
194197
}
195198
}
196199

197-
private void performRequest(boolean halfClosed) throws IOException, HTTP2Exception {
200+
private void performRequest() throws IOException, HTTP2Exception {
198201
if(!handlingRequest.compareAndSet(false, true)) {
199202
throw new HTTP2Exception(HTTP2ErrorCode.PROTOCOL_ERROR,"already received headers for stream "+streamId);
200203
}
@@ -203,11 +206,6 @@ private void performRequest(boolean halfClosed) throws IOException, HTTP2Excepti
203206
connection.stats.activeStreams.incrementAndGet();
204207

205208
InputStream in = halfClosed ? InputStream.nullInputStream() : dataIn;
206-
207-
if(halfClosed) {
208-
this.halfClosed = true;
209-
dataIn.close();
210-
}
211209

212210
handler.getExecutor().execute(() -> {
213211
thread = Thread.currentThread();
@@ -231,7 +229,7 @@ public void writeResponseHeaders(boolean closeStream) throws IOException {
231229
try {
232230
HPackContext.writeHeaderFrame(responseHeaders, connection.outputStream, streamId, closeStream);
233231
if (closeStream) {
234-
streamOutClosed = true;
232+
streamOutputClosed = true;
235233
}
236234
} finally {
237235
connection.unlock();
@@ -279,7 +277,7 @@ public void write(byte[] b, int off, int len) throws IOException {
279277
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
280278
}
281279
writeResponseHeaders(false);
282-
if(streamOutClosed) {
280+
if(streamOutputClosed) {
283281
throw new IOException("output stream was closed during headers send");
284282
}
285283
while(len>0) {
@@ -340,7 +338,7 @@ public void close() throws IOException {
340338
connection.lock();
341339
boolean lastRequest = connection.requestsInProgress.decrementAndGet() == 0;
342340
try {
343-
if(!streamOutClosed) {
341+
if(!streamOutputClosed) {
344342
FrameHeader.writeTo(connection.outputStream, 0, FrameType.DATA, END_STREAM, streamId);
345343
connection.stats.framesSent.incrementAndGet();
346344
}
@@ -351,10 +349,7 @@ public void close() throws IOException {
351349
} finally {
352350
connection.unlock();
353351
}
354-
// same as http1, read all incoming frames when closing the output stream.
355-
// TODO review this, as the http2 stream is bidirectional and the spec may allow the server to continue to process inbound frames
356-
// after closing the outbound stream - similar to a http2 client
357-
dataIn.readAllBytes();
352+
dataIn.close();
358353
} finally {
359354
connection.stats.activeStreams.decrementAndGet();
360355
closed=true;
@@ -369,21 +364,26 @@ private class DataIn extends InputStream {
369364
private volatile Thread reader;
370365
/** offset into the top of the queue array */
371366
private int offset = 0;
372-
private volatile boolean closed;
373367

374368
public DataIn() {
375369
}
376370

377371
void enqueue(byte[] data) {
378-
if(closed) return;
379372
queue.add(data);
380373
LockSupport.unpark(reader);
381374
}
375+
376+
void wakeupReader() {
377+
LockSupport.unpark(reader);
378+
}
382379

383380
@Override
384381
public void close() throws IOException {
385-
closed=true;
386-
LockSupport.unpark(reader);
382+
if(Thread.currentThread()==reader || reader == null) {
383+
readAllBytes();
384+
} else {
385+
LockSupport.unpark(reader);
386+
}
387387
}
388388

389389
private final byte[] single = new byte[1];
@@ -405,7 +405,7 @@ public int read(byte[] b, int off, int len) throws IOException {
405405
if(read>0) {
406406
return read;
407407
}
408-
if(closed) return -1;
408+
if(halfClosed) return -1;
409409
LockSupport.park();
410410
if(Thread.interrupted()) {
411411
throw new IOException("interrupted");

0 commit comments

Comments
 (0)