4242
4343from cassandra import (ConsistencyLevel , AuthenticationFailed ,
4444 OperationTimedOut , UnsupportedOperation ,
45- SchemaTargetType , DriverException , ProtocolVersion )
45+ SchemaTargetType , DriverException , ProtocolVersion ,
46+ InvalidRequest )
4647from cassandra .connection import (ConnectionException , ConnectionShutdown ,
4748 ConnectionHeartbeat , ProtocolVersionUnsupported )
4849from cassandra .cqltypes import UserType
@@ -1275,13 +1276,16 @@ def _session_register_user_types(self, session):
12751276 for udt_name , klass in six .iteritems (type_map ):
12761277 session .user_type_registered (keyspace , udt_name , klass )
12771278
1278- def _cleanup_failed_on_up_handling (self , host ):
1279+ def _cleanup_failed_on_up_handling (self , host , set_keyspace_failed = False ):
12791280 self .profile_manager .on_down (host )
12801281 self .control_connection .on_down (host )
1282+ keyspaces = []
12811283 for session in tuple (self .sessions ):
12821284 session .remove_pool (host )
1285+ if set_keyspace_failed and session .keyspace :
1286+ keyspaces .append (session .keyspace )
12831287
1284- self ._start_reconnector (host , is_host_addition = False )
1288+ self ._start_reconnector (host , is_host_addition = False , keyspaces = keyspaces )
12851289
12861290 def _on_up_future_completed (self , host , futures , results , lock , finished_future ):
12871291 with lock :
@@ -1299,7 +1303,10 @@ def _on_up_future_completed(self, host, futures, results, lock, finished_future)
12991303 # all futures have completed at this point
13001304 for exc in [f for f in results if isinstance (f , Exception )]:
13011305 log .error ("Unexpected failure while marking node %s up:" , host , exc_info = exc )
1302- self ._cleanup_failed_on_up_handling (host )
1306+ set_keyspace_failed = False
1307+ if isinstance (exc , InvalidRequest ):
1308+ set_keyspace_failed = True
1309+ self ._cleanup_failed_on_up_handling (host , set_keyspace_failed = set_keyspace_failed )
13031310 return
13041311
13051312 if not all (results ):
@@ -1392,7 +1399,7 @@ def on_up(self, host):
13921399 # for testing purposes
13931400 return futures
13941401
1395- def _start_reconnector (self , host , is_host_addition ):
1402+ def _start_reconnector (self , host , is_host_addition , keyspaces ):
13961403 if self .profile_manager .distance (host ) == HostDistance .IGNORED :
13971404 return
13981405
@@ -1405,7 +1412,7 @@ def _start_reconnector(self, host, is_host_addition):
14051412
14061413 reconnector = _HostReconnectionHandler (
14071414 host , conn_factory , is_host_addition , self .on_add , self .on_up ,
1408- self .scheduler , schedule , host .get_and_set_reconnection_handler ,
1415+ keyspaces , self .scheduler , schedule , host .get_and_set_reconnection_handler ,
14091416 new_handler = None )
14101417
14111418 old_reconnector = host .get_and_set_reconnection_handler (reconnector )
@@ -1453,7 +1460,7 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
14531460 for listener in self .listeners :
14541461 listener .on_down (host )
14551462
1456- self ._start_reconnector (host , is_host_addition )
1463+ self ._start_reconnector (host , is_host_addition , None )
14571464
14581465 def on_add (self , host , refresh_nodes = True ):
14591466 if self .is_shutdown :
@@ -1765,7 +1772,12 @@ def _prepare_all_queries(self, host):
17651772 else :
17661773 for keyspace , ks_statements in groupby (statements , lambda s : s .keyspace ):
17671774 if keyspace is not None :
1768- connection .set_keyspace_blocking (keyspace )
1775+ try :
1776+ connection .set_keyspace_blocking (keyspace )
1777+ except InvalidRequest :
1778+ log .warning ("Error trying to prepare statements on "
1779+ "host %s, keyspace % s doesn't exist" , host , keyspace )
1780+ continue
17691781
17701782 # prepare 10 statements at a time
17711783 ks_statements = list (ks_statements )
@@ -2012,10 +2024,10 @@ def __init__(self, cluster, hosts, keyspace=None):
20122024 self ._initial_connect_futures .add (future )
20132025
20142026 futures = wait_futures (self ._initial_connect_futures , return_when = FIRST_COMPLETED )
2015- while futures .not_done and not any (f .result () for f in futures .done ):
2027+ while futures .not_done and not any (f .result () for f in futures .done if not f . exception () ):
20162028 futures = wait_futures (futures .not_done , return_when = FIRST_COMPLETED )
20172029
2018- if not any (f .result () for f in self ._initial_connect_futures ):
2030+ if not any (f .result () for f in self ._initial_connect_futures if not f . exception () ):
20192031 msg = "Unable to connect to any servers"
20202032 if self .keyspace :
20212033 msg += " using keyspace '%s'" % self .keyspace
@@ -2391,15 +2403,15 @@ def run_add_or_renew_pool():
23912403 except AuthenticationFailed as auth_exc :
23922404 conn_exc = ConnectionException (str (auth_exc ), host = host )
23932405 self .cluster .signal_connection_failure (host , conn_exc , is_host_addition )
2394- return False
2406+ raise
23952407 except Exception as conn_exc :
23962408 log .warning ("Failed to create connection pool for new host %s:" ,
23972409 host , exc_info = conn_exc )
23982410 # the host itself will still be marked down, so we need to pass
23992411 # a special flag to make sure the reconnector is created
24002412 self .cluster .signal_connection_failure (
24012413 host , conn_exc , is_host_addition , expect_host_to_be_down = True )
2402- return False
2414+ raise
24032415
24042416 previous = self ._pools .get (host )
24052417 with self ._lock :
@@ -2411,15 +2423,18 @@ def run_add_or_renew_pool():
24112423 def callback (pool , errors ):
24122424 errors_returned .extend (errors )
24132425 set_keyspace_event .set ()
2414-
2415- new_pool ._set_keyspace_for_all_conns (self .keyspace , callback )
2426+ try :
2427+ new_pool ._set_keyspace_for_all_conns (self .keyspace , callback )
2428+ except ConnectionShutdown :
2429+ self ._lock .acquire ()
2430+ raise
24162431 set_keyspace_event .wait (self .cluster .connect_timeout )
24172432 if not set_keyspace_event .is_set () or errors_returned :
24182433 log .warning ("Failed setting keyspace for pool after keyspace changed during connect: %s" , errors_returned )
24192434 self .cluster .on_down (host , is_host_addition )
24202435 new_pool .shutdown ()
24212436 self ._lock .acquire ()
2422- return False
2437+ raise errors_returned
24232438 self ._lock .acquire ()
24242439 self ._pools [host ] = new_pool
24252440
0 commit comments