|
25 | 25 | import com.google.common.base.Throwables; |
26 | 26 | import com.google.common.collect.*; |
27 | 27 | import com.google.common.util.concurrent.*; |
28 | | -import io.netty.util.concurrent.DefaultThreadFactory; |
29 | 28 | import org.slf4j.Logger; |
30 | 29 | import org.slf4j.LoggerFactory; |
31 | 30 |
|
@@ -71,17 +70,13 @@ public class Cluster implements Closeable { |
71 | 70 |
|
72 | 71 | @VisibleForTesting |
73 | 72 | static final int NEW_NODE_DELAY_SECONDS = SystemProperties.getInt("com.datastax.driver.NEW_NODE_DELAY_SECONDS", 1); |
74 | | - private static final int NON_BLOCKING_EXECUTOR_SIZE = SystemProperties.getInt("com.datastax.driver.NON_BLOCKING_EXECUTOR_SIZE", |
75 | | - Runtime.getRuntime().availableProcessors()); |
76 | 73 |
|
77 | 74 | private static final ResourceBundle driverProperties = ResourceBundle.getBundle("com.datastax.driver.core.Driver"); |
78 | 75 |
|
79 | 76 | // Some per-JVM number that allows to generate unique cluster names when |
80 | 77 | // multiple Cluster instance are created in the same JVM. |
81 | 78 | private static final AtomicInteger CLUSTER_ID = new AtomicInteger(0); |
82 | 79 |
|
83 | | - private static final int DEFAULT_THREAD_KEEP_ALIVE = 30; |
84 | | - |
85 | 80 | private static final int NOTIF_LOCK_TIMEOUT_SECONDS = SystemProperties.getInt("com.datastax.driver.NOTIF_LOCK_TIMEOUT_SECONDS", 60); |
86 | 81 |
|
87 | 82 | final Manager manager; |
@@ -1227,6 +1222,19 @@ public Builder withQueryOptions(QueryOptions options) { |
1227 | 1222 | return this; |
1228 | 1223 | } |
1229 | 1224 |
|
| 1225 | + /** |
| 1226 | + * Sets the threading options to use for the newly created Cluster. |
| 1227 | + * <p/> |
| 1228 | + * If no options are set through this method, a new instance of {@link ThreadingOptions} will be used. |
| 1229 | + * |
| 1230 | + * @param options the options. |
| 1231 | + * @return this builder. |
| 1232 | + */ |
| 1233 | + public Builder withThreadingOptions(ThreadingOptions options) { |
| 1234 | + configurationBuilder.withThreadingOptions(options); |
| 1235 | + return this; |
| 1236 | + } |
| 1237 | + |
1230 | 1238 | /** |
1231 | 1239 | * Set the {@link NettyOptions} to use for the newly created Cluster. |
1232 | 1240 | * <p/> |
@@ -1316,20 +1324,15 @@ class Manager implements Connection.DefaultResponseHandler { |
1316 | 1324 |
|
1317 | 1325 | final ConvictionPolicy.Factory convictionPolicyFactory = new ConvictionPolicy.DefaultConvictionPolicy.Factory(); |
1318 | 1326 |
|
1319 | | - ScheduledThreadPoolExecutor reconnectionExecutor; |
1320 | | - ScheduledThreadPoolExecutor scheduledTasksExecutor; |
1321 | | - |
1322 | | - // Executor used for tasks that shouldn't be executed on an IO thread. Used for short-lived, generally non-blocking tasks |
1323 | 1327 | ListeningExecutorService executor; |
1324 | | - |
1325 | | - // Work Queue used by executor. |
1326 | | - LinkedBlockingQueue<Runnable> executorQueue; |
1327 | | - |
1328 | | - // An executor for tasks that might block some time, like creating new connection, but are generally not too critical. |
1329 | 1328 | ListeningExecutorService blockingExecutor; |
| 1329 | + ScheduledExecutorService reconnectionExecutor; |
| 1330 | + ScheduledExecutorService scheduledTasksExecutor; |
1330 | 1331 |
|
1331 | | - // Work Queue used by blockingExecutor. |
1332 | | - LinkedBlockingQueue<Runnable> blockingExecutorQueue; |
| 1332 | + BlockingQueue<Runnable> executorQueue; |
| 1333 | + BlockingQueue<Runnable> blockingExecutorQueue; |
| 1334 | + BlockingQueue<Runnable> reconnectionExecutorQueue; |
| 1335 | + BlockingQueue<Runnable> scheduledTasksExecutorQueue; |
1333 | 1336 |
|
1334 | 1337 | ConnectionReaper reaper; |
1335 | 1338 |
|
@@ -1368,16 +1371,31 @@ synchronized void init() { |
1368 | 1371 |
|
1369 | 1372 | this.configuration.register(this); |
1370 | 1373 |
|
1371 | | - this.executorQueue = new LinkedBlockingQueue<Runnable>(); |
1372 | | - this.executor = makeExecutor(NON_BLOCKING_EXECUTOR_SIZE, "worker", executorQueue); |
1373 | | - this.blockingExecutorQueue = new LinkedBlockingQueue<Runnable>(); |
1374 | | - this.blockingExecutor = makeExecutor(2, "blocking-task-worker", blockingExecutorQueue); |
1375 | | - this.reconnectionExecutor = new ScheduledThreadPoolExecutor(2, threadFactory("reconnection")); |
1376 | | - // scheduledTasksExecutor is used to process C* notifications. So having it mono-threaded ensures notifications are |
1377 | | - // applied in the order received. |
1378 | | - this.scheduledTasksExecutor = new ScheduledThreadPoolExecutor(1, threadFactory("scheduled-task-worker")); |
| 1374 | + ThreadingOptions threadingOptions = this.configuration.getThreadingOptions(); |
| 1375 | + |
| 1376 | + // executor |
| 1377 | + ExecutorService tmpExecutor = threadingOptions.createExecutor(clusterName); |
| 1378 | + this.executorQueue = (tmpExecutor instanceof ThreadPoolExecutor) |
| 1379 | + ? ((ThreadPoolExecutor) tmpExecutor).getQueue() : null; |
| 1380 | + this.executor = MoreExecutors.listeningDecorator(tmpExecutor); |
| 1381 | + |
| 1382 | + // blocking executor |
| 1383 | + ExecutorService tmpBlockingExecutor = threadingOptions.createBlockingExecutor(clusterName); |
| 1384 | + this.blockingExecutorQueue = (tmpBlockingExecutor instanceof ThreadPoolExecutor) |
| 1385 | + ? ((ThreadPoolExecutor) tmpBlockingExecutor).getQueue() : null; |
| 1386 | + this.blockingExecutor = MoreExecutors.listeningDecorator(tmpBlockingExecutor); |
1379 | 1387 |
|
1380 | | - this.reaper = new ConnectionReaper(this); |
| 1388 | + // reconnection executor |
| 1389 | + this.reconnectionExecutor = threadingOptions.createReconnectionExecutor(clusterName); |
| 1390 | + this.reconnectionExecutorQueue = (reconnectionExecutor instanceof ThreadPoolExecutor) |
| 1391 | + ? ((ThreadPoolExecutor) reconnectionExecutor).getQueue() : null; |
| 1392 | + |
| 1393 | + // scheduled tasks executor |
| 1394 | + this.scheduledTasksExecutor = threadingOptions.createScheduledTasksExecutor(clusterName); |
| 1395 | + this.scheduledTasksExecutorQueue = (scheduledTasksExecutor instanceof ThreadPoolExecutor) |
| 1396 | + ? ((ThreadPoolExecutor) scheduledTasksExecutor).getQueue() : null; |
| 1397 | + |
| 1398 | + this.reaper = new ConnectionReaper(threadingOptions.createReaperExecutor(clusterName)); |
1381 | 1399 | this.metadata = new Metadata(this); |
1382 | 1400 | this.connectionFactory = new Connection.Factory(this, configuration); |
1383 | 1401 | this.controlConnection = new ControlConnection(this); |
@@ -1544,29 +1562,6 @@ ProtocolVersion protocolVersion() { |
1544 | 1562 | return connectionFactory.protocolVersion; |
1545 | 1563 | } |
1546 | 1564 |
|
1547 | | - ThreadFactory threadFactory(String name) { |
1548 | | - return new ThreadFactoryBuilder() |
1549 | | - .setNameFormat(clusterName + "-" + name + "-%d") |
1550 | | - // Back with Netty's thread factory in order to create FastThreadLocalThread instances. This allows |
1551 | | - // an optimization around ThreadLocals (we could use DefaultThreadFactory directly but it creates |
1552 | | - // slightly different thread names, so keep we keep a ThreadFactoryBuilder wrapper for backward |
1553 | | - // compatibility). |
1554 | | - .setThreadFactory(new DefaultThreadFactory("ignored name")) |
1555 | | - .build(); |
1556 | | - } |
1557 | | - |
1558 | | - private ListeningExecutorService makeExecutor(int threads, String name, LinkedBlockingQueue<Runnable> workQueue) { |
1559 | | - ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, |
1560 | | - threads, |
1561 | | - DEFAULT_THREAD_KEEP_ALIVE, |
1562 | | - TimeUnit.SECONDS, |
1563 | | - workQueue, |
1564 | | - threadFactory(name)); |
1565 | | - |
1566 | | - executor.allowCoreThreadTimeOut(true); |
1567 | | - return MoreExecutors.listeningDecorator(executor); |
1568 | | - } |
1569 | | - |
1570 | 1565 | Cluster getCluster() { |
1571 | 1566 | return Cluster.this; |
1572 | 1567 | } |
@@ -2864,9 +2859,9 @@ public void run() { |
2864 | 2859 | } |
2865 | 2860 | }; |
2866 | 2861 |
|
2867 | | - ConnectionReaper(Cluster.Manager manager) { |
2868 | | - executor = Executors.newScheduledThreadPool(1, manager.threadFactory("connection-reaper")); |
2869 | | - executor.scheduleWithFixedDelay(reaperTask, INTERVAL_MS, INTERVAL_MS, TimeUnit.MILLISECONDS); |
| 2862 | + ConnectionReaper(ScheduledExecutorService executor) { |
| 2863 | + this.executor = executor; |
| 2864 | + this.executor.scheduleWithFixedDelay(reaperTask, INTERVAL_MS, INTERVAL_MS, TimeUnit.MILLISECONDS); |
2870 | 2865 | } |
2871 | 2866 |
|
2872 | 2867 | void register(Connection connection, long terminateTime) { |
|
0 commit comments