6262 RESULT_KIND_SET_KEYSPACE , RESULT_KIND_ROWS ,
6363 RESULT_KIND_SCHEMA_CHANGE )
6464from cassandra .metadata import Metadata , protect_name
65- from cassandra .policies import (RoundRobinPolicy , SimpleConvictionPolicy ,
65+ from cassandra .policies import (TokenAwarePolicy , DCAwareRoundRobinPolicy , SimpleConvictionPolicy ,
6666 ExponentialReconnectionPolicy , HostDistance ,
6767 RetryPolicy )
6868from cassandra .pool import (Host , _ReconnectionHandler , _HostReconnectionHandler ,
@@ -169,6 +169,16 @@ def _shutdown_cluster(cluster):
169169 cluster .shutdown ()
170170
171171
172+ # murmur3 implementation required for TokenAware is only available for CPython
173+ import platform
174+ if platform .python_implementation () == 'CPython' :
175+ def default_lbp_factory ():
176+ return TokenAwarePolicy (DCAwareRoundRobinPolicy ())
177+ else :
178+ def default_lbp_factory ():
179+ return DCAwareRoundRobinPolicy ()
180+
181+
172182class Cluster (object ):
173183 """
174184 The main class to use when interacting with a Cassandra cluster.
@@ -193,9 +203,9 @@ class Cluster(object):
193203 Defaults to loopback interface.
194204
195205 Note: When using :class:`.DCAwareLoadBalancingPolicy` with no explicit
196- local_dc set, the DC is chosen from an arbitrary host in contact_points.
197- In this case, contact_points should contain only nodes from a single,
198- local DC.
206+ local_dc set (as is the default) , the DC is chosen from an arbitrary
207+ host in contact_points. In this case, contact_points should contain
208+ only nodes from a single, local DC.
199209 """
200210
201211 port = 9042
@@ -289,7 +299,16 @@ def auth_provider(self, value):
289299 load_balancing_policy = None
290300 """
291301 An instance of :class:`.policies.LoadBalancingPolicy` or
292- one of its subclasses. Defaults to :class:`~.RoundRobinPolicy`.
302+ one of its subclasses.
303+
304+ .. versionchanged:: 2.6.0
305+
306+ Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`).
307+ when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy`
308+ otherwise. Default local DC will be chosen from contact points.
309+
310+ **Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to
311+ DC locality and remote nodes.**
293312 """
294313
295314 reconnection_policy = ExponentialReconnectionPolicy (1.0 , 600.0 )
@@ -319,6 +338,8 @@ def auth_provider(self, value):
319338 by the :attr:`~.Cluster.load_balancing_policy` will have a connection
320339 opened to them. Otherwise, they will not have a connection opened to them.
321340
341+ Note that the default load balancing policy ignores remote hosts by default.
342+
322343 .. versionadded:: 2.1.0
323344 """
324345
@@ -503,7 +524,7 @@ def __init__(self,
503524
504525 self .load_balancing_policy = load_balancing_policy
505526 else :
506- self .load_balancing_policy = RoundRobinPolicy ()
527+ self .load_balancing_policy = default_lbp_factory ()
507528
508529 if reconnection_policy is not None :
509530 if isinstance (reconnection_policy , type ):
@@ -2114,20 +2135,21 @@ def _submit(self, *args, **kwargs):
21142135 return None
21152136
21162137 def shutdown (self ):
2138+ # stop trying to reconnect (if we are)
2139+ with self ._reconnection_lock :
2140+ if self ._reconnection_handler :
2141+ self ._reconnection_handler .cancel ()
2142+
21172143 with self ._lock :
21182144 if self ._is_shutdown :
21192145 return
21202146 else :
21212147 self ._is_shutdown = True
21222148
2123- log .debug ("Shutting down control connection" )
2124- # stop trying to reconnect (if we are)
2125- if self ._reconnection_handler :
2126- self ._reconnection_handler .cancel ()
2127-
2128- if self ._connection :
2129- self ._connection .close ()
2130- del self ._connection
2149+ log .debug ("Shutting down control connection" )
2150+ if self ._connection :
2151+ self ._connection .close ()
2152+ del self ._connection
21312153
21322154 def refresh_schema (self , keyspace = None , table = None , usertype = None , function = None ,
21332155 aggregate = None , schema_agreement_wait = None ):
@@ -2545,17 +2567,21 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):
25452567 return dict ((version , list (nodes )) for version , nodes in six .iteritems (versions ))
25462568
25472569 def _signal_error (self ):
2548- # try just signaling the cluster, as this will trigger a reconnect
2549- # as part of marking the host down
2550- if self ._connection and self ._connection .is_defunct :
2551- host = self ._cluster .metadata .get_host (self ._connection .host )
2552- # host may be None if it's already been removed, but that indicates
2553- # that errors have already been reported, so we're fine
2554- if host :
2555- self ._cluster .signal_connection_failure (
2556- host , self ._connection .last_error , is_host_addition = False )
2570+ with self ._lock :
2571+ if self ._is_shutdown :
25572572 return
25582573
2574+ # try just signaling the cluster, as this will trigger a reconnect
2575+ # as part of marking the host down
2576+ if self ._connection and self ._connection .is_defunct :
2577+ host = self ._cluster .metadata .get_host (self ._connection .host )
2578+ # host may be None if it's already been removed, but that indicates
2579+ # that errors have already been reported, so we're fine
2580+ if host :
2581+ self ._cluster .signal_connection_failure (
2582+ host , self ._connection .last_error , is_host_addition = False )
2583+ return
2584+
25592585 # if the connection is not defunct or the host already left, reconnect
25602586 # manually
25612587 self .reconnect ()
0 commit comments