@@ -759,13 +759,13 @@ def signal_connection_failure(self, host, connection_exc, is_host_addition):
759759 self .on_down (host , is_host_addition , force_if_down = True )
760760 return is_down
761761
762- def add_host (self , address , signal ):
762+ def add_host (self , address , datacenter = None , rack = None , signal = True ):
763763 """
764764 Called when adding initial contact points and when the control
765765 connection subsequently discovers a new node. Intended for internal
766766 use only.
767767 """
768- new_host = self .metadata .add_host (address )
768+ new_host = self .metadata .add_host (address , datacenter , rack )
769769 if new_host and signal :
770770 log .info ("New Cassandra host %s added" , address )
771771 self .on_add (new_host )
@@ -1580,10 +1580,13 @@ def _refresh_node_list_and_token_map(self, connection):
15801580 found_hosts .add (addr )
15811581
15821582 host = self ._cluster .metadata .get_host (addr )
1583+ datacenter = row .get ("data_center" )
1584+ rack = row .get ("rack" )
15831585 if host is None :
15841586 log .debug ("[control connection] Found new host to connect to: %s" , addr )
1585- host = self ._cluster .add_host (addr , signal = True )
1586- host .set_location_info (row .get ("data_center" ), row .get ("rack" ))
1587+ host = self ._cluster .add_host (addr , datacenter , rack , signal = True )
1588+ else :
1589+ self ._update_location_info (host , datacenter , rack )
15871590
15881591 tokens = row .get ("tokens" )
15891592 if partitioner and tokens :
@@ -1600,11 +1603,22 @@ def _refresh_node_list_and_token_map(self, connection):
16001603 log .debug ("[control connection] Fetched ring info, rebuilding metadata" )
16011604 self ._cluster .metadata .rebuild_token_map (partitioner , token_map )
16021605
1606+ def _update_location_info (self , host , datacenter , rack ):
1607+ if host .datacenter == datacenter and host .rack == rack :
1608+ return
1609+
1610+ # If the dc/rack information changes, we need to update the load balancing policy.
1611+ # For that, we remove and re-add the node against the policy. Not the most elegant, and assumes
1612+ # that the policy will update correctly, but in practice this should work.
1613+ self ._cluster .load_balancing_policy .on_down (host )
1614+ host .set_location_info (datacenter , rack )
1615+ self ._cluster .load_balancing_policy .on_up (host )
1616+
16031617 def _handle_topology_change (self , event ):
16041618 change_type = event ["change_type" ]
16051619 addr , port = event ["address" ]
16061620 if change_type == "NEW_NODE" :
1607- self ._cluster .scheduler .schedule (10 , self ._cluster . add_host , addr , signal = True )
1621+ self ._cluster .scheduler .schedule (10 , self .refresh_node_list_and_token_map )
16081622 elif change_type == "REMOVED_NODE" :
16091623 host = self ._cluster .metadata .get_host (addr )
16101624 self ._cluster .scheduler .schedule (0 , self ._cluster .remove_host , host )
@@ -1618,7 +1632,7 @@ def _handle_status_change(self, event):
16181632 if change_type == "UP" :
16191633 if host is None :
16201634 # this is the first time we've seen the node
1621- self ._cluster .scheduler .schedule (1 , self ._cluster . add_host , addr , signal = True )
1635+ self ._cluster .scheduler .schedule (1 , self .refresh_node_list_and_token_map )
16221636 else :
16231637 # this will be run by the scheduler
16241638 self ._cluster .scheduler .schedule (1 , self ._cluster .on_up , host )
0 commit comments