Skip to content

Commit 2ce9916

Browse files
committed
optimized packer buffer interface
1 parent dc1f10b commit 2ce9916

File tree

9 files changed

+306
-200
lines changed

9 files changed

+306
-200
lines changed

msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java

Lines changed: 176 additions & 149 deletions
Large diffs are not rendered by default.

msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferOutput.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,21 @@ public class ChannelBufferOutput
3131
private MessageBuffer buffer;
3232

3333
public ChannelBufferOutput(WritableByteChannel channel)
34+
{
35+
this(channel, 8192);
36+
}
37+
38+
public ChannelBufferOutput(WritableByteChannel channel, int bufferSize)
3439
{
3540
this.channel = checkNotNull(channel, "output channel is null");
41+
this.buffer = MessageBuffer.newBuffer(bufferSize);
3642
}
3743

3844
/**
39-
* Reset channel. This method doesn't close the old resource.
45+
* Reset channel. This method doesn't close the old channel.
4046
*
4147
* @param channel new channel
42-
* @return the old resource
48+
* @return the old channel
4349
*/
4450
public WritableByteChannel reset(WritableByteChannel channel)
4551
throws IOException
@@ -50,21 +56,40 @@ public WritableByteChannel reset(WritableByteChannel channel)
5056
}
5157

5258
@Override
53-
public MessageBuffer next(int bufferSize)
59+
public MessageBuffer next(int mimimumSize)
5460
throws IOException
5561
{
56-
if (buffer == null || buffer.size() != bufferSize) {
57-
buffer = MessageBuffer.newBuffer(bufferSize);
62+
if (buffer.size() < mimimumSize) {
63+
buffer = MessageBuffer.newBuffer(mimimumSize);
5864
}
5965
return buffer;
6066
}
6167

6268
@Override
63-
public void flush(MessageBuffer buf)
69+
public void writeBuffer(int length)
70+
throws IOException
71+
{
72+
ByteBuffer bb = buffer.toByteBuffer(0, length);
73+
while (bb.hasRemaining()) {
74+
channel.write(bb);
75+
}
76+
}
77+
78+
@Override
79+
public void write(byte[] buffer, int offset, int length)
6480
throws IOException
6581
{
66-
ByteBuffer bb = buf.toByteBuffer();
67-
channel.write(bb);
82+
ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
83+
while (bb.hasRemaining()) {
84+
channel.write(bb);
85+
}
86+
}
87+
88+
@Override
89+
public void add(byte[] buffer, int offset, int length)
90+
throws IOException
91+
{
92+
write(buffer, offset, length);
6893
}
6994

7095
@Override
@@ -73,4 +98,9 @@ public void close()
7398
{
7499
channel.close();
75100
}
101+
102+
@Override
103+
public void flush()
104+
throws IOException
105+
{ }
76106
}

msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ public static MessageBuffer wrap(byte[] array)
210210
return newMessageBuffer(array);
211211
}
212212

213+
public static MessageBuffer wrap(byte[] array, int offset, int length)
214+
{
215+
return newMessageBuffer(array).slice(offset, length);
216+
}
217+
213218
public static MessageBuffer wrap(ByteBuffer bb)
214219
{
215220
return newMessageBuffer(bb).slice(bb.position(), bb.remaining());

msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferOutput.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,60 @@
1717

1818
import java.io.Closeable;
1919
import java.io.IOException;
20+
import java.io.Flushable;
2021

2122
/**
22-
* Provides a sequence of MessageBuffers for packing the input data
23+
* Provides a buffered output stream for packing objects
2324
*/
2425
public interface MessageBufferOutput
25-
extends Closeable
26+
extends Closeable, Flushable
2627
{
2728
/**
28-
* Retrieves the next buffer for writing message packed data
29+
* Allocates the next buffer for writing message packed data.
30+
* If the previously allocated buffer is not flushed yet, this next method should discard
31+
* it without writing it.
2932
*
30-
* @param bufferSize the buffer size to retrieve
33+
* @param mimimumSize the mimium required buffer size to allocate
3134
* @return
3235
* @throws IOException
3336
*/
34-
public MessageBuffer next(int bufferSize)
37+
public MessageBuffer next(int mimimumSize)
3538
throws IOException;
3639

3740
/**
38-
* Output the buffer contents. If you need to output a part of the
39-
* buffer use {@link MessageBuffer#slice(int, int)}
41+
* Flushes the previously allocated buffer.
42+
* This method is not always called because next method also flushes previously allocated buffer.
43+
* This method is called when write method is called or application wants to control the timing of flush.
4044
*
41-
* @param buf
45+
* @param length the size of buffer to flush
4246
* @throws IOException
4347
*/
44-
public void flush(MessageBuffer buf)
48+
public void writeBuffer(int length)
49+
throws IOException;
50+
51+
/**
52+
* Writes an external payload data.
53+
* This method should follow semantics of OutputStream.
54+
*
55+
* @param buffer the data to write
56+
* @param offset the start offset in the data
57+
* @param length the number of bytes to write
58+
* @return
59+
* @throws IOException
60+
*/
61+
public void write(byte[] buffer, int offset, int length)
62+
throws IOException;
63+
64+
/**
65+
* Writes an external payload data.
66+
* This buffer is given - this MessageBufferOutput owns the buffer and may modify contents of the buffer. Contents of this buffer won't be modified by the caller.
67+
*
68+
* @param buffer the data to add
69+
* @param offset the start offset in the data
70+
* @param length the number of bytes to add
71+
* @return
72+
* @throws IOException
73+
*/
74+
public void add(byte[] buffer, int offset, int length)
4575
throws IOException;
4676
}

msgpack-core/src/main/java/org/msgpack/core/buffer/OutputStreamBufferOutput.java

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,23 @@ public class OutputStreamBufferOutput
2828
{
2929
private OutputStream out;
3030
private MessageBuffer buffer;
31-
private byte[] tmpBuf;
3231

3332
public OutputStreamBufferOutput(OutputStream out)
33+
{
34+
this(out, 8192);
35+
}
36+
37+
public OutputStreamBufferOutput(OutputStream out, int bufferSize)
3438
{
3539
this.out = checkNotNull(out, "output is null");
40+
this.buffer = MessageBuffer.newBuffer(bufferSize);
3641
}
3742

3843
/**
39-
* Reset Stream. This method doesn't close the old resource.
44+
* Reset Stream. This method doesn't close the old stream.
4045
*
4146
* @param out new stream
42-
* @return the old resource
47+
* @return the old stream
4348
*/
4449
public OutputStream reset(OutputStream out)
4550
throws IOException
@@ -50,41 +55,47 @@ public OutputStream reset(OutputStream out)
5055
}
5156

5257
@Override
53-
public MessageBuffer next(int bufferSize)
58+
public MessageBuffer next(int mimimumSize)
5459
throws IOException
5560
{
56-
if (buffer == null || buffer.size != bufferSize) {
57-
buffer = MessageBuffer.newBuffer(bufferSize);
61+
if (buffer.size() < mimimumSize) {
62+
buffer = MessageBuffer.newBuffer(mimimumSize);
5863
}
5964
return buffer;
6065
}
6166

6267
@Override
63-
public void flush(MessageBuffer buf)
68+
public void writeBuffer(int length)
6469
throws IOException
6570
{
66-
int writeLen = buf.size();
67-
if (buf.hasArray()) {
68-
out.write(buf.getArray(), buf.offset(), writeLen);
69-
}
70-
else {
71-
if (tmpBuf == null || tmpBuf.length < writeLen) {
72-
tmpBuf = new byte[writeLen];
73-
}
74-
buf.getBytes(0, tmpBuf, 0, writeLen);
75-
out.write(tmpBuf, 0, writeLen);
76-
}
71+
write(buffer.getArray(), buffer.offset(), length);
72+
}
73+
74+
@Override
75+
public void write(byte[] buffer, int offset, int length)
76+
throws IOException
77+
{
78+
out.write(buffer, offset, length);
79+
}
80+
81+
@Override
82+
public void add(byte[] buffer, int offset, int length)
83+
throws IOException
84+
{
85+
write(buffer, offset, length);
7786
}
7887

7988
@Override
8089
public void close()
8190
throws IOException
8291
{
83-
try {
84-
out.flush();
85-
}
86-
finally {
87-
out.close();
88-
}
92+
out.close();
93+
}
94+
95+
@Override
96+
public void flush()
97+
throws IOException
98+
{
99+
out.flush();
89100
}
90101
}

msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,6 @@ public static void packer()
153153
.packArrayHeader(2)
154154
.packString("xxx-xxxx")
155155
.packString("yyy-yyyy");
156-
157-
// [Advanced] write data using ByteBuffer
158-
ByteBuffer bb = ByteBuffer.wrap(new byte[] {'b', 'i', 'n', 'a', 'r', 'y', 'd', 'a', 't', 'a'});
159-
packer.packBinaryHeader(bb.remaining());
160-
packer.writePayload(bb);
161156
}
162157

163158
/**

msgpack-core/src/test/scala/org/msgpack/core/MessagePackerTest.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,12 +283,11 @@ class MessagePackerTest
283283

284284
"support read-only buffer" taggedAs ("read-only") in {
285285
val payload = Array[Byte](1)
286-
val buffer = ByteBuffer.wrap(payload).asReadOnlyBuffer()
287286
val out = new
288287
ByteArrayOutputStream()
289288
val packer = MessagePack.newDefaultPacker(out)
290289
.packBinaryHeader(1)
291-
.writePayload(buffer)
290+
.writePayload(payload)
292291
.close()
293292
}
294293
}

msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferOutputTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class MessageBufferOutputTest
4444
def writeIntToBuf(buf: MessageBufferOutput) = {
4545
val mb0 = buf.next(8)
4646
mb0.putInt(0, 42)
47-
buf.flush(mb0)
47+
buf.writeBuffer(4)
4848
buf.close
4949
}
5050

msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackGenerator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,17 @@ else if (v instanceof Integer) {
183183
}
184184
else if (v instanceof ByteBuffer) {
185185
ByteBuffer bb = (ByteBuffer) v;
186-
messagePacker.packBinaryHeader(bb.limit());
187-
messagePacker.writePayload(bb);
186+
int len = bb.remaining();
187+
if (bb.hasArray()) {
188+
messagePacker.packBinaryHeader(len);
189+
messagePacker.writePayload(bb.array(), bb.arrayOffset(), len);
190+
}
191+
else {
192+
byte[] data = new byte[len];
193+
bb.get(data);
194+
messagePacker.packBinaryHeader(len);
195+
messagePacker.addPayload(data);
196+
}
188197
}
189198
else if (v instanceof String) {
190199
messagePacker.packString((String) v);

0 commit comments

Comments
 (0)