Skip to content

Commit c963037

Browse files
committed
Added test cases for reading various types of MessageBufferInputs, and fixed some errors
1 parent b71b05a commit c963037

7 files changed

Lines changed: 178 additions & 27 deletions

File tree

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.msgpack.core.buffer;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
import static org.msgpack.core.Preconditions.*;
6+
7+
/**
8+
* {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer}
9+
*/
10+
public class ByteBufferInput implements MessageBufferInput {
11+
12+
private final ByteBuffer input;
13+
private boolean isRead = false;
14+
15+
public ByteBufferInput(ByteBuffer input) {
16+
this.input = checkNotNull(input, "input ByteBuffer is null");
17+
}
18+
19+
20+
@Override
21+
public MessageBuffer next() throws IOException {
22+
if(isRead)
23+
return null;
24+
25+
isRead = true;
26+
return MessageBuffer.wrap(input);
27+
}
28+
29+
30+
@Override
31+
public void close() throws IOException {
32+
// Nothing to do
33+
}
34+
}

msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,15 @@ public MessageBuffer next() throws IOException {
3232
return null;
3333

3434
MessageBuffer m = MessageBuffer.newBuffer(bufferSize);
35-
ByteBuffer b = m.toByteBuffer(0, m.size);
36-
while(b.remaining() > 0) {
37-
int ret = 0;
38-
ret = channel.read(b);
35+
ByteBuffer b = m.toByteBuffer();
36+
while(!reachedEOF && b.remaining() > 0) {
37+
int ret = channel.read(b);
3938
if(ret == -1) {
4039
reachedEOF = true;
41-
break;
4240
}
4341
}
4442
b.flip();
45-
if(b.remaining() == 0) {
46-
return null;
47-
}
48-
else if(b.limit() < m.size) {
49-
return m.slice(0, b.limit());
50-
}
51-
else {
52-
return m;
53-
}
43+
return b.remaining() == 0 ? null : m.slice(0, b.limit());
5444
}
5545

5646
@Override

msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
*/
1414
public class InputStreamBufferInput implements MessageBufferInput {
1515

16-
private final InputStream in;
17-
private final int bufferSize;
1816

1917
private static Field bufField;
2018
private static Field bufPosField;
@@ -38,6 +36,10 @@ private static Field getField(String name) {
3836
bufCountField = getField("count");
3937
}
4038

39+
private final InputStream in;
40+
private final int bufferSize;
41+
private boolean reachedEOF = false;
42+
4143
public static MessageBufferInput newBufferInput(InputStream in) {
4244
if(in instanceof ByteArrayInputStream) {
4345
ByteArrayInputStream b = (ByteArrayInputStream) in;
@@ -69,23 +71,24 @@ public InputStreamBufferInput(InputStream in, int bufferSize) {
6971

7072
@Override
7173
public MessageBuffer next() throws IOException {
74+
if(reachedEOF)
75+
return null;
76+
7277
byte[] buffer = null;
7378
int cursor = 0;
74-
while(cursor < bufferSize) {
79+
while(!reachedEOF && cursor < bufferSize) {
7580
if(buffer == null)
7681
buffer = new byte[bufferSize];
82+
7783
int readLen = in.read(buffer, cursor, bufferSize - cursor);
7884
if(readLen == -1) {
85+
reachedEOF = true;
7986
break;
8087
}
8188
cursor += readLen;
8289
}
83-
if(buffer == null)
84-
return null;
85-
else {
86-
MessageBuffer m = MessageBuffer.wrap(buffer);
87-
return cursor == bufferSize ? m : m.slice(0, cursor);
88-
}
90+
91+
return buffer == null ? null : MessageBuffer.wrap(buffer).slice(0, cursor);
8992
}
9093

9194
@Override

msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,10 @@ private MessageBuffer(Object base, long address, int length, ByteBuffer referenc
259259

260260
public MessageBuffer slice(int offset, int length) {
261261
// TODO ensure deleting this slice does not collapse this MessageBuffer
262-
return new MessageBuffer(base, address + offset, length, reference);
262+
if(offset == 0 && length == size())
263+
return this;
264+
else
265+
return new MessageBuffer(base, address + offset, length, reference);
263266
}
264267

265268
public byte getByte(int index) {
@@ -388,6 +391,17 @@ public ByteBuffer toByteBuffer(int index, int length) {
388391
}
389392
}
390393

394+
public ByteBuffer toByteBuffer() {
395+
return toByteBuffer(0, size());
396+
}
397+
398+
public byte[] toByteArray() {
399+
byte[] b = new byte[size()];
400+
unsafe.copyMemory(base, address, b, ARRAY_BYTE_BASE_OFFSET, size());
401+
return b;
402+
}
403+
404+
391405
public void relocate(int offset, int length, int dst) {
392406
unsafe.copyMemory(base, address + offset, base, address+dst, length);
393407
}

msgpack-core/src/test/scala/org/msgpack/core/MessagePackSpec.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,17 @@ import org.scalatest._
44
import xerial.core.log.{LogLevel, Logger}
55
import xerial.core.util.{TimeReport, Timer}
66
import scala.language.implicitConversions
7-
8-
trait MessagePackSpec extends WordSpec with Matchers with GivenWhenThen with OptionValues with BeforeAndAfter with Benchmark with Logger {
7+
import org.scalatest.prop.PropertyChecks
8+
9+
trait MessagePackSpec
10+
extends WordSpec
11+
with Matchers
12+
with GivenWhenThen
13+
with OptionValues
14+
with BeforeAndAfter
15+
with PropertyChecks
16+
with Benchmark
17+
with Logger {
918

1019
implicit def toTag(s:String) : Tag = Tag(s)
1120

msgpack-core/src/test/scala/org/msgpack/core/MessagePackTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import java.math.BigInteger
99
/**
1010
* Created on 2014/05/07.
1111
*/
12-
class MessagePackTest extends MessagePackSpec with PropertyChecks {
12+
class MessagePackTest extends MessagePackSpec {
1313

1414
"MessagePack" should {
1515
"detect fixint values" in {
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package org.msgpack.core.buffer
2+
3+
import org.msgpack.core.MessagePackSpec
4+
import java.io._
5+
import xerial.core.io.IOUtil
6+
import scala.util.Random
7+
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
8+
import java.nio.ByteBuffer
9+
10+
/**
11+
* Created on 5/30/14.
12+
*/
13+
class MessageBufferInputTest extends MessagePackSpec {
14+
15+
val targetInputSize = Seq(0, 10, 500, 1000, 2000, 4000, 8000, 10000, 30000, 50000, 100000)
16+
17+
def testData(size:Int) = {
18+
//debug(s"test data size: ${size}")
19+
val b = new Array[Byte](size)
20+
Random.nextBytes(b)
21+
b
22+
}
23+
24+
def testDataSet = {
25+
targetInputSize.map(testData)
26+
}
27+
28+
def runTest(factory:Array[Byte] => MessageBufferInput) {
29+
for(b <- testDataSet) {
30+
checkInputData(b, factory(b))
31+
}
32+
}
33+
34+
implicit class InputData(b:Array[Byte]) {
35+
def compress = {
36+
val compressed = new ByteArrayOutputStream()
37+
val out = new GZIPOutputStream(compressed)
38+
out.write(b)
39+
out.close()
40+
compressed.toByteArray
41+
}
42+
43+
def toByteBuffer = {
44+
ByteBuffer.wrap(b)
45+
}
46+
47+
def saveToTmpFile : File = {
48+
val tmp = File.createTempFile("testbuf", ".dat", new File("target"))
49+
tmp.getParentFile.mkdirs()
50+
tmp.deleteOnExit()
51+
IOUtil.withResource(new FileOutputStream(tmp)) { out =>
52+
out.write(b)
53+
}
54+
tmp
55+
}
56+
}
57+
58+
59+
60+
def checkInputData(inputData:Array[Byte], in:MessageBufferInput) {
61+
When(s"input data size = ${inputData.length}")
62+
var cursor = 0
63+
for(m <- Iterator.continually(in.next).takeWhile(_ != null)) {
64+
m.toByteArray() shouldBe inputData.slice(cursor, cursor + m.size())
65+
cursor += m.size()
66+
}
67+
cursor shouldBe inputData.length
68+
69+
}
70+
71+
72+
"MessageBufferInput" should {
73+
"support byte arrays" in {
74+
runTest(new ArrayBufferInput(_))
75+
}
76+
77+
"support ByteBuffers" in {
78+
runTest(b => new ByteBufferInput(b.toByteBuffer))
79+
}
80+
81+
"support InputStreams" taggedAs("is") in {
82+
runTest(b =>
83+
new InputStreamBufferInput(
84+
new GZIPInputStream(new ByteArrayInputStream(b.compress)))
85+
)
86+
}
87+
88+
"support file input channel" taggedAs("fc") in {
89+
runTest { b =>
90+
val tmp = b.saveToTmpFile
91+
try {
92+
InputStreamBufferInput.newBufferInput(new FileInputStream(tmp))
93+
}
94+
finally {
95+
tmp.delete()
96+
}
97+
}
98+
}
99+
100+
}
101+
}

0 commit comments

Comments
 (0)