Skip to content

Commit 427dfd9

Browse files
committed
Merge pull request apache#300 from datastax/PYTHON-160
PYTHON-160 - Token-, DC-Aware default load balancing
2 parents 8ab437f + 45a09d3 commit 427dfd9

2 files changed

Lines changed: 50 additions & 24 deletions

File tree

cassandra/cluster.py

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS,
6363
RESULT_KIND_SCHEMA_CHANGE)
6464
from cassandra.metadata import Metadata, protect_name
65-
from cassandra.policies import (RoundRobinPolicy, SimpleConvictionPolicy,
65+
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
6666
ExponentialReconnectionPolicy, HostDistance,
6767
RetryPolicy)
6868
from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
@@ -169,6 +169,16 @@ def _shutdown_cluster(cluster):
169169
cluster.shutdown()
170170

171171

172+
# murmur3 implementation required for TokenAware is only available for CPython
173+
import platform
174+
if platform.python_implementation() == 'CPython':
175+
def default_lbp_factory():
176+
return TokenAwarePolicy(DCAwareRoundRobinPolicy())
177+
else:
178+
def default_lbp_factory():
179+
return DCAwareRoundRobinPolicy()
180+
181+
172182
class Cluster(object):
173183
"""
174184
The main class to use when interacting with a Cassandra cluster.
@@ -193,9 +203,9 @@ class Cluster(object):
193203
Defaults to loopback interface.
194204
195205
Note: When using :class:`.DCAwareLoadBalancingPolicy` with no explicit
196-
local_dc set, the DC is chosen from an arbitrary host in contact_points.
197-
In this case, contact_points should contain only nodes from a single,
198-
local DC.
206+
local_dc set (as is the default), the DC is chosen from an arbitrary
207+
host in contact_points. In this case, contact_points should contain
208+
only nodes from a single, local DC.
199209
"""
200210

201211
port = 9042
@@ -289,7 +299,16 @@ def auth_provider(self, value):
289299
load_balancing_policy = None
290300
"""
291301
An instance of :class:`.policies.LoadBalancingPolicy` or
292-
one of its subclasses. Defaults to :class:`~.RoundRobinPolicy`.
302+
one of its subclasses.
303+
304+
.. versionchanged:: 2.6.0
305+
306+
Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`).
307+
when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy`
308+
otherwise. Default local DC will be chosen from contact points.
309+
310+
**Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to
311+
DC locality and remote nodes.**
293312
"""
294313

295314
reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0)
@@ -319,6 +338,8 @@ def auth_provider(self, value):
319338
by the :attr:`~.Cluster.load_balancing_policy` will have a connection
320339
opened to them. Otherwise, they will not have a connection opened to them.
321340
341+
Note that the default load balancing policy ignores remote hosts by default.
342+
322343
.. versionadded:: 2.1.0
323344
"""
324345

@@ -503,7 +524,7 @@ def __init__(self,
503524

504525
self.load_balancing_policy = load_balancing_policy
505526
else:
506-
self.load_balancing_policy = RoundRobinPolicy()
527+
self.load_balancing_policy = default_lbp_factory()
507528

508529
if reconnection_policy is not None:
509530
if isinstance(reconnection_policy, type):
@@ -2114,20 +2135,21 @@ def _submit(self, *args, **kwargs):
21142135
return None
21152136

21162137
def shutdown(self):
2138+
# stop trying to reconnect (if we are)
2139+
with self._reconnection_lock:
2140+
if self._reconnection_handler:
2141+
self._reconnection_handler.cancel()
2142+
21172143
with self._lock:
21182144
if self._is_shutdown:
21192145
return
21202146
else:
21212147
self._is_shutdown = True
21222148

2123-
log.debug("Shutting down control connection")
2124-
# stop trying to reconnect (if we are)
2125-
if self._reconnection_handler:
2126-
self._reconnection_handler.cancel()
2127-
2128-
if self._connection:
2129-
self._connection.close()
2130-
del self._connection
2149+
log.debug("Shutting down control connection")
2150+
if self._connection:
2151+
self._connection.close()
2152+
del self._connection
21312153

21322154
def refresh_schema(self, keyspace=None, table=None, usertype=None, function=None,
21332155
aggregate=None, schema_agreement_wait=None):
@@ -2545,17 +2567,21 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):
25452567
return dict((version, list(nodes)) for version, nodes in six.iteritems(versions))
25462568

25472569
def _signal_error(self):
2548-
# try just signaling the cluster, as this will trigger a reconnect
2549-
# as part of marking the host down
2550-
if self._connection and self._connection.is_defunct:
2551-
host = self._cluster.metadata.get_host(self._connection.host)
2552-
# host may be None if it's already been removed, but that indicates
2553-
# that errors have already been reported, so we're fine
2554-
if host:
2555-
self._cluster.signal_connection_failure(
2556-
host, self._connection.last_error, is_host_addition=False)
2570+
with self._lock:
2571+
if self._is_shutdown:
25572572
return
25582573

2574+
# try just signaling the cluster, as this will trigger a reconnect
2575+
# as part of marking the host down
2576+
if self._connection and self._connection.is_defunct:
2577+
host = self._cluster.metadata.get_host(self._connection.host)
2578+
# host may be None if it's already been removed, but that indicates
2579+
# that errors have already been reported, so we're fine
2580+
if host:
2581+
self._cluster.signal_connection_failure(
2582+
host, self._connection.last_error, is_host_addition=False)
2583+
return
2584+
25592585
# if the connection is not defunct or the host already left, reconnect
25602586
# manually
25612587
self.reconnect()

tests/integration/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def teardown_package():
232232
log.exception('Failed to remove cluster: %s' % cluster_name)
233233

234234
except Exception:
235-
log.warn('Did not find cluster: %s' % cluster_name)
235+
log.warning('Did not find cluster: %s' % cluster_name)
236236

237237

238238
def setup_keyspace(ipformat=None):

0 commit comments

Comments
 (0)