Skip to content

Commit 635c5d0

Browse files
committed
Merged v07-buffer-output
2 parents 7d1edcf + e800df4 commit 635c5d0

File tree

15 files changed

+195
-113
lines changed

15 files changed

+195
-113
lines changed

msgpack-core/src/main/java/org/msgpack/core/MessagePack.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.msgpack.core;
22

3+
import org.msgpack.core.buffer.ArrayBufferInput;
4+
35
import java.nio.charset.Charset;
46
import java.nio.charset.CodingErrorAction;
57

@@ -13,6 +15,12 @@ public class MessagePack {
1315

1416
public static final Charset UTF8 = Charset.forName("UTF-8");
1517

18+
public static MessageUnpacker newUnpacker(byte[] src) {
19+
return new MessageUnpacker(new ArrayBufferInput(src));
20+
}
21+
22+
23+
1624
/**
1725
* Message packer/unpacker configuration object
1826
*/

msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java

Lines changed: 38 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public class MessagePacker implements Closeable {
5757
private final MessagePack.Config config;
5858

5959
private MessageBufferOutput out;
60-
private final MessageBuffer buffer;
60+
private MessageBuffer buffer;
61+
6162
private int position;
6263

6364

@@ -71,11 +72,23 @@ public class MessagePacker implements Closeable {
7172
*/
7273
private CharsetEncoder encoder;
7374

75+
76+
/**
77+
* Create an MessagePacker that outputs the packed data to the specified stream
78+
* @param out
79+
*/
7480
public MessagePacker(OutputStream out) {
81+
// This factory method does not have significant performance overhead.
7582
this(new OutputStreamBufferOutput(out));
7683
}
7784

78-
85+
/**
86+
* Create an MessagePacker that outputs the packed data to the given {@link org.msgpack.core.buffer.MessageBufferOutput}
87+
*
88+
* @param out MessageBufferOutput. Use {@link org.msgpack.core.buffer.OutputStreamBufferOutput}, {@link org.msgpack.core.buffer.ChannelBufferOutput} or
89+
* your own implementation of {@link org.msgpack.core.buffer.MessageBufferOutput} interface.
90+
*
91+
*/
7992
public MessagePacker(MessageBufferOutput out) {
8093
this(out, MessagePack.DEFAULT_CONFIG);
8194
}
@@ -106,14 +119,16 @@ private void prepareEncoder() {
106119
}
107120

108121
public void flush() throws IOException {
109-
out.flush(buffer, 0, position);
122+
if(position == buffer.size()) {
123+
out.flush(buffer);
124+
}
125+
else {
126+
out.flush(buffer.slice(0, position));
127+
}
128+
buffer = out.next(config.getPackerBufferSize());
110129
position = 0;
111130
}
112131

113-
private void flushBuffer(MessageBuffer b) throws IOException {
114-
out.flush(b, 0, b.size());
115-
}
116-
117132
public void close() throws IOException {
118133
try {
119134
flush();
@@ -306,12 +321,12 @@ public MessagePacker packBigInteger(BigInteger bi) throws IOException {
306321
}
307322
return this;
308323
}
309-
324+
310325
public MessagePacker packFloat(float v) throws IOException {
311326
writeByteAndFloat(FLOAT32, v);
312327
return this;
313328
}
314-
329+
315330
public MessagePacker packDouble(double v) throws IOException {
316331
writeByteAndDouble(FLOAT64, v);
317332
return this;
@@ -405,65 +420,41 @@ public MessagePacker packMapHeader(int mapSize) throws IOException {
405420
return this;
406421
}
407422

408-
public MessagePacker pack(Value v) throws IOException {
423+
public MessagePacker packValue(Value v) throws IOException {
409424
v.writeTo(this);
410425
return this;
411426
}
412427

413-
public MessagePacker packExtendedType(int extType, byte[] src, int offset, int len) throws IOException {
414-
return packExtendedTypeHeader(extType, len).writePayload(src, offset, len);
415-
}
416-
417-
public MessagePacker packExtendedType(int extType, byte[] src) throws IOException {
418-
return packExtendedType(extType, src, 0, src.length);
419-
}
420-
421-
public MessagePacker packExtendedType(int extType, ByteBuffer src) throws IOException {
422-
return packExtendedTypeHeader(extType, src.remaining()).writePayload(src);
423-
}
424-
425-
public MessagePacker packExtendedTypeHeader(int extType, int dataLen) throws IOException {
426-
if(dataLen < (1 << 8)) {
427-
if(dataLen > 0 && (dataLen & (dataLen - 1)) == 0) { // check whether dataLen == 2^x
428-
if(dataLen == 1) {
428+
public MessagePacker packExtendedTypeHeader(int extType, int payloadLen) throws IOException {
429+
if(payloadLen < (1 << 8)) {
430+
if(payloadLen > 0 && (payloadLen & (payloadLen - 1)) == 0) { // check whether dataLen == 2^x
431+
if(payloadLen == 1) {
429432
writeByteAndByte(FIXEXT1, (byte) extType);
430-
} else if(dataLen == 2){
433+
} else if(payloadLen == 2){
431434
writeByteAndByte(FIXEXT2, (byte) extType);
432-
} else if(dataLen == 4) {
435+
} else if(payloadLen == 4) {
433436
writeByteAndByte(FIXEXT4, (byte) extType);
434-
} else if(dataLen == 8) {
437+
} else if(payloadLen == 8) {
435438
writeByteAndByte(FIXEXT8, (byte) extType);
436439
} else {
437440
writeByteAndByte(FIXEXT16, (byte) extType);
438441
}
439442
} else {
440-
writeByteAndByte(EXT8, (byte) dataLen);
443+
writeByteAndByte(EXT8, (byte) payloadLen);
441444
writeByte((byte) extType);
442445
}
443-
} else if(dataLen < (1 << 16)) {
444-
writeByteAndShort(EXT16, (short) dataLen);
446+
} else if(payloadLen < (1 << 16)) {
447+
writeByteAndShort(EXT16, (short) payloadLen);
445448
writeByte((byte) extType);
446449
} else {
447-
writeByteAndInt(EXT32, dataLen);
450+
writeByteAndInt(EXT32, payloadLen);
448451
writeByte((byte) extType);
449452

450453
// TODO support dataLen > 2^31 - 1
451454
}
452455
return this;
453456
}
454457

455-
public MessagePacker packBinary(byte[] src) throws IOException {
456-
return packBinary(src, 0, src.length);
457-
}
458-
459-
public MessagePacker packBinary(byte[] src, int offset, int len) throws IOException {
460-
return packBinaryHeader(len).writePayload(src, offset, len);
461-
}
462-
463-
public MessagePacker packBinary(ByteBuffer src) throws IOException {
464-
return packBinaryHeader(src.remaining()).writePayload(src);
465-
}
466-
467458
public MessagePacker packBinaryHeader(int len) throws IOException {
468459
if(len < (1 << 8)) {
469460
writeByteAndByte(BIN8, (byte) len);
@@ -475,10 +466,6 @@ public MessagePacker packBinaryHeader(int len) throws IOException {
475466
return this;
476467
}
477468

478-
public MessagePacker packRawString(ByteBuffer src) throws IOException {
479-
return packRawStringHeader(src.remaining()).writePayload(src);
480-
}
481-
482469
public MessagePacker packRawStringHeader(int len) throws IOException {
483470
if(len < (1 << 5)) {
484471
writeByte((byte) (FIXSTR_PREFIX | len));
@@ -503,7 +490,7 @@ public MessagePacker writePayload(ByteBuffer src) throws IOException {
503490
// Wrap the input source as a MessageBuffer
504491
MessageBuffer wrapped = MessageBuffer.wrap(src).slice(src.position(), src.remaining());
505492
// Then, dump the source data to the output
506-
flushBuffer(wrapped);
493+
out.flush(wrapped);
507494
src.position(src.limit());
508495
}
509496
else {
@@ -533,7 +520,7 @@ public MessagePacker writePayload(byte[] src, int off, int len) throws IOExcepti
533520
// Wrap the input array as a MessageBuffer
534521
MessageBuffer wrapped = MessageBuffer.wrap(src).slice(off, len);
535522
// Dump the source data to the output
536-
flushBuffer(wrapped);
523+
out.flush(wrapped);
537524
}
538525
else {
539526
int cursor = 0;

msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,20 @@ public MessageUnpacker(byte[] arr) {
134134
this(new ArrayBufferInput(arr));
135135
}
136136

137+
/**
138+
* Create an MessageUnpacker that reads data from the given byte array [offset, offset+length)
139+
* @param arr
140+
* @param offset
141+
* @param length
142+
*/
143+
public MessageUnpacker(byte[] arr, int offset, int length) {
144+
this(new ArrayBufferInput(arr, offset, length));
145+
}
146+
137147
/**
138148
* Create an MessageUnpacker that reads data from the given InputStream.
149+
* For reading data efficiently from byte[], use {@link #MessageUnpacker(byte[])} or {@link #MessageUnpacker(byte[], int, int)} instead of this constructor.
150+
*
139151
* @param in
140152
*/
141153
public MessageUnpacker(InputStream in) {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.msgpack.core.annotations;
2+
3+
/**
4+
* Annotates a code which must be used carefully.
5+
*/
6+
public @interface Insecure {
7+
}

msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ public ArrayBufferInput(byte[] arr, int offset, int length) {
2020
this.buffer = MessageBuffer.wrap(checkNotNull(arr, "input array is null")).slice(offset, length);
2121
}
2222

23-
2423
@Override
2524
public MessageBuffer next() throws IOException {
2625
if(isRead)

msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferOutput.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,23 @@
1212
public class ChannelBufferOutput implements MessageBufferOutput {
1313

1414
private final WritableByteChannel channel;
15+
private MessageBuffer buffer;
1516

1617
public ChannelBufferOutput(WritableByteChannel channel) {
1718
this.channel = checkNotNull(channel, "output channel is null");
1819
}
1920

2021
@Override
21-
public void flush(MessageBuffer buf, int offset, int len) throws IOException {
22-
assert(offset + len <= buf.size());
23-
ByteBuffer bb = buf.toByteBuffer(offset, len);
22+
public MessageBuffer next(int bufferSize) throws IOException {
23+
if(buffer == null || buffer.size() != bufferSize) {
24+
buffer = MessageBuffer.newBuffer(bufferSize);
25+
}
26+
return buffer;
27+
}
28+
29+
@Override
30+
public void flush(MessageBuffer buf) throws IOException {
31+
ByteBuffer bb = buf.toByteBuffer();
2432
channel.write(bb);
2533
}
2634

msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,50 +13,15 @@
1313
*/
1414
public class InputStreamBufferInput implements MessageBufferInput {
1515

16-
private static Field bufField;
17-
private static Field bufPosField;
18-
private static Field bufCountField;
19-
20-
private static Field getField(String name) {
21-
Field f = null;
22-
try {
23-
f = ByteArrayInputStream.class.getDeclaredField(name);
24-
f.setAccessible(true);
25-
}
26-
catch(Exception e) {
27-
e.printStackTrace();
28-
}
29-
return f;
30-
}
31-
32-
static {
33-
bufField = getField("buf");
34-
bufPosField = getField("pos");
35-
bufCountField = getField("count");
36-
}
37-
3816
private final InputStream in;
3917
private final int bufferSize;
4018
private boolean reachedEOF = false;
4119

4220
public static MessageBufferInput newBufferInput(InputStream in) {
4321
checkNotNull(in, "InputStream is null");
44-
if(in.getClass() == ByteArrayInputStream.class) {
45-
ByteArrayInputStream b = (ByteArrayInputStream) in;
46-
try {
47-
// Extract a raw byte array from the ByteArrayInputStream
48-
byte[] buf = (byte[]) bufField.get(b);
49-
int pos = (Integer) bufPosField.get(b);
50-
int length = (Integer) bufCountField.get(b);
51-
return new ArrayBufferInput(buf, pos, length);
52-
}
53-
catch(Exception e) {
54-
// Failed to retrieve the raw byte array
55-
}
56-
} else if (in instanceof FileInputStream) {
22+
if (in instanceof FileInputStream) {
5723
return new ChannelBufferInput(((FileInputStream) in).getChannel());
5824
}
59-
6025
return new InputStreamBufferInput(in);
6126
}
6227

msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.msgpack.core.buffer;
22

3+
import org.msgpack.core.annotations.Insecure;
34
import sun.misc.Unsafe;
45
import sun.nio.ch.DirectBuffer;
56

@@ -251,7 +252,7 @@ else if(bb.hasArray()) {
251252
this.reference = null;
252253
}
253254

254-
protected MessageBuffer(Object base, long address, int length, ByteBuffer reference) {
255+
MessageBuffer(Object base, long address, int length, ByteBuffer reference) {
255256
this.base = base;
256257
this.address = address;
257258
this.size = length;
@@ -397,7 +398,7 @@ public void putByteBuffer(int index, ByteBuffer src, int len) {
397398
* @return
398399
*/
399400
public ByteBuffer toByteBuffer(int index, int length) {
400-
if(base instanceof byte[]) {
401+
if(hasArray()) {
401402
return ByteBuffer.wrap((byte[]) base, (int) ((address-ARRAY_BYTE_BASE_OFFSET) + index), length);
402403
}
403404
try {
@@ -411,16 +412,49 @@ public ByteBuffer toByteBuffer(int index, int length) {
411412
}
412413
}
413414

415+
/**
416+
* Get a ByteBuffer view of this buffer
417+
* @return
418+
*/
414419
public ByteBuffer toByteBuffer() {
415420
return toByteBuffer(0, size());
416421
}
417422

423+
/**
424+
* Get a copy of this buffer
425+
* @return
426+
*/
418427
public byte[] toByteArray() {
419428
byte[] b = new byte[size()];
420429
unsafe.copyMemory(base, address, b, ARRAY_BYTE_BASE_OFFSET, size());
421430
return b;
422431
}
423432

433+
@Insecure
434+
public boolean hasArray() { return base instanceof byte[]; }
435+
436+
@Insecure
437+
public byte[] getArray() { return (byte[]) base; }
438+
439+
@Insecure
440+
public Object getBase() { return base; }
441+
442+
@Insecure
443+
public long getAddress() { return address; }
444+
445+
@Insecure
446+
public int offset() {
447+
if(hasArray()) {
448+
return (int) address - ARRAY_BYTE_BASE_OFFSET;
449+
}
450+
else {
451+
return 0;
452+
}
453+
}
454+
455+
@Insecure
456+
public Object getReference() { return reference; }
457+
424458

425459
public void relocate(int offset, int length, int dst) {
426460
unsafe.copyMemory(base, address + offset, base, address+dst, length);

0 commit comments

Comments
 (0)