@@ -1122,12 +1122,12 @@ private static String generateClusterName() {
11221122 return "cluster" + CLUSTER_ID .incrementAndGet ();
11231123 }
11241124
1125- private static ListeningExecutorService makeExecutor (int threads , String name ) {
1125+ private static ListeningExecutorService makeExecutor (int threads , String name , LinkedBlockingQueue < Runnable > workQueue ) {
11261126 ThreadPoolExecutor executor = new ThreadPoolExecutor (threads ,
11271127 threads ,
11281128 DEFAULT_THREAD_KEEP_ALIVE ,
11291129 TimeUnit .SECONDS ,
1130- new LinkedBlockingQueue < Runnable >() ,
1130+ workQueue ,
11311131 threadFactory (name ));
11321132
11331133 executor .allowCoreThreadTimeOut (true );
@@ -1160,17 +1160,23 @@ class Manager implements Connection.DefaultResponseHandler {
11601160
11611161 final ConvictionPolicy .Factory convictionPolicyFactory = new ConvictionPolicy .Simple .Factory ();
11621162
1163- final ScheduledExecutorService reconnectionExecutor = Executors . newScheduledThreadPool (2 , threadFactory ("Reconnection-%d" ));
1163+ final ScheduledThreadPoolExecutor reconnectionExecutor = new ScheduledThreadPoolExecutor (2 , threadFactory ("Reconnection-%d" ));
11641164 // scheduledTasksExecutor is used to process C* notifications. So having it mono-threaded ensures notifications are
11651165 // applied in the order received.
1166- final ScheduledExecutorService scheduledTasksExecutor = Executors . newScheduledThreadPool (1 , threadFactory ("Scheduled Tasks-%d" ));
1166+ final ScheduledThreadPoolExecutor scheduledTasksExecutor = new ScheduledThreadPoolExecutor (1 , threadFactory ("Scheduled Tasks-%d" ));
11671167
11681168 // Executor used for tasks that shouldn't be executed on an IO thread. Used for short-lived, generally non-blocking tasks
11691169 final ListeningExecutorService executor ;
11701170
1171+ // Work Queue used by executor.
1172+ final LinkedBlockingQueue <Runnable > executorQueue = new LinkedBlockingQueue <Runnable >();
1173+
11711174 // An executor for tasks that might block some time, like creating new connection, but are generally not too critical.
11721175 final ListeningExecutorService blockingExecutor ;
11731176
1177+ // Work Queue used by blockingExecutor.
1178+ final LinkedBlockingQueue <Runnable > blockingExecutorQueue = new LinkedBlockingQueue <Runnable >();
1179+
11741180 final ConnectionReaper reaper ;
11751181
11761182 final AtomicReference <CloseFuture > closeFuture = new AtomicReference <CloseFuture >();
@@ -1191,8 +1197,8 @@ private Manager(String clusterName, List<InetSocketAddress> contactPoints, Confi
11911197 this .configuration = configuration ;
11921198 this .configuration .register (this );
11931199
1194- this .executor = makeExecutor (NON_BLOCKING_EXECUTOR_SIZE , "Cassandra Java Driver worker-%d" );
1195- this .blockingExecutor = makeExecutor (2 , "Cassandra Java Driver blocking tasks worker-%d" );
1200+ this .executor = makeExecutor (NON_BLOCKING_EXECUTOR_SIZE , "Cassandra Java Driver worker-%d" , executorQueue );
1201+ this .blockingExecutor = makeExecutor (2 , "Cassandra Java Driver blocking tasks worker-%d" , blockingExecutorQueue );
11961202
11971203 this .reaper = new ConnectionReaper ();
11981204
0 commit comments