Skip to content

Commit 50627fd

Browse files
committed
Merge branch '3.0.x' into 3.1.x
2 parents 4b1d801 + 712f2b0 commit 50627fd

12 files changed

Lines changed: 589 additions & 86 deletions

File tree

changelog/README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
## Changelog
22

3+
### 3.1.2 (in progress)
4+
5+
Merged from 3.0.x:
6+
7+
- [bug] JAVA-1312: QueryBuilder modifies selected columns when manually selected.
8+
- [improvement] JAVA-1303: Add missing BoundStatement.setRoutingKey(ByteBuffer...)
9+
- [improvement] JAVA-262: Make internal executors customizable
10+
11+
312
### 3.1.1
413

514
- [bug] JAVA-1284: ClockFactory should check system property before attempting to load Native class.
@@ -95,6 +104,32 @@ Merged from 2.1 branch:
95104
- [improvement] JAVA-444: Add Java process information to UUIDs.makeNode() hash.
96105

97106

107+
### 3.0.5
108+
109+
- [bug] JAVA-1312: QueryBuilder modifies selected columns when manually selected.
110+
- [improvement] JAVA-1303: Add missing BoundStatement.setRoutingKey(ByteBuffer...)
111+
- [improvement] JAVA-262: Make internal executors customizable
112+
113+
114+
### 3.0.4
115+
116+
- [improvement] JAVA-1246: Driver swallows the real exception in a few cases
117+
- [improvement] JAVA-1261: Throw error when attempting to page in I/O thread.
118+
- [bug] JAVA-1258: Regression: Mapper cannot map a materialized view after JAVA-1126.
119+
- [bug] JAVA-1101: Batch and BatchStatement should consider inner statements to determine query idempotence
120+
- [improvement] JAVA-1262: Use ParseUtils for quoting & unquoting.
121+
- [improvement] JAVA-1275: Use Netty's default thread factory
122+
- [bug] JAVA-1285: QueryBuilder routing key auto-discovery should handle case-sensitive column names.
123+
- [bug] JAVA-1283: Don't cache failed query preparations in the mapper.
124+
- [improvement] JAVA-1277: Expose AbstractSession.checkNotInEventLoop.
125+
- [bug] JAVA-1272: BuiltStatement not able to print its query string if it contains mapped UDTs.
126+
- [bug] JAVA-1292: 'Adjusted frame length' error breaks driver's ability to read data.
127+
- [improvement] JAVA-1293: Make DecoderForStreamIdSize.MAX_FRAME_LENGTH configurable.
128+
- [improvement] JAVA-1053: Add a metric for authentication errors
129+
- [improvement] JAVA-1263: Eliminate unnecessary memory copies in FrameCompressor implementations.
130+
- [improvement] JAVA-893: Make connection pool non-blocking
131+
132+
98133
### 3.0.3
99134

100135
- [improvement] JAVA-1147: Upgrade Netty to 4.0.37.

driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,24 @@ public BoundStatement setRoutingKey(ByteBuffer routingKey) {
270270
return this;
271271
}
272272

273+
/**
274+
* Sets the routing key for this bound statement, when the query partition key is composite and the routing key must
275+
* be built from multiple values.
276+
* <p/>
277+
* This is useful when the routing key can neither be set on the {@code PreparedStatement} this bound statement
278+
* was built from, nor automatically computed from bound variables. In particular, this is the case if the
279+
* partition key is composite and only some of its components are bound.
280+
*
281+
* @param routingKeyComponents the raw (binary) values to compose to obtain
282+
* the routing key.
283+
* @return this {@code BoundStatement} object.
284+
* @see BoundStatement#getRoutingKey
285+
*/
286+
public BoundStatement setRoutingKey(ByteBuffer... routingKeyComponents) {
287+
this.routingKey = SimpleStatement.compose(routingKeyComponents);
288+
return this;
289+
}
290+
273291
/**
274292
* {@inheritDoc}
275293
*/

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 46 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.common.base.Throwables;
2626
import com.google.common.collect.*;
2727
import com.google.common.util.concurrent.*;
28-
import io.netty.util.concurrent.DefaultThreadFactory;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130

@@ -71,17 +70,13 @@ public class Cluster implements Closeable {
7170

7271
@VisibleForTesting
7372
static final int NEW_NODE_DELAY_SECONDS = SystemProperties.getInt("com.datastax.driver.NEW_NODE_DELAY_SECONDS", 1);
74-
private static final int NON_BLOCKING_EXECUTOR_SIZE = SystemProperties.getInt("com.datastax.driver.NON_BLOCKING_EXECUTOR_SIZE",
75-
Runtime.getRuntime().availableProcessors());
7673

7774
private static final ResourceBundle driverProperties = ResourceBundle.getBundle("com.datastax.driver.core.Driver");
7875

7976
// Some per-JVM number that allows to generate unique cluster names when
8077
// multiple Cluster instance are created in the same JVM.
8178
private static final AtomicInteger CLUSTER_ID = new AtomicInteger(0);
8279

83-
private static final int DEFAULT_THREAD_KEEP_ALIVE = 30;
84-
8580
private static final int NOTIF_LOCK_TIMEOUT_SECONDS = SystemProperties.getInt("com.datastax.driver.NOTIF_LOCK_TIMEOUT_SECONDS", 60);
8681

8782
final Manager manager;
@@ -1227,6 +1222,19 @@ public Builder withQueryOptions(QueryOptions options) {
12271222
return this;
12281223
}
12291224

1225+
/**
1226+
* Sets the threading options to use for the newly created Cluster.
1227+
* <p/>
1228+
* If no options are set through this method, a new instance of {@link ThreadingOptions} will be used.
1229+
*
1230+
* @param options the options.
1231+
* @return this builder.
1232+
*/
1233+
public Builder withThreadingOptions(ThreadingOptions options) {
1234+
configurationBuilder.withThreadingOptions(options);
1235+
return this;
1236+
}
1237+
12301238
/**
12311239
* Set the {@link NettyOptions} to use for the newly created Cluster.
12321240
* <p/>
@@ -1316,20 +1324,15 @@ class Manager implements Connection.DefaultResponseHandler {
13161324

13171325
final ConvictionPolicy.Factory convictionPolicyFactory = new ConvictionPolicy.DefaultConvictionPolicy.Factory();
13181326

1319-
ScheduledThreadPoolExecutor reconnectionExecutor;
1320-
ScheduledThreadPoolExecutor scheduledTasksExecutor;
1321-
1322-
// Executor used for tasks that shouldn't be executed on an IO thread. Used for short-lived, generally non-blocking tasks
13231327
ListeningExecutorService executor;
1324-
1325-
// Work Queue used by executor.
1326-
LinkedBlockingQueue<Runnable> executorQueue;
1327-
1328-
// An executor for tasks that might block some time, like creating new connection, but are generally not too critical.
13291328
ListeningExecutorService blockingExecutor;
1329+
ScheduledExecutorService reconnectionExecutor;
1330+
ScheduledExecutorService scheduledTasksExecutor;
13301331

1331-
// Work Queue used by blockingExecutor.
1332-
LinkedBlockingQueue<Runnable> blockingExecutorQueue;
1332+
BlockingQueue<Runnable> executorQueue;
1333+
BlockingQueue<Runnable> blockingExecutorQueue;
1334+
BlockingQueue<Runnable> reconnectionExecutorQueue;
1335+
BlockingQueue<Runnable> scheduledTasksExecutorQueue;
13331336

13341337
ConnectionReaper reaper;
13351338

@@ -1368,16 +1371,31 @@ synchronized void init() {
13681371

13691372
this.configuration.register(this);
13701373

1371-
this.executorQueue = new LinkedBlockingQueue<Runnable>();
1372-
this.executor = makeExecutor(NON_BLOCKING_EXECUTOR_SIZE, "worker", executorQueue);
1373-
this.blockingExecutorQueue = new LinkedBlockingQueue<Runnable>();
1374-
this.blockingExecutor = makeExecutor(2, "blocking-task-worker", blockingExecutorQueue);
1375-
this.reconnectionExecutor = new ScheduledThreadPoolExecutor(2, threadFactory("reconnection"));
1376-
// scheduledTasksExecutor is used to process C* notifications. So having it mono-threaded ensures notifications are
1377-
// applied in the order received.
1378-
this.scheduledTasksExecutor = new ScheduledThreadPoolExecutor(1, threadFactory("scheduled-task-worker"));
1374+
ThreadingOptions threadingOptions = this.configuration.getThreadingOptions();
1375+
1376+
// executor
1377+
ExecutorService tmpExecutor = threadingOptions.createExecutor(clusterName);
1378+
this.executorQueue = (tmpExecutor instanceof ThreadPoolExecutor)
1379+
? ((ThreadPoolExecutor) tmpExecutor).getQueue() : null;
1380+
this.executor = MoreExecutors.listeningDecorator(tmpExecutor);
1381+
1382+
// blocking executor
1383+
ExecutorService tmpBlockingExecutor = threadingOptions.createBlockingExecutor(clusterName);
1384+
this.blockingExecutorQueue = (tmpBlockingExecutor instanceof ThreadPoolExecutor)
1385+
? ((ThreadPoolExecutor) tmpBlockingExecutor).getQueue() : null;
1386+
this.blockingExecutor = MoreExecutors.listeningDecorator(tmpBlockingExecutor);
13791387

1380-
this.reaper = new ConnectionReaper(this);
1388+
// reconnection executor
1389+
this.reconnectionExecutor = threadingOptions.createReconnectionExecutor(clusterName);
1390+
this.reconnectionExecutorQueue = (reconnectionExecutor instanceof ThreadPoolExecutor)
1391+
? ((ThreadPoolExecutor) reconnectionExecutor).getQueue() : null;
1392+
1393+
// scheduled tasks executor
1394+
this.scheduledTasksExecutor = threadingOptions.createScheduledTasksExecutor(clusterName);
1395+
this.scheduledTasksExecutorQueue = (scheduledTasksExecutor instanceof ThreadPoolExecutor)
1396+
? ((ThreadPoolExecutor) scheduledTasksExecutor).getQueue() : null;
1397+
1398+
this.reaper = new ConnectionReaper(threadingOptions.createReaperExecutor(clusterName));
13811399
this.metadata = new Metadata(this);
13821400
this.connectionFactory = new Connection.Factory(this, configuration);
13831401
this.controlConnection = new ControlConnection(this);
@@ -1544,29 +1562,6 @@ ProtocolVersion protocolVersion() {
15441562
return connectionFactory.protocolVersion;
15451563
}
15461564

1547-
ThreadFactory threadFactory(String name) {
1548-
return new ThreadFactoryBuilder()
1549-
.setNameFormat(clusterName + "-" + name + "-%d")
1550-
// Back with Netty's thread factory in order to create FastThreadLocalThread instances. This allows
1551-
// an optimization around ThreadLocals (we could use DefaultThreadFactory directly but it creates
1552-
// slightly different thread names, so keep we keep a ThreadFactoryBuilder wrapper for backward
1553-
// compatibility).
1554-
.setThreadFactory(new DefaultThreadFactory("ignored name"))
1555-
.build();
1556-
}
1557-
1558-
private ListeningExecutorService makeExecutor(int threads, String name, LinkedBlockingQueue<Runnable> workQueue) {
1559-
ThreadPoolExecutor executor = new ThreadPoolExecutor(threads,
1560-
threads,
1561-
DEFAULT_THREAD_KEEP_ALIVE,
1562-
TimeUnit.SECONDS,
1563-
workQueue,
1564-
threadFactory(name));
1565-
1566-
executor.allowCoreThreadTimeOut(true);
1567-
return MoreExecutors.listeningDecorator(executor);
1568-
}
1569-
15701565
Cluster getCluster() {
15711566
return Cluster.this;
15721567
}
@@ -2864,9 +2859,9 @@ public void run() {
28642859
}
28652860
};
28662861

2867-
ConnectionReaper(Cluster.Manager manager) {
2868-
executor = Executors.newScheduledThreadPool(1, manager.threadFactory("connection-reaper"));
2869-
executor.scheduleWithFixedDelay(reaperTask, INTERVAL_MS, INTERVAL_MS, TimeUnit.MILLISECONDS);
2862+
ConnectionReaper(ScheduledExecutorService executor) {
2863+
this.executor = executor;
2864+
this.executor.scheduleWithFixedDelay(reaperTask, INTERVAL_MS, INTERVAL_MS, TimeUnit.MILLISECONDS);
28702865
}
28712866

28722867
void register(Connection connection, long terminateTime) {

driver-core/src/main/java/com/datastax/driver/core/Configuration.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public static Builder builder() {
5151
private final SocketOptions socketOptions;
5252
private final MetricsOptions metricsOptions;
5353
private final QueryOptions queryOptions;
54+
private final ThreadingOptions threadingOptions;
5455
private final NettyOptions nettyOptions;
5556
private final CodecRegistry codecRegistry;
5657

@@ -60,6 +61,7 @@ private Configuration(Policies policies,
6061
SocketOptions socketOptions,
6162
MetricsOptions metricsOptions,
6263
QueryOptions queryOptions,
64+
ThreadingOptions threadingOptions,
6365
NettyOptions nettyOptions,
6466
CodecRegistry codecRegistry) {
6567
this.policies = policies;
@@ -68,6 +70,7 @@ private Configuration(Policies policies,
6870
this.socketOptions = socketOptions;
6971
this.metricsOptions = metricsOptions;
7072
this.queryOptions = queryOptions;
73+
this.threadingOptions = threadingOptions;
7174
this.nettyOptions = nettyOptions;
7275
this.codecRegistry = codecRegistry;
7376
}
@@ -85,6 +88,7 @@ protected Configuration(Configuration toCopy) {
8588
toCopy.getSocketOptions(),
8689
toCopy.getMetricsOptions(),
8790
toCopy.getQueryOptions(),
91+
toCopy.getThreadingOptions(),
8892
toCopy.getNettyOptions(),
8993
toCopy.getCodecRegistry()
9094
);
@@ -153,6 +157,13 @@ public QueryOptions getQueryOptions() {
153157
return queryOptions;
154158
}
155159

160+
/**
161+
* @return the threading options for this configuration.
162+
*/
163+
public ThreadingOptions getThreadingOptions() {
164+
return threadingOptions;
165+
}
166+
156167
/**
157168
* Returns the {@link NettyOptions} instance for this configuration.
158169
*
@@ -186,6 +197,7 @@ public static class Builder {
186197
private SocketOptions socketOptions;
187198
private MetricsOptions metricsOptions;
188199
private QueryOptions queryOptions;
200+
private ThreadingOptions threadingOptions;
189201
private NettyOptions nettyOptions;
190202
private CodecRegistry codecRegistry;
191203

@@ -260,6 +272,17 @@ public Builder withQueryOptions(QueryOptions queryOptions) {
260272
return this;
261273
}
262274

275+
/**
276+
* Sets the threading options for this cluster.
277+
*
278+
* @param threadingOptions the threading options to set.
279+
* @return this builder.
280+
*/
281+
public Builder withThreadingOptions(ThreadingOptions threadingOptions) {
282+
this.threadingOptions = threadingOptions;
283+
return this;
284+
}
285+
263286
/**
264287
* Sets the Netty options for this cluster.
265288
*
@@ -297,6 +320,7 @@ public Configuration build() {
297320
socketOptions != null ? socketOptions : new SocketOptions(),
298321
metricsOptions != null ? metricsOptions : new MetricsOptions(),
299322
queryOptions != null ? queryOptions : new QueryOptions(),
323+
threadingOptions != null ? threadingOptions : new ThreadingOptions(),
300324
nettyOptions != null ? nettyOptions : NettyOptions.DEFAULT_INSTANCE,
301325
codecRegistry != null ? codecRegistry : CodecRegistry.DEFAULT_INSTANCE);
302326
}

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -766,9 +766,11 @@ static class Factory {
766766
this.authProvider = configuration.getProtocolOptions().getAuthProvider();
767767
this.protocolVersion = configuration.getProtocolOptions().initialProtocolVersion;
768768
this.nettyOptions = configuration.getNettyOptions();
769-
this.eventLoopGroup = nettyOptions.eventLoopGroup(manager.threadFactory("nio-worker"));
769+
this.eventLoopGroup = nettyOptions.eventLoopGroup(
770+
manager.configuration.getThreadingOptions().createThreadFactory(manager.clusterName, "nio-worker"));
770771
this.channelClass = nettyOptions.channelClass();
771-
this.timer = nettyOptions.timer(manager.threadFactory("timeouter"));
772+
this.timer = nettyOptions.timer(
773+
manager.configuration.getThreadingOptions().createThreadFactory(manager.clusterName, "timeouter"));
772774
}
773775

774776
int getPort() {

0 commit comments

Comments
 (0)