Skip to content

Commit 16a38b0

Browse files
committed
Fixes msgpack#222: Check whthere available() == 0 before calling blocking read()
1 parent 05142ee commit 16a38b0

2 files changed

Lines changed: 35 additions & 6 deletions

File tree

msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,14 @@ public MessageBuffer next() throws IOException {
5757
byte[] buffer = null;
5858
int cursor = 0;
5959
while(!reachedEOF && cursor < bufferSize) {
60-
if(buffer == null)
60+
if(buffer == null) {
6161
buffer = new byte[bufferSize];
62+
}
6263

63-
int readLen = in.read(buffer, cursor, bufferSize - cursor);
64-
if(readLen == -1) {
64+
int readLen = -1;
65+
// available() == 0 means, it reached the end of the stream
66+
if(in.available() == 0 ||
67+
(readLen = in.read(buffer, cursor, bufferSize - cursor)) == -1) {
6568
reachedEOF = true;
6669
break;
6770
}

msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package org.msgpack.core.buffer
22

33
import org.msgpack.core.{MessageUnpacker, MessagePack, MessagePackSpec}
44
import java.io._
5-
import xerial.core.io.IOUtil
5+
import xerial.core.io.IOUtil._
66
import scala.util.Random
77
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
88
import java.nio.ByteBuffer
9-
import org.msgpack.unpacker.MessagePackUnpacker
9+
1010

1111
/**
1212
* Created on 5/30/14.
@@ -49,7 +49,7 @@ class MessageBufferInputTest extends MessagePackSpec {
4949
val tmp = File.createTempFile("testbuf", ".dat", new File("target"))
5050
tmp.getParentFile.mkdirs()
5151
tmp.deleteOnExit()
52-
IOUtil.withResource(new FileOutputStream(tmp)) { out =>
52+
withResource(new FileOutputStream(tmp)) { out =>
5353
out.write(b)
5454
}
5555
tmp
@@ -135,6 +135,32 @@ class MessageBufferInputTest extends MessagePackSpec {
135135
buf.reset(in1)
136136
readInt(buf) shouldBe 42
137137
}
138+
139+
"be non-blocking" taggedAs("non-blocking") in {
140+
141+
withResource(new PipedOutputStream()) { pipedOutputStream =>
142+
withResource(new PipedInputStream()) { pipedInputStream =>
143+
pipedInputStream.connect(pipedOutputStream)
144+
145+
val packer = MessagePack.newDefaultPacker(pipedOutputStream)
146+
.packArrayHeader(2)
147+
.packLong(42)
148+
.packString("hello world")
149+
150+
packer.flush
151+
152+
val unpacker = MessagePack.newDefaultUnpacker(pipedInputStream)
153+
unpacker.hasNext() shouldBe true
154+
unpacker.unpackArrayHeader() shouldBe 2
155+
unpacker.unpackLong() shouldBe 42L
156+
unpacker.unpackString() shouldBe "hello world"
157+
158+
packer.close
159+
unpacker.close
160+
}
161+
}
162+
}
163+
138164
}
139165

140166
"ChannelBufferInput" should {

0 commit comments

Comments
 (0)