2929import com .datastax .oss .driver .internal .core .metadata .DefaultNode ;
3030import com .datastax .oss .driver .internal .core .metadata .DistanceEvent ;
3131import com .datastax .oss .driver .internal .core .metadata .NodeStateEvent ;
32+ import com .datastax .oss .driver .internal .core .metadata .TopologyEvent ;
3233import com .datastax .oss .driver .internal .core .pool .ChannelPool ;
3334import com .datastax .oss .driver .internal .core .pool .ChannelPoolFactory ;
3435import com .datastax .oss .driver .internal .core .util .concurrent .CompletableFutures ;
@@ -208,6 +209,7 @@ private class SingleThreaded {
208209 private final Object stateListenerKey ;
209210 private final ReplayingEventFilter <NodeStateEvent > stateEventFilter =
210211 new ReplayingEventFilter <>(this ::processStateEvent );
212+ private final Object topologyListenerKey ;
211213 // The pools that we have opened but have not finished initializing yet
212214 private final Map <Node , CompletionStage <ChannelPool >> pending = new HashMap <>();
213215 // If we receive events while a pool is initializing, the last one is stored here
@@ -226,6 +228,11 @@ private SingleThreaded(InternalDriverContext context) {
226228 context
227229 .eventBus ()
228230 .register (NodeStateEvent .class , RunOrSchedule .on (adminExecutor , this ::onStateEvent ));
231+ this .topologyListenerKey =
232+ context
233+ .eventBus ()
234+ .register (
235+ TopologyEvent .class , RunOrSchedule .on (adminExecutor , this ::onTopologyEvent ));
229236 }
230237
231238 private void init () {
@@ -338,39 +345,57 @@ private void processDistanceEvent(DistanceEvent event) {
338345
339346 private void processStateEvent (NodeStateEvent event ) {
340347 assert adminExecutor .inEventLoop ();
341- // no need to check closeWasCalled, because we stop listening for events one closed
348+ // no need to check closeWasCalled, because we stop listening for events once closed
342349 DefaultNode node = event .node ;
350+ NodeState oldState = event .oldState ;
343351 NodeState newState = event .newState ;
344352 if (pending .containsKey (node )) {
345353 pendingStateEvents .put (node , event );
346354 } else if (newState == NodeState .FORCED_DOWN ) {
347355 ChannelPool pool = pools .remove (node );
348356 if (pool != null ) {
349- LOG .debug ("[{}] {} became FORCED_DOWN, destroying pool" , logPrefix , node );
357+ LOG .debug ("[{}] {} was FORCED_DOWN, destroying pool" , logPrefix , node );
350358 pool .closeAsync ()
351359 .exceptionally (
352360 error -> {
353361 LOG .warn ("[{}] Error closing pool" , logPrefix , error );
354362 return null ;
355363 });
356364 }
357- } else if (newState == NodeState .UP && node .getDistance () != NodeDistance .IGNORED ) {
358- ChannelPool pool = pools .get (node );
359- if (pool == null ) {
360- LOG .debug ("[{}] {} came back UP and no pool found, initializing it" , logPrefix , node );
361- CompletionStage <ChannelPool > poolFuture =
362- channelPoolFactory .init (node , keyspace , node .getDistance (), context , logPrefix );
363- pending .put (node , poolFuture );
364- poolFuture
365- .thenAcceptAsync (this ::onPoolInitialized , adminExecutor )
366- .exceptionally (UncaughtExceptions ::log );
367- } else {
368- LOG .debug ("[{}] {} came back UP, triggering pool reconnection" , logPrefix , node );
369- pool .reconnectNow ();
365+ } else if (oldState == NodeState .FORCED_DOWN
366+ && newState == NodeState .UP
367+ && node .getDistance () != NodeDistance .IGNORED ) {
368+ LOG .debug ("[{}] {} was forced back UP, initializing pool" , logPrefix , node );
369+ createOrReconnectPool (node );
370+ }
371+ }
372+
373+ private void onTopologyEvent (TopologyEvent event ) {
374+ assert adminExecutor .inEventLoop ();
375+ if (event .type == TopologyEvent .Type .SUGGEST_UP ) {
376+ Node node = context .metadataManager ().getMetadata ().getNodes ().get (event .address );
377+ if (node .getDistance () != NodeDistance .IGNORED ) {
378+ LOG .debug (
379+ "[{}] Received a SUGGEST_UP event for {}, reconnecting pool now" , logPrefix , node );
380+ createOrReconnectPool (node );
370381 }
371382 }
372383 }
373384
385+ private void createOrReconnectPool (Node node ) {
386+ ChannelPool pool = pools .get (node );
387+ if (pool == null ) {
388+ CompletionStage <ChannelPool > poolFuture =
389+ channelPoolFactory .init (node , keyspace , node .getDistance (), context , logPrefix );
390+ pending .put (node , poolFuture );
391+ poolFuture
392+ .thenAcceptAsync (this ::onPoolInitialized , adminExecutor )
393+ .exceptionally (UncaughtExceptions ::log );
394+ } else {
395+ pool .reconnectNow ();
396+ }
397+ }
398+
374399 private void onPoolInitialized (ChannelPool pool ) {
375400 assert adminExecutor .inEventLoop ();
376401 Node node = pool .getNode ();
@@ -458,6 +483,7 @@ private void close() {
458483 // Stop listening for events
459484 context .eventBus ().unregister (distanceListenerKey , DistanceEvent .class );
460485 context .eventBus ().unregister (stateListenerKey , NodeStateEvent .class );
486+ context .eventBus ().unregister (topologyListenerKey , TopologyEvent .class );
461487
462488 List <CompletionStage <Void >> closePoolStages = new ArrayList <>(pools .size ());
463489 for (ChannelPool pool : pools .values ()) {
0 commit comments