Skip to content

Commit 88d33c7

Browse files
authored
Merge pull request apache#658 from datastax/628
PYTHON-628 - fix race adding connection pool to session while handling keyspace change
2 parents 87dd9df + 8bbe7ce commit 88d33c7

2 files changed

Lines changed: 37 additions & 11 deletions

File tree

cassandra/cluster.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2320,7 +2320,27 @@ def run_add_or_renew_pool():
23202320
return False
23212321

23222322
previous = self._pools.get(host)
2323-
self._pools[host] = new_pool
2323+
with self._lock:
2324+
while new_pool._keyspace != self.keyspace:
2325+
self._lock.release()
2326+
set_keyspace_event = Event()
2327+
errors_returned = []
2328+
2329+
def callback(pool, errors):
2330+
errors_returned.extend(errors)
2331+
set_keyspace_event.set()
2332+
2333+
new_pool._set_keyspace_for_all_conns(self.keyspace, callback)
2334+
set_keyspace_event.wait(self.cluster.connect_timeout)
2335+
if not set_keyspace_event.is_set() or errors_returned:
2336+
log.warning("Failed setting keyspace for pool after keyspace changed during connect: %s", errors_returned)
2337+
self.cluster.on_down(host, is_host_addition)
2338+
new_pool.shutdown()
2339+
self._lock.acquire()
2340+
return False
2341+
self._lock.acquire()
2342+
self._pools[host] = new_pool
2343+
23242344
log.debug("Added pool for host %s to session", host)
23252345
if previous:
23262346
previous.shutdown()
@@ -2397,9 +2417,9 @@ def _set_keyspace_for_all_pools(self, keyspace, callback):
23972417
called with a dictionary of all errors that occurred, keyed
23982418
by the `Host` that they occurred against.
23992419
"""
2400-
self.keyspace = keyspace
2401-
2402-
remaining_callbacks = set(self._pools.values())
2420+
with self._lock:
2421+
self.keyspace = keyspace
2422+
remaining_callbacks = set(self._pools.values())
24032423
errors = {}
24042424

24052425
if not remaining_callbacks:

cassandra/pool.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ class HostConnection(object):
307307
_session = None
308308
_connection = None
309309
_lock = None
310+
_keyspace = None
310311

311312
def __init__(self, host, host_distance, session):
312313
self.host = host
@@ -326,8 +327,9 @@ def __init__(self, host, host_distance, session):
326327

327328
log.debug("Initializing connection for host %s", self.host)
328329
self._connection = session.cluster.connection_factory(host.address)
329-
if session.keyspace:
330-
self._connection.set_keyspace_blocking(session.keyspace)
330+
self._keyspace = session.keyspace
331+
if self._keyspace:
332+
self._connection.set_keyspace_blocking(self._keyspace)
331333
log.debug("Finished initializing connection for host %s", self.host)
332334

333335
def borrow_connection(self, timeout):
@@ -381,8 +383,8 @@ def _replace(self, connection):
381383
log.debug("Replacing connection (%s) to %s", id(connection), self.host)
382384
try:
383385
conn = self._session.cluster.connection_factory(self.host.address)
384-
if self._session.keyspace:
385-
conn.set_keyspace_blocking(self._session.keyspace)
386+
if self._keyspace:
387+
conn.set_keyspace_blocking(self._keyspace)
386388
self._connection = conn
387389
except Exception:
388390
log.warning("Failed reconnecting %s. Retrying." % (self.host.address,))
@@ -412,6 +414,7 @@ def connection_finished_setting_keyspace(conn, error):
412414
errors = [] if not error else [error]
413415
callback(self, errors)
414416

417+
self._keyspace = keyspace
415418
self._connection.set_keyspace_async(keyspace, connection_finished_setting_keyspace)
416419

417420
def get_connections(self):
@@ -445,6 +448,7 @@ class HostConnectionPool(object):
445448
open_count = 0
446449
_scheduled_for_creation = 0
447450
_next_trash_allowed_at = 0
451+
_keyspace = None
448452

449453
def __init__(self, host, host_distance, session):
450454
self.host = host
@@ -459,9 +463,10 @@ def __init__(self, host, host_distance, session):
459463
self._connections = [session.cluster.connection_factory(host.address)
460464
for i in range(core_conns)]
461465

462-
if session.keyspace:
466+
self._keyspace = session.keyspace
467+
if self._keyspace:
463468
for conn in self._connections:
464-
conn.set_keyspace_blocking(session.keyspace)
469+
conn.set_keyspace_blocking(self._keyspace)
465470

466471
self._trash = set()
467472
self._next_trash_allowed_at = time.time()
@@ -560,7 +565,7 @@ def _add_conn_if_under_max(self):
560565
log.debug("Going to open new connection to host %s", self.host)
561566
try:
562567
conn = self._session.cluster.connection_factory(self.host.address)
563-
if self._session.keyspace:
568+
if self._keyspace:
564569
conn.set_keyspace_blocking(self._session.keyspace)
565570
self._next_trash_allowed_at = time.time() + _MIN_TRASH_INTERVAL
566571
with self._lock:
@@ -761,6 +766,7 @@ def connection_finished_setting_keyspace(conn, error):
761766
if not remaining_callbacks:
762767
callback(self, errors)
763768

769+
self._keyspace = keyspace
764770
for conn in self._connections:
765771
conn.set_keyspace_async(keyspace, connection_finished_setting_keyspace)
766772

0 commit comments

Comments
 (0)