@@ -584,11 +584,8 @@ def _on_up_future_completed(self, host, futures, results, lock, finished_future)
584584 for listener in self .listeners :
585585 listener .on_up (host )
586586 finally :
587- host ._handle_node_up_condition .acquire ()
588- if host ._currently_handling_node_up :
587+ with host .lock :
589588 host ._currently_handling_node_up = False
590- host ._handle_node_up_condition .notify ()
591- host ._handle_node_up_condition .release ()
592589
593590 # see if there are any pools to add or remove now that the host is marked up
594591 for session in self .sessions :
@@ -601,39 +598,49 @@ def on_up(self, host):
601598 if self .is_shutdown :
602599 return
603600
604- host . _handle_node_up_condition . acquire ( )
605- while host ._currently_handling_node_up :
606- host ._handle_node_up_condition . wait ()
607- host . _currently_handling_node_up = True
608- host . _handle_node_up_condition . release ()
601+ log . debug ( "Waiting to acquire lock for handling up status of node %s" , host )
602+ with host .lock :
603+ if host ._currently_handling_node_up :
604+ log . debug ( "Another thread is already handling up status of node %s" , host )
605+ return
609606
610- if host .is_up :
611- return
607+ if host .is_up :
608+ log .debug ("Host %s was already marked up" , host )
609+ return
612610
611+ host ._currently_handling_node_up = True
612+ log .debug ("Starting to handle up status of node %s" , host )
613+
614+ have_future = False
613615 futures = set ()
614616 try :
615- log .info ("Host %s has been marked up " , host )
617+ log .info ("Host %s may be up; will prepare queries and open connection pool " , host )
616618
617619 reconnector = host .get_and_set_reconnection_handler (None )
618620 if reconnector :
619621 log .debug ("Now that host %s is up, cancelling the reconnection handler" , host )
620622 reconnector .cancel ()
621623
622624 self ._prepare_all_queries (host )
623- log .debug ("Done preparing all queries for host %s" , host )
625+ log .debug ("Done preparing all queries for host %s, " , host )
624626
625627 for session in self .sessions :
626628 session .remove_pool (host )
627629
630+ log .debug ("Signalling to load balancing policy that host %s is up" , host )
628631 self .load_balancing_policy .on_up (host )
632+
633+ log .debug ("Signalling to control connection that host %s is up" , host )
629634 self .control_connection .on_up (host )
630635
636+ log .debug ("Attempting to open new connection pools for host %s" , host )
631637 futures_lock = Lock ()
632638 futures_results = []
633639 callback = partial (self ._on_up_future_completed , host , futures , futures_results , futures_lock )
634640 for session in self .sessions :
635641 future = session .add_or_renew_pool (host , is_host_addition = False )
636642 if future is not None :
643+ have_future = True
637644 future .add_done_callback (callback )
638645 futures .add (future )
639646 except Exception :
@@ -643,11 +650,13 @@ def on_up(self, host):
643650
644651 self ._cleanup_failed_on_up_handling (host )
645652
646- host ._handle_node_up_condition .acquire ()
647- host ._currently_handling_node_up = False
648- host ._handle_node_up_condition .notify ()
649- host ._handle_node_up_condition .release ()
653+ with host .lock :
654+ host ._currently_handling_node_up = False
650655 raise
656+ else :
657+ if not have_future :
658+ with host .lock :
659+ host ._currently_handling_node_up = False
651660
652661 # for testing purposes
653662 return futures
@@ -670,19 +679,19 @@ def _start_reconnector(self, host, is_host_addition):
670679 log .debug ("Old host reconnector found for %s, cancelling" , host )
671680 old_reconnector .cancel ()
672681
673- log .debug ("Staring reconnector for host %s" , host )
682+ log .debug ("Starting reconnector for host %s" , host )
674683 reconnector .start ()
675684
676685 @run_in_executor
677- def on_down (self , host , is_host_addition , force_if_down = False ):
686+ def on_down (self , host , is_host_addition , expect_host_to_be_down = False ):
678687 """
679688 Intended for internal use only.
680689 """
681690 if self .is_shutdown :
682691 return
683692
684693 with host .lock :
685- if (not ( host .is_up or force_if_down ) ) or host .is_currently_reconnecting ():
694+ if (not host .is_up and not expect_host_to_be_down ) or host .is_currently_reconnecting ():
686695 return
687696
688697 host .set_down ()
@@ -772,10 +781,10 @@ def on_remove(self, host):
772781 listener .on_remove (host )
773782 self .control_connection .on_remove (host )
774783
775- def signal_connection_failure (self , host , connection_exc , is_host_addition ):
784+ def signal_connection_failure (self , host , connection_exc , is_host_addition , expect_host_to_be_down = False ):
776785 is_down = host .signal_connection_failure (connection_exc )
777786 if is_down :
778- self .on_down (host , is_host_addition , force_if_down = True )
787+ self .on_down (host , is_host_addition , expect_host_to_be_down )
779788 return is_down
780789
781790 def add_host (self , address , datacenter = None , rack = None , signal = True ):
@@ -1226,8 +1235,12 @@ def run_add_or_renew_pool():
12261235 self .cluster .signal_connection_failure (host , conn_exc , is_host_addition )
12271236 return False
12281237 except Exception as conn_exc :
1229- log .warn ("Failed to create connection pool for new host %s: %s" , host , conn_exc )
1230- self .cluster .signal_connection_failure (host , conn_exc , is_host_addition )
1238+ log .warn ("Failed to create connection pool for new host %s: %s" ,
1239+ host , conn_exc )
1240+ # the host itself will still be marked down, so we need to pass
1241+ # a special flag to make sure the reconnector is created
1242+ self .cluster .signal_connection_failure (
1243+ host , conn_exc , is_host_addition , expect_host_to_be_down = True )
12311244 return False
12321245
12331246 previous = self ._pools .get (host )
0 commit comments