Skip to content

Commit 8192552

Browse files
committed
Merge pull request msgpack#237 from xerial/fix-inputstream-hang
Fixes msgpack#222: Check available() before calling blocking read
2 parents 97e0bce + af428de commit 8192552

File tree

2 files changed

+35
-17
lines changed

2 files changed

+35
-17
lines changed

msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,13 @@ public MessageBuffer next() throws IOException {
5454
if(reachedEOF)
5555
return null;
5656

57-
byte[] buffer = null;
58-
int cursor = 0;
59-
while(!reachedEOF && cursor < bufferSize) {
60-
if(buffer == null)
61-
buffer = new byte[bufferSize];
62-
63-
int readLen = in.read(buffer, cursor, bufferSize - cursor);
64-
if(readLen == -1) {
65-
reachedEOF = true;
66-
break;
67-
}
68-
cursor += readLen;
57+
byte[] buffer = new byte[bufferSize];
58+
int readLen = in.read(buffer);
59+
if(readLen == -1) {
60+
reachedEOF = true;
61+
return null;
6962
}
70-
71-
return buffer == null ? null : MessageBuffer.wrap(buffer).slice(0, cursor);
63+
return MessageBuffer.wrap(buffer).slice(0, readLen);
7264
}
7365

7466
@Override

msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package org.msgpack.core.buffer
22

33
import org.msgpack.core.{MessageUnpacker, MessagePack, MessagePackSpec}
44
import java.io._
5-
import xerial.core.io.IOUtil
5+
import xerial.core.io.IOUtil._
66
import scala.util.Random
77
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
88
import java.nio.ByteBuffer
9-
import org.msgpack.unpacker.MessagePackUnpacker
9+
1010

1111
/**
1212
* Created on 5/30/14.
@@ -49,7 +49,7 @@ class MessageBufferInputTest extends MessagePackSpec {
4949
val tmp = File.createTempFile("testbuf", ".dat", new File("target"))
5050
tmp.getParentFile.mkdirs()
5151
tmp.deleteOnExit()
52-
IOUtil.withResource(new FileOutputStream(tmp)) { out =>
52+
withResource(new FileOutputStream(tmp)) { out =>
5353
out.write(b)
5454
}
5555
tmp
@@ -135,6 +135,32 @@ class MessageBufferInputTest extends MessagePackSpec {
135135
buf.reset(in1)
136136
readInt(buf) shouldBe 42
137137
}
138+
139+
"be non-blocking" taggedAs("non-blocking") in {
140+
141+
withResource(new PipedOutputStream()) { pipedOutputStream =>
142+
withResource(new PipedInputStream()) { pipedInputStream =>
143+
pipedInputStream.connect(pipedOutputStream)
144+
145+
val packer = MessagePack.newDefaultPacker(pipedOutputStream)
146+
.packArrayHeader(2)
147+
.packLong(42)
148+
.packString("hello world")
149+
150+
packer.flush
151+
152+
val unpacker = MessagePack.newDefaultUnpacker(pipedInputStream)
153+
unpacker.hasNext() shouldBe true
154+
unpacker.unpackArrayHeader() shouldBe 2
155+
unpacker.unpackLong() shouldBe 42L
156+
unpacker.unpackString() shouldBe "hello world"
157+
158+
packer.close
159+
unpacker.close
160+
}
161+
}
162+
}
163+
138164
}
139165

140166
"ChannelBufferInput" should {

0 commit comments

Comments
 (0)