Skip to content

Commit 5a1bb55

Browse files
committed
Merge branch '2.0' into 2.1
2 parents 99e0d1a + a313058 commit 5a1bb55

26 files changed

Lines changed: 747 additions & 231 deletions

.travis.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
language: java
2+
jdk:
3+
- openjdk6
4+
- oraclejdk7
5+
- oraclejdk8

driver-core/pom.xml

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,19 @@
3030
<dependency>
3131
<groupId>io.netty</groupId>
3232
<artifactId>netty</artifactId>
33-
<version>3.9.0.Final</version>
33+
<version>${netty.version}</version>
3434
</dependency>
3535

3636
<dependency>
3737
<groupId>com.google.guava</groupId>
3838
<artifactId>guava</artifactId>
39-
<!--
40-
We use an old version of Guava to be compatible with Spark 1.1.
41-
Check with the spark-cassandra-connector team before upgrading this.
42-
-->
43-
<version>14.0.1</version>
39+
<version>${guava.version}</version>
4440
</dependency>
4541

4642
<dependency>
4743
<groupId>com.codahale.metrics</groupId>
4844
<artifactId>metrics-core</artifactId>
49-
<version>3.0.2</version>
45+
<version>${metrics.version}</version>
5046
</dependency>
5147

5248
<!-- Compression libraries for the protocol. -->
@@ -55,14 +51,14 @@
5551
<dependency>
5652
<groupId>org.xerial.snappy</groupId>
5753
<artifactId>snappy-java</artifactId>
58-
<version>1.0.5</version>
54+
<version>${snappy.version}</version>
5955
<optional>true</optional>
6056
</dependency>
6157

6258
<dependency>
6359
<groupId>net.jpountz.lz4</groupId>
6460
<artifactId>lz4</artifactId>
65-
<version>1.2.0</version>
61+
<version>${lz4.version}</version>
6662
<optional>true</optional>
6763
</dependency>
6864

@@ -71,29 +67,28 @@
7167
<dependency>
7268
<groupId>org.testng</groupId>
7369
<artifactId>testng</artifactId>
74-
<version>6.8.8</version>
70+
<version>${testng.version}</version>
7571
<scope>test</scope>
7672
</dependency>
7773

7874
<dependency>
7975
<groupId>org.assertj</groupId>
8076
<artifactId>assertj-core</artifactId>
81-
<version>1.7.0</version>
77+
<version>${assertj.version}</version>
8278
<scope>test</scope>
8379
</dependency>
8480

8581
<dependency>
8682
<groupId>org.mockito</groupId>
8783
<artifactId>mockito-all</artifactId>
88-
<version>1.10.8</version>
84+
<version>${mockito.version}</version>
8985
<scope>test</scope>
9086
</dependency>
9187

9288
<dependency>
9389
<groupId>org.scassandra</groupId>
9490
<artifactId>java-client</artifactId>
95-
<!-- N.B. later versions of scassandra require JDK 7 -->
96-
<version>0.4.1</version>
91+
<version>${scassandra.version}</version>
9792
<scope>test</scope>
9893
<exclusions>
9994
<exclusion>

driver-core/src/main/java/com/datastax/driver/core/AbstractReconnectionHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,7 @@ public void run() {
141141
currentAttempt.compareAndSet(handlerFuture, null);
142142
}
143143
} catch (InterruptedException e) {
144-
// If interrupted, skip this attempt but still skip scheduling reconnections
145144
Thread.currentThread().interrupt();
146-
reschedule(schedule.nextDelayMs());
147145
} catch (UnsupportedProtocolVersionException e) {
148146
logger.error(e.getMessage());
149147
long nextDelay = schedule.nextDelayMs();

driver-core/src/main/java/com/datastax/driver/core/BatchStatement.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public class BatchStatement extends Statement {
4848
public enum Type {
4949
/**
5050
* A logged batch: Cassandra will first write the batch to its distributed batch log
51-
* to ensure the atomicity of the batch.
51+
* to ensure the atomicity of the batch (atomicity meaning that if any statement in
52+
* the batch succeeds, all will eventually succeed).
5253
*/
5354
LOGGED,
5455

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,12 +1122,12 @@ private static String generateClusterName() {
11221122
return "cluster" + CLUSTER_ID.incrementAndGet();
11231123
}
11241124

1125-
private static ListeningExecutorService makeExecutor(int threads, String name) {
1125+
private static ListeningExecutorService makeExecutor(int threads, String name, LinkedBlockingQueue<Runnable> workQueue) {
11261126
ThreadPoolExecutor executor = new ThreadPoolExecutor(threads,
11271127
threads,
11281128
DEFAULT_THREAD_KEEP_ALIVE,
11291129
TimeUnit.SECONDS,
1130-
new LinkedBlockingQueue<Runnable>(),
1130+
workQueue,
11311131
threadFactory(name));
11321132

11331133
executor.allowCoreThreadTimeOut(true);
@@ -1160,17 +1160,23 @@ class Manager implements Connection.DefaultResponseHandler {
11601160

11611161
final ConvictionPolicy.Factory convictionPolicyFactory = new ConvictionPolicy.Simple.Factory();
11621162

1163-
final ScheduledExecutorService reconnectionExecutor = Executors.newScheduledThreadPool(2, threadFactory("Reconnection-%d"));
1163+
final ScheduledThreadPoolExecutor reconnectionExecutor = new ScheduledThreadPoolExecutor(2, threadFactory("Reconnection-%d"));
11641164
// scheduledTasksExecutor is used to process C* notifications. So having it mono-threaded ensures notifications are
11651165
// applied in the order received.
1166-
final ScheduledExecutorService scheduledTasksExecutor = Executors.newScheduledThreadPool(1, threadFactory("Scheduled Tasks-%d"));
1166+
final ScheduledThreadPoolExecutor scheduledTasksExecutor = new ScheduledThreadPoolExecutor(1, threadFactory("Scheduled Tasks-%d"));
11671167

11681168
// Executor used for tasks that shouldn't be executed on an IO thread. Used for short-lived, generally non-blocking tasks
11691169
final ListeningExecutorService executor;
11701170

1171+
// Work Queue used by executor.
1172+
final LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<Runnable>();
1173+
11711174
// An executor for tasks that might block some time, like creating new connection, but are generally not too critical.
11721175
final ListeningExecutorService blockingExecutor;
11731176

1177+
// Work Queue used by blockingExecutor.
1178+
final LinkedBlockingQueue<Runnable> blockingExecutorQueue = new LinkedBlockingQueue<Runnable>();
1179+
11741180
final ConnectionReaper reaper;
11751181

11761182
final AtomicReference<CloseFuture> closeFuture = new AtomicReference<CloseFuture>();
@@ -1191,8 +1197,8 @@ private Manager(String clusterName, List<InetSocketAddress> contactPoints, Confi
11911197
this.configuration = configuration;
11921198
this.configuration.register(this);
11931199

1194-
this.executor = makeExecutor(NON_BLOCKING_EXECUTOR_SIZE, "Cassandra Java Driver worker-%d");
1195-
this.blockingExecutor = makeExecutor(2, "Cassandra Java Driver blocking tasks worker-%d");
1200+
this.executor = makeExecutor(NON_BLOCKING_EXECUTOR_SIZE, "Cassandra Java Driver worker-%d", executorQueue);
1201+
this.blockingExecutor = makeExecutor(2, "Cassandra Java Driver blocking tasks worker-%d", blockingExecutorQueue);
11961202

11971203
this.reaper = new ConnectionReaper();
11981204

driver-core/src/main/java/com/datastax/driver/core/Metrics.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,34 @@ public Integer getValue() {
7070
}
7171
});
7272

73+
private final Gauge<Integer> executorQueueDepth = registry.register("executor-queue-depth", new Gauge<Integer>() {
74+
@Override
75+
public Integer getValue() {
76+
return manager.executorQueue.size();
77+
}
78+
});
79+
80+
private final Gauge<Integer> blockingExecutorQueueDepth = registry.register("blocking-executor-queue-depth", new Gauge<Integer>() {
81+
@Override
82+
public Integer getValue() {
83+
return manager.blockingExecutorQueue.size();
84+
}
85+
});
86+
87+
private final Gauge<Integer> reconnectionSchedulerQueueSize= registry.register("reconnection-scheduler-task-count", new Gauge<Integer>() {
88+
@Override
89+
public Integer getValue() {
90+
return manager.reconnectionExecutor.getQueue().size();
91+
}
92+
});
93+
94+
private final Gauge<Integer> taskSchedulerQueueSize = registry.register("task-scheduler-task-count", new Gauge<Integer>() {
95+
@Override
96+
public Integer getValue() {
97+
return manager.scheduledTasksExecutor.getQueue().size();
98+
}
99+
});
100+
73101
Metrics(Cluster.Manager manager) {
74102
this.manager = manager;
75103
if (manager.configuration.getMetricsOptions().isJMXReportingEnabled()) {
@@ -177,6 +205,36 @@ public Gauge<Integer> getOpenConnections() {
177205
return openConnections;
178206
}
179207

208+
/**
209+
* @return The number of queued up tasks in the non-blocking executor (Cassandra Java Driver workers).
210+
*/
211+
public Gauge<Integer> getExecutorQueueDepth() {
212+
return executorQueueDepth;
213+
}
214+
215+
/**
216+
* @return The number of queued up tasks in the blocking executor (Cassandra Java Driver blocking tasks worker).
217+
*/
218+
public Gauge<Integer> getBlockingExecutorQueueDepth() {
219+
return blockingExecutorQueueDepth;
220+
}
221+
222+
/**
223+
* @return The size of the work queue for the reconnection scheduler (Reconnection). A queue size > 0 does not
224+
* necessarily indicate a backlog as some tasks may not have been scheduled to execute yet.
225+
*/
226+
public Gauge<Integer> getReconnectionSchedulerQueueSize() {
227+
return reconnectionSchedulerQueueSize;
228+
}
229+
230+
/**
231+
* @return The size of the work queue for the task scheduler (Scheduled Tasks). A queue size > 0 does not
232+
* necessarily indicate a backlog as some tasks may not have been scheduled to execute yet.
233+
*/
234+
public Gauge<Integer> getTaskSchedulerQueueSize() {
235+
return taskSchedulerQueueSize;
236+
}
237+
180238
void shutdown() {
181239
if (jmxReporter != null)
182240
jmxReporter.stop();

driver-core/src/main/java/com/datastax/driver/core/WriteType.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ public enum WriteType
2626
{
2727
/** A write to a single partition key. Such writes are guaranteed to be atomic and isolated. */
2828
SIMPLE,
29-
/** A write to a multiple partition key that used the distributed batch log to ensure atomicity. */
29+
/**
30+
* A write to a multiple partition key that used the distributed batch log to ensure atomicity
31+
* (atomicity meaning that if any statement in the batch succeeds, all will eventually succeed).
32+
*/
3033
BATCH,
3134
/** A write to a multiple partition key that doesn't use the distributed batch log. Atomicity for such writes is not guaranteed */
3235
UNLOGGED_BATCH,

0 commit comments

Comments
 (0)