Skip to content

Commit 238b7ed

Browse files
committed
fix issue [Bug] Request hangs if request body is not consumed robaho#19
1 parent 2a13b2a commit 238b7ed

File tree

5 files changed

+212
-18
lines changed

5 files changed

+212
-18
lines changed

src/main/java/robaho/net/httpserver/FixedLengthOutputStream.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,18 +87,21 @@ public void close() throws IOException {
8787
throw new IOException("insufficient bytes written to stream");
8888
}
8989
LeftOverInputStream is = t.getOriginalInputStream();
90-
// if after reading the rest of the known input for this request, there is
91-
// more input available, http pipelining is in effect, so avoid flush, since
92-
// it will be flushed after processing the next request
93-
if(is.getRawInputStream().available()==0) {
94-
flush();
95-
}
90+
9691

9792
if (!is.isClosed()) {
9893
try {
9994
is.close();
10095
} catch (IOException e) {
10196
}
10297
}
98+
99+
// if after reading the rest of the known input for this request, there is
100+
// more input available, http pipelining is in effect, so avoid flush, since
101+
// it will be flushed after processing the next request
102+
if(is.getRawInputStream().available()==0) {
103+
flush();
104+
}
105+
103106
}
104107
}

src/main/java/robaho/net/httpserver/HttpConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class HttpConnection {
5959
volatile long lastActivityTime;
6060
volatile boolean noActivity;
6161
volatile boolean inRequest;
62+
volatile long drainingAt;
6263

6364
public AtomicLong requestCount = new AtomicLong();
6465
private final String connectionId;
@@ -124,6 +125,13 @@ synchronized void close() {
124125
if (socket.isClosed()) {
125126
return;
126127
}
128+
try {
129+
if (os!=null) {
130+
// see issue #19, flush before closing, in case of pending data
131+
os.flush();
132+
}
133+
} catch(IOException ex){}
134+
127135
try {
128136
/* need to ensure temporary selectors are closed */
129137
if (is != null) {
@@ -134,6 +142,7 @@ synchronized void close() {
134142
}
135143
try {
136144
if (os != null) {
145+
os.flush();
137146
os.close();
138147
}
139148
} catch (IOException e) {

src/main/java/robaho/net/httpserver/LeftOverInputStream.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,20 +98,24 @@ public synchronized int read(byte[] b, int off, int len) throws IOException {
9898
* (still bytes to be read)
9999
*/
100100
public boolean drain(long l) throws IOException {
101-
102-
while (l > 0) {
103-
if (server.isFinishing()) {
104-
break;
105-
}
106-
long len = readImpl(drainBuffer, 0, drainBuffer.length);
107-
if (len == -1) {
108-
eof = true;
109-
return true;
110-
} else {
111-
l = l - len;
101+
try {
102+
while (l > 0) {
103+
if (server.isFinishing()) {
104+
break;
105+
}
106+
t.connection.drainingAt = ActivityTimer.now();
107+
long len = readImpl(drainBuffer, 0, drainBuffer.length);
108+
if (len == -1) {
109+
eof = true;
110+
return true;
111+
} else {
112+
l = l - len;
113+
}
112114
}
115+
return false;
116+
} finally {
117+
t.connection.drainingAt = 0;
113118
}
114-
return false;
115119
}
116120
public InputStream getRawInputStream() {
117121
return super.in;

src/main/java/robaho/net/httpserver/ServerImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,9 @@ public void run() {
913913
long now = ActivityTimer.now();
914914

915915
for (var c : allConnections) {
916+
if (c.drainingAt != 0 && now- c.drainingAt >= IDLE_INTERVAL / 2) {
917+
closeConnection(c);
918+
}
916919
if (now- c.lastActivityTime >= IDLE_INTERVAL && !c.inRequest) {
917920
logger.log(Level.DEBUG, "closing idle connection");
918921
stats.idleCloseCount.incrementAndGet();
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package robaho.net.httpserver;
2+
3+
import java.io.BufferedReader;
4+
import java.io.IOException;
5+
import java.io.InputStreamReader;
6+
import java.io.OutputStream;
7+
import java.io.OutputStreamWriter;
8+
import java.io.PrintWriter;
9+
import java.net.InetAddress;
10+
import java.net.InetSocketAddress;
11+
import java.net.Socket;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.ThreadFactory;
15+
import java.util.concurrent.atomic.AtomicLong;
16+
import java.util.logging.Level;
17+
import java.util.logging.Logger;
18+
19+
import com.sun.net.httpserver.HttpExchange;
20+
import com.sun.net.httpserver.HttpHandler;
21+
import com.sun.net.httpserver.HttpServer;
22+
23+
import org.testng.annotations.Test;
24+
25+
import static java.nio.charset.StandardCharsets.*;
26+
27+
/**
28+
* see issue #19
29+
*
30+
* the server attempts to optimize flushing the response stream if there is
31+
* another request in the pipeline, but the bug caused the server to assume the
32+
* data remaining to be read was part of the next request, causing the server to
33+
* hang. Reading even a single character from the request body would have
34+
* prevented the issue since the buffer would have been filled.
35+
*
36+
* The solution is to read the remaining request data, then check if there are
37+
* any characters waiting to be read.
38+
*/
39+
public class PipeliningStallTest {
40+
41+
private static final int msgCode = 200;
42+
private static final String someContext = "/context";
43+
44+
static class ServerThreadFactory implements ThreadFactory {
45+
46+
static final AtomicLong tokens = new AtomicLong();
47+
48+
@Override
49+
public Thread newThread(Runnable r) {
50+
var thread = new Thread(r, "Server-" + tokens.incrementAndGet());
51+
thread.setDaemon(true);
52+
return thread;
53+
}
54+
}
55+
56+
static {
57+
Logger.getLogger("").setLevel(Level.ALL);
58+
Logger.getLogger("").getHandlers()[0].setLevel(Level.ALL);
59+
}
60+
61+
@Test
62+
public void testSendResponse() throws Exception {
63+
System.out.println("testSendResponse()");
64+
InetAddress loopback = InetAddress.getLoopbackAddress();
65+
HttpServer server = HttpServer.create(new InetSocketAddress(loopback, 0), 0);
66+
ExecutorService executor = Executors.newCachedThreadPool(new ServerThreadFactory());
67+
server.setExecutor(executor);
68+
try {
69+
server.createContext(someContext, new HttpHandler() {
70+
@Override
71+
public void handle(HttpExchange exchange) throws IOException {
72+
var length = exchange.getRequestHeaders().getFirst("Content-Length");
73+
74+
var msg = "hi";
75+
var status = 200;
76+
if (Integer.valueOf(length) > 4) {
77+
msg = "oversized";
78+
status = 413;
79+
}
80+
81+
var bytes = msg.getBytes();
82+
83+
// -1 means no content, 0 means unknown content length
84+
var contentLength = bytes.length == 0 ? -1 : bytes.length;
85+
86+
try (OutputStream os = exchange.getResponseBody()) {
87+
exchange.sendResponseHeaders(status, contentLength);
88+
os.write(bytes);
89+
}
90+
}
91+
});
92+
server.start();
93+
System.out.println("Server started at port "
94+
+ server.getAddress().getPort());
95+
96+
runRawSocketHttpClient(loopback, server.getAddress().getPort(), -1);
97+
} finally {
98+
System.out.println("shutting server down");
99+
executor.shutdown();
100+
server.stop(0);
101+
}
102+
System.out.println("Server finished.");
103+
}
104+
105+
static void runRawSocketHttpClient(InetAddress address, int port, int contentLength)
106+
throws Exception {
107+
Socket socket = null;
108+
PrintWriter writer = null;
109+
BufferedReader reader = null;
110+
final String CRLF = "\r\n";
111+
try {
112+
socket = new Socket(address, port);
113+
writer = new PrintWriter(new OutputStreamWriter(
114+
socket.getOutputStream()));
115+
System.out.println("Client connected by socket: " + socket);
116+
String body = "I will send all the data.";
117+
if (contentLength <= 0) {
118+
contentLength = body.getBytes(UTF_8).length;
119+
}
120+
121+
writer.print("GET " + someContext + "/ HTTP/1.1" + CRLF);
122+
writer.print("User-Agent: Java/"
123+
+ System.getProperty("java.version")
124+
+ CRLF);
125+
writer.print("Host: " + address.getHostName() + CRLF);
126+
writer.print("Accept: */*" + CRLF);
127+
writer.print("Content-Length: " + contentLength + CRLF);
128+
writer.print("Connection: keep-alive" + CRLF);
129+
writer.print(CRLF); // Important, else the server will expect that
130+
// there's more into the request.
131+
writer.flush();
132+
System.out.println("Client wrote request to socket: " + socket);
133+
writer.print(body);
134+
writer.flush();
135+
136+
reader = new BufferedReader(new InputStreamReader(
137+
socket.getInputStream()));
138+
System.out.println("Client start reading from server:");
139+
String line = reader.readLine();
140+
for (; line != null; line = reader.readLine()) {
141+
if (line.isEmpty()) {
142+
break;
143+
}
144+
System.out.println("\"" + line + "\"");
145+
}
146+
System.out.println("Client finished reading from server");
147+
} finally {
148+
// give time to the server to try & drain its input stream
149+
Thread.sleep(500);
150+
// closes the client outputstream while the server is draining
151+
// it
152+
if (writer != null) {
153+
writer.close();
154+
}
155+
// give time to the server to trigger its assertion
156+
// error before closing the connection
157+
Thread.sleep(500);
158+
if (reader != null)
159+
try {
160+
reader.close();
161+
} catch (IOException logOrIgnore) {
162+
logOrIgnore.printStackTrace();
163+
}
164+
if (socket != null) {
165+
try {
166+
socket.close();
167+
} catch (IOException logOrIgnore) {
168+
logOrIgnore.printStackTrace();
169+
}
170+
}
171+
}
172+
System.out.println("Client finished.");
173+
}
174+
175+
}

0 commit comments

Comments
 (0)