Skip to content

Commit 74d506d

Browse files
tejksatmarcuslinke
authored andcommitted
Fix http response input stream resource leak (docker-java#633)
there are several problems in the original code: - not releasing `ByteBuf` queued; - queuing `ByteBuf` until all data is read what effectively leads to `OutOfDirectMemoryError` if the data read is too big. This happens if you try to copy too big file or folder from a container. To fix this `ResponseCallback` (which in `awaitResult()` waits in fact for `readComplete()` channel method invocation) replaced with `AsyncResultCallback`; - actually queuing `ByteBuf` anyway leads to `OutOfDirectMemoryError` if data consumption is slower (the next point) than obtaining it; - poor performance because of the byte-by-byte reads.
1 parent 5a0c696 commit 74d506d

File tree

4 files changed

+249
-39
lines changed

4 files changed

+249
-39
lines changed

src/main/java/com/github/dockerjava/netty/InvocationBuilder.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.InputStream;
2828
import java.util.HashMap;
2929
import java.util.Map;
30+
import java.util.concurrent.CountDownLatch;
3031

3132
import com.fasterxml.jackson.core.JsonProcessingException;
3233
import com.fasterxml.jackson.core.type.TypeReference;
@@ -75,6 +76,57 @@ public void onNext(Void object) {
7576
}
7677
}
7778

79+
/**
80+
* Implementation of {@link ResultCallback} with the single result event expected.
81+
*/
82+
public static class AsyncResultCallback<A_RES_T>
83+
extends ResultCallbackTemplate<AsyncResultCallback<A_RES_T>, A_RES_T> {
84+
85+
private A_RES_T result = null;
86+
87+
private final CountDownLatch resultReady = new CountDownLatch(1);
88+
89+
@Override
90+
public void onNext(A_RES_T object) {
91+
onResult(object);
92+
}
93+
94+
private void onResult(A_RES_T object) {
95+
if (resultReady.getCount() == 0) {
96+
throw new IllegalStateException("Result has already been set");
97+
}
98+
99+
try {
100+
result = object;
101+
} finally {
102+
resultReady.countDown();
103+
}
104+
}
105+
106+
@Override
107+
public void close() throws IOException {
108+
try {
109+
super.close();
110+
} finally {
111+
resultReady.countDown();
112+
}
113+
}
114+
115+
/**
116+
* Blocks until {@link ResultCallback#onNext(Object)} was called for the first time
117+
*/
118+
@SuppressWarnings("unchecked")
119+
public A_RES_T awaitResult() {
120+
try {
121+
resultReady.await();
122+
} catch (InterruptedException e) {
123+
throw new RuntimeException(e);
124+
}
125+
getFirstError();
126+
return result;
127+
}
128+
}
129+
78130
private ChannelProvider channelProvider;
79131

80132
private String resource;
@@ -203,7 +255,7 @@ public InputStream post(final Object entity) {
203255

204256
Channel channel = getChannel();
205257

206-
ResponseCallback<InputStream> callback = new ResponseCallback<InputStream>();
258+
AsyncResultCallback<InputStream> callback = new AsyncResultCallback<>();
207259

208260
HttpResponseHandler responseHandler = new HttpResponseHandler(requestProvider, callback);
209261
HttpResponseStreamHandler streamHandler = new HttpResponseStreamHandler(callback);
@@ -454,7 +506,7 @@ public InputStream get() {
454506

455507
Channel channel = getChannel();
456508

457-
ResponseCallback<InputStream> resultCallback = new ResponseCallback<InputStream>();
509+
AsyncResultCallback<InputStream> resultCallback = new AsyncResultCallback<>();
458510

459511
HttpResponseHandler responseHandler = new HttpResponseHandler(requestProvider, resultCallback);
460512

src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java

Lines changed: 107 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66

77
import java.io.IOException;
88
import java.io.InputStream;
9-
import java.util.concurrent.LinkedTransferQueue;
10-
import java.util.concurrent.TimeUnit;
11-
import java.util.concurrent.atomic.AtomicBoolean;
129

1310
import com.github.dockerjava.api.async.ResultCallback;
1411

@@ -19,45 +16,87 @@
1916
*/
2017
public class HttpResponseStreamHandler extends SimpleChannelInboundHandler<ByteBuf> {
2118

22-
private HttpResponseInputStream stream = new HttpResponseInputStream();
19+
private ResultCallback<InputStream> resultCallback;
20+
21+
private final HttpResponseInputStream stream = new HttpResponseInputStream();
2322

2423
public HttpResponseStreamHandler(ResultCallback<InputStream> resultCallback) {
25-
resultCallback.onNext(stream);
24+
this.resultCallback = resultCallback;
2625
}
2726

2827
@Override
2928
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
29+
invokeCallbackOnFirstRead();
30+
3031
stream.write(msg.copy());
3132
}
3233

34+
private void invokeCallbackOnFirstRead() {
35+
if (resultCallback != null) {
36+
resultCallback.onNext(stream);
37+
resultCallback = null;
38+
}
39+
}
40+
3341
@Override
34-
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
35-
stream.close();
36-
super.channelReadComplete(ctx);
42+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
43+
stream.writeComplete();
44+
45+
super.channelInactive(ctx);
3746
}
3847

3948
public static class HttpResponseInputStream extends InputStream {
4049

41-
private AtomicBoolean closed = new AtomicBoolean(false);
50+
private boolean writeCompleted = false;
4251

43-
private LinkedTransferQueue<ByteBuf> queue = new LinkedTransferQueue<ByteBuf>();
52+
private boolean closed = false;
4453

4554
private ByteBuf current = null;
4655

47-
public void write(ByteBuf byteBuf) {
48-
queue.put(byteBuf);
56+
private final Object lock = new Object();
57+
58+
public void write(ByteBuf byteBuf) throws InterruptedException {
59+
synchronized (lock) {
60+
if (closed) {
61+
return;
62+
}
63+
while (current != null) {
64+
lock.wait();
65+
66+
if (closed) {
67+
return;
68+
}
69+
}
70+
current = byteBuf;
71+
72+
lock.notifyAll();
73+
}
74+
}
75+
76+
public void writeComplete() {
77+
synchronized (lock) {
78+
writeCompleted = true;
79+
80+
lock.notifyAll();
81+
}
4982
}
5083

5184
@Override
5285
public void close() throws IOException {
53-
closed.set(true);
54-
super.close();
86+
synchronized (lock) {
87+
closed = true;
88+
releaseCurrent();
89+
90+
lock.notifyAll();
91+
}
5592
}
5693

5794
@Override
5895
public int available() throws IOException {
59-
poll();
60-
return readableBytes();
96+
synchronized (lock) {
97+
poll(0);
98+
return readableBytes();
99+
}
61100
}
62101

63102
private int readableBytes() {
@@ -66,34 +105,72 @@ private int readableBytes() {
66105
} else {
67106
return 0;
68107
}
69-
70108
}
71109

72110
@Override
73111
public int read() throws IOException {
112+
byte[] b = new byte[1];
113+
int n = read(b, 0, 1);
114+
return n != -1 ? b[0] : -1;
115+
}
74116

75-
poll();
117+
@Override
118+
public int read(byte[] b, int off, int len) throws IOException {
119+
synchronized (lock) {
120+
off = poll(off);
76121

77-
if (readableBytes() == 0) {
78-
if (closed.get()) {
122+
if (current == null) {
79123
return -1;
124+
} else {
125+
int availableBytes = Math.min(len, current.readableBytes() - off);
126+
current.readBytes(b, off, availableBytes);
127+
return availableBytes;
80128
}
81129
}
130+
}
82131

83-
if (current != null && current.readableBytes() > 0) {
84-
return current.readByte() & 0xff;
85-
} else {
86-
return read();
132+
private int poll(int off) throws IOException {
133+
synchronized (lock) {
134+
while (readableBytes() <= off) {
135+
try {
136+
if (closed) {
137+
throw new IOException("Stream closed");
138+
}
139+
140+
off -= releaseCurrent();
141+
if (writeCompleted) {
142+
return off;
143+
}
144+
while (current == null) {
145+
lock.wait();
146+
147+
if (closed) {
148+
throw new IOException("Stream closed");
149+
}
150+
if (writeCompleted && current == null) {
151+
return off;
152+
}
153+
}
154+
} catch (InterruptedException e) {
155+
throw new RuntimeException(e);
156+
}
157+
}
158+
return off;
87159
}
88160
}
89161

90-
private void poll() {
91-
if (readableBytes() == 0) {
92-
try {
93-
current = queue.poll(50, TimeUnit.MILLISECONDS);
94-
} catch (InterruptedException e) {
95-
throw new RuntimeException(e);
162+
private int releaseCurrent() {
163+
synchronized (lock) {
164+
if (current != null) {
165+
int n = current.readableBytes();
166+
current.release();
167+
current = null;
168+
169+
lock.notifyAll();
170+
171+
return n;
96172
}
173+
return 0;
97174
}
98175
}
99176
}

src/test/java/com/github/dockerjava/netty/exec/SaveImageCmdExecTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ public void afterMethod(ITestResult result) {
4747
@Test
4848
public void saveImage() throws Exception {
4949

50-
InputStream image = IOUtils.toBufferedInputStream(dockerClient.saveImageCmd("busybox").exec());
51-
assertThat(image.available(), greaterThan(0));
50+
try (InputStream image = dockerClient.saveImageCmd("busybox").exec()) {
51+
assertThat(image.available(), greaterThan(0));
52+
}
5253

5354
}
5455

0 commit comments

Comments
 (0)