1717
1818import com .datastax .oss .driver .api .core .AllNodesFailedException ;
1919import com .datastax .oss .driver .api .core .AsyncAutoCloseable ;
20+ import com .datastax .oss .driver .api .core .loadbalancing .NodeDistance ;
2021import com .datastax .oss .driver .api .core .metadata .Node ;
22+ import com .datastax .oss .driver .api .core .metadata .NodeState ;
2123import com .datastax .oss .driver .internal .core .channel .ChannelEvent ;
2224import com .datastax .oss .driver .internal .core .channel .DriverChannel ;
2325import com .datastax .oss .driver .internal .core .channel .DriverChannelOptions ;
2426import com .datastax .oss .driver .internal .core .channel .EventCallback ;
2527import com .datastax .oss .driver .internal .core .context .InternalDriverContext ;
28+ import com .datastax .oss .driver .internal .core .metadata .DistanceEvent ;
29+ import com .datastax .oss .driver .internal .core .metadata .NodeStateEvent ;
2630import com .datastax .oss .driver .internal .core .metadata .SchemaElementKind ;
2731import com .datastax .oss .driver .internal .core .metadata .TopologyEvent ;
2832import com .datastax .oss .driver .internal .core .metadata .TopologyMonitor ;
4145import java .util .LinkedHashMap ;
4246import java .util .Map ;
4347import java .util .Queue ;
48+ import java .util .WeakHashMap ;
4449import java .util .concurrent .CompletableFuture ;
4550import java .util .concurrent .CompletionStage ;
4651import java .util .function .Consumer ;
@@ -66,7 +71,7 @@ public class ControlConnection implements EventCallback, AsyncAutoCloseable {
6671 private final EventExecutor adminExecutor ;
6772 private final SingleThreaded singleThreaded ;
6873
69- // The single channel used by this connection. This field is accessed currently , but only
74+ // The single channel used by this connection. This field is accessed concurrently , but only
7075 // mutated on adminExecutor (by SingleThreaded methods)
7176 private volatile DriverChannel channel ;
7277
@@ -195,11 +200,21 @@ private class SingleThreaded {
195200 private boolean closeWasCalled ;
196201 private final Reconnection reconnection ;
197202 private DriverChannelOptions channelOptions ;
203+ // The last events received for each node
204+ private final Map <Node , DistanceEvent > lastDistanceEvents = new WeakHashMap <>();
205+ private final Map <Node , NodeStateEvent > lastStateEvents = new WeakHashMap <>();
198206
199207 private SingleThreaded (InternalDriverContext context ) {
200208 this .context = context ;
201209 this .reconnection =
202210 new Reconnection (logPrefix , adminExecutor , context .reconnectionPolicy (), this ::reconnect );
211+
212+ context
213+ .eventBus ()
214+ .register (DistanceEvent .class , RunOrSchedule .on (adminExecutor , this ::onDistanceEvent ));
215+ context
216+ .eventBus ()
217+ .register (NodeStateEvent .class , RunOrSchedule .on (adminExecutor , this ::onStateEvent ));
203218 }
204219
205220 private void init (boolean listenToClusterEvents ) {
@@ -253,6 +268,8 @@ private void connect(
253268 .whenCompleteAsync (
254269 (channel , error ) -> {
255270 try {
271+ DistanceEvent lastDistanceEvent = lastDistanceEvents .get (node );
272+ NodeStateEvent lastStateEvent = lastStateEvents .get (node );
256273 if (error != null ) {
257274 if (closeWasCalled ) {
258275 onSuccess .run (); // abort, we don't really care about the result
@@ -274,6 +291,25 @@ private void connect(
274291 channel );
275292 channel .forceClose ();
276293 onSuccess .run ();
294+ } else if (lastDistanceEvent != null
295+ && lastDistanceEvent .distance == NodeDistance .IGNORED ) {
296+ LOG .debug (
297+ "[{}] New channel opened ({}) but node became ignored, "
298+ + "closing and trying next node" ,
299+ logPrefix ,
300+ channel );
301+ channel .forceClose ();
302+ connect (nodes , errors , onSuccess , onFailure );
303+ } else if (lastStateEvent != null
304+ && (lastStateEvent .newState == null /*(removed)*/
305+ || lastStateEvent .newState == NodeState .FORCED_DOWN )) {
306+ LOG .debug (
307+ "[{}] New channel opened ({}) but node was removed or forced down, "
308+ + "closing and trying next node" ,
309+ logPrefix ,
310+ channel );
311+ channel .forceClose ();
312+ connect (nodes , errors , onSuccess , onFailure );
277313 } else {
278314 LOG .debug ("[{}] Connection established to {}" , logPrefix , node );
279315 // Make sure previous channel gets closed (it may still be open if reconnection was forced)
@@ -344,6 +380,36 @@ private void reconnectNow() {
344380 }
345381 }
346382
383+ private void onDistanceEvent (DistanceEvent event ) {
384+ assert adminExecutor .inEventLoop ();
385+ this .lastDistanceEvents .put (event .node , event );
386+ if (event .distance == NodeDistance .IGNORED
387+ && channel != null
388+ && !channel .closeFuture ().isDone ()
389+ && event .node .getConnectAddress ().equals (channel .address ())) {
390+ LOG .debug (
391+ "[{}] Control node {} became IGNORED, reconnecting to a different node" ,
392+ logPrefix ,
393+ event .node );
394+ reconnectNow ();
395+ }
396+ }
397+
398+ private void onStateEvent (NodeStateEvent event ) {
399+ assert adminExecutor .inEventLoop ();
400+ this .lastStateEvents .put (event .node , event );
401+ if ((event .newState == null /*(removed)*/ || event .newState == NodeState .FORCED_DOWN )
402+ && channel != null
403+ && !channel .closeFuture ().isDone ()
404+ && event .node .getConnectAddress ().equals (channel .address ())) {
405+ LOG .debug (
406+ "[{}] Control node {} was removed or forced down, reconnecting to a different node" ,
407+ logPrefix ,
408+ event .node );
409+ reconnectNow ();
410+ }
411+ }
412+
347413 private void forceClose () {
348414 assert adminExecutor .inEventLoop ();
349415 if (closeWasCalled ) {
0 commit comments