Skip to content

Commit 6aa2792

Browse files
author
Alexandre Dutra
committed
Merge branch '3.1.x' into 3.x
Conflicts: changelog/README.md driver-core/src/test/java/com/datastax/driver/core/SpeculativeExecutionIntegrationTest.java
2 parents a0b86e9 + 638be38 commit 6aa2792

25 files changed

Lines changed: 442 additions & 233 deletions

build.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ schedules:
2121
include: ["/\\d+(\\.[\\dx]+)+/"]
2222
env_vars: |
2323
TEST_GROUP="long"
24+
adhoc:
25+
# Adhoc job for non-primary braches that doesn't have a schedule but may be used to run all configs.
26+
schedule: adhoc
27+
branches:
28+
# regex matches primary branch format (2.1, 3.x, 3.0.x, 3.1.x, etc).
29+
exclude: ["/\\d+(\\.[\\dx]+)+/"]
30+
env_vars: |
31+
TEST_GROUP="long"
2432
java:
2533
- openjdk6
2634
- oraclejdk7

changelog/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@
77
- [new feature] JAVA-1362: Send query options flags as [int] for Protocol V5+.
88
- [improvement] JAVA-1367: Make protocol negotiation more resilient.
99

10+
Merged from 3.1.x branch:
11+
12+
- [bug] JAVA-1371: Reintroduce connection pool timeout.
13+
- [bug] JAVA-1313: Copy SerialConsistencyLevel to PreparedStatement.
14+
- [documentation] JAVA-1334: Clarify documentation of method `addContactPoints`.
15+
- [improvement] JAVA-1357: Document that getReplicas only returns replicas of the last token in range.
16+
1017

1118
### 3.1.3
1219

driver-core/src/main/java/com/datastax/driver/core/AbstractSession.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ public PreparedStatement apply(PreparedStatement prepared) {
144144
ByteBuffer routingKey = statement.getRoutingKey(protocolVersion, codecRegistry);
145145
if (routingKey != null)
146146
prepared.setRoutingKey(routingKey);
147-
prepared.setConsistencyLevel(statement.getConsistencyLevel());
147+
if (statement.getConsistencyLevel() != null)
148+
prepared.setConsistencyLevel(statement.getConsistencyLevel());
149+
if (statement.getSerialConsistencyLevel() != null)
150+
prepared.setSerialConsistencyLevel(statement.getSerialConsistencyLevel());
148151
if (statement.isTracing())
149152
prepared.enableTracing();
150153
prepared.setRetryPolicy(statement.getRetryPolicy());

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ public Builder withProtocolVersion(ProtocolVersion version) {
845845
}
846846

847847
/**
848-
* Adds a contact point - or many if it host resolves to multiple
848+
* Adds a contact point - or many if the given address resolves to multiple
849849
* <code>InetAddress</code>s (A records).
850850
* <p/>
851851
* Contact points are addresses of Cassandra nodes that the driver uses
@@ -856,7 +856,7 @@ public Builder withProtocolVersion(ProtocolVersion version) {
856856
* the driver cannot initialize itself correctly.
857857
* <p/>
858858
* Note that by default (that is, unless you use the {@link #withLoadBalancingPolicy})
859-
* method of this builder), the first succesfully contacted host will be used
859+
* method of this builder), the first successfully contacted host will be used
860860
* to define the local data-center for the client. If follows that if you are
861861
* running Cassandra in a multiple data-center setting, it is a good idea to
862862
* only provide contact points that are in the same datacenter than the client,
@@ -866,15 +866,15 @@ public Builder withProtocolVersion(ProtocolVersion version) {
866866
* returned will be used. Make sure that all resulting <code>InetAddress</code>s returned
867867
* point to the same cluster and datacenter.
868868
*
869-
* @param address the address of the node(s) to connect to
869+
* @param address the address of the node(s) to connect to.
870870
* @return this Builder.
871-
* @throws IllegalArgumentException if no IP address for {@code address}
872-
* could be found
871+
* @throws IllegalArgumentException if the given {@code address}
872+
* could not be resolved.
873873
* @throws SecurityException if a security manager is present and
874874
* permission to resolve the host name is denied.
875875
*/
876876
public Builder addContactPoint(String address) {
877-
// We explicitely check for nulls because InetAdress.getByName() will happily
877+
// We explicitly check for nulls because InetAdress.getByName() will happily
878878
// accept it and use localhost (while a null here almost likely mean a user error,
879879
// not "connect to localhost")
880880
if (address == null)
@@ -893,11 +893,14 @@ public Builder addContactPoint(String address) {
893893
* <p/>
894894
* See {@link Builder#addContactPoint} for more details on contact
895895
* points.
896+
* <p/>
897+
* Note that all contact points must be resolvable;
898+
* if <em>any</em> of them cannot be resolved, this method will fail.
896899
*
897-
* @param addresses addresses of the nodes to add as contact point.
900+
* @param addresses addresses of the nodes to add as contact points.
898901
* @return this Builder.
899-
* @throws IllegalArgumentException if no IP address for at least one
900-
* of {@code addresses} could be found
902+
* @throws IllegalArgumentException if any of the given {@code addresses}
903+
* could not be resolved.
901904
* @throws SecurityException if a security manager is present and
902905
* permission to resolve the host name is denied.
903906
* @see Builder#addContactPoint
@@ -913,9 +916,16 @@ public Builder addContactPoints(String... addresses) {
913916
* <p/>
914917
* See {@link Builder#addContactPoint} for more details on contact
915918
* points.
919+
* <p/>
920+
* Note that all contact points must be resolvable;
921+
* if <em>any</em> of them cannot be resolved, this method will fail.
916922
*
917-
* @param addresses addresses of the nodes to add as contact point.
923+
* @param addresses addresses of the nodes to add as contact points.
918924
* @return this Builder.
925+
* @throws IllegalArgumentException if any of the given {@code addresses}
926+
* could not be resolved.
927+
* @throws SecurityException if a security manager is present and
928+
* permission to resolve the host name is denied.
919929
* @see Builder#addContactPoint
920930
*/
921931
public Builder addContactPoints(InetAddress... addresses) {
@@ -929,7 +939,7 @@ public Builder addContactPoints(InetAddress... addresses) {
929939
* See {@link Builder#addContactPoint} for more details on contact
930940
* points.
931941
*
932-
* @param addresses addresses of the nodes to add as contact point
942+
* @param addresses addresses of the nodes to add as contact points.
933943
* @return this Builder
934944
* @see Builder#addContactPoint
935945
*/
@@ -943,19 +953,19 @@ public Builder addContactPoints(Collection<InetAddress> addresses) {
943953
* <p/>
944954
* See {@link Builder#addContactPoint} for more details on contact
945955
* points. Contrarily to other {@code addContactPoints} methods, this method
946-
* allow to provide a different port for each contact points. Since Cassandra
947-
* nodes must always all listen on the same port, this is rarelly what you
956+
* allows to provide a different port for each contact point. Since Cassandra
957+
* nodes must always all listen on the same port, this is rarely what you
948958
* want and most users should prefer other {@code addContactPoints} methods to
949959
* this one. However, this can be useful if the Cassandra nodes are behind
950960
* a router and are not accessed directly. Note that if you are in this
951961
* situation (Cassandra nodes are behind a router, not directly accessible),
952-
* you almost surely want to provide a specific {@code AddressTranslator}
962+
* you almost surely want to provide a specific {@link AddressTranslator}
953963
* (through {@link #withAddressTranslator}) to translate actual Cassandra node
954964
* addresses to the addresses the driver should use, otherwise the driver
955965
* will not be able to auto-detect new nodes (and will generally not function
956966
* optimally).
957967
*
958-
* @param addresses addresses of the nodes to add as contact point
968+
* @param addresses addresses of the nodes to add as contact points.
959969
* @return this Builder
960970
* @see Builder#addContactPoint
961971
*/
@@ -969,19 +979,19 @@ public Builder addContactPointsWithPorts(InetSocketAddress... addresses) {
969979
* <p/>
970980
* See {@link Builder#addContactPoint} for more details on contact
971981
* points. Contrarily to other {@code addContactPoints} methods, this method
972-
* allow to provide a different port for each contact points. Since Cassandra
973-
* nodes must always all listen on the same port, this is rarelly what you
982+
* allows to provide a different port for each contact point. Since Cassandra
983+
* nodes must always all listen on the same port, this is rarely what you
974984
* want and most users should prefer other {@code addContactPoints} methods to
975985
* this one. However, this can be useful if the Cassandra nodes are behind
976986
* a router and are not accessed directly. Note that if you are in this
977987
* situation (Cassandra nodes are behind a router, not directly accessible),
978-
* you almost surely want to provide a specific {@code AddressTranslator}
988+
* you almost surely want to provide a specific {@link AddressTranslator}
979989
* (through {@link #withAddressTranslator}) to translate actual Cassandra node
980990
* addresses to the addresses the driver should use, otherwise the driver
981991
* will not be able to auto-detect new nodes (and will generally not function
982992
* optimally).
983993
*
984-
* @param addresses addresses of the nodes to add as contact point
994+
* @param addresses addresses of the nodes to add as contact points.
985995
* @return this Builder
986996
* @see Builder#addContactPoint
987997
*/

driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.base.Throwables;
2525
import com.google.common.collect.Lists;
2626
import com.google.common.util.concurrent.*;
27+
import io.netty.util.concurrent.EventExecutor;
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

@@ -57,14 +58,16 @@ class HostConnectionPool implements Connection.Owner {
5758
@VisibleForTesting
5859
final Set<Connection> trash = new CopyOnWriteArraySet<Connection>();
5960

60-
private final Queue<SettableFuture<Connection>> pendingBorrows = new ConcurrentLinkedQueue<SettableFuture<Connection>>();
61+
private final Queue<PendingBorrow> pendingBorrows = new ConcurrentLinkedQueue<PendingBorrow>();
6162
private final AtomicInteger pendingBorrowCount = new AtomicInteger();
6263

6364
private final Runnable newConnectionTask;
6465

6566
private final AtomicInteger scheduledForCreation = new AtomicInteger();
6667

67-
protected final AtomicReference<CloseFuture> closeFuture = new AtomicReference<CloseFuture>();
68+
private final EventExecutor timeoutsExecutor;
69+
70+
private final AtomicReference<CloseFuture> closeFuture = new AtomicReference<CloseFuture>();
6871

6972
private enum Phase {INITIALIZING, READY, INIT_FAILED, CLOSING}
7073

@@ -93,6 +96,8 @@ public void run() {
9396
this.open = new AtomicInteger();
9497

9598
this.minAllowedStreams = options().getMaxRequestsPerConnection(hostDistance) * 3 / 4;
99+
100+
this.timeoutsExecutor = manager.getCluster().manager.connectionFactory.eventLoopGroup.next();
96101
}
97102

98103
/**
@@ -190,7 +195,7 @@ private PoolingOptions options() {
190195
return manager.configuration().getPoolingOptions();
191196
}
192197

193-
ListenableFuture<Connection> borrowConnection(int maxQueueSize) {
198+
ListenableFuture<Connection> borrowConnection(long timeout, TimeUnit unit, int maxQueueSize) {
194199
Phase phase = this.phase.get();
195200
if (phase != Phase.READY)
196201
return Futures.immediateFailedFuture(new ConnectionException(host.getSocketAddress(), "Pool is " + phase));
@@ -207,7 +212,7 @@ ListenableFuture<Connection> borrowConnection(int maxQueueSize) {
207212
manager.blockingExecutor().submit(newConnectionTask);
208213
}
209214
}
210-
return enqueue(maxQueueSize);
215+
return enqueue(timeout, unit, maxQueueSize);
211216
}
212217
}
213218

@@ -228,13 +233,13 @@ ListenableFuture<Connection> borrowConnection(int maxQueueSize) {
228233
// This might maybe happen if the number of core connections per host is 0 and a connection was trashed between
229234
// the previous check to connections and now. But in that case, the line above will have trigger the creation of
230235
// a new connection, so just wait that connection and move on
231-
return enqueue(maxQueueSize);
236+
return enqueue(timeout, unit, maxQueueSize);
232237
} else {
233238
while (true) {
234239
int inFlight = leastBusy.inFlight.get();
235240

236241
if (inFlight >= Math.min(leastBusy.maxAvailableStreams(), options().getMaxRequestsPerConnection(hostDistance))) {
237-
return enqueue(maxQueueSize);
242+
return enqueue(timeout, unit, maxQueueSize);
238243
}
239244

240245
if (leastBusy.inFlight.compareAndSet(inFlight, inFlight + 1))
@@ -264,8 +269,8 @@ ListenableFuture<Connection> borrowConnection(int maxQueueSize) {
264269
return leastBusy.setKeyspaceAsync(manager.poolsState.keyspace);
265270
}
266271

267-
private ListenableFuture<Connection> enqueue(int maxQueueSize) {
268-
if (maxQueueSize == 0) {
272+
private ListenableFuture<Connection> enqueue(long timeout, TimeUnit unit, int maxQueueSize) {
273+
if (timeout == 0 || maxQueueSize == 0) {
269274
return Futures.immediateFailedFuture(new BusyPoolException(host.getSocketAddress(), 0));
270275
}
271276

@@ -279,16 +284,16 @@ private ListenableFuture<Connection> enqueue(int maxQueueSize) {
279284
}
280285
}
281286

282-
SettableFuture<Connection> future = SettableFuture.create();
283-
pendingBorrows.add(future);
287+
PendingBorrow pendingBorrow = new PendingBorrow(timeout, unit, timeoutsExecutor);
288+
pendingBorrows.add(pendingBorrow);
284289

285290
// If we raced with shutdown, make sure the future will be completed. This has no effect if it was properly
286291
// handled in closeAsync.
287292
if (phase.get() == Phase.CLOSING) {
288-
future.setException(new ConnectionException(host.getSocketAddress(), "Pool is shutdown"));
293+
pendingBorrow.setException(new ConnectionException(host.getSocketAddress(), "Pool is shutdown"));
289294
}
290295

291-
return future;
296+
return pendingBorrow.future;
292297
}
293298

294299
void returnConnection(Connection connection) {
@@ -316,7 +321,7 @@ void returnConnection(Connection connection) {
316321
}
317322

318323
// When a connection gets returned to the pool, check if there are pending borrows that can be completed with it.
319-
private void dequeue(Connection connection) {
324+
private void dequeue(final Connection connection) {
320325
while (!pendingBorrows.isEmpty()) {
321326

322327
// We can only reuse the connection if it's under its maximum number of inFlight requests.
@@ -333,34 +338,43 @@ private void dequeue(Connection connection) {
333338
}
334339
}
335340

336-
final SettableFuture<Connection> pendingBorrow = pendingBorrows.poll();
341+
final PendingBorrow pendingBorrow = pendingBorrows.poll();
337342
if (pendingBorrow == null) {
338343
// Another thread has emptied the queue since our last check, restore the count
339344
connection.inFlight.decrementAndGet();
340345
} else {
341-
totalInFlight.incrementAndGet();
342346
pendingBorrowCount.decrementAndGet();
343347
// Ensure that the keyspace set on the connection is the one set on the pool state, in the general case it will be.
344348
ListenableFuture<Connection> setKeyspaceFuture = connection.setKeyspaceAsync(manager.poolsState.keyspace);
345349
// Slight optimization, if the keyspace was already correct the future will be complete, so simply complete it here.
346350
if (setKeyspaceFuture.isDone()) {
347351
try {
348-
pendingBorrow.set(Uninterruptibles.getUninterruptibly(setKeyspaceFuture));
352+
if (pendingBorrow.set(Uninterruptibles.getUninterruptibly(setKeyspaceFuture))) {
353+
totalInFlight.incrementAndGet();
354+
} else {
355+
connection.inFlight.decrementAndGet();
356+
}
349357
} catch (ExecutionException e) {
350358
pendingBorrow.setException(e.getCause());
359+
connection.inFlight.decrementAndGet();
351360
}
352361
} else {
353362
// Otherwise the keyspace did need to be set, tie the pendingBorrow future to the set keyspace completion.
354363
Futures.addCallback(setKeyspaceFuture, new FutureCallback<Connection>() {
355364

356365
@Override
357366
public void onSuccess(Connection c) {
358-
pendingBorrow.set(c);
367+
if (pendingBorrow.set(c)) {
368+
totalInFlight.incrementAndGet();
369+
} else {
370+
connection.inFlight.decrementAndGet();
371+
}
359372
}
360373

361374
@Override
362375
public void onFailure(Throwable t) {
363376
pendingBorrow.setException(t);
377+
connection.inFlight.decrementAndGet();
364378
}
365379
});
366380
}
@@ -592,7 +606,7 @@ final CloseFuture closeAsync() {
592606

593607
phase.set(Phase.CLOSING);
594608

595-
for (SettableFuture<Connection> pendingBorrow : pendingBorrows) {
609+
for (PendingBorrow pendingBorrow : pendingBorrows) {
596610
pendingBorrow.setException(new ConnectionException(host.getSocketAddress(), "Pool is shutdown"));
597611
}
598612

@@ -664,4 +678,31 @@ void setKeyspace(String keyspace) {
664678
this.keyspace = keyspace;
665679
}
666680
}
681+
682+
private class PendingBorrow {
683+
final SettableFuture<Connection> future;
684+
final Future<?> timeoutTask;
685+
686+
PendingBorrow(final long timeout, final TimeUnit unit, EventExecutor timeoutsExecutor) {
687+
this.future = SettableFuture.create();
688+
this.timeoutTask = timeoutsExecutor.schedule(new Runnable() {
689+
@Override
690+
public void run() {
691+
future.setException(
692+
new BusyPoolException(host.getSocketAddress(), timeout, unit));
693+
}
694+
}, timeout, unit);
695+
}
696+
697+
boolean set(Connection connection) {
698+
boolean succeeded = this.future.set(connection);
699+
this.timeoutTask.cancel(false);
700+
return succeeded;
701+
}
702+
703+
void setException(Throwable exception) {
704+
this.future.setException(exception);
705+
this.timeoutTask.cancel(false);
706+
}
707+
}
667708
}

driver-core/src/main/java/com/datastax/driver/core/Metadata.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,12 @@ public Set<Host> getReplicas(String keyspace, ByteBuffer partitionKey) {
305305
/**
306306
* Returns the set of hosts that are replica for a given token range.
307307
* <p/>
308-
* Note that this information is refreshed asynchronously by the control
308+
* Note that it is assumed that the input range does not overlap across multiple host ranges.
309+
* If the range extends over multiple hosts, it only returns the replicas for those hosts
310+
* that are replicas for the last token of the range. This behavior may change in a future
311+
* release, see <a href="https://datastax-oss.atlassian.net/browse/JAVA-1355">JAVA-1355</a>.
312+
* <p/>
313+
* Also note that this information is refreshed asynchronously by the control
309314
* connection, when schema or ring topology changes. It might occasionally
310315
* be stale (or even empty).
311316
*

0 commit comments

Comments
 (0)