Skip to content

Commit a74d163

Browse files
committed
msgpack#117: Add next(buffer size) method to enable future optimization, such as bulk buffer outputs
1 parent 9a5cbcb commit a74d163

5 files changed

Lines changed: 62 additions & 29 deletions

File tree

msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public class MessagePacker implements Closeable {
5757
private final MessagePack.Config config;
5858

5959
private final MessageBufferOutput out;
60-
private final MessageBuffer buffer;
60+
private MessageBuffer buffer;
6161
private int position;
6262

6363

@@ -96,14 +96,16 @@ private void prepareEncoder() {
9696
}
9797

9898
public void flush() throws IOException {
99-
out.flush(buffer, 0, position);
99+
if(position == buffer.size()) {
100+
out.flush(buffer);
101+
}
102+
else {
103+
out.flush(buffer.slice(0, position));
104+
}
105+
buffer = out.next(config.getPackerBufferSize());
100106
position = 0;
101107
}
102108

103-
private void flushBuffer(MessageBuffer b) throws IOException {
104-
out.flush(b, 0, b.size());
105-
}
106-
107109
public void close() throws IOException {
108110
try {
109111
flush();
@@ -296,12 +298,12 @@ public MessagePacker packBigInteger(BigInteger bi) throws IOException {
296298
}
297299
return this;
298300
}
299-
301+
300302
public MessagePacker packFloat(float v) throws IOException {
301303
writeByteAndFloat(FLOAT32, v);
302304
return this;
303305
}
304-
306+
305307
public MessagePacker packDouble(double v) throws IOException {
306308
writeByteAndDouble(FLOAT64, v);
307309
return this;
@@ -493,7 +495,7 @@ public MessagePacker writePayload(ByteBuffer src) throws IOException {
493495
// Wrap the input source as a MessageBuffer
494496
MessageBuffer wrapped = MessageBuffer.wrap(src).slice(src.position(), src.remaining());
495497
// Then, dump the source data to the output
496-
flushBuffer(wrapped);
498+
out.flush(wrapped);
497499
src.position(src.limit());
498500
}
499501
else {
@@ -523,7 +525,7 @@ public MessagePacker writePayload(byte[] src, int off, int len) throws IOExcepti
523525
// Wrap the input array as a MessageBuffer
524526
MessageBuffer wrapped = MessageBuffer.wrap(src).slice(off, len);
525527
// Dump the source data to the output
526-
flushBuffer(wrapped);
528+
out.flush(wrapped);
527529
}
528530
else {
529531
int cursor = 0;

msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ public ArrayBufferInput(byte[] arr, int offset, int length) {
2020
this.buffer = MessageBuffer.wrap(checkNotNull(arr, "input array is null")).slice(offset, length);
2121
}
2222

23-
2423
@Override
2524
public MessageBuffer next() throws IOException {
2625
if(isRead)

msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferOutput.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,23 @@
1212
public class ChannelBufferOutput implements MessageBufferOutput {
1313

1414
private final WritableByteChannel channel;
15+
private MessageBuffer buffer;
1516

1617
public ChannelBufferOutput(WritableByteChannel channel) {
1718
this.channel = checkNotNull(channel, "output channel is null");
1819
}
1920

2021
@Override
21-
public void flush(MessageBuffer buf, int offset, int len) throws IOException {
22-
assert(offset + len <= buf.size());
23-
ByteBuffer bb = buf.toByteBuffer(offset, len);
22+
public MessageBuffer next(int bufferSize) throws IOException {
23+
if(buffer == null || buffer.size() != bufferSize) {
24+
buffer = MessageBuffer.newBuffer(bufferSize);
25+
}
26+
return buffer;
27+
}
28+
29+
@Override
30+
public void flush(MessageBuffer buf) throws IOException {
31+
ByteBuffer bb = buf.toByteBuffer();
2432
channel.write(bb);
2533
}
2634

msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferOutput.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,22 @@
99
*/
1010
public interface MessageBufferOutput extends Closeable {
1111

12-
// /**
13-
// * Retrieves the next buffer for writing message packed data
14-
// * @return
15-
// * @throws IOException
16-
// */
17-
// public MessageBuffer next() throws IOException;
18-
19-
20-
public void flush(MessageBuffer buf, int offset, int len) throws IOException;
12+
/**
13+
* Retrieves the next buffer for writing message packed data
14+
*
15+
* @param bufferSize the buffer size to retrieve
16+
* @return
17+
* @throws IOException
18+
*/
19+
public MessageBuffer next(int bufferSize) throws IOException;
20+
21+
/**
22+
* Output the buffer contents. If you need to output a part of the
23+
* buffer use {@link MessageBuffer#slice(int, int)}
24+
* @param buf
25+
* @throws IOException
26+
*/
27+
public void flush(MessageBuffer buf) throws IOException;
2128

2229
}
2330

msgpack-core/src/main/java/org/msgpack/core/buffer/OutputStreamBufferOutput.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,36 @@
1111
public class OutputStreamBufferOutput implements MessageBufferOutput {
1212

1313
private final OutputStream out;
14+
private MessageBuffer buffer;
15+
private byte[] tmpBuf;
1416

1517
public OutputStreamBufferOutput(OutputStream out) {
1618
this.out = checkNotNull(out, "output is null");
1719
}
1820

1921
@Override
20-
public void flush(MessageBuffer buf, int offset, int len) throws IOException {
21-
assert(offset + len <= buf.size());
22+
public MessageBuffer next(int bufferSize) throws IOException {
23+
if(buffer == null || buffer.size != bufferSize) {
24+
return buffer = MessageBuffer.newBuffer(bufferSize);
25+
}
26+
else {
27+
return buffer;
28+
}
29+
}
2230

23-
// TODO reuse the allocated buffer
24-
byte[] in = new byte[len];
25-
buf.getBytes(offset, in, 0, len);
26-
out.write(in, 0, len);
31+
@Override
32+
public void flush(MessageBuffer buf) throws IOException {
33+
int writeLen = buf.size();
34+
if(buf.hasArray()) {
35+
out.write(buf.getArray(), buf.offset(), writeLen);
36+
}
37+
else {
38+
if(tmpBuf == null || tmpBuf.length < writeLen) {
39+
tmpBuf = new byte[writeLen];
40+
}
41+
buf.getBytes(0, tmpBuf, 0, writeLen);
42+
out.write(tmpBuf, 0, writeLen);
43+
}
2744
}
2845

2946
@Override

0 commit comments

Comments
 (0)