Skip to content

Commit bc4beab

Browse files
committed
Merge branch '2.0' into 2.1
2 parents bec78e2 + 0bef855 commit bc4beab

9 files changed

Lines changed: 242 additions & 48 deletions

File tree

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ The driver contains the following modules:
5151
- driver-examples: example applications using the other modules which are
5252
only meant for demonstration purposes.
5353

54-
Please refer to the README of each module for more information.
55-
5654
**Community:**
5755

5856
- JIRA: https://datastax-oss.atlassian.net/browse/JAVA

changelog/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,18 @@ Merged from 2.0 branch: everything up to 2.0.3 (included), and the following.
219219
- [bug] Make metadata parsing more lenient (JAVA-377, JAVA-391)
220220

221221

222+
### 2.0.11 (in progress)
223+
224+
- [bug] Fix SpeculativeExecutionPolicy.init() and close() are never called (JAVA-796)
225+
226+
Merged from 2.0.10_fixes branch:
227+
228+
- [improvement] Use Netty's pooled ByteBufAllocator by default (JAVA-756)
229+
- [improvement] Expose "unsafe" paging state API (JAVA-759)
230+
- [bug] Fix getObject by name (JAVA-767)
231+
- [bug] Prevent race during pool initialization (JAVA-768)
232+
233+
222234
### 2.0.10.1
223235

224236
- [improvement] Use Netty's pooled ByteBufAllocator by default (JAVA-756)

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,16 @@ public Session connect(String keyspace) {
281281
long timeout = getConfiguration().getSocketOptions().getConnectTimeoutMillis();
282282
Session session = connect();
283283
try {
284-
ResultSetFuture future = session.executeAsync("USE " + keyspace);
285-
// Note: using the connection timeout isn't perfectly correct, we should probably change that someday
286-
Uninterruptibles.getUninterruptibly(future, timeout, TimeUnit.MILLISECONDS);
287-
return session;
288-
} catch (TimeoutException e) {
289-
throw new DriverInternalError(String.format("No responses after %d milliseconds while setting current keyspace. This should not happen, unless you have setup a very low connection timeout.", timeout));
290-
} catch (ExecutionException e) {
291-
throw DefaultResultSetFuture.extractCauseFromExecutionException(e);
284+
try {
285+
ResultSetFuture future = session.executeAsync("USE " + keyspace);
286+
// Note: using the connection timeout isn't perfectly correct, we should probably change that someday
287+
Uninterruptibles.getUninterruptibly(future, timeout, TimeUnit.MILLISECONDS);
288+
return session;
289+
} catch (TimeoutException e) {
290+
throw new DriverInternalError(String.format("No responses after %d milliseconds while setting current keyspace. This should not happen, unless you have setup a very low connection timeout.", timeout));
291+
} catch (ExecutionException e) {
292+
throw DefaultResultSetFuture.extractCauseFromExecutionException(e);
293+
}
292294
} catch (RuntimeException e) {
293295
session.close();
294296
throw e;
@@ -789,6 +791,38 @@ public Builder addContactPoints(String... addresses) {
789791
return this;
790792
}
791793

794+
/**
795+
* Adds a contact point - or many if it host resolves to multiple <code>InetAddress</code>s (A records).
796+
* <p>
797+
*
798+
* If the host name points to a dns records with multiple a-records, all InetAddresses
799+
* returned will be used. Make sure that all resulting <code>InetAddress</code>s returned
800+
* points to the same cluster and datacenter.
801+
* <p>
802+
* See {@link Builder#addContactPoint} for more details on contact
803+
* points and thrown exceptions
804+
*
805+
* @param address address of the nodes to look up InetAddresses from to add as contact points.
806+
* @return this Builder.
807+
*
808+
*
809+
* @see Builder#addContactPoint
810+
*/
811+
public Builder addContactPoints(String address) {
812+
// We explicitely check for nulls because InetAdress.getByName() will happily
813+
// accept it and use localhost (while a null here almost likely mean a user error,
814+
// not "connect to localhost")
815+
if (address == null)
816+
throw new NullPointerException();
817+
818+
try {
819+
addContactPoints(InetAddress.getAllByName(address));
820+
} catch (UnknownHostException e) {
821+
throw new IllegalArgumentException(e.getMessage());
822+
}
823+
return this;
824+
}
825+
792826
/**
793827
* Adds contact points.
794828
* <p>
@@ -1292,6 +1326,7 @@ synchronized void init() {
12921326
// Now that the control connection is ready, we have all the information we need about the nodes (datacenter,
12931327
// rack...) to initialize the load balancing policy
12941328
loadBalancingPolicy().init(Cluster.this, contactPointHosts);
1329+
speculativeRetryPolicy().init(Cluster.this);
12951330
for (Host host : downContactPointHosts) {
12961331
loadBalancingPolicy().onDown(host);
12971332
for (Host.StateListener listener : listeners)
@@ -1417,6 +1452,8 @@ private CloseFuture close() {
14171452
if (loadBalancingPolicy instanceof CloseableLoadBalancingPolicy)
14181453
((CloseableLoadBalancingPolicy)loadBalancingPolicy).close();
14191454

1455+
speculativeRetryPolicy().close();
1456+
14201457
AddressTranslater translater = configuration.getPolicies().getAddressTranslater();
14211458
if (translater instanceof CloseableAddressTranslater)
14221459
((CloseableAddressTranslater)translater).close();

driver-core/src/main/java/com/datastax/driver/core/QueryTrace.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ private void doFetchTrace() {
232232
}
233233

234234
if (tries > MAX_TRIES)
235-
throw new TraceRetrievalException(String.format("Unable to retrieve complete query trace after %d tries", MAX_TRIES));
235+
throw new TraceRetrievalException(String.format("Unable to retrieve complete query trace for id %s after %d tries", traceId, MAX_TRIES));
236236
}
237237

238238
/**

driver-core/src/main/java/com/datastax/driver/core/SessionManager.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,11 @@ Message.Request makeRequestMessage(Statement statement, ByteBuffer pagingState)
471471
defaultTimestamp = cluster.getConfiguration().getPolicies().getTimestampGenerator().next();
472472
}
473473

474-
return makeRequestMessage(statement, consistency, serialConsistency, pagingState, defaultTimestamp);
474+
Message.Request request = makeRequestMessage(statement, consistency, serialConsistency, pagingState, defaultTimestamp);
475+
if (statement.isTracing())
476+
request.setTracingRequested();
477+
478+
return request;
475479
}
476480

477481
Message.Request makeRequestMessage(Statement statement, ConsistencyLevel cl, ConsistencyLevel scl, ByteBuffer pagingState, long defaultTimestamp) {
@@ -591,8 +595,6 @@ private void prepare(String query, InetSocketAddress toExclude) throws Interrupt
591595
}
592596

593597
ResultSetFuture executeQuery(Message.Request msg, Statement statement) {
594-
if (statement.isTracing())
595-
msg.setTracingRequested();
596598

597599
DefaultResultSetFuture future = new DefaultResultSetFuture(this, configuration().getProtocolOptions().getProtocolVersionEnum(), msg);
598600
execute(future, statement);

driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ public class CCMBridge {
8383
*/
8484
private static final Map<String,String> ENVIRONMENT_MAP;
8585

86+
/**
87+
* The command to use to launch CCM
88+
*/
89+
private static final String CCM_COMMAND;
90+
8691
static {
8792
// Inherit the current environment.
8893
Map<String,String> envMap = Maps.newHashMap(new ProcessBuilder().environment());
@@ -93,8 +98,15 @@ public class CCMBridge {
9398
if(existingPath == null) {
9499
existingPath = "";
95100
}
96-
envMap.put("PATH", ccmPath + ":" + existingPath);
101+
envMap.put("PATH", ccmPath + File.pathSeparator + existingPath);
97102
}
103+
104+
if (isWindows()) {
105+
CCM_COMMAND = "cmd /c ccm.py";
106+
} else {
107+
CCM_COMMAND = "ccm";
108+
}
109+
98110
// If ccm.java.home is set, override the JAVA_HOME variable with it.
99111
String ccmJavaHome = System.getProperty("ccm.java.home");
100112
if(ccmJavaHome != null) {
@@ -103,6 +115,19 @@ public class CCMBridge {
103115
ENVIRONMENT_MAP = ImmutableMap.copyOf(envMap);
104116
}
105117

118+
/**
119+
* Checks if the operating system is a Windows one
120+
* @return <code>true</code> if the operating system is a Windows one, <code>false</code> otherwise.
121+
*/
122+
private static boolean isWindows() {
123+
124+
String osName = System.getProperty("os.name");
125+
if (osName == null) {
126+
return false;
127+
}
128+
return osName.startsWith("Windows");
129+
}
130+
106131
static final File CASSANDRA_DIR;
107132
static final String CASSANDRA_VERSION;
108133

@@ -170,31 +195,31 @@ public static CCMBridge create(String name, String... options) {
170195
checkArgument(!"current".equals(name.toLowerCase()),
171196
"cluster can't be called \"current\"");
172197
CCMBridge bridge = new CCMBridge();
173-
bridge.execute("ccm create %s -b -i %s %s " + Joiner.on(" ").join(options), name, IP_PREFIX, CASSANDRA_VERSION);
198+
bridge.execute(CCM_COMMAND + " create %s -b -i %s %s " + Joiner.on(" ").join(options), name, IP_PREFIX, CASSANDRA_VERSION);
174199
return bridge;
175200
}
176201

177202
public static CCMBridge create(String name, int nbNodes, String... options) {
178203
checkArgument(!"current".equals(name.toLowerCase()),
179204
"cluster can't be called \"current\"");
180205
CCMBridge bridge = new CCMBridge();
181-
bridge.execute("ccm create %s -n %d -s -i %s -b %s " + Joiner.on(" ").join(options), name, nbNodes, IP_PREFIX, CASSANDRA_VERSION);
206+
bridge.execute(CCM_COMMAND + " create %s -n %d -s -i %s -b %s " + Joiner.on(" ").join(options), name, nbNodes, IP_PREFIX, CASSANDRA_VERSION);
182207
return bridge;
183208
}
184209

185210
public static CCMBridge createWithCustomVersion(String name, int nbNodes, String cassandraVersion) {
186211
checkArgument(!"current".equals(name.toLowerCase()),
187212
"cluster can't be called \"current\"");
188213
CCMBridge bridge = new CCMBridge();
189-
bridge.execute("ccm create %s -n %d -s -i %s -b -v %s ", name, nbNodes, IP_PREFIX, cassandraVersion);
214+
bridge.execute(CCM_COMMAND + " create %s -n %d -s -i %s -b -v %s ", name, nbNodes, IP_PREFIX, cassandraVersion);
190215
return bridge;
191216
}
192217

193218
public static CCMBridge create(String name, int nbNodesDC1, int nbNodesDC2) {
194219
checkArgument(!"current".equals(name.toLowerCase()),
195220
"cluster can't be called \"current\"");
196221
CCMBridge bridge = new CCMBridge();
197-
bridge.execute("ccm create %s -n %d:%d -s -i %s -b %s", name, nbNodesDC1, nbNodesDC2, IP_PREFIX, CASSANDRA_VERSION);
222+
bridge.execute(CCM_COMMAND + " create %s -n %d:%d -s -i %s -b %s", name, nbNodesDC1, nbNodesDC2, IP_PREFIX, CASSANDRA_VERSION);
198223
return bridge;
199224
}
200225

@@ -207,55 +232,55 @@ public static CCMBridge.CCMCluster buildCluster(int nbNodesDC1, int nbNodesDC2,
207232
}
208233

209234
public void start() {
210-
execute("ccm start --wait-other-notice --wait-for-binary-proto");
235+
execute(CCM_COMMAND + " start --wait-other-notice --wait-for-binary-proto");
211236
}
212237

213238
public void stop() {
214-
execute("ccm stop");
239+
execute(CCM_COMMAND + " stop");
215240
}
216241

217242
public void forceStop() {
218-
execute("ccm stop --not-gently");
243+
execute(CCM_COMMAND + " stop --not-gently");
219244
}
220245

221246
public void start(int n) {
222247
logger.info("Starting: " + IP_PREFIX + n);
223-
execute("ccm node%d start --wait-other-notice --wait-for-binary-proto", n);
248+
execute(CCM_COMMAND + " node%d start --wait-other-notice --wait-for-binary-proto", n);
224249
}
225250

226251
public void start(int n, String option) {
227252
logger.info("Starting: " + IP_PREFIX + n + " with " + option);
228-
execute("ccm node%d start --wait-other-notice --wait-for-binary-proto --jvm_arg=%s", n, option);
253+
execute(CCM_COMMAND + " node%d start --wait-other-notice --wait-for-binary-proto --jvm_arg=%s", n, option);
229254
}
230255

231256
public void stop(int n) {
232257
logger.info("Stopping: " + IP_PREFIX + n);
233-
execute("ccm node%d stop", n);
258+
execute(CCM_COMMAND + " node%d stop", n);
234259
}
235260

236261
public void stop(String clusterName) {
237262
logger.info("Stopping Cluster : "+clusterName);
238-
execute("ccm stop "+clusterName);
263+
execute(CCM_COMMAND + " stop "+clusterName);
239264
}
240265

241266
public void forceStop(int n) {
242267
logger.info("Force stopping: " + IP_PREFIX + n);
243-
execute("ccm node%d stop --not-gently", n);
268+
execute(CCM_COMMAND + " node%d stop --not-gently", n);
244269
}
245270

246271
public void remove() {
247272
stop();
248-
execute("ccm remove");
273+
execute(CCM_COMMAND + " remove");
249274
}
250275

251276
public void remove(String clusterName) {
252277
stop(clusterName);
253-
execute("ccm remove " + clusterName);
278+
execute(CCM_COMMAND + " remove " + clusterName);
254279
}
255280

256281
public void remove(int n) {
257282
logger.info("Removing: " + IP_PREFIX + n);
258-
execute("ccm node%d remove", n);
283+
execute(CCM_COMMAND + " node%d remove", n);
259284
}
260285

261286
public void bootstrapNode(int n) {
@@ -264,24 +289,24 @@ public void bootstrapNode(int n) {
264289

265290
public void bootstrapNode(int n, String dc) {
266291
if (dc == null)
267-
execute("ccm add node%d -i %s%d -j %d -r %d -b -s", n, IP_PREFIX, n, 7000 + 100 * n, 8000 + 100 * n);
292+
execute(CCM_COMMAND + " add node%d -i %s%d -j %d -r %d -b -s", n, IP_PREFIX, n, 7000 + 100 * n, 8000 + 100 * n);
268293
else
269-
execute("ccm add node%d -i %s%d -j %d -b -d %s -s", n, IP_PREFIX, n, 7000 + 100 * n, dc);
270-
execute("ccm node%d start --wait-other-notice --wait-for-binary-proto", n);
294+
execute(CCM_COMMAND + " add node%d -i %s%d -j %d -b -d %s -s", n, IP_PREFIX, n, 7000 + 100 * n, dc);
295+
execute(CCM_COMMAND + " node%d start --wait-other-notice --wait-for-binary-proto", n);
271296
}
272297

273298
public void bootstrapNodeWithPorts(int n, int thriftPort, int storagePort, int binaryPort, int jmxPort, int remoteDebugPort) {
274299
String thriftItf = IP_PREFIX + n + ":" + thriftPort;
275300
String storageItf = IP_PREFIX + n + ":" + storagePort;
276301
String binaryItf = IP_PREFIX + n + ":" + binaryPort;
277302
String remoteLogItf = IP_PREFIX + n + ":" + remoteDebugPort;
278-
execute("ccm add node%d -i %s%d -b -t %s -l %s --binary-itf %s -j %d -r %s -s",
303+
execute(CCM_COMMAND + " add node%d -i %s%d -b -t %s -l %s --binary-itf %s -j %d -r %s -s",
279304
n, IP_PREFIX, n, thriftItf, storageItf, binaryItf, jmxPort, remoteLogItf);
280-
execute("ccm node%d start --wait-other-notice --wait-for-binary-proto", n);
305+
execute(CCM_COMMAND + " node%d start --wait-other-notice --wait-for-binary-proto", n);
281306
}
282307

283308
public void decommissionNode(int n) {
284-
execute("ccm node%d decommission", n);
309+
execute(CCM_COMMAND + " node%d decommission", n);
285310
}
286311

287312
public void updateConfig(String name, String value) {
@@ -293,16 +318,17 @@ public void updateConfig(Map<String, String> configs) {
293318
for (Map.Entry<String, String> entry : configs.entrySet()) {
294319
confStr.append(entry.getKey() + ":" + entry.getValue() + " ");
295320
}
296-
execute("ccm updateconf " + confStr);
321+
execute(CCM_COMMAND + " updateconf " + confStr);
297322
}
298323

299324
public void populate(int n) {
300-
execute("ccm populate -n %d -i %s", n, IP_PREFIX);
325+
execute(CCM_COMMAND + " populate -n %d -i %s", n, IP_PREFIX);
301326
}
302327

303328
private void execute(String command, Object... args) {
329+
330+
String fullCommand = String.format(command, args) + " --config-dir=" + ccmDir;
304331
try {
305-
String fullCommand = String.format(command, args) + " --config-dir=" + ccmDir;
306332
logger.debug("Executing: " + fullCommand);
307333
CommandLine cli = CommandLine.parse(fullCommand);
308334
Executor executor = new DefaultExecutor();
@@ -327,7 +353,7 @@ private void execute(String command, Object... args) {
327353
throw new RuntimeException();
328354
}
329355
} catch (IOException e) {
330-
throw new RuntimeException(e);
356+
throw new RuntimeException(String.format("The command %s failed to execute", fullCommand), e);
331357
}
332358
}
333359

0 commit comments

Comments
 (0)