@@ -1721,6 +1721,19 @@ def set_meta_refresh_enabled(self, enabled):
17211721 self .schema_metadata_enabled = enabled
17221722 self .token_metadata_enabled = enabled
17231723
1724+ @classmethod
1725+ def _send_chunks (cls , connection , host , chunks , set_keyspace = False ):
1726+ for ks_chunk in chunks :
1727+ messages = [PrepareMessage (query = s .query_string ,
1728+ keyspace = s .keyspace if set_keyspace else None )
1729+ for s in ks_chunk ]
1730+ # TODO: make this timeout configurable somehow?
1731+ responses = connection .wait_for_responses (* messages , timeout = 5.0 , fail_on_error = False )
1732+ for success , response in responses :
1733+ if not success :
1734+ log .debug ("Got unexpected response when preparing "
1735+ "statement on host %s: %r" , host , response )
1736+
17241737 def _prepare_all_queries (self , host ):
17251738 if not self ._prepared_statements or not self .reprepare_on_up :
17261739 return
@@ -1730,24 +1743,23 @@ def _prepare_all_queries(self, host):
17301743 try :
17311744 connection = self .connection_factory (host .address )
17321745 statements = self ._prepared_statements .values ()
1733- for keyspace , ks_statements in groupby (statements , lambda s : s .keyspace ):
1734- if keyspace is not None :
1735- connection .set_keyspace_blocking (keyspace )
1736-
1737- # prepare 10 statements at a time
1738- ks_statements = list (ks_statements )
1746+ if ProtocolVersion .uses_keyspace_flag (self .protocol_version ):
1747+ # V5 protocol and higher, no need to set the keyspace
17391748 chunks = []
1740- for i in range (0 , len (ks_statements ), 10 ):
1741- chunks .append (ks_statements [i :i + 10 ])
1742-
1743- for ks_chunk in chunks :
1744- messages = [PrepareMessage (query = s .query_string ) for s in ks_chunk ]
1745- # TODO: make this timeout configurable somehow?
1746- responses = connection .wait_for_responses (* messages , timeout = 5.0 , fail_on_error = False )
1747- for success , response in responses :
1748- if not success :
1749- log .debug ("Got unexpected response when preparing "
1750- "statement on host %s: %r" , host , response )
1749+ for i in range (0 , len (statements ), 10 ):
1750+ chunks .append (statements [i :i + 10 ])
1751+ self ._send_chunks (connection , host , chunks , True )
1752+ else :
1753+ for keyspace , ks_statements in groupby (statements , lambda s : s .keyspace ):
1754+ if keyspace is not None :
1755+ connection .set_keyspace_blocking (keyspace )
1756+
1757+ # prepare 10 statements at a time
1758+ ks_statements = list (ks_statements )
1759+ chunks = []
1760+ for i in range (0 , len (ks_statements ), 10 ):
1761+ chunks .append (ks_statements [i :i + 10 ])
1762+ self ._send_chunks (connection , host , chunks )
17511763
17521764 log .debug ("Done preparing all known prepared statements against host %s" , host )
17531765 except OperationTimedOut as timeout :
@@ -2126,11 +2138,13 @@ def _create_response_future(self, query, parameters, trace, custom_payload, time
21262138
21272139 if isinstance (query , SimpleStatement ):
21282140 query_string = query .query_string
2141+ statement_keyspace = query .keyspace if ProtocolVersion .uses_keyspace_flag (self ._protocol_version ) else None
21292142 if parameters :
21302143 query_string = bind_params (query_string , parameters , self .encoder )
21312144 message = QueryMessage (
21322145 query_string , cl , serial_cl ,
2133- fetch_size , timestamp = timestamp )
2146+ fetch_size , timestamp = timestamp ,
2147+ keyspace = statement_keyspace )
21342148 elif isinstance (query , BoundStatement ):
21352149 prepared_statement = query .prepared_statement
21362150 message = ExecuteMessage (
@@ -2143,9 +2157,10 @@ def _create_response_future(self, query, parameters, trace, custom_payload, time
21432157 "BatchStatement execution is only supported with protocol version "
21442158 "2 or higher (supported in Cassandra 2.0 and higher). Consider "
21452159 "setting Cluster.protocol_version to 2 to support this operation." )
2160+ statement_keyspace = query .keyspace if ProtocolVersion .uses_keyspace_flag (self ._protocol_version ) else None
21462161 message = BatchMessage (
21472162 query .batch_type , query ._statements_and_parameters , cl ,
2148- serial_cl , timestamp )
2163+ serial_cl , timestamp , statement_keyspace )
21492164
21502165 message .tracing = trace
21512166
@@ -2214,7 +2229,7 @@ def _on_request(self, response_future):
22142229 for fn , args , kwargs in self ._request_init_callbacks :
22152230 fn (response_future , * args , ** kwargs )
22162231
2217- def prepare (self , query , custom_payload = None ):
2232+ def prepare (self , query , custom_payload = None , keyspace = None ):
22182233 """
22192234 Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement`
22202235 instance which can be used as follows::
@@ -2237,13 +2252,24 @@ def prepare(self, query, custom_payload=None):
22372252 ... bound = prepared.bind((user.id, user.name, user.age))
22382253 ... session.execute(bound)
22392254
2255+ Alternatively, if :attr:`~.Cluster.protocol_version` is 5 or higher
2256+ (requires Cassandra 4.0+), the keyspace can be specified as a
2257+ parameter. This will allow you to avoid specifying the keyspace in the
2258+ query without specifying a keyspace in :meth:`~.Cluster.connect`. It
2259+ even will let you prepare and use statements against a keyspace other
2260+ than the one originally specified on connection:
2261+
2262+ >>> analyticskeyspace_prepared = session.prepare(
2263+ ... "INSERT INTO user_activity id, last_activity VALUES (?, ?)",
2264+ ... keyspace="analyticskeyspace") # note the different keyspace
2265+
22402266 **Important**: PreparedStatements should be prepared only once.
22412267 Preparing the same query more than once will likely affect performance.
22422268
22432269 `custom_payload` is a key value map to be passed along with the prepare
22442270 message. See :ref:`custom_payload`.
22452271 """
2246- message = PrepareMessage (query = query )
2272+ message = PrepareMessage (query = query , keyspace = keyspace )
22472273 future = ResponseFuture (self , message , query = None , timeout = self .default_timeout )
22482274 try :
22492275 future .send_request ()
@@ -2252,8 +2278,9 @@ def prepare(self, query, custom_payload=None):
22522278 log .exception ("Error preparing query:" )
22532279 raise
22542280
2281+ prepared_keyspace = keyspace if keyspace else self .keyspace
22552282 prepared_statement = PreparedStatement .from_message (
2256- query_id , bind_metadata , pk_indexes , self .cluster .metadata , query , self . keyspace ,
2283+ query_id , bind_metadata , pk_indexes , self .cluster .metadata , query , prepared_keyspace ,
22572284 self ._protocol_version , result_metadata )
22582285 prepared_statement .custom_payload = future .custom_payload
22592286
@@ -2262,21 +2289,22 @@ def prepare(self, query, custom_payload=None):
22622289 if self .cluster .prepare_on_all_hosts :
22632290 host = future ._current_host
22642291 try :
2265- self .prepare_on_all_hosts (prepared_statement .query_string , host )
2292+ self .prepare_on_all_hosts (prepared_statement .query_string , host , prepared_keyspace )
22662293 except Exception :
22672294 log .exception ("Error preparing query on all hosts:" )
22682295
22692296 return prepared_statement
22702297
2271- def prepare_on_all_hosts (self , query , excluded_host ):
2298+ def prepare_on_all_hosts (self , query , excluded_host , keyspace = None ):
22722299 """
22732300 Prepare the given query on all hosts, excluding ``excluded_host``.
22742301 Intended for internal use only.
22752302 """
22762303 futures = []
22772304 for host in tuple (self ._pools .keys ()):
22782305 if host != excluded_host and host .is_up :
2279- future = ResponseFuture (self , PrepareMessage (query = query ), None , self .default_timeout )
2306+ future = ResponseFuture (self , PrepareMessage (query = query , keyspace = keyspace ),
2307+ None , self .default_timeout )
22802308
22812309 # we don't care about errors preparing against specific hosts,
22822310 # since we can always prepare them as needed when the prepared
@@ -3659,7 +3687,8 @@ def _set_result(self, host, connection, pool, response):
36593687
36603688 current_keyspace = self ._connection .keyspace
36613689 prepared_keyspace = prepared_statement .keyspace
3662- if prepared_keyspace and current_keyspace != prepared_keyspace :
3690+ if not ProtocolVersion .uses_keyspace_flag (self .session .cluster .protocol_version ) \
3691+ and prepared_keyspace and current_keyspace != prepared_keyspace :
36633692 self ._set_final_exception (
36643693 ValueError ("The Session's current keyspace (%s) does "
36653694 "not match the keyspace the statement was "
@@ -3669,7 +3698,10 @@ def _set_result(self, host, connection, pool, response):
36693698
36703699 log .debug ("Re-preparing unrecognized prepared statement against host %s: %s" ,
36713700 host , prepared_statement .query_string )
3672- prepare_message = PrepareMessage (query = prepared_statement .query_string )
3701+ prepared_keyspace = prepared_statement .keyspace \
3702+ if ProtocolVersion .uses_keyspace_flag (self .session .cluster .protocol_version ) else None
3703+ prepare_message = PrepareMessage (query = prepared_statement .query_string ,
3704+ keyspace = prepared_keyspace )
36733705 # since this might block, run on the executor to avoid hanging
36743706 # the event loop thread
36753707 self .session .submit (self ._reprepare , prepare_message , host , connection , pool )
0 commit comments