Skip to content

Commit f0daa87

Browse files
committed
Don't set keyspace in HostConnection if it doesn't exit
1 parent 08ba052 commit f0daa87

3 files changed

Lines changed: 43 additions & 21 deletions

File tree

cassandra/cluster.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242

4343
from cassandra import (ConsistencyLevel, AuthenticationFailed,
4444
OperationTimedOut, UnsupportedOperation,
45-
SchemaTargetType, DriverException, ProtocolVersion)
45+
SchemaTargetType, DriverException, ProtocolVersion,
46+
InvalidRequest)
4647
from cassandra.connection import (ConnectionException, ConnectionShutdown,
4748
ConnectionHeartbeat, ProtocolVersionUnsupported)
4849
from cassandra.cqltypes import UserType
@@ -1275,13 +1276,16 @@ def _session_register_user_types(self, session):
12751276
for udt_name, klass in six.iteritems(type_map):
12761277
session.user_type_registered(keyspace, udt_name, klass)
12771278

1278-
def _cleanup_failed_on_up_handling(self, host):
1279+
def _cleanup_failed_on_up_handling(self, host, set_keyspace_failed=False):
12791280
self.profile_manager.on_down(host)
12801281
self.control_connection.on_down(host)
1282+
keyspaces = []
12811283
for session in tuple(self.sessions):
12821284
session.remove_pool(host)
1285+
if set_keyspace_failed and session.keyspace:
1286+
keyspaces.append(session.keyspace)
12831287

1284-
self._start_reconnector(host, is_host_addition=False)
1288+
self._start_reconnector(host, is_host_addition=False, keyspaces=keyspaces)
12851289

12861290
def _on_up_future_completed(self, host, futures, results, lock, finished_future):
12871291
with lock:
@@ -1299,7 +1303,10 @@ def _on_up_future_completed(self, host, futures, results, lock, finished_future)
12991303
# all futures have completed at this point
13001304
for exc in [f for f in results if isinstance(f, Exception)]:
13011305
log.error("Unexpected failure while marking node %s up:", host, exc_info=exc)
1302-
self._cleanup_failed_on_up_handling(host)
1306+
set_keyspace_failed = False
1307+
if isinstance(exc, InvalidRequest):
1308+
set_keyspace_failed = True
1309+
self._cleanup_failed_on_up_handling(host, set_keyspace_failed=set_keyspace_failed)
13031310
return
13041311

13051312
if not all(results):
@@ -1392,7 +1399,7 @@ def on_up(self, host):
13921399
# for testing purposes
13931400
return futures
13941401

1395-
def _start_reconnector(self, host, is_host_addition):
1402+
def _start_reconnector(self, host, is_host_addition, keyspaces):
13961403
if self.profile_manager.distance(host) == HostDistance.IGNORED:
13971404
return
13981405

@@ -1405,7 +1412,7 @@ def _start_reconnector(self, host, is_host_addition):
14051412

14061413
reconnector = _HostReconnectionHandler(
14071414
host, conn_factory, is_host_addition, self.on_add, self.on_up,
1408-
self.scheduler, schedule, host.get_and_set_reconnection_handler,
1415+
keyspaces, self.scheduler, schedule, host.get_and_set_reconnection_handler,
14091416
new_handler=None)
14101417

14111418
old_reconnector = host.get_and_set_reconnection_handler(reconnector)
@@ -1453,7 +1460,7 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
14531460
for listener in self.listeners:
14541461
listener.on_down(host)
14551462

1456-
self._start_reconnector(host, is_host_addition)
1463+
self._start_reconnector(host, is_host_addition, None)
14571464

14581465
def on_add(self, host, refresh_nodes=True):
14591466
if self.is_shutdown:
@@ -1765,7 +1772,12 @@ def _prepare_all_queries(self, host):
17651772
else:
17661773
for keyspace, ks_statements in groupby(statements, lambda s: s.keyspace):
17671774
if keyspace is not None:
1768-
connection.set_keyspace_blocking(keyspace)
1775+
try:
1776+
connection.set_keyspace_blocking(keyspace)
1777+
except InvalidRequest:
1778+
log.warning("Error trying to prepare statements on "
1779+
"host %s, keyspace % s doesn't exist", host, keyspace)
1780+
continue
17691781

17701782
# prepare 10 statements at a time
17711783
ks_statements = list(ks_statements)
@@ -2012,10 +2024,10 @@ def __init__(self, cluster, hosts, keyspace=None):
20122024
self._initial_connect_futures.add(future)
20132025

20142026
futures = wait_futures(self._initial_connect_futures, return_when=FIRST_COMPLETED)
2015-
while futures.not_done and not any(f.result() for f in futures.done):
2027+
while futures.not_done and not any(f.result() for f in futures.done if not f.exception()):
20162028
futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED)
20172029

2018-
if not any(f.result() for f in self._initial_connect_futures):
2030+
if not any(f.result() for f in self._initial_connect_futures if not f.exception()):
20192031
msg = "Unable to connect to any servers"
20202032
if self.keyspace:
20212033
msg += " using keyspace '%s'" % self.keyspace
@@ -2391,15 +2403,15 @@ def run_add_or_renew_pool():
23912403
except AuthenticationFailed as auth_exc:
23922404
conn_exc = ConnectionException(str(auth_exc), host=host)
23932405
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
2394-
return False
2406+
raise
23952407
except Exception as conn_exc:
23962408
log.warning("Failed to create connection pool for new host %s:",
23972409
host, exc_info=conn_exc)
23982410
# the host itself will still be marked down, so we need to pass
23992411
# a special flag to make sure the reconnector is created
24002412
self.cluster.signal_connection_failure(
24012413
host, conn_exc, is_host_addition, expect_host_to_be_down=True)
2402-
return False
2414+
raise
24032415

24042416
previous = self._pools.get(host)
24052417
with self._lock:
@@ -2411,15 +2423,18 @@ def run_add_or_renew_pool():
24112423
def callback(pool, errors):
24122424
errors_returned.extend(errors)
24132425
set_keyspace_event.set()
2414-
2415-
new_pool._set_keyspace_for_all_conns(self.keyspace, callback)
2426+
try:
2427+
new_pool._set_keyspace_for_all_conns(self.keyspace, callback)
2428+
except ConnectionShutdown:
2429+
self._lock.acquire()
2430+
raise
24162431
set_keyspace_event.wait(self.cluster.connect_timeout)
24172432
if not set_keyspace_event.is_set() or errors_returned:
24182433
log.warning("Failed setting keyspace for pool after keyspace changed during connect: %s", errors_returned)
24192434
self.cluster.on_down(host, is_host_addition)
24202435
new_pool.shutdown()
24212436
self._lock.acquire()
2422-
return False
2437+
raise errors_returned
24232438
self._lock.acquire()
24242439
self._pools[host] = new_pool
24252440

cassandra/connection.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
else:
3838
from six.moves.queue import Queue, Empty # noqa
3939

40-
from cassandra import ConsistencyLevel, AuthenticationFailed, OperationTimedOut, ProtocolVersion
40+
from cassandra import ConsistencyLevel, AuthenticationFailed, OperationTimedOut, InvalidRequest, ProtocolVersion
4141
from cassandra.marshal import int32_pack
4242
from cassandra.protocol import (ReadyMessage, AuthenticateMessage, OptionsMessage,
4343
StartupMessage, ErrorMessage, CredentialsMessage,
@@ -515,6 +515,8 @@ def wait_for_responses(self, *msgs, **kwargs):
515515
return waiter.deliver(timeout)
516516
except OperationTimedOut:
517517
raise
518+
except InvalidRequest:
519+
raise
518520
except Exception as exc:
519521
self.defunct(exc)
520522
raise
@@ -799,9 +801,9 @@ def set_keyspace_blocking(self, keyspace):
799801
consistency_level=ConsistencyLevel.ONE)
800802
try:
801803
result = self.wait_for_response(query)
802-
except InvalidRequestException as ire:
804+
except InvalidRequest as ire:
803805
# the keyspace probably doesn't exist
804-
raise ire.to_exception()
806+
raise ire
805807
except Exception as exc:
806808
conn_exc = ConnectionException(
807809
"Problem while setting keyspace: %r" % (exc,), self.host)

cassandra/pool.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
except ImportError:
2828
from cassandra.util import WeakSet # NOQA
2929

30-
from cassandra import AuthenticationFailed
30+
from cassandra import AuthenticationFailed, InvalidRequest
3131
from cassandra.connection import ConnectionException
3232
from cassandra.policies import HostDistance
3333

@@ -267,16 +267,21 @@ def on_exception(self, exc, next_delay):
267267

268268
class _HostReconnectionHandler(_ReconnectionHandler):
269269

270-
def __init__(self, host, connection_factory, is_host_addition, on_add, on_up, *args, **kwargs):
270+
def __init__(self, host, connection_factory, is_host_addition, on_add, on_up, keyspaces, *args, **kwargs):
271271
_ReconnectionHandler.__init__(self, *args, **kwargs)
272272
self.is_host_addition = is_host_addition
273273
self.on_add = on_add
274274
self.on_up = on_up
275275
self.host = host
276276
self.connection_factory = connection_factory
277+
self.keyspaces = keyspaces
277278

278279
def try_reconnect(self):
279-
return self.connection_factory()
280+
conn = self.connection_factory()
281+
if self.keyspaces:
282+
for keyspace in self.keyspaces:
283+
conn.set_keyspace_blocking(keyspace)
284+
return conn
280285

281286
def on_reconnection(self, connection):
282287
log.info("Successful reconnection to %s, marking node up if it isn't already", self.host)

0 commit comments

Comments
 (0)