6565from cassandra .metadata import Metadata , protect_name , murmur3
6666from cassandra .policies import (TokenAwarePolicy , DCAwareRoundRobinPolicy , SimpleConvictionPolicy ,
6767 ExponentialReconnectionPolicy , HostDistance ,
68- RetryPolicy )
68+ RetryPolicy , IdentityTranslator )
6969from cassandra .pool import (Host , _ReconnectionHandler , _HostReconnectionHandler ,
7070 HostConnectionPool , HostConnection ,
7171 NoConnectionsAvailable )
@@ -347,6 +347,12 @@ def auth_provider(self, value):
347347 :class:`.policies.SimpleConvictionPolicy`.
348348 """
349349
350+ address_translator = IdentityTranslator ()
351+ """
352+ :class:`.policies.AddressTranslator` instance to be used in translating server node addresses
353+ to driver connection addresses.
354+ """
355+
350356 connect_to_remote_hosts = True
351357 """
352358 If left as :const:`True`, hosts that are considered :attr:`~.HostDistance.REMOTE`
@@ -553,7 +559,8 @@ def __init__(self,
553559 topology_event_refresh_window = 10 ,
554560 connect_timeout = 5 ,
555561 schema_metadata_enabled = True ,
556- token_metadata_enabled = True ):
562+ token_metadata_enabled = True ,
563+ address_translator = None ):
557564 """
558565 Any of the mutable Cluster attributes may be set as keyword arguments
559566 to the constructor.
@@ -572,28 +579,30 @@ def __init__(self,
572579 if load_balancing_policy is not None :
573580 if isinstance (load_balancing_policy , type ):
574581 raise TypeError ("load_balancing_policy should not be a class, it should be an instance of that class" )
575-
576582 self .load_balancing_policy = load_balancing_policy
577583 else :
578584 self .load_balancing_policy = default_lbp_factory ()
579585
580586 if reconnection_policy is not None :
581587 if isinstance (reconnection_policy , type ):
582588 raise TypeError ("reconnection_policy should not be a class, it should be an instance of that class" )
583-
584589 self .reconnection_policy = reconnection_policy
585590
586591 if default_retry_policy is not None :
587592 if isinstance (default_retry_policy , type ):
588593 raise TypeError ("default_retry_policy should not be a class, it should be an instance of that class" )
589-
590594 self .default_retry_policy = default_retry_policy
591595
592596 if conviction_policy_factory is not None :
593597 if not callable (conviction_policy_factory ):
594598 raise ValueError ("conviction_policy_factory must be callable" )
595599 self .conviction_policy_factory = conviction_policy_factory
596600
601+ if address_translator is not None :
602+ if isinstance (address_translator , type ):
603+ raise TypeError ("address_translator should not be a class, it should be an instance of that class" )
604+ self .address_translator = address_translator
605+
597606 if connection_class is not None :
598607 self .connection_class = connection_class
599608
@@ -2348,10 +2357,7 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
23482357 should_rebuild_token_map = force_token_rebuild or self ._cluster .metadata .partitioner is None
23492358 found_hosts = set ()
23502359 for row in peers_result :
2351- addr = row .get ("rpc_address" )
2352-
2353- if not addr or addr in ["0.0.0.0" , "::" ]:
2354- addr = row .get ("peer" )
2360+ addr = self ._rpc_from_peer_row (row )
23552361
23562362 tokens = row .get ("tokens" , None )
23572363 if 'tokens' in row and not tokens : # it was selected, but empty
@@ -2413,7 +2419,7 @@ def _delay_for_event_type(self, event_type, delay_window):
24132419
24142420 def _handle_topology_change (self , event ):
24152421 change_type = event ["change_type" ]
2416- addr , port = event ["address" ]
2422+ addr = self . _translate_address ( event ["address" ][ 0 ])
24172423 if change_type == "NEW_NODE" or change_type == "MOVED_NODE" :
24182424 if self ._topology_event_refresh_window >= 0 :
24192425 delay = self ._delay_for_event_type ('topology_change' , self ._topology_event_refresh_window )
@@ -2424,7 +2430,7 @@ def _handle_topology_change(self, event):
24242430
24252431 def _handle_status_change (self , event ):
24262432 change_type = event ["change_type" ]
2427- addr , port = event ["address" ]
2433+ addr = self . _translate_address ( event ["address" ][ 0 ])
24282434 host = self ._cluster .metadata .get_host (addr )
24292435 if change_type == "UP" :
24302436 delay = 1 + self ._delay_for_event_type ('status_change' , 0.5 ) # randomness to avoid thundering herd problem on events
@@ -2442,6 +2448,9 @@ def _handle_status_change(self, event):
24422448 # this will be run by the scheduler
24432449 self ._cluster .on_down (host , is_host_addition = False )
24442450
2451+ def _translate_address (self , addr ):
2452+ return self ._cluster .address_translator .translate (addr )
2453+
24452454 def _handle_schema_change (self , event ):
24462455 if self ._schema_event_refresh_window < 0 :
24472456 return
@@ -2523,11 +2532,7 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):
25232532 schema_ver = row .get ('schema_version' )
25242533 if not schema_ver :
25252534 continue
2526-
2527- addr = row .get ("rpc_address" )
2528- if not addr or addr in ["0.0.0.0" , "::" ]:
2529- addr = row .get ("peer" )
2530-
2535+ addr = self ._rpc_from_peer_row (row )
25312536 peer = self ._cluster .metadata .get_host (addr )
25322537 if peer and peer .is_up :
25332538 versions [schema_ver ].add (addr )
@@ -2538,6 +2543,12 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):
25382543
25392544 return dict ((version , list (nodes )) for version , nodes in six .iteritems (versions ))
25402545
2546+ def _rpc_from_peer_row (self , row ):
2547+ addr = row .get ("rpc_address" )
2548+ if not addr or addr in ["0.0.0.0" , "::" ]:
2549+ addr = row .get ("peer" )
2550+ return self ._translate_address (addr )
2551+
25412552 def _signal_error (self ):
25422553 with self ._lock :
25432554 if self ._is_shutdown :
0 commit comments