@@ -74,6 +74,8 @@ public class Cluster implements Closeable {
7474
7575 private static final int DEFAULT_THREAD_KEEP_ALIVE = 30 ;
7676
77+ private static final int NOTIF_LOCK_TIMEOUT_SECONDS = 60 ;
78+
7779 final Manager manager ;
7880
7981 /**
@@ -1378,12 +1380,12 @@ private void onUp(final Host host, ListeningExecutorService poolCreationExecutor
13781380 return ;
13791381 }
13801382
1383+ boolean locked = host .notificationsLock .tryLock (NOTIF_LOCK_TIMEOUT_SECONDS , TimeUnit .SECONDS );
1384+ if (!locked ) {
1385+ logger .warn ("Could not acquire notifications lock within {} seconds, ignoring UP notification for {}" , NOTIF_LOCK_TIMEOUT_SECONDS , host );
1386+ return ;
1387+ }
13811388 try {
1382- boolean locked = host .notificationsLock .tryLock (10 , TimeUnit .SECONDS );
1383- if (!locked ) {
1384- logger .warn ("Could not acquire notifications lock within 10 seconds, ignoring UP notification for {}" , host );
1385- return ;
1386- }
13871389
13881390 // We don't want to use the public Host.isUp() as this would make us skip the rest for suspected hosts
13891391 if (host .state == Host .State .UP )
@@ -1476,7 +1478,7 @@ public void runMayThrow() throws InterruptedException, ExecutionException {
14761478 });
14771479 }
14781480
1479- public void onSuspected (final Host host ) {
1481+ public void onSuspected (final Host host ) throws InterruptedException {
14801482 logger .debug ("Host {} is Suspected" , host );
14811483
14821484 if (isClosed ())
@@ -1490,6 +1492,10 @@ public void onSuspected(final Host host) {
14901492 return ;
14911493 }
14921494
1495+ // If we've already mark the node down/suspected, ignore this
1496+ if (host .state == Host .State .SUSPECT || host .reconnectionAttempt .get () != null )
1497+ return ;
1498+
14931499 // We need to
14941500 // 1) mark the node suspect if no-one has bitten us to it
14951501 // 2) start the reconnection attempt
@@ -1501,15 +1507,14 @@ public void onSuspected(final Host host) {
15011507 // once, but we also don't want said threads to return from this method before
15021508 // the loadbalancing policy has been informed (otherwise those threads won't
15031509 // consider the host suspect but simply ignore it). So we lock.
1510+ boolean locked = host .notificationsLock .tryLock (NOTIF_LOCK_TIMEOUT_SECONDS , TimeUnit .SECONDS );
1511+ if (!locked ) {
1512+ logger .warn ("Could not acquire notifications lock within {} seconds, ignoring SUSPECTED notification for {}" , NOTIF_LOCK_TIMEOUT_SECONDS , host );
1513+ return ;
1514+ }
15041515 try {
1505- boolean locked = host .notificationsLock .tryLock (10 , TimeUnit .SECONDS );
1506- if (!locked ) {
1507- logger .warn ("Could not acquire notifications lock within 10 seconds, ignoring SUSPECTED notification for {}" , host );
1508- return ;
1509- }
1510-
1511- // If we've already mark the node down/suspected, ignore this
1512- if (!host .setSuspected () || host .reconnectionAttempt .get () != null )
1516+ // Again, exit if someone beat us to suspecting the host
1517+ if (!host .setSuspected () || host .reconnectionAttempt .get () != null )
15131518 return ;
15141519
15151520 // Start the initial initial reconnection attempt
@@ -1545,8 +1550,6 @@ public void runMayThrow() throws InterruptedException, ExecutionException {
15451550 for (Host .StateListener listener : listeners )
15461551 listener .onSuspected (host );
15471552
1548- } catch (InterruptedException e ) {
1549- Thread .currentThread ().interrupt ();
15501553 } finally {
15511554 host .notificationsLock .unlock ();
15521555 }
@@ -1559,12 +1562,12 @@ private void onDown(final Host host, final boolean isHostAddition, final boolean
15591562 if (isClosed ())
15601563 return ;
15611564
1565+ boolean locked = host .notificationsLock .tryLock (NOTIF_LOCK_TIMEOUT_SECONDS , TimeUnit .SECONDS );
1566+ if (!locked ) {
1567+ logger .warn ("Could not acquire notifications lock within {} seconds, ignoring DOWN notification for {}" , NOTIF_LOCK_TIMEOUT_SECONDS , host );
1568+ return ;
1569+ }
15621570 try {
1563- boolean locked = host .notificationsLock .tryLock (10 , TimeUnit .SECONDS );
1564- if (!locked ) {
1565- logger .warn ("Could not acquire notifications lock within 10 seconds, ignoring DOWN notification for {}" , host );
1566- return ;
1567- }
15681571
15691572 // If we're SUSPECT and not the task validating the suspicion, then some other task is
15701573 // already checking to verify if the node is really down (or if it's simply that the
@@ -1740,12 +1743,12 @@ private void onAdd(final Host host) throws InterruptedException, ExecutionExcept
17401743 return ;
17411744 }
17421745
1746+ boolean locked = host .notificationsLock .tryLock (NOTIF_LOCK_TIMEOUT_SECONDS , TimeUnit .SECONDS );
1747+ if (!locked ) {
1748+ logger .warn ("Could not acquire notifications lock within {} seconds, ignoring ADD notification for {}" , NOTIF_LOCK_TIMEOUT_SECONDS , host );
1749+ return ;
1750+ }
17431751 try {
1744- boolean locked = host .notificationsLock .tryLock (10 , TimeUnit .SECONDS );
1745- if (!locked ) {
1746- logger .warn ("Could not acquire notifications lock within 10 seconds, ignoring ADD notification for {}" , host );
1747- return ;
1748- }
17491752
17501753 // Adds to the load balancing first and foremost, as doing so might change the decision
17511754 // it will make for distance() on that node (not likely but we leave that possibility).
@@ -1835,12 +1838,12 @@ private void onRemove(Host host) throws InterruptedException, ExecutionException
18351838 if (isClosed ())
18361839 return ;
18371840
1841+ boolean locked = host .notificationsLock .tryLock (NOTIF_LOCK_TIMEOUT_SECONDS , TimeUnit .SECONDS );
1842+ if (!locked ) {
1843+ logger .warn ("Could not acquire notifications lock within {} seconds, ignoring REMOVE notification for {}" , NOTIF_LOCK_TIMEOUT_SECONDS , host );
1844+ return ;
1845+ }
18381846 try {
1839- boolean locked = host .notificationsLock .tryLock (10 , TimeUnit .SECONDS );
1840- if (!locked ) {
1841- logger .warn ("Could not acquire notifications lock within 10 seconds, ignoring REMOVE notification for {}" , host );
1842- return ;
1843- }
18441847
18451848 host .setDown ();
18461849
@@ -1868,11 +1871,16 @@ public boolean signalConnectionFailure(Host host, ConnectionException exception,
18681871 if (isHostAddition || !markSuspected ) {
18691872 triggerOnDown (host , isHostAddition );
18701873 } else {
1871- // Note that we do want to call onSuspected on the current thread, as the whole point is
1872- // that by the time this method return, the host initialReconnectionAttempt will have been
1873- // set and the load balancing policy informed of the suspection. We know that onSuspected
1874- // does little work (and non blocking one) itself however.
1875- onSuspected (host );
1874+ try {
1875+ // Note that we do want to call onSuspected on the current thread, as the whole point is
1876+ // that by the time this method return, the host initialReconnectionAttempt will have been
1877+ // set and the load balancing policy informed of the suspection. We know that onSuspected
1878+ // does little work (and non blocking one) itself however.
1879+ onSuspected (host );
1880+ } catch (InterruptedException e ) {
1881+ // This is most likely due to shutdown
1882+ logger .warn ("Interrupted while trying to set host SUSPECT, aborting" );
1883+ }
18761884 }
18771885 }
18781886 return isDown ;
0 commit comments