Skip to content

Commit 9319d52

Browse files
committed
msgpack#103 Optimization for ByteArrayInputStream
1 parent c0a7af1 commit 9319d52

File tree

4 files changed

+126
-18
lines changed

4 files changed

+126
-18
lines changed

msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public MessageUnpacker(byte[] arr) {
117117
* @param in
118118
*/
119119
public MessageUnpacker(InputStream in) {
120-
this(new InputStreamBufferInput(in));
120+
this(InputStreamBufferInput.newBufferInput(in));
121121
}
122122

123123
/**
@@ -992,6 +992,10 @@ public void readPayload(ByteBuffer dst) throws IOException {
992992
}
993993
}
994994

995+
public void readPayload(byte[] dst) throws IOException {
996+
readPayload(dst, 0, dst.length);
997+
}
998+
995999
/**
9961000
* Read up to len bytes of data into the destination array
9971001
*

msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,33 @@
99
public class ArrayBufferInput implements MessageBufferInput {
1010

1111
private MessageBuffer buffer;
12-
private boolean isRead = false;
12+
private int cursor;
13+
private final int length;
1314

1415
public ArrayBufferInput(byte[] arr) {
16+
this(arr, 0, arr.length);
17+
}
18+
19+
public ArrayBufferInput(byte[] arr, int offset, int length) {
1520
this.buffer = MessageBuffer.wrap(checkNotNull(arr, "input array is null"));
21+
this.cursor = offset;
22+
checkArgument(length <= arr.length);
23+
this.length = length;
1624
}
1725

26+
1827
@Override
1928
public MessageBuffer next() throws IOException {
20-
if(isRead) {
29+
if(cursor < length) {
30+
int c = cursor;
31+
cursor = length;
32+
if(c == 0 && length == buffer.size)
33+
return buffer;
34+
else
35+
return buffer.slice(c, length);
36+
}
37+
else {
2138
return null;
22-
} else {
23-
isRead = true;
24-
return buffer;
2539
}
2640
}
2741

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package org.msgpack.core.buffer;
22

3+
import java.io.ByteArrayInputStream;
34
import java.io.IOException;
45
import java.io.InputStream;
6+
import java.lang.reflect.Field;
57

68
import static org.msgpack.core.Preconditions.checkNotNull;
79

@@ -11,28 +13,76 @@
1113
public class InputStreamBufferInput implements MessageBufferInput {
1214

1315
private final InputStream in;
14-
private byte[] buffer = new byte[8192];
16+
private final int bufferSize;
17+
18+
private static Field bufField;
19+
private static Field bufPosField;
20+
private static Field bufCountField;
21+
22+
private static Field getField(String name) {
23+
Field f = null;
24+
try {
25+
f = ByteArrayInputStream.class.getDeclaredField(name);
26+
f.setAccessible(true);
27+
}
28+
catch(Exception e) {
29+
e.printStackTrace();
30+
}
31+
return f;
32+
}
33+
34+
static {
35+
bufField = getField("buf");
36+
bufPosField = getField("pos");
37+
bufCountField = getField("count");
38+
}
39+
40+
public static MessageBufferInput newBufferInput(InputStream in) {
41+
if(in instanceof ByteArrayInputStream) {
42+
ByteArrayInputStream b = (ByteArrayInputStream) in;
43+
try {
44+
// Extract a raw byte array from the ByteArrayInputStream
45+
byte[] buf = (byte[]) bufField.get(b);
46+
int pos = (Integer) bufPosField.get(b);
47+
int length = (Integer) bufCountField.get(b);
48+
return new ArrayBufferInput(buf, pos, length);
49+
}
50+
catch(Exception e) {
51+
// Failed to retrieve the raw byte array
52+
}
53+
}
54+
55+
return new InputStreamBufferInput(in);
56+
}
1557

1658
public InputStreamBufferInput(InputStream in) {
59+
this(in, 8192);
60+
}
61+
62+
public InputStreamBufferInput(InputStream in, int bufferSize) {
1763
this.in = checkNotNull(in, "input is null");
64+
this.bufferSize = 8192;
1865
}
1966

2067
@Override
2168
public MessageBuffer next() throws IOException {
22-
// Manage the allocated buffers
23-
MessageBuffer m = MessageBuffer.newBuffer(buffer.length);
24-
25-
// TODO reduce the number of memory copy
69+
byte[] buffer = null;
2670
int cursor = 0;
27-
while(cursor < buffer.length) {
28-
int readLen = in.read(buffer, cursor, buffer.length - cursor);
71+
while(cursor < bufferSize) {
72+
if(buffer == null)
73+
buffer = new byte[bufferSize];
74+
int readLen = in.read(buffer, cursor, bufferSize - cursor);
2975
if(readLen == -1) {
3076
break;
3177
}
3278
cursor += readLen;
3379
}
34-
m.putBytes(0, buffer, 0, cursor);
35-
return m;
80+
if(buffer == null)
81+
return null;
82+
else {
83+
MessageBuffer m = MessageBuffer.wrap(buffer);
84+
return cursor == bufferSize ? m : m.slice(0, cursor);
85+
}
3686
}
3787

3888
@Override
@@ -41,7 +91,7 @@ public void close() throws IOException {
4191
in.close();
4292
}
4393
finally {
44-
buffer = null;
94+
4595
}
4696
}
4797
}

msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,11 +304,10 @@ class MessageUnpackerTest extends MessagePackSpec {
304304
t("v7").averageWithoutMinMax should be <= t("v6").averageWithoutMinMax
305305
}
306306

307+
import org.msgpack.`type`.{ValueType=>ValueTypeV6}
307308

308309
"be faster than msgpack-v6 read value" taggedAs("cmp-unpack") in {
309310

310-
import org.msgpack.`type`.{ValueType=>ValueTypeV6}
311-
312311
def readValueV6(unpacker:org.msgpack.unpacker.MessagePackUnpacker) {
313312
val vt = unpacker.getNextType()
314313
vt match {
@@ -409,6 +408,47 @@ class MessageUnpackerTest extends MessagePackSpec {
409408

410409
}
411410

411+
"be faster for reading binary than v6" taggedAs("cmp-binary") in {
412+
413+
val bos = new ByteArrayOutputStream()
414+
val packer = new MessagePacker(bos)
415+
val L = 10000
416+
val R = 100
417+
(0 until R).foreach { i =>
418+
packer.packBinaryHeader(L)
419+
packer.writePayload(new Array[Byte](L))
420+
}
421+
packer.close()
422+
423+
val b = bos.toByteArray
424+
time("unpackBinary", repeat=100) {
425+
block("v6") {
426+
val v6 = new org.msgpack.MessagePack()
427+
val unpacker = new org.msgpack.unpacker.MessagePackUnpacker(v6, new ByteArrayInputStream(b))
428+
var i = 0
429+
while(i < R) {
430+
val out = unpacker.readByteArray()
431+
i += 1
432+
}
433+
unpacker.close()
434+
}
435+
436+
block("v7") {
437+
//val unpacker = new MessageUnpacker(b)
438+
val unpacker = new MessageUnpacker(new ByteArrayInputStream(b))
439+
var i = 0
440+
while(i < R) {
441+
val len = unpacker.unpackBinaryHeader()
442+
val out = new Array[Byte](len)
443+
unpacker.readPayload(out, 0, len)
444+
i += 1
445+
}
446+
unpacker.close()
447+
}
448+
}
449+
450+
}
451+
412452

413453

414454
}

0 commit comments

Comments
 (0)