Skip to content

Commit fc42ecb

Browse files
committed
Various fixes on JAVA-594.
- fix log acquisition pattern - increase lock timeout - fix deadlock when a connection created for a suspected host is defunct.
1 parent b83874f commit fc42ecb

2 files changed

Lines changed: 45 additions & 38 deletions

File tree

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public class Cluster implements Closeable {
7474

7575
private static final int DEFAULT_THREAD_KEEP_ALIVE = 30;
7676

77+
private static final int NOTIF_LOCK_TIMEOUT_SECONDS = 60;
78+
7779
final Manager manager;
7880

7981
/**
@@ -1378,12 +1380,12 @@ private void onUp(final Host host, ListeningExecutorService poolCreationExecutor
13781380
return;
13791381
}
13801382

1383+
boolean locked = host.notificationsLock.tryLock(NOTIF_LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
1384+
if (!locked) {
1385+
logger.warn("Could not acquire notifications lock within {} seconds, ignoring UP notification for {}", NOTIF_LOCK_TIMEOUT_SECONDS, host);
1386+
return;
1387+
}
13811388
try {
1382-
boolean locked = host.notificationsLock.tryLock(10, TimeUnit.SECONDS);
1383-
if (!locked) {
1384-
logger.warn("Could not acquire notifications lock within 10 seconds, ignoring UP notification for {}", host);
1385-
return;
1386-
}
13871389

13881390
// We don't want to use the public Host.isUp() as this would make us skip the rest for suspected hosts
13891391
if (host.state == Host.State.UP)
@@ -1476,7 +1478,7 @@ public void runMayThrow() throws InterruptedException, ExecutionException {
14761478
});
14771479
}
14781480

1479-
public void onSuspected(final Host host) {
1481+
public void onSuspected(final Host host) throws InterruptedException {
14801482
logger.debug("Host {} is Suspected", host);
14811483

14821484
if (isClosed())
@@ -1490,6 +1492,10 @@ public void onSuspected(final Host host) {
14901492
return;
14911493
}
14921494

1495+
// If we've already mark the node down/suspected, ignore this
1496+
if (host.state == Host.State.SUSPECT || host.reconnectionAttempt.get() != null)
1497+
return;
1498+
14931499
// We need to
14941500
// 1) mark the node suspect if no-one has bitten us to it
14951501
// 2) start the reconnection attempt
@@ -1501,15 +1507,14 @@ public void onSuspected(final Host host) {
15011507
// once, but we also don't want said threads to return from this method before
15021508
// the loadbalancing policy has been informed (otherwise those threads won't
15031509
// consider the host suspect but simply ignore it). So we lock.
1510+
boolean locked = host.notificationsLock.tryLock(NOTIF_LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
1511+
if (!locked) {
1512+
logger.warn("Could not acquire notifications lock within {} seconds, ignoring SUSPECTED notification for {}", NOTIF_LOCK_TIMEOUT_SECONDS, host);
1513+
return;
1514+
}
15041515
try {
1505-
boolean locked = host.notificationsLock.tryLock(10, TimeUnit.SECONDS);
1506-
if (!locked) {
1507-
logger.warn("Could not acquire notifications lock within 10 seconds, ignoring SUSPECTED notification for {}", host);
1508-
return;
1509-
}
1510-
1511-
// If we've already mark the node down/suspected, ignore this
1512-
if (!host.setSuspected() || host.reconnectionAttempt.get() != null)
1516+
// Again, exit if someone beat us to suspecting the host
1517+
if (!host.setSuspected() || host.reconnectionAttempt.get() != null)
15131518
return;
15141519

15151520
// Start the initial initial reconnection attempt
@@ -1545,8 +1550,6 @@ public void runMayThrow() throws InterruptedException, ExecutionException {
15451550
for (Host.StateListener listener : listeners)
15461551
listener.onSuspected(host);
15471552

1548-
} catch (InterruptedException e) {
1549-
Thread.currentThread().interrupt();
15501553
} finally {
15511554
host.notificationsLock.unlock();
15521555
}
@@ -1559,12 +1562,12 @@ private void onDown(final Host host, final boolean isHostAddition, final boolean
15591562
if (isClosed())
15601563
return;
15611564

1565+
boolean locked = host.notificationsLock.tryLock(NOTIF_LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
1566+
if (!locked) {
1567+
logger.warn("Could not acquire notifications lock within {} seconds, ignoring DOWN notification for {}", NOTIF_LOCK_TIMEOUT_SECONDS, host);
1568+
return;
1569+
}
15621570
try {
1563-
boolean locked = host.notificationsLock.tryLock(10, TimeUnit.SECONDS);
1564-
if (!locked) {
1565-
logger.warn("Could not acquire notifications lock within 10 seconds, ignoring DOWN notification for {}", host);
1566-
return;
1567-
}
15681571

15691572
// If we're SUSPECT and not the task validating the suspicion, then some other task is
15701573
// already checking to verify if the node is really down (or if it's simply that the
@@ -1740,12 +1743,12 @@ private void onAdd(final Host host) throws InterruptedException, ExecutionExcept
17401743
return;
17411744
}
17421745

1746+
boolean locked = host.notificationsLock.tryLock(NOTIF_LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
1747+
if (!locked) {
1748+
logger.warn("Could not acquire notifications lock within {} seconds, ignoring ADD notification for {}", NOTIF_LOCK_TIMEOUT_SECONDS, host);
1749+
return;
1750+
}
17431751
try {
1744-
boolean locked = host.notificationsLock.tryLock(10, TimeUnit.SECONDS);
1745-
if (!locked) {
1746-
logger.warn("Could not acquire notifications lock within 10 seconds, ignoring ADD notification for {}", host);
1747-
return;
1748-
}
17491752

17501753
// Adds to the load balancing first and foremost, as doing so might change the decision
17511754
// it will make for distance() on that node (not likely but we leave that possibility).
@@ -1835,12 +1838,12 @@ private void onRemove(Host host) throws InterruptedException, ExecutionException
18351838
if (isClosed())
18361839
return;
18371840

1841+
boolean locked = host.notificationsLock.tryLock(NOTIF_LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
1842+
if (!locked) {
1843+
logger.warn("Could not acquire notifications lock within {} seconds, ignoring REMOVE notification for {}", NOTIF_LOCK_TIMEOUT_SECONDS, host);
1844+
return;
1845+
}
18381846
try {
1839-
boolean locked = host.notificationsLock.tryLock(10, TimeUnit.SECONDS);
1840-
if (!locked) {
1841-
logger.warn("Could not acquire notifications lock within 10 seconds, ignoring REMOVE notification for {}", host);
1842-
return;
1843-
}
18441847

18451848
host.setDown();
18461849

@@ -1868,11 +1871,16 @@ public boolean signalConnectionFailure(Host host, ConnectionException exception,
18681871
if (isHostAddition || !markSuspected) {
18691872
triggerOnDown(host, isHostAddition);
18701873
} else {
1871-
// Note that we do want to call onSuspected on the current thread, as the whole point is
1872-
// that by the time this method return, the host initialReconnectionAttempt will have been
1873-
// set and the load balancing policy informed of the suspection. We know that onSuspected
1874-
// does little work (and non blocking one) itself however.
1875-
onSuspected(host);
1874+
try {
1875+
// Note that we do want to call onSuspected on the current thread, as the whole point is
1876+
// that by the time this method return, the host initialReconnectionAttempt will have been
1877+
// set and the load balancing policy informed of the suspection. We know that onSuspected
1878+
// does little work (and non blocking one) itself however.
1879+
onSuspected(host);
1880+
} catch (InterruptedException e) {
1881+
// This is most likely due to shutdown
1882+
logger.warn("Interrupted while trying to set host SUSPECT, aborting");
1883+
}
18761884
}
18771885
}
18781886
return isDown;

driver-core/src/main/java/com/datastax/driver/core/Host.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.net.InetAddress;
1919
import java.net.InetSocketAddress;
2020
import java.util.concurrent.atomic.AtomicReference;
21-
import java.util.concurrent.locks.Lock;
2221
import java.util.concurrent.locks.ReentrantLock;
2322

2423
import com.google.common.collect.ImmutableList;
@@ -42,7 +41,7 @@ public class Host {
4241
enum State { ADDED, DOWN, SUSPECT, UP }
4342
volatile State state;
4443
/** Ensures state change notifications for that host are handled serially */
45-
final Lock notificationsLock = new ReentrantLock();
44+
final ReentrantLock notificationsLock = new ReentrantLock(true);
4645

4746
private final ConvictionPolicy policy;
4847
private final Cluster.Manager manager;

0 commit comments

Comments
 (0)