Skip to content

Commit 9401668

Browse files
committed
added AddressTranlator interface
PYTHON-69
1 parent d4bf402 commit 9401668

2 files changed

Lines changed: 55 additions & 19 deletions

File tree

cassandra/cluster.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
from cassandra.metadata import Metadata, protect_name, murmur3
6666
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
6767
ExponentialReconnectionPolicy, HostDistance,
68-
RetryPolicy)
68+
RetryPolicy, IdentityTranslator)
6969
from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
7070
HostConnectionPool, HostConnection,
7171
NoConnectionsAvailable)
@@ -347,6 +347,12 @@ def auth_provider(self, value):
347347
:class:`.policies.SimpleConvictionPolicy`.
348348
"""
349349

350+
address_translator = IdentityTranslator()
351+
"""
352+
:class:`.policies.AddressTranslator` instance to be used in translating server node addresses
353+
to driver connection addresses.
354+
"""
355+
350356
connect_to_remote_hosts = True
351357
"""
352358
If left as :const:`True`, hosts that are considered :attr:`~.HostDistance.REMOTE`
@@ -553,7 +559,8 @@ def __init__(self,
553559
topology_event_refresh_window=10,
554560
connect_timeout=5,
555561
schema_metadata_enabled=True,
556-
token_metadata_enabled=True):
562+
token_metadata_enabled=True,
563+
address_translator=None):
557564
"""
558565
Any of the mutable Cluster attributes may be set as keyword arguments
559566
to the constructor.
@@ -572,28 +579,30 @@ def __init__(self,
572579
if load_balancing_policy is not None:
573580
if isinstance(load_balancing_policy, type):
574581
raise TypeError("load_balancing_policy should not be a class, it should be an instance of that class")
575-
576582
self.load_balancing_policy = load_balancing_policy
577583
else:
578584
self.load_balancing_policy = default_lbp_factory()
579585

580586
if reconnection_policy is not None:
581587
if isinstance(reconnection_policy, type):
582588
raise TypeError("reconnection_policy should not be a class, it should be an instance of that class")
583-
584589
self.reconnection_policy = reconnection_policy
585590

586591
if default_retry_policy is not None:
587592
if isinstance(default_retry_policy, type):
588593
raise TypeError("default_retry_policy should not be a class, it should be an instance of that class")
589-
590594
self.default_retry_policy = default_retry_policy
591595

592596
if conviction_policy_factory is not None:
593597
if not callable(conviction_policy_factory):
594598
raise ValueError("conviction_policy_factory must be callable")
595599
self.conviction_policy_factory = conviction_policy_factory
596600

601+
if address_translator is not None:
602+
if isinstance(address_translator, type):
603+
raise TypeError("address_translator should not be a class, it should be an instance of that class")
604+
self.address_translator = address_translator
605+
597606
if connection_class is not None:
598607
self.connection_class = connection_class
599608

@@ -2348,10 +2357,7 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
23482357
should_rebuild_token_map = force_token_rebuild or self._cluster.metadata.partitioner is None
23492358
found_hosts = set()
23502359
for row in peers_result:
2351-
addr = row.get("rpc_address")
2352-
2353-
if not addr or addr in ["0.0.0.0", "::"]:
2354-
addr = row.get("peer")
2360+
addr = self._rpc_from_peer_row(row)
23552361

23562362
tokens = row.get("tokens", None)
23572363
if 'tokens' in row and not tokens: # it was selected, but empty
@@ -2413,7 +2419,7 @@ def _delay_for_event_type(self, event_type, delay_window):
24132419

24142420
def _handle_topology_change(self, event):
24152421
change_type = event["change_type"]
2416-
addr, port = event["address"]
2422+
addr = self._translate_address(event["address"][0])
24172423
if change_type == "NEW_NODE" or change_type == "MOVED_NODE":
24182424
if self._topology_event_refresh_window >= 0:
24192425
delay = self._delay_for_event_type('topology_change', self._topology_event_refresh_window)
@@ -2424,7 +2430,7 @@ def _handle_topology_change(self, event):
24242430

24252431
def _handle_status_change(self, event):
24262432
change_type = event["change_type"]
2427-
addr, port = event["address"]
2433+
addr = self._translate_address(event["address"][0])
24282434
host = self._cluster.metadata.get_host(addr)
24292435
if change_type == "UP":
24302436
delay = 1 + self._delay_for_event_type('status_change', 0.5) # randomness to avoid thundering herd problem on events
@@ -2442,6 +2448,9 @@ def _handle_status_change(self, event):
24422448
# this will be run by the scheduler
24432449
self._cluster.on_down(host, is_host_addition=False)
24442450

2451+
def _translate_address(self, addr):
2452+
return self._cluster.address_translator.translate(addr)
2453+
24452454
def _handle_schema_change(self, event):
24462455
if self._schema_event_refresh_window < 0:
24472456
return
@@ -2523,11 +2532,7 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):
25232532
schema_ver = row.get('schema_version')
25242533
if not schema_ver:
25252534
continue
2526-
2527-
addr = row.get("rpc_address")
2528-
if not addr or addr in ["0.0.0.0", "::"]:
2529-
addr = row.get("peer")
2530-
2535+
addr = self._rpc_from_peer_row(row)
25312536
peer = self._cluster.metadata.get_host(addr)
25322537
if peer and peer.is_up:
25332538
versions[schema_ver].add(addr)
@@ -2538,6 +2543,12 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):
25382543

25392544
return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))
25402545

2546+
def _rpc_from_peer_row(self, row):
2547+
addr = row.get("rpc_address")
2548+
if not addr or addr in ["0.0.0.0", "::"]:
2549+
addr = row.get("peer")
2550+
return self._translate_address(addr)
2551+
25412552
def _signal_error(self):
25422553
with self._lock:
25432554
if self._is_shutdown:

cassandra/policies.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,9 @@
1616
import logging
1717
from random import randint
1818
from threading import Lock
19-
import six
2019

2120
from cassandra import ConsistencyLevel
2221

23-
from six.moves import range
24-
2522
log = logging.getLogger(__name__)
2623

2724

@@ -849,3 +846,31 @@ def on_unavailable(self, query, consistency, required_replicas, alive_replicas,
849846
return (self.RETHROW, None)
850847
else:
851848
return self._pick_consistency(alive_replicas)
849+
850+
851+
class AddressTranslator(object):
852+
"""
853+
Interface for translating cluster-defined endpoints.
854+
855+
The driver discovers nodes using server metadata and topology change events. Normally,
856+
the endpoint defined by the server is the right way to connect to a node. In some environments,
857+
these addresses may not be reachable, or not preferred (public vs. private IPs in cloud environments,
858+
suboptimal routing, etc). This interface allows for translating from server defined endpoints to
859+
preferred addresses for driver connections.
860+
861+
*Note:* :attr:`~Cluster.contact_points` provided while creating the :class:`~.Cluster` instance are not
862+
translated using this mechanism -- only addresses received from Cassandra nodes are.
863+
"""
864+
def translate(self, addr):
865+
"""
866+
Accepts the node ip address, and returns a translated address to be used connecting to this node.
867+
"""
868+
raise NotImplementedError
869+
870+
871+
class IdentityTranslator(AddressTranslator):
872+
"""
873+
Returns the endpoint with no translation
874+
"""
875+
def translate(self, addr):
876+
return addr

0 commit comments

Comments
 (0)