Skip to content

Commit 17cdbd3

Browse files
committed
Merge branch '2.0' into 2.1
2 parents 7578132 + c796a80 commit 17cdbd3

16 files changed

Lines changed: 135 additions & 382 deletions

driver-core/CHANGELOG.rst

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ Merged from 2.0 branch:
1414
- [bug] Avoid deadlock when multiple connections to the same host get write
1515
errors (JAVA-557)
1616
- [improvement] Make shuffle=true the default for TokenAwarePolicy (JAVA-504)
17+
- [bug] Fix bug when SUSPECT reconnection succeeds, but one of the pooled
18+
connections fails while bringing the node back up (JAVA-577)
19+
- [bug] Prevent faulty control connection from ignoring reconnecting hosts
20+
(JAVA-587)
1721

1822

1923
2.1.3:
@@ -167,7 +171,6 @@ Merged from 2.0 branch: everything up to 2.0.3 (included), and the following.
167171
- [improvement] Shuffle the replicas in TokenAwarePolicy.newQueryPlan (JAVA-504)
168172
- [improvement] Make schema agreement wait tuneable (JAVA-507)
169173
- [improvement] Document how to inject the driver metrics into another registry (JAVA-494)
170-
- [improvement] Add idle timeout to the connection pool (JAVA-419)
171174
- [bug] LatencyAwarePolicy does not shutdown executor on invocation of close (JAVA-516)
172175
- [improvement] Throw an exception when DCAwareRoundRobinPolicy is built with
173176
an explicit but null or empty local datacenter (JAVA-451).

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.atomic.AtomicReference;
2727

2828
import com.google.common.annotations.VisibleForTesting;
29+
import com.google.common.base.Function;
2930
import com.google.common.base.Predicates;
3031
import com.google.common.collect.*;
3132
import com.google.common.util.concurrent.*;
@@ -1281,9 +1282,6 @@ synchronized void init() {
12811282
close();
12821283
throw e;
12831284
}
1284-
1285-
if (connectionFactory.protocolVersion.compareTo(ProtocolVersion.V3) < 0)
1286-
this.scheduledTasksExecutor.scheduleWithFixedDelay(new TrashIdleConnectionsTask(), 10, 10, TimeUnit.SECONDS);
12871285
}
12881286

12891287
ProtocolVersion protocolVersion() {
@@ -1539,15 +1537,23 @@ public void onSuspected(final Host host) {
15391537
host.initialReconnectionAttempt.set(executor.submit(new ExceptionCatchingRunnable() {
15401538
@Override
15411539
public void runMayThrow() throws InterruptedException, ExecutionException {
1540+
boolean success;
15421541
try {
15431542
// TODO: as for the ReconnectionHandler, we could avoid "wasting" this connection
15441543
connectionFactory.open(host).closeAsync();
15451544
// Note that we want to do the pool creation on this thread because we want that
15461545
// when onUp return, the host is ready for querying
15471546
onUp(host, MoreExecutors.sameThreadExecutor());
1547+
// If one of the connections in onUp failed, it signaled the error and triggerd onDown,
1548+
// but onDown aborted because this reconnection attempt was in progress (JAVA-577).
1549+
// Test the state now to check than onUp succeeded (we know it's up-to-date since onUp was
1550+
// executed synchronously).
1551+
success = host.state == Host.State.UP;
15481552
} catch (Exception e) {
1549-
onDown(host, false, true);
1553+
success = false;
15501554
}
1555+
if (!success)
1556+
onDown(host, false, true);
15511557
}
15521558
}));
15531559

@@ -1569,19 +1575,23 @@ private void onDown(final Host host, final boolean isHostAddition, final boolean
15691575
if (isClosed())
15701576
return;
15711577

1572-
// If we're SUSPECT and not the task validating the suspection, then some other task is
1578+
// If we're SUSPECT and not the task validating the suspicion, then some other task is
15731579
// already checking to verify if the node is really down (or if it's simply that the
15741580
// connections where broken). So just skip this in that case.
1575-
if (!isSuspectedVerification && host.state == Host.State.SUSPECT)
1581+
if (!isSuspectedVerification && host.state == Host.State.SUSPECT) {
1582+
logger.debug("Aborting onDown because a reconnection is running on SUSPECT host {}", host);
15761583
return;
1584+
}
15771585

15781586
// Note: we don't want to skip that method if !host.isUp() because we set isUp
15791587
// late in onUp, and so we can rely on isUp if there is an error during onUp.
15801588
// But if there is a reconnection attempt in progress already, then we know
15811589
// we've already gone through that method since the last successful onUp(), so
15821590
// we're good skipping it.
1583-
if (host.reconnectionAttempt.get() != null)
1591+
if (host.reconnectionAttempt.get() != null) {
1592+
logger.debug("Aborting onDown because a reconnection is running on DOWN host {}", host);
15841593
return;
1594+
}
15851595

15861596
// Remember if we care about this node at all. We must call this before
15871597
// we've signalled the load balancing policy, since most policy will always
@@ -2225,19 +2235,6 @@ public void run() {
22252235
}).start();
22262236
}
22272237
}
2228-
2229-
private class TrashIdleConnectionsTask implements Runnable {
2230-
@Override public void run() {
2231-
try {
2232-
long now = System.currentTimeMillis();
2233-
for (SessionManager session : sessions) {
2234-
session.trashIdleConnections(now);
2235-
}
2236-
} catch (Exception e) {
2237-
logger.warn("Error while trashing idle connections", e);
2238-
}
2239-
}
2240-
}
22412238
}
22422239

22432240
/**

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,15 +265,8 @@ <E extends Exception> E defunct(E e) {
265265
// sure the "suspected" mechanism work as expected
266266
Host host = factory.manager.metadata.getHost(address);
267267
if (host != null) {
268-
// If we get an error on a host that was already DOWN or SUSPECTED, this is a reconnection attempt.
269-
// We don't want to signal, because that would invoke triggerOnDown unnecessarily (the host's bad
270-
// condition is already taken care of by the reattempt in progress)
271-
boolean isReconnectionAttempt = (host.state == Host.State.DOWN || host.state == Host.State.SUSPECT)
272-
&& !(this instanceof PooledConnection);
273-
if (!isReconnectionAttempt) {
274-
boolean isDown = factory.manager.signalConnectionFailure(host, ce, host.wasJustAdded(), isInitialized);
275-
notifyOwnerWhenDefunct(isDown);
276-
}
268+
boolean isDown = factory.manager.signalConnectionFailure(host, ce, host.wasJustAdded(), isInitialized);
269+
notifyOwnerWhenDefunct(isDown);
277270
}
278271

279272
// Force the connection to close to make sure the future completes. Otherwise force() might never get called and

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -407,39 +407,23 @@ private static InetSocketAddress addressToUseForPeerHost(Row peersRow, InetSocke
407407
return cluster.translateAddress(addr);
408408
}
409409

410-
private Row fetchNodeInfo(Host host, Connection c) {
411-
try {
412-
boolean isConnectedHost = c.address.equals(host.getSocketAddress());
413-
if (isConnectedHost || host.listenAddress != null) {
414-
DefaultResultSetFuture future = isConnectedHost
415-
? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL))
416-
: new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS + " WHERE peer='" + host.listenAddress.getHostAddress() + '\''));
417-
c.write(future);
418-
return future.get().one();
419-
}
420-
421-
// We have to fetch the whole peers table and find the host we're looking for
422-
DefaultResultSetFuture future = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
410+
private Row fetchNodeInfo(Host host, Connection c) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
411+
boolean isConnectedHost = c.address.equals(host.getSocketAddress());
412+
if (isConnectedHost || host.listenAddress != null) {
413+
DefaultResultSetFuture future = isConnectedHost
414+
? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL))
415+
: new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS + " WHERE peer='" + host.listenAddress.getHostAddress() + '\''));
423416
c.write(future);
424-
for (Row row : future.get()) {
425-
InetSocketAddress addr = addressToUseForPeerHost(row, c.address, cluster, true);
426-
if (addr != null && addr.equals(host.getSocketAddress()))
427-
return row;
428-
}
429-
} catch (ConnectionException e) {
430-
logger.debug("[Control connection] Connection error while refreshing node info ({})", e.getMessage());
431-
signalError();
432-
} catch (ExecutionException e) {
433-
// If we're being shutdown during refresh, this can happen. That's fine so don't scare the user.
434-
if (!isShutdown)
435-
logger.debug("[Control connection] Unexpected error while refreshing node info", e);
436-
signalError();
437-
} catch (BusyConnectionException e) {
438-
logger.debug("[Control connection] Connection is busy, reconnecting");
439-
signalError();
440-
} catch (InterruptedException e) {
441-
Thread.currentThread().interrupt();
442-
logger.debug("[Control connection] Interrupted while refreshing node list and token map, skipping it.");
417+
return future.get().one();
418+
}
419+
420+
// We have to fetch the whole peers table and find the host we're looking for
421+
DefaultResultSetFuture future = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS));
422+
c.write(future);
423+
for (Row row : future.get()) {
424+
InetSocketAddress addr = addressToUseForPeerHost(row, c.address, cluster, true);
425+
if (addr != null && addr.equals(host.getSocketAddress()))
426+
return row;
443427
}
444428
return null;
445429
}
@@ -455,26 +439,49 @@ public boolean refreshNodeInfo(Host host) {
455439
return true;
456440

457441
logger.debug("[Control connection] Refreshing node info on {}", host);
458-
Row row = fetchNodeInfo(host, c);
459-
if (row == null) {
460-
if (c.isDefunct()) {
461-
logger.debug("Control connection is down, could not refresh node info");
462-
// Keep going with what we currently know about the node, otherwise we will ignore all nodes
463-
// until the control connection is back up (which leads to a catch-22 if there is only one)
464-
return true;
465-
} else {
466-
logger.error("No row found for host {} in {}'s peers system table. {} will be ignored.", host.getAddress(), c.address, host.getAddress());
442+
try {
443+
Row row = fetchNodeInfo(host, c);
444+
if (row == null) {
445+
if (c.isDefunct()) {
446+
logger.debug("Control connection is down, could not refresh node info");
447+
// Keep going with what we currently know about the node, otherwise we will ignore all nodes
448+
// until the control connection is back up (which leads to a catch-22 if there is only one)
449+
return true;
450+
} else {
451+
logger.error("No row found for host {} in {}'s peers system table. {} will be ignored.", host.getAddress(), c.address, host.getAddress());
452+
return false;
453+
}
454+
// Ignore hosts with a null rpc_address, as this is most likely a phantom row in system.peers (JAVA-428).
455+
// Don't test this for the control host since we're already connected to it anyway, and we read the info from system.local
456+
// which doesn't have an rpc_address column (JAVA-546).
457+
} else if (!c.address.equals(host.getSocketAddress()) && row.getInet("rpc_address") == null) {
458+
logger.error("No rpc_address found for host {} in {}'s peers system table. {} will be ignored.", host.getAddress(), c.address, host.getAddress());
467459
return false;
468460
}
469-
// Ignore hosts with a null rpc_address, as this is most likely a phantom row in system.peers (JAVA-428).
470-
// Don't test this for the control host since we're already connected to it anyway, and we read the info from system.local
471-
// which doesn't have an rpc_address column (JAVA-546).
472-
} else if (!c.address.equals(host.getSocketAddress()) && row.getInet("rpc_address") == null) {
473-
logger.error("No rpc_address found for host {} in {}'s peers system table. {} will be ignored.", host.getAddress(), c.address, host.getAddress());
474-
return false;
475-
}
476461

477-
updateInfo(host, row, cluster);
462+
updateInfo(host, row, cluster);
463+
return true;
464+
465+
} catch (ConnectionException e) {
466+
logger.debug("[Control connection] Connection error while refreshing node info ({})", e.getMessage());
467+
signalError();
468+
} catch (ExecutionException e) {
469+
// If we're being shutdown during refresh, this can happen. That's fine so don't scare the user.
470+
if (!isShutdown)
471+
logger.debug("[Control connection] Unexpected error while refreshing node info", e);
472+
signalError();
473+
} catch (BusyConnectionException e) {
474+
logger.debug("[Control connection] Connection is busy, reconnecting");
475+
signalError();
476+
} catch (InterruptedException e) {
477+
Thread.currentThread().interrupt();
478+
logger.debug("[Control connection] Interrupted while refreshing node info, skipping it.");
479+
} catch (Exception e) {
480+
logger.debug("[Control connection] Unexpected error while refreshing node info", e);
481+
signalError();
482+
}
483+
// If we got an exception, always return true. Otherwise a faulty control connection would cause
484+
// reconnected hosts to be ignored permanently.
478485
return true;
479486
}
480487

driver-core/src/main/java/com/datastax/driver/core/DynamicConnectionPool.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -255,25 +255,15 @@ public void returnConnection(PooledConnection connection) {
255255
close(connection);
256256
} else {
257257
if (connections.size() > options().getCoreConnectionsPerHost(hostDistance) && inFlight <= options().getMinSimultaneousRequestsPerConnectionThreshold(hostDistance)) {
258-
connection.setTrashTimeIn(options().getIdleTimeoutSeconds());
258+
trashConnection(connection);
259259
} else if (connection.maxAvailableStreams() < MIN_AVAILABLE_STREAMS) {
260260
replaceConnection(connection);
261261
} else {
262-
connection.cancelTrashTime();
263262
signalAvailableConnection();
264263
}
265264
}
266265
}
267266

268-
@Override
269-
void trashIdleConnections(long now) {
270-
for (PooledConnection connection : connections) {
271-
if (connection.getTrashTime() < now) {
272-
trashConnection(connection);
273-
}
274-
}
275-
}
276-
277267
// Trash the connection and create a new one, but we don't call trashConnection
278268
// directly because we want to make sure the connection is always trashed.
279269
private void replaceConnection(PooledConnection connection) {
@@ -290,7 +280,6 @@ private boolean trashConnection(PooledConnection connection) {
290280
int opened = open.get();
291281
if (opened <= options().getCoreConnectionsPerHost(hostDistance)) {
292282
connection.markForTrash.set(false);
293-
connection.cancelTrashTime();
294283
return false;
295284
}
296285

@@ -300,9 +289,13 @@ private boolean trashConnection(PooledConnection connection) {
300289

301290
doTrashConnection(connection);
302291
}
303-
// If compareAndSet failed, it means we raced with another thread that will execute doTrashConnection.
304-
// Since trashConnection is called from a scheduled task, we're sure that the current thread did not modify
305-
// inFlight, so the other thread will take care of closing the connection if necessary (i.e. if inFlight == 0).
292+
// If compareAndSet failed, it means we raced and another thread will execute doTrashConnection.
293+
// If the connection needs to be closed (inFlight == 0), we don't need to do it here because the other thread will necessarily do
294+
// it:
295+
// - the current thread decremented inFlight in returnConnection
296+
// - we know it did it before the connection was trashed, because otherwise it would have entered `if (trash.contains(connection))`
297+
// in returnConnection and not arrived here.
298+
// - so the other thread will see the up-to-date value of inFlight and take appropriate action.
306299
return true;
307300
}
308301

driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ protected HostConnectionPool(Host host, HostDistance hostDistance, SessionManage
5959

6060
abstract void replaceDefunctConnection(final PooledConnection connection);
6161

62-
abstract void trashIdleConnections(long now);
63-
6462
abstract int opened();
6563

6664
abstract int inFlightQueriesCount();

driver-core/src/main/java/com/datastax/driver/core/PooledConnection.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@ class PooledConnection extends Connection {
2525

2626
private final HostConnectionPool pool;
2727

28-
/** The instant when the connection should be trashed after being idle for too long */
29-
private volatile long trashTime = Long.MAX_VALUE;
30-
31-
/** Used in {@link DynamicConnectionPool} to handle races between two threads trying to trash the same connection */
28+
/** Used in {@link HostConnectionPool} to handle races between two threads trying to trash the same connection */
3229
final AtomicBoolean markForTrash = new AtomicBoolean();
3330

3431
PooledConnection(String name, InetSocketAddress address, Factory factory, HostConnectionPool pool) throws ConnectionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException {
@@ -63,16 +60,4 @@ protected void notifyOwnerWhenDefunct(boolean hostIsDown) {
6360
pool.replaceDefunctConnection(this);
6461
}
6562
}
66-
67-
long getTrashTime() {
68-
return trashTime;
69-
}
70-
71-
void cancelTrashTime() {
72-
trashTime = Long.MAX_VALUE;
73-
}
74-
75-
void setTrashTimeIn(int timeoutSeconds) {
76-
trashTime = System.currentTimeMillis() + 1000 * timeoutSeconds;
77-
}
7863
}

0 commit comments

Comments
 (0)