Skip to content

Commit 046c367

Browse files
committed
JAVA-1494: Implement Snappy and LZ4 compression
1 parent 812913a commit 046c367

15 files changed

Lines changed: 681 additions & 4 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.0.0-alpha2 (in progress)
66

7+
- [new feature] JAVA-1494: Implement Snappy and LZ4 compression
78
- [new feature] JAVA-1514: Port Uuids utility class
89
- [new feature] JAVA-1520: Add node state listeners
910
- [new feature] JAVA-1493: Handle schema metadata

core/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@
6363
<groupId>com.github.jnr</groupId>
6464
<artifactId>jnr-posix</artifactId>
6565
</dependency>
66+
<dependency>
67+
<groupId>org.xerial.snappy</groupId>
68+
<artifactId>snappy-java</artifactId>
69+
<optional>true</optional>
70+
</dependency>
71+
<dependency>
72+
<groupId>net.jpountz.lz4</groupId>
73+
<artifactId>lz4</artifactId>
74+
<optional>true</optional>
75+
</dependency>
6676
<dependency>
6777
<groupId>org.slf4j</groupId>
6878
<artifactId>slf4j-api</artifactId>

core/src/main/java/com/datastax/oss/driver/api/core/config/CoreDriverOption.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public enum CoreDriverOption implements DriverOption {
2525

2626
PROTOCOL_VERSION("protocol.version", false),
2727
PROTOCOL_MAX_FRAME_LENGTH("protocol.max-frame-length", true),
28+
PROTOCOL_COMPRESSOR("protocol.compressor", false),
2829

2930
CLUSTER_NAME("cluster-name", false),
3031
CONFIG_RELOAD_INTERVAL("config-reload-interval", false),

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ String describe() {
140140
Message getRequest() {
141141
switch (step) {
142142
case STARTUP:
143-
return new Startup();
143+
return new Startup(internalDriverContext.compressor().algorithm());
144144
case GET_CLUSTER_NAME:
145145
return CLUSTER_NAME_QUERY;
146146
case SET_KEYSPACE:

core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,11 @@ protected EventBus buildEventBus() {
217217
return new EventBus(clusterName());
218218
}
219219

220+
@SuppressWarnings("unchecked")
220221
protected Compressor<ByteBuf> buildCompressor() {
221-
// TODO build alternate implementation if specified in conf
222-
return Compressor.none();
222+
return (Compressor<ByteBuf>)
223+
Reflection.buildFromConfig(this, CoreDriverOption.PROTOCOL_COMPRESSOR, Compressor.class)
224+
.orElse(Compressor.none());
223225
}
224226

225227
protected FrameCodec<ByteBuf> buildFrameCodec() {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (C) 2017-2017 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.protocol;
17+
18+
import com.datastax.oss.protocol.internal.Compressor;
19+
import io.netty.buffer.ByteBuf;
20+
import java.nio.ByteBuffer;
21+
22+
public abstract class ByteBufCompressor implements Compressor<ByteBuf> {
23+
24+
@Override
25+
public ByteBuf compress(ByteBuf uncompressed) {
26+
return uncompressed.isDirect() ? compressDirect(uncompressed) : compressHeap(uncompressed);
27+
}
28+
29+
protected abstract ByteBuf compressDirect(ByteBuf input);
30+
31+
protected abstract ByteBuf compressHeap(ByteBuf input);
32+
33+
@Override
34+
public ByteBuf decompress(ByteBuf compressed) {
35+
return compressed.isDirect() ? decompressDirect(compressed) : decompressHeap(compressed);
36+
}
37+
38+
protected abstract ByteBuf decompressDirect(ByteBuf input);
39+
40+
protected abstract ByteBuf decompressHeap(ByteBuf input);
41+
42+
protected static ByteBuffer inputNioBuffer(ByteBuf buf) {
43+
// Using internalNioBuffer(...) as we only hold the reference in this method and so can
44+
// reduce Object allocations.
45+
int index = buf.readerIndex();
46+
int len = buf.readableBytes();
47+
return buf.nioBufferCount() == 1
48+
? buf.internalNioBuffer(index, len)
49+
: buf.nioBuffer(index, len);
50+
}
51+
52+
protected static ByteBuffer outputNioBuffer(ByteBuf buf) {
53+
int index = buf.writerIndex();
54+
int len = buf.writableBytes();
55+
return buf.nioBufferCount() == 1
56+
? buf.internalNioBuffer(index, len)
57+
: buf.nioBuffer(index, len);
58+
}
59+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright (C) 2017-2017 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.protocol;
17+
18+
import com.datastax.oss.driver.api.core.config.DriverOption;
19+
import com.datastax.oss.driver.api.core.context.DriverContext;
20+
import com.datastax.oss.protocol.internal.Compressor;
21+
import io.netty.buffer.ByteBuf;
22+
import java.nio.ByteBuffer;
23+
import net.jpountz.lz4.LZ4Compressor;
24+
import net.jpountz.lz4.LZ4Factory;
25+
import net.jpountz.lz4.LZ4FastDecompressor;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
public class Lz4Compressor extends ByteBufCompressor {
30+
31+
private static final Logger LOG = LoggerFactory.getLogger(Lz4Compressor.class);
32+
33+
private final LZ4Compressor compressor;
34+
private final LZ4FastDecompressor decompressor;
35+
36+
public Lz4Compressor(DriverContext context, @SuppressWarnings("unused") DriverOption configRoot) {
37+
try {
38+
LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
39+
LOG.info("[{}] Using {}", context.clusterName(), lz4Factory.toString());
40+
this.compressor = lz4Factory.fastCompressor();
41+
this.decompressor = lz4Factory.fastDecompressor();
42+
} catch (NoClassDefFoundError e) {
43+
throw new IllegalStateException(
44+
"Error initializing compressor, make sure that the LZ4 library is in the classpath "
45+
+ "(the driver declares it as an optional dependency, "
46+
+ "so you need to declare it explicitly)",
47+
e);
48+
}
49+
}
50+
51+
@Override
52+
public String algorithm() {
53+
return "lz4";
54+
}
55+
56+
@Override
57+
protected ByteBuf compressDirect(ByteBuf input) {
58+
int maxCompressedLength = compressor.maxCompressedLength(input.readableBytes());
59+
// If the input is direct we will allocate a direct output buffer as well as this will allow us
60+
// to use LZ4Compressor.compress and so eliminate memory copies.
61+
ByteBuf output = input.alloc().directBuffer(4 + maxCompressedLength);
62+
try {
63+
ByteBuffer in = inputNioBuffer(input);
64+
// Increase reader index.
65+
input.readerIndex(input.writerIndex());
66+
67+
output.writeInt(in.remaining());
68+
69+
ByteBuffer out = outputNioBuffer(output);
70+
int written =
71+
compressor.compress(
72+
in, in.position(), in.remaining(), out, out.position(), out.remaining());
73+
// Set the writer index so the amount of written bytes is reflected
74+
output.writerIndex(output.writerIndex() + written);
75+
} catch (Exception e) {
76+
// release output buffer so we not leak and rethrow exception.
77+
output.release();
78+
throw e;
79+
}
80+
return output;
81+
}
82+
83+
@Override
84+
protected ByteBuf compressHeap(ByteBuf input) {
85+
int maxCompressedLength = compressor.maxCompressedLength(input.readableBytes());
86+
87+
// Not a direct buffer so use byte arrays...
88+
int inOffset = input.arrayOffset() + input.readerIndex();
89+
byte[] in = input.array();
90+
int len = input.readableBytes();
91+
// Increase reader index.
92+
input.readerIndex(input.writerIndex());
93+
94+
// Allocate a heap buffer from the ByteBufAllocator as we may use a PooledByteBufAllocator and
95+
// so can eliminate the overhead of allocate a new byte[].
96+
ByteBuf output = input.alloc().heapBuffer(4 + maxCompressedLength);
97+
try {
98+
output.writeInt(len);
99+
// calculate the correct offset.
100+
int offset = output.arrayOffset() + output.writerIndex();
101+
byte[] out = output.array();
102+
int written = compressor.compress(in, inOffset, len, out, offset);
103+
104+
// Set the writer index so the amount of written bytes is reflected
105+
output.writerIndex(output.writerIndex() + written);
106+
} catch (Exception e) {
107+
// release output buffer so we not leak and rethrow exception.
108+
output.release();
109+
throw e;
110+
}
111+
return output;
112+
}
113+
114+
@Override
115+
protected ByteBuf decompressDirect(ByteBuf input) {
116+
// If the input is direct we will allocate a direct output buffer as well as this will allow us
117+
// to use LZ4Compressor.decompress and so eliminate memory copies.
118+
int readable = input.readableBytes();
119+
int uncompressedLength = input.readInt();
120+
ByteBuffer in = inputNioBuffer(input);
121+
// Increase reader index.
122+
input.readerIndex(input.writerIndex());
123+
ByteBuf output = input.alloc().directBuffer(uncompressedLength);
124+
try {
125+
ByteBuffer out = outputNioBuffer(output);
126+
int read = decompressor.decompress(in, in.position(), out, out.position(), out.remaining());
127+
if (read != readable - 4) {
128+
throw new IllegalArgumentException("Compressed lengths mismatch");
129+
}
130+
131+
// Set the writer index so the amount of written bytes is reflected
132+
output.writerIndex(output.writerIndex() + uncompressedLength);
133+
} catch (Exception e) {
134+
// release output buffer so we not leak and rethrow exception.
135+
output.release();
136+
throw e;
137+
}
138+
return output;
139+
}
140+
141+
@Override
142+
protected ByteBuf decompressHeap(ByteBuf input) {
143+
// Not a direct buffer so use byte arrays...
144+
byte[] in = input.array();
145+
int len = input.readableBytes();
146+
int uncompressedLength = input.readInt();
147+
int inOffset = input.arrayOffset() + input.readerIndex();
148+
// Increase reader index.
149+
input.readerIndex(input.writerIndex());
150+
151+
// Allocate a heap buffer from the ByteBufAllocator as we may use a PooledByteBufAllocator and
152+
// so can eliminate the overhead of allocate a new byte[].
153+
ByteBuf output = input.alloc().heapBuffer(uncompressedLength);
154+
try {
155+
int offset = output.arrayOffset() + output.writerIndex();
156+
byte out[] = output.array();
157+
int read = decompressor.decompress(in, inOffset, out, offset, uncompressedLength);
158+
if (read != len - 4) {
159+
throw new IllegalArgumentException("Compressed lengths mismatch");
160+
}
161+
162+
// Set the writer index so the amount of written bytes is reflected
163+
output.writerIndex(output.writerIndex() + uncompressedLength);
164+
} catch (Exception e) {
165+
// release output buffer so we not leak and rethrow exception.
166+
output.release();
167+
throw e;
168+
}
169+
return output;
170+
}
171+
}

0 commit comments

Comments
 (0)