Skip to content

Commit aa0798d

Browse files
committed
Merge branch 'master' into 2.0
Conflicts: cassandra/__init__.py tests/integration/__init__.py tests/unit/test_types.py
2 parents b84b602 + ca9a8a6 commit aa0798d

12 files changed

Lines changed: 234 additions & 73 deletions

File tree

CHANGELOG.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,14 @@ Bug Fixes
1010
closing excess connections
1111
* Avoid handling a node coming up multiple times due to a reconnection attempt
1212
succeeding close to the same time that an UP notification is pushed
13+
* Fix duplicate node-up handling, which could result in multiple reconnectors
14+
being started as well as the executor threads becoming deadlocked, preventing
15+
future node up or node down handling from being executed.
16+
17+
Other
18+
-----
19+
* Don't log at ERROR when a connection is closed during the startup
20+
communications
1321

1422
1.1.1
1523
=====

cassandra/cluster.py

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -584,11 +584,8 @@ def _on_up_future_completed(self, host, futures, results, lock, finished_future)
584584
for listener in self.listeners:
585585
listener.on_up(host)
586586
finally:
587-
host._handle_node_up_condition.acquire()
588-
if host._currently_handling_node_up:
587+
with host.lock:
589588
host._currently_handling_node_up = False
590-
host._handle_node_up_condition.notify()
591-
host._handle_node_up_condition.release()
592589

593590
# see if there are any pools to add or remove now that the host is marked up
594591
for session in self.sessions:
@@ -601,39 +598,49 @@ def on_up(self, host):
601598
if self.is_shutdown:
602599
return
603600

604-
host._handle_node_up_condition.acquire()
605-
while host._currently_handling_node_up:
606-
host._handle_node_up_condition.wait()
607-
host._currently_handling_node_up = True
608-
host._handle_node_up_condition.release()
601+
log.debug("Waiting to acquire lock for handling up status of node %s", host)
602+
with host.lock:
603+
if host._currently_handling_node_up:
604+
log.debug("Another thread is already handling up status of node %s", host)
605+
return
609606

610-
if host.is_up:
611-
return
607+
if host.is_up:
608+
log.debug("Host %s was already marked up", host)
609+
return
612610

611+
host._currently_handling_node_up = True
612+
log.debug("Starting to handle up status of node %s", host)
613+
614+
have_future = False
613615
futures = set()
614616
try:
615-
log.info("Host %s has been marked up", host)
617+
log.info("Host %s may be up; will prepare queries and open connection pool", host)
616618

617619
reconnector = host.get_and_set_reconnection_handler(None)
618620
if reconnector:
619621
log.debug("Now that host %s is up, cancelling the reconnection handler", host)
620622
reconnector.cancel()
621623

622624
self._prepare_all_queries(host)
623-
log.debug("Done preparing all queries for host %s", host)
625+
log.debug("Done preparing all queries for host %s, ", host)
624626

625627
for session in self.sessions:
626628
session.remove_pool(host)
627629

630+
log.debug("Signalling to load balancing policy that host %s is up", host)
628631
self.load_balancing_policy.on_up(host)
632+
633+
log.debug("Signalling to control connection that host %s is up", host)
629634
self.control_connection.on_up(host)
630635

636+
log.debug("Attempting to open new connection pools for host %s", host)
631637
futures_lock = Lock()
632638
futures_results = []
633639
callback = partial(self._on_up_future_completed, host, futures, futures_results, futures_lock)
634640
for session in self.sessions:
635641
future = session.add_or_renew_pool(host, is_host_addition=False)
636642
if future is not None:
643+
have_future = True
637644
future.add_done_callback(callback)
638645
futures.add(future)
639646
except Exception:
@@ -643,11 +650,13 @@ def on_up(self, host):
643650

644651
self._cleanup_failed_on_up_handling(host)
645652

646-
host._handle_node_up_condition.acquire()
647-
host._currently_handling_node_up = False
648-
host._handle_node_up_condition.notify()
649-
host._handle_node_up_condition.release()
653+
with host.lock:
654+
host._currently_handling_node_up = False
650655
raise
656+
else:
657+
if not have_future:
658+
with host.lock:
659+
host._currently_handling_node_up = False
651660

652661
# for testing purposes
653662
return futures
@@ -670,19 +679,19 @@ def _start_reconnector(self, host, is_host_addition):
670679
log.debug("Old host reconnector found for %s, cancelling", host)
671680
old_reconnector.cancel()
672681

673-
log.debug("Staring reconnector for host %s", host)
682+
log.debug("Starting reconnector for host %s", host)
674683
reconnector.start()
675684

676685
@run_in_executor
677-
def on_down(self, host, is_host_addition, force_if_down=False):
686+
def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
678687
"""
679688
Intended for internal use only.
680689
"""
681690
if self.is_shutdown:
682691
return
683692

684693
with host.lock:
685-
if (not (host.is_up or force_if_down)) or host.is_currently_reconnecting():
694+
if (not host.is_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
686695
return
687696

688697
host.set_down()
@@ -772,10 +781,10 @@ def on_remove(self, host):
772781
listener.on_remove(host)
773782
self.control_connection.on_remove(host)
774783

775-
def signal_connection_failure(self, host, connection_exc, is_host_addition):
784+
def signal_connection_failure(self, host, connection_exc, is_host_addition, expect_host_to_be_down=False):
776785
is_down = host.signal_connection_failure(connection_exc)
777786
if is_down:
778-
self.on_down(host, is_host_addition, force_if_down=True)
787+
self.on_down(host, is_host_addition, expect_host_to_be_down)
779788
return is_down
780789

781790
def add_host(self, address, datacenter=None, rack=None, signal=True):
@@ -1226,8 +1235,12 @@ def run_add_or_renew_pool():
12261235
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
12271236
return False
12281237
except Exception as conn_exc:
1229-
log.warn("Failed to create connection pool for new host %s: %s", host, conn_exc)
1230-
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
1238+
log.warn("Failed to create connection pool for new host %s: %s",
1239+
host, conn_exc)
1240+
# the host itself will still be marked down, so we need to pass
1241+
# a special flag to make sure the reconnector is created
1242+
self.cluster.signal_connection_failure(
1243+
host, conn_exc, is_host_addition, expect_host_to_be_down=True)
12311244
return False
12321245

12331246
previous = self._pools.get(host)

cassandra/connection.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ def process_msg(self, msg, body_len):
316316
response = decode_response(stream_id, flags, opcode, body, self.decompressor)
317317
except Exception as exc:
318318
log.exception("Error decoding response from Cassandra. "
319-
"opcode: %04x; message contents: %r", opcode, body)
319+
"opcode: %04x; message contents: %r", opcode, msg)
320320
if callback is not None:
321321
callback(exc)
322322
self.defunct(exc)
@@ -429,6 +429,9 @@ def _handle_startup_response(self, startup_response, did_authenticate=False):
429429
raise ConnectionException(
430430
"Failed to initialize new connection to %s: %s"
431431
% (self.host, startup_response.summary_msg()))
432+
elif isinstance(startup_response, ConnectionShutdown):
433+
log.debug("Connection to %s was closed during the startup handshake", (self.host))
434+
raise startup_response
432435
else:
433436
msg = "Unexpected response during Connection setup: %r"
434437
log.error(msg, startup_response)

cassandra/pool.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ class Host(object):
8383
lock = None
8484

8585
_currently_handling_node_up = False
86-
_handle_node_up_condition = None
8786

8887
def __init__(self, inet_address, conviction_policy_factory, datacenter=None, rack=None):
8988
if inet_address is None:
@@ -95,7 +94,6 @@ def __init__(self, inet_address, conviction_policy_factory, datacenter=None, rac
9594
self.conviction_policy = conviction_policy_factory(self)
9695
self.set_location_info(datacenter, rack)
9796
self.lock = RLock()
98-
self._handle_node_up_condition = Condition()
9997

10098
@property
10199
def datacenter(self):
@@ -177,11 +175,9 @@ def __init__(self, scheduler, schedule, callback, *callback_args, **callback_kwa
177175

178176
def start(self):
179177
if self._cancelled:
178+
log.debug("Reconnection handler was cancelled before starting")
180179
return
181180

182-
# TODO cancel previous reconnection handlers? That's probably the job
183-
# of whatever created this.
184-
185181
first_delay = self.schedule.next()
186182
self.scheduler.schedule(first_delay, self.run)
187183

docs/performance.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ be consumed by the list of Futures:
169169
~/python-driver $ python benchmarks/future_full_throttle.py -n 100000 --hosts=127.0.0.1,127.0.0.2,127.0.0.3 --libev-only --threads=8
170170
Average throughput: 2165.29/sec
171171
172-
Callback Chaining (`callbacks_full_pipeline.py <https://github.com/datastax/python-driver/blob/master/benchmarks/callbacks_full_pipeline.py>`_)
172+
Callback Chaining (`callback_full_pipeline.py <https://github.com/datastax/python-driver/blob/master/benchmarks/callback_full_pipeline.py>`_)
173173
-----------------------------------------------------------------------------------------------------------------------------------------------
174174
This pattern is very different from the previous patterns. Here we're taking
175175
advantage of the :meth:`.ResponseFuture.add_callback()` function to start

example.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#!/usr/bin/env python
2+
13
# Copyright 2013-2014 DataStax, Inc.
24
#
35
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,8 +14,6 @@
1214
# See the License for the specific language governing permissions and
1315
# limitations under the License.
1416

15-
#!/usr/bin/env python
16-
1717
import logging
1818

1919
log = logging.getLogger()

tests/integration/long/test_schema.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import logging
1616

17-
from cassandra import ConsistencyLevel
17+
from cassandra import ConsistencyLevel, OperationTimedOut
1818
from cassandra.cluster import Cluster
1919
from cassandra.query import SimpleStatement
2020
from tests.integration import PROTOCOL_VERSION
@@ -64,3 +64,57 @@ def test_recreates(self):
6464
ss = SimpleStatement(statement,
6565
consistency_level=ConsistencyLevel.QUORUM)
6666
session.execute(ss)
67+
68+
def test_for_schema_disagreements_different_keyspaces(self):
69+
cluster = Cluster()
70+
session = cluster.connect()
71+
72+
for i in xrange(30):
73+
try:
74+
session.execute('''
75+
CREATE KEYSPACE test_%s
76+
WITH replication = {'class': 'SimpleStrategy',
77+
'replication_factor': 1}
78+
''' % i)
79+
80+
session.execute('''
81+
CREATE TABLE test_%s.cf (
82+
key int,
83+
value int,
84+
PRIMARY KEY (key))
85+
''' % i)
86+
87+
for j in xrange(100):
88+
session.execute('INSERT INTO test_%s.cf (key, value) VALUES (%s, %s)' % (i, j, j))
89+
90+
session.execute('''
91+
DROP KEYSPACE test_%s
92+
''' % i)
93+
except OperationTimedOut: pass
94+
95+
def test_for_schema_disagreements_same_keyspace(self):
96+
cluster = Cluster()
97+
session = cluster.connect()
98+
99+
for i in xrange(30):
100+
try:
101+
session.execute('''
102+
CREATE KEYSPACE test
103+
WITH replication = {'class': 'SimpleStrategy',
104+
'replication_factor': 1}
105+
''')
106+
107+
session.execute('''
108+
CREATE TABLE test.cf (
109+
key int,
110+
value int,
111+
PRIMARY KEY (key))
112+
''')
113+
114+
for j in xrange(100):
115+
session.execute('INSERT INTO test.cf (key, value) VALUES (%s, %s)' % (j, j))
116+
117+
session.execute('''
118+
DROP KEYSPACE test
119+
''')
120+
except OperationTimedOut: pass

tests/unit/test_connection.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
from cassandra.cluster import Cluster
1415

1516
try:
1617
import unittest2 as unittest
@@ -153,3 +154,18 @@ def test_not_implemented(self):
153154
self.assertRaises(NotImplementedError, c.close)
154155
self.assertRaises(NotImplementedError, c.register_watcher, None, None)
155156
self.assertRaises(NotImplementedError, c.register_watchers, None)
157+
158+
def test_set_keyspace_blocking(self):
159+
c = self.make_connection()
160+
161+
self.assertEqual(c.keyspace, None)
162+
c.set_keyspace_blocking(None)
163+
self.assertEqual(c.keyspace, None)
164+
165+
c.keyspace = 'ks'
166+
c.set_keyspace_blocking('ks')
167+
self.assertEqual(c.keyspace, 'ks')
168+
169+
def test_set_connection_class(self):
170+
cluster = Cluster(connection_class='test')
171+
self.assertEqual('test', cluster.connection_class)

tests/unit/test_marshalling.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
from cassandra.marshal import bitlength
1415

1516
try:
1617
import unittest2 as unittest
@@ -126,3 +127,8 @@ def test_marshalling(self):
126127
self.assertEqual(type(whatwegot), type(serializedval),
127128
msg='Marshaller for %s (%s) gave wrong type (%s instead of %s)'
128129
% (valtype, marshaller, type(whatwegot), type(serializedval)))
130+
131+
def test_bitlength(self):
132+
self.assertEqual(bitlength(9), 4)
133+
self.assertEqual(bitlength(-10), 0)
134+
self.assertEqual(bitlength(0), 0)

tests/unit/test_metadata.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ def test_simple_strategy_make_token_replica_map(self):
147147
self.assertItemsEqual(rf3_replicas[MD5Token(200)], [host3, host1, host2])
148148

149149

150+
def test_ss_equals(self):
151+
self.assertNotEqual(SimpleStrategy(1), NetworkTopologyStrategy(2))
152+
150153
class TestNameEscaping(unittest.TestCase):
151154

152155
def test_protect_name(self):

0 commit comments

Comments
 (0)