Skip to content

Commit 083600a

Browse files
committed
Upgrade Netty to 4.0.26 (JAVA-622).
Includes patch to coalesce frames before flushing them (JAVA-562), submitted by Ariel Weisberg.
1 parent ed94a14 commit 083600a

16 files changed

Lines changed: 543 additions & 270 deletions

File tree

driver-core/CHANGELOG.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ CHANGELOG
2222
- [new feature] Add getObject to BoundStatement and Row (JAVA-584)
2323
- [improvement] Improve connection pool resizing algorithm (JAVA-419)
2424
- [bug] Fix race condition between pool expansion and shutdown (JAVA-599)
25+
- [improvement] Upgrade Netty to 4.0.26 (JAVA-622)
26+
- [improvement] Coalesce frames before flushing them to the connection
27+
(JAVA-562)
2528

2629
Merged from 2.0.9_fixes branch:
2730

driver-core/pom.xml

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<dependencies>
3030
<dependency>
3131
<groupId>io.netty</groupId>
32-
<artifactId>netty</artifactId>
32+
<artifactId>netty-handler</artifactId>
3333
<version>${netty.version}</version>
3434
</dependency>
3535

@@ -62,6 +62,13 @@
6262
<optional>true</optional>
6363
</dependency>
6464

65+
<dependency>
66+
<groupId>io.netty</groupId>
67+
<artifactId>netty-transport-native-epoll</artifactId>
68+
<version>${netty.version}</version>
69+
<optional>true</optional>
70+
</dependency>
71+
6572
<!-- End of compression libraries -->
6673

6774
<dependency>
@@ -162,7 +169,7 @@
162169
<configuration>
163170
<manifestLocation>${project.build.directory}/META-INF-shaded</manifestLocation>
164171
<instructions>
165-
<Import-Package><![CDATA[com.google.common.*;version="[14.0,19)",!org.jboss.netty.*,*]]></Import-Package>
172+
<Import-Package><![CDATA[com.google.common.*;version="[14.0,19)",!io.netty.*,*]]></Import-Package>
166173
<Private-Package>com.datastax.shaded.*</Private-Package>
167174
</instructions>
168175
</configuration>
@@ -180,21 +187,30 @@
180187
<shadedArtifactAttached>true</shadedArtifactAttached>
181188
<artifactSet>
182189
<includes>
183-
<include>io.netty:netty</include>
190+
<include>io.netty:*</include>
184191
</includes>
185192
</artifactSet>
186193
<relocations>
187194
<relocation>
188-
<pattern>org.jboss.netty</pattern>
195+
<pattern>io.netty</pattern>
189196
<shadedPattern>com.datastax.shaded.netty</shadedPattern>
190197
</relocation>
191198
</relocations>
192199
<transformers>
193200
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
194201
<resources>
195202
<resource>META-INF/MANIFEST.MF</resource>
196-
<resource>META-INF/maven/io.netty/netty/pom.properties</resource>
197-
<resource>META-INF/maven/io.netty/netty/pom.xml</resource>
203+
<resource>META-INF/io.netty.versions.properties</resource>
204+
<resource>META-INF/maven/io.netty/netty-buffer/pom.properties</resource>
205+
<resource>META-INF/maven/io.netty/netty-buffer/pom.xml</resource>
206+
<resource>META-INF/maven/io.netty/netty-codec/pom.properties</resource>
207+
<resource>META-INF/maven/io.netty/netty-codec/pom.xml</resource>
208+
<resource>META-INF/maven/io.netty/netty-common/pom.properties</resource>
209+
<resource>META-INF/maven/io.netty/netty-common/pom.xml</resource>
210+
<resource>META-INF/maven/io.netty/netty-handler/pom.properties</resource>
211+
<resource>META-INF/maven/io.netty/netty-handler/pom.xml</resource>
212+
<resource>META-INF/maven/io.netty/netty-transport/pom.properties</resource>
213+
<resource>META-INF/maven/io.netty/netty-transport/pom.xml</resource>
198214
</resources>
199215
</transformer>
200216
<!-- Pick up the alternate manifest that was generated by the alternate execution of the bundle plugin -->

driver-core/src/main/java/com/datastax/driver/core/CBUtil.java

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@
2222
import java.nio.charset.CharacterCodingException;
2323
import java.util.*;
2424

25-
import org.jboss.netty.buffer.ChannelBuffer;
26-
import org.jboss.netty.util.CharsetUtil;
25+
import io.netty.buffer.ByteBuf;
26+
import io.netty.util.CharsetUtil;
2727

2828
import com.datastax.driver.core.exceptions.DriverInternalError;
2929

3030
/**
31-
* ChannelBuffer utility methods.
31+
* ByteBuf utility methods.
3232
*/
33-
abstract class CBUtil {
33+
abstract class CBUtil { // TODO rename
3434

3535
private CBUtil() {}
3636

37-
private static String readString(ChannelBuffer cb, int length) {
37+
private static String readString(ByteBuf cb, int length) {
3838
try {
3939
String str = cb.toString(cb.readerIndex(), length, CharsetUtil.UTF_8);
4040
cb.readerIndex(cb.readerIndex() + length);
@@ -48,7 +48,7 @@ private static String readString(ChannelBuffer cb, int length) {
4848
}
4949
}
5050

51-
public static String readString(ChannelBuffer cb) {
51+
public static String readString(ByteBuf cb) {
5252
try {
5353
int length = cb.readUnsignedShort();
5454
return readString(cb, length);
@@ -57,7 +57,7 @@ public static String readString(ChannelBuffer cb) {
5757
}
5858
}
5959

60-
public static void writeString(String str, ChannelBuffer cb) {
60+
public static void writeString(String str, ByteBuf cb) {
6161
byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
6262
cb.writeShort(bytes.length);
6363
cb.writeBytes(bytes);
@@ -82,7 +82,7 @@ else if (c > 0x07FF)
8282
return utflen;
8383
}
8484

85-
public static String readLongString(ChannelBuffer cb) {
85+
public static String readLongString(ByteBuf cb) {
8686
try {
8787
int length = cb.readInt();
8888
return readString(cb, length);
@@ -91,7 +91,7 @@ public static String readLongString(ChannelBuffer cb) {
9191
}
9292
}
9393

94-
public static void writeLongString(String str, ChannelBuffer cb) {
94+
public static void writeLongString(String str, ByteBuf cb) {
9595
byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
9696
cb.writeInt(bytes.length);
9797
cb.writeBytes(bytes);
@@ -101,7 +101,7 @@ public static int sizeOfLongString(String str) {
101101
return 4 + str.getBytes(CharsetUtil.UTF_8).length;
102102
}
103103

104-
public static byte[] readBytes(ChannelBuffer cb) {
104+
public static byte[] readBytes(ByteBuf cb) {
105105
try {
106106
int length = cb.readUnsignedShort();
107107
byte[] bytes = new byte[length];
@@ -112,7 +112,7 @@ public static byte[] readBytes(ChannelBuffer cb) {
112112
}
113113
}
114114

115-
public static void writeBytes(byte[] bytes, ChannelBuffer cb) {
115+
public static void writeBytes(byte[] bytes, ByteBuf cb) {
116116
cb.writeShort(bytes.length);
117117
cb.writeBytes(bytes);
118118
}
@@ -121,19 +121,19 @@ public static int sizeOfBytes(byte[] bytes) {
121121
return 2 + bytes.length;
122122
}
123123

124-
public static ConsistencyLevel readConsistencyLevel(ChannelBuffer cb) {
124+
public static ConsistencyLevel readConsistencyLevel(ByteBuf cb) {
125125
return ConsistencyLevel.fromCode(cb.readUnsignedShort());
126126
}
127127

128-
public static void writeConsistencyLevel(ConsistencyLevel consistency, ChannelBuffer cb) {
128+
public static void writeConsistencyLevel(ConsistencyLevel consistency, ByteBuf cb) {
129129
cb.writeShort(consistency.code);
130130
}
131131

132132
public static int sizeOfConsistencyLevel(ConsistencyLevel consistency) {
133133
return 2;
134134
}
135135

136-
public static <T extends Enum<T>> T readEnumValue(Class<T> enumType, ChannelBuffer cb) {
136+
public static <T extends Enum<T>> T readEnumValue(Class<T> enumType, ByteBuf cb) {
137137
String value = CBUtil.readString(cb);
138138
try {
139139
return Enum.valueOf(enumType, value.toUpperCase());
@@ -142,21 +142,21 @@ public static <T extends Enum<T>> T readEnumValue(Class<T> enumType, ChannelBuff
142142
}
143143
}
144144

145-
public static <T extends Enum<T>> void writeEnumValue(T enumValue, ChannelBuffer cb) {
145+
public static <T extends Enum<T>> void writeEnumValue(T enumValue, ByteBuf cb) {
146146
writeString(enumValue.toString(), cb);
147147
}
148148

149149
public static <T extends Enum<T>> int sizeOfEnumValue(T enumValue) {
150150
return sizeOfString(enumValue.toString());
151151
}
152152

153-
public static UUID readUUID(ChannelBuffer cb) {
153+
public static UUID readUUID(ByteBuf cb) {
154154
long msb = cb.readLong();
155155
long lsb = cb.readLong();
156156
return new UUID(msb, lsb);
157157
}
158158

159-
public static void writeUUID(UUID uuid, ChannelBuffer cb) {
159+
public static void writeUUID(UUID uuid, ByteBuf cb) {
160160
cb.writeLong(uuid.getMostSignificantBits());
161161
cb.writeLong(uuid.getLeastSignificantBits());
162162
}
@@ -165,15 +165,15 @@ public static int sizeOfUUID(UUID uuid) {
165165
return 16;
166166
}
167167

168-
public static List<String> readStringList(ChannelBuffer cb) {
168+
public static List<String> readStringList(ByteBuf cb) {
169169
int length = cb.readUnsignedShort();
170170
List<String> l = new ArrayList<String>(length);
171171
for (int i = 0; i < length; i++)
172172
l.add(readString(cb));
173173
return l;
174174
}
175175

176-
public static void writeStringList(List<String> l, ChannelBuffer cb) {
176+
public static void writeStringList(List<String> l, ByteBuf cb) {
177177
cb.writeShort(l.size());
178178
for (String str : l)
179179
writeString(str, cb);
@@ -186,7 +186,7 @@ public static int sizeOfStringList(List<String> l) {
186186
return size;
187187
}
188188

189-
public static Map<String, String> readStringMap(ChannelBuffer cb) {
189+
public static Map<String, String> readStringMap(ByteBuf cb) {
190190
int length = cb.readUnsignedShort();
191191
Map<String, String> m = new HashMap<String, String>(length);
192192
for (int i = 0; i < length; i++) {
@@ -197,7 +197,7 @@ public static Map<String, String> readStringMap(ChannelBuffer cb) {
197197
return m;
198198
}
199199

200-
public static void writeStringMap(Map<String, String> m, ChannelBuffer cb) {
200+
public static void writeStringMap(Map<String, String> m, ByteBuf cb) {
201201
cb.writeShort(m.size());
202202
for (Map.Entry<String, String> entry : m.entrySet()) {
203203
writeString(entry.getKey(), cb);
@@ -214,7 +214,7 @@ public static int sizeOfStringMap(Map<String, String> m) {
214214
return size;
215215
}
216216

217-
public static Map<String, List<String>> readStringToStringListMap(ChannelBuffer cb) {
217+
public static Map<String, List<String>> readStringToStringListMap(ByteBuf cb) {
218218
int length = cb.readUnsignedShort();
219219
Map<String, List<String>> m = new HashMap<String, List<String>>(length);
220220
for (int i = 0; i < length; i++) {
@@ -225,7 +225,7 @@ public static Map<String, List<String>> readStringToStringListMap(ChannelBuffer
225225
return m;
226226
}
227227

228-
public static void writeStringToStringListMap(Map<String, List<String>> m, ChannelBuffer cb) {
228+
public static void writeStringToStringListMap(Map<String, List<String>> m, ByteBuf cb) {
229229
cb.writeShort(m.size());
230230
for (Map.Entry<String, List<String>> entry : m.entrySet()) {
231231
writeString(entry.getKey(), cb);
@@ -242,12 +242,16 @@ public static int sizeOfStringToStringListMap(Map<String, List<String>> m) {
242242
return size;
243243
}
244244

245-
public static ByteBuffer readValue(ChannelBuffer cb) {
245+
public static ByteBuffer readValue(ByteBuf cb) {
246246
int length = cb.readInt();
247-
return length < 0 ? null : cb.readSlice(length).toByteBuffer();
247+
if (length < 0)
248+
return null;
249+
ByteBuf slice = cb.readSlice(length);
250+
251+
return ByteBuffer.wrap(readRawBytes(slice));
248252
}
249253

250-
public static void writeValue(byte[] bytes, ChannelBuffer cb) {
254+
public static void writeValue(byte[] bytes, ByteBuf cb) {
251255
if (bytes == null) {
252256
cb.writeInt(-1);
253257
return;
@@ -257,7 +261,7 @@ public static void writeValue(byte[] bytes, ChannelBuffer cb) {
257261
cb.writeBytes(bytes);
258262
}
259263

260-
public static void writeValue(ByteBuffer bytes, ChannelBuffer cb) {
264+
public static void writeValue(ByteBuffer bytes, ByteBuf cb) {
261265
if (bytes == null) {
262266
cb.writeInt(-1);
263267
return;
@@ -275,7 +279,7 @@ public static int sizeOfValue(ByteBuffer bytes) {
275279
return 4 + (bytes == null ? 0 : bytes.remaining());
276280
}
277281

278-
public static List<ByteBuffer> readValueList(ChannelBuffer cb) {
282+
public static List<ByteBuffer> readValueList(ByteBuf cb) {
279283
int size = cb.readUnsignedShort();
280284
if (size == 0)
281285
return Collections.<ByteBuffer>emptyList();
@@ -286,7 +290,7 @@ public static List<ByteBuffer> readValueList(ChannelBuffer cb) {
286290
return l;
287291
}
288292

289-
public static void writeValueList(List<ByteBuffer> values, ChannelBuffer cb) {
293+
public static void writeValueList(List<ByteBuffer> values, ByteBuf cb) {
290294
cb.writeShort(values.size());
291295
for (ByteBuffer value : values)
292296
CBUtil.writeValue(value, cb);
@@ -299,7 +303,7 @@ public static int sizeOfValueList(List<ByteBuffer> values) {
299303
return size;
300304
}
301305

302-
public static InetSocketAddress readInet(ChannelBuffer cb) {
306+
public static InetSocketAddress readInet(ByteBuf cb) {
303307
int addrSize = cb.readByte();
304308
byte[] address = new byte[addrSize];
305309
cb.readBytes(address);
@@ -311,7 +315,7 @@ public static InetSocketAddress readInet(ChannelBuffer cb) {
311315
}
312316
}
313317

314-
public static void writeInet(InetSocketAddress inet, ChannelBuffer cb) {
318+
public static void writeInet(InetSocketAddress inet, ByteBuf cb) {
315319
byte[] address = inet.getAddress().getAddress();
316320

317321
cb.writeByte(address.length);
@@ -328,7 +332,7 @@ public static int sizeOfInet(InetSocketAddress inet) {
328332
* Reads *all* readable bytes from {@code cb} and return them.
329333
* If {@code cb} is backed by an array, this will return the underlying array directly, without copy.
330334
*/
331-
public static byte[] readRawBytes(ChannelBuffer cb) {
335+
public static byte[] readRawBytes(ByteBuf cb) {
332336
if (cb.hasArray() && cb.readableBytes() == cb.array().length) {
333337
// Move the readerIndex just so we consistently consume the input
334338
cb.readerIndex(cb.writerIndex());

0 commit comments

Comments
 (0)