Skip to content

Commit 8b78f37

Browse files
committed
Avoid deadlock when nodes go up and down
1 parent 4834683 commit 8b78f37

10 files changed

Lines changed: 334 additions & 238 deletions

cassandra/cluster.py

Lines changed: 243 additions & 84 deletions
Large diffs are not rendered by default.

cassandra/metadata.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,6 @@ def add_host(self, address):
306306
else:
307307
return None
308308

309-
new_host.monitor.register(cluster)
310309
return new_host
311310

312311
def remove_host(self, host):

cassandra/policies.py

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,29 @@ class HostDistance(object):
4242
"""
4343

4444

45-
class LoadBalancingPolicy(object):
45+
class HostStateListener(object):
46+
47+
def on_up(self, host):
48+
""" Called when a node is marked up. """
49+
raise NotImplementedError()
50+
51+
def on_down(self, host):
52+
""" Called when a node is marked down. """
53+
raise NotImplementedError()
54+
55+
def on_add(self, host):
56+
"""
57+
Called when a node is added to the cluster. The newly added node
58+
should be considered up.
59+
"""
60+
raise NotImplementedError()
61+
62+
def on_remove(self, host):
63+
""" Called when a node is removed from the cluster. """
64+
raise NotImplementedError()
65+
66+
67+
class LoadBalancingPolicy(HostStateListener):
4668
"""
4769
Load balancing policies are used to decide how to distribute
4870
requests among all possible coordinator nodes in the cluster.
@@ -87,36 +109,6 @@ def make_query_plan(self, working_keyspace=None, query=None):
87109
"""
88110
raise NotImplementedError()
89111

90-
def on_up(self, host):
91-
"""
92-
Called when a :class:`~.pool.Host`'s :class:`~.HealthMonitor`
93-
marks the node up.
94-
"""
95-
raise NotImplementedError()
96-
97-
def on_down(self, host):
98-
"""
99-
Called when a :class:`~.pool.Host`'s :class:`~.HealthMonitor`
100-
marks the node down.
101-
"""
102-
raise NotImplementedError()
103-
104-
def on_add(self, host):
105-
"""
106-
Called when a :class:`.Cluster` instance is first created and
107-
the initial contact points are added as well as when a new
108-
:class:`~.pool.Host` is discovered in the cluster, which may
109-
happen the first time the ring topology is examined or when
110-
a new node joins the cluster.
111-
"""
112-
raise NotImplementedError()
113-
114-
def on_remove(self, host):
115-
"""
116-
Called when a :class:`~.pool.Host` leaves the cluster.
117-
"""
118-
raise NotImplementedError()
119-
120112

121113
class RoundRobinPolicy(LoadBalancingPolicy):
122114
"""
@@ -300,7 +292,7 @@ def make_query_plan(self, working_keyspace=None, query=None):
300292
else:
301293
replicas = self._cluster_metadata.get_replicas(keyspace, routing_key)
302294
for replica in replicas:
303-
if replica.monitor.is_up and \
295+
if replica.is_up and \
304296
child.distance(replica) == HostDistance.LOCAL:
305297
yield replica
306298

cassandra/pool.py

Lines changed: 39 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import logging
66
import time
7-
from threading import Lock, RLock, Condition
7+
from threading import RLock, Condition
88
import weakref
99
try:
1010
from weakref import WeakSet
@@ -35,15 +35,18 @@ class Host(object):
3535
The IP address or hostname of the node.
3636
"""
3737

38-
monitor = None
38+
conviction_policy = None
3939
"""
40-
A :class:`.HealthMonitor` instance that tracks whether this node is
41-
up or down.
40+
A class:`ConvictionPolicy` instance for determining when this node should
41+
be marked up or down.
4242
"""
4343

44+
is_up = None
45+
4446
_datacenter = None
4547
_rack = None
4648
_reconnection_handler = None
49+
lock = None
4750

4851
def __init__(self, inet_address, conviction_policy_factory):
4952
if inet_address is None:
@@ -52,9 +55,8 @@ def __init__(self, inet_address, conviction_policy_factory):
5255
raise ValueError("conviction_policy_factory may not be None")
5356

5457
self.address = inet_address
55-
self.monitor = HealthMonitor(conviction_policy_factory(self))
56-
57-
self._reconnection_lock = Lock()
58+
self.conviction_policy = conviction_policy_factory(self)
59+
self.lock = RLock()
5860

5961
@property
6062
def datacenter(self):
@@ -75,12 +77,25 @@ def set_location_info(self, datacenter, rack):
7577
self._datacenter = datacenter
7678
self._rack = rack
7779

80+
def set_up(self):
81+
self.conviction_policy.reset()
82+
self.is_up = True
83+
84+
def set_down(self):
85+
self.is_up = False
86+
87+
def signal_connection_failure(self, connection_exc):
88+
return self.conviction_policy.add_failure(connection_exc)
89+
90+
def is_currently_reconnecting(self):
91+
return self._reconnection_handler is not None
92+
7893
def get_and_set_reconnection_handler(self, new_handler):
7994
"""
8095
Atomically replaces the reconnection handler for this
8196
host. Intended for internal use only.
8297
"""
83-
with self._reconnection_lock:
98+
with self.lock:
8499
old = self._reconnection_handler
85100
self._reconnection_handler = new_handler
86101
return old
@@ -175,94 +190,35 @@ def on_exception(self, exc, next_delay):
175190

176191
class _HostReconnectionHandler(_ReconnectionHandler):
177192

178-
def __init__(self, host, connection_factory, *args, **kwargs):
193+
def __init__(self, host, connection_factory, is_host_addition, on_add, on_up, *args, **kwargs):
179194
_ReconnectionHandler.__init__(self, *args, **kwargs)
195+
self.is_host_addition = is_host_addition
196+
self.on_add = on_add
197+
self.on_up = on_up
180198
self.host = host
181199
self.connection_factory = connection_factory
182200

183201
def try_reconnect(self):
184202
return self.connection_factory()
185203

186204
def on_reconnection(self, connection):
187-
self.host.monitor.reset()
205+
connection.close()
206+
log.info("Successful reconnection to %s, marking node up", self.host)
207+
if self.is_host_addition:
208+
self.on_add(self.host)
209+
else:
210+
self.on_up(self.host)
188211

189212
def on_exception(self, exc, next_delay):
190213
if isinstance(exc, AuthenticationFailed):
191214
return False
192215
else:
193-
log.warn("Error attempting to reconnect to %s: %s", self.host, exc)
216+
log.warn("Error attempting to reconnect to %s, scheduling retry in %f seconds: %s",
217+
self.host, next_delay, exc)
194218
log.debug("Reconnection error details", exc_info=True)
195219
return True
196220

197221

198-
class HealthMonitor(object):
199-
"""
200-
Monitors whether a particular host is marked as up or down.
201-
This class is primarily intended for internal use, although
202-
applications may find it useful to check whether a given node
203-
is up or down.
204-
"""
205-
206-
is_up = True
207-
"""
208-
A boolean representing the current state of the node.
209-
"""
210-
211-
def __init__(self, conviction_policy):
212-
self._conviction_policy = conviction_policy
213-
self._host = conviction_policy.host
214-
# self._listeners will hold, among other things, references to
215-
# Cluster objects. To allow those to be GC'ed (and shutdown) even
216-
# though we've implemented __del__, use weak references.
217-
self._listeners = WeakSet()
218-
self._lock = RLock()
219-
220-
def register(self, listener):
221-
with self._lock:
222-
self._listeners.add(listener)
223-
224-
def unregister(self, listener):
225-
with self._lock:
226-
self._listeners.remove(listener)
227-
228-
def set_up(self):
229-
if self.is_up:
230-
return
231-
232-
self._conviction_policy.reset()
233-
log.info("Host %s is considered up", self._host)
234-
235-
with self._lock:
236-
listeners = self._listeners.copy()
237-
238-
for listener in listeners:
239-
listener.on_up(self._host)
240-
241-
self.is_up = True
242-
243-
def set_down(self):
244-
if not self.is_up:
245-
return
246-
247-
self.is_up = False
248-
log.info("Host %s is considered down", self._host)
249-
250-
with self._lock:
251-
listeners = self._listeners.copy()
252-
253-
for listener in listeners:
254-
listener.on_down(self._host)
255-
256-
def reset(self):
257-
return self.set_up()
258-
259-
def signal_connection_failure(self, connection_exc):
260-
is_down = self._conviction_policy.add_failure(connection_exc)
261-
if is_down:
262-
self.set_down()
263-
return is_down
264-
265-
266222
_MAX_SIMULTANEOUS_CREATION = 1
267223
_NEW_CONNECTION_GRACE_PERIOD = 5
268224

@@ -295,6 +251,7 @@ def __init__(self, host, host_distance, session):
295251

296252
self._trash = set()
297253
self.open_count = core_conns
254+
log.debug("Finished initializing new connection pool for host %s", self.host)
298255

299256
def borrow_connection(self, timeout):
300257
if self.is_shutdown:
@@ -395,7 +352,7 @@ def _add_conn_if_under_max(self):
395352
log.exception("Failed to add new connection to pool for host %s", self.host)
396353
with self._lock:
397354
self.open_count -= 1
398-
if self.host.monitor.signal_connection_failure(exc):
355+
if self._session.cluster.signal_connection_failure(self.host, exc, is_host_addition=False):
399356
self.shutdown()
400357
return False
401358
except AuthenticationFailed:
@@ -448,7 +405,8 @@ def return_connection(self, connection):
448405
if connection.is_defunct or connection.is_closed:
449406
log.debug("Defunct or closed connection (%s) returned to pool, potentially "
450407
"marking host %s as down", id(connection), self.host)
451-
is_down = self.host.monitor.signal_connection_failure(connection.last_error)
408+
is_down = self._session.cluster.signal_connection_failure(
409+
self.host, connection.last_error, is_host_addition=False)
452410
if is_down:
453411
self.shutdown()
454412
else:

tests/integration/test_cluster.py

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def test_default_connections(self):
7474
Ensure errors are not thrown when using non-default policies
7575
"""
7676

77-
cluster = Cluster(
77+
Cluster(
7878
load_balancing_policy=RoundRobinPolicy(),
7979
reconnection_policy=ExponentialReconnectionPolicy(1.0, 600.0),
8080
default_retry_policy=RetryPolicy(),
@@ -177,28 +177,6 @@ def test_submit_schema_refresh(self):
177177

178178
self.assertIn("newkeyspace", cluster.metadata.keyspaces)
179179

180-
def test_on_down_and_up(self):
181-
"""
182-
Test on_down and on_up handling
183-
"""
184-
185-
cluster = Cluster()
186-
session = cluster.connect()
187-
host = cluster.metadata.all_hosts()[0]
188-
host.monitor.signal_connection_failure(None)
189-
cluster.on_down(host)
190-
self.assertNotIn(host, session._pools)
191-
host_reconnector = host._reconnection_handler
192-
self.assertNotEqual(None, host_reconnector)
193-
194-
host.monitor.is_up = True
195-
196-
cluster.on_up(host)
197-
198-
self.assertEqual(None, host._reconnection_handler)
199-
self.assertTrue(host_reconnector._cancelled)
200-
self.assertIn(host, session._pools)
201-
202180
def test_trace(self):
203181
"""
204182
Ensure trace can be requested for async and non-async queries

tests/unit/io/test_libevreactor.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
from cassandra.connection import (PROTOCOL_VERSION,
1313
HEADER_DIRECTION_TO_CLIENT,
14-
ProtocolError,
1514
ConnectionException)
1615

1716
from cassandra.decoder import (write_stringmultimap, write_int, write_string,
@@ -87,15 +86,15 @@ def test_protocol_error(self, *args):
8786
c.handle_write(None, None)
8887

8988
# read in a SupportedMessage response
90-
header = self.make_header_prefix(SupportedMessage, version=0x04)
89+
header = self.make_header_prefix(SupportedMessage, version=0xa4)
9190
options = self.make_options_body()
9291
c._socket.recv.return_value = self.make_msg(header, options)
9392
c.handle_read(None, None)
9493

9594
# make sure it errored correctly
9695
self.assertTrue(c.is_defunct)
9796
self.assertTrue(c.connected_event.is_set())
98-
self.assertIsInstance(c.last_error, ProtocolError)
97+
self.assertIsInstance(c.last_error, ConnectionException)
9998

10099
def test_error_message_on_startup(self, *args):
101100
c = self.make_connection()

0 commit comments

Comments
 (0)