Skip to content

Commit 81172ef

Browse files
Jaume Marhuendamambocab
authored andcommitted
Added API and tests for CASSANDRA-10145 (apache#823)
PYTHON-678
1 parent f5694e4 commit 81172ef

8 files changed

Lines changed: 413 additions & 59 deletions

File tree

CHANGELOG.rst

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
3.12.0
22
======
33

4+
Features
5+
--------
6+
* Send keyspace in QUERY, PREPARE, and BATCH messages (PYTHON-678)
7+
48
Bug Fixes
59
---------
610
* Both _set_final_exception/result called for the same ResponseFuture (PYTHON-630)
711
* Use of DCAwareRoundRobinPolicy raises NoHostAvailable exception (PYTHON-781)
812

9-
1013
3.11.0
1114
======
1215
July 24, 2017

build.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,9 @@ build:
121121
# Run the unit tests, this is not done in travis because
122122
# it takes too much time for the whole matrix to build with cython
123123
if [[ $CYTHON == 'CYTHON' ]]; then
124-
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER VERIFY_CYTHON=1 nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=cqle_results.xml tests/unit/ || true
125-
MONKEY_PATCH_LOOP=1 EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER VERIFY_CYTHON=1 nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=cqle_results.xml tests/unit/io/test_eventletreactor.py || true
126-
MONKEY_PATCH_LOOP=1 EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER VERIFY_CYTHON=1 nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=cqle_results.xml tests/unit/io/test_geventreactor.py || true
124+
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER VERIFY_CYTHON=1 nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=unit_results.xml tests/unit/ || true
125+
MONKEY_PATCH_LOOP=1 EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER VERIFY_CYTHON=1 nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=unit_eventlet_results.xml tests/unit/io/test_eventletreactor.py || true
126+
MONKEY_PATCH_LOOP=1 EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER VERIFY_CYTHON=1 nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=unit_gevent_results.xml tests/unit/io/test_geventreactor.py || true
127127
128128
fi
129129

cassandra/cluster.py

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,6 +1721,19 @@ def set_meta_refresh_enabled(self, enabled):
17211721
self.schema_metadata_enabled = enabled
17221722
self.token_metadata_enabled = enabled
17231723

1724+
@classmethod
1725+
def _send_chunks(cls, connection, host, chunks, set_keyspace=False):
1726+
for ks_chunk in chunks:
1727+
messages = [PrepareMessage(query=s.query_string,
1728+
keyspace=s.keyspace if set_keyspace else None)
1729+
for s in ks_chunk]
1730+
# TODO: make this timeout configurable somehow?
1731+
responses = connection.wait_for_responses(*messages, timeout=5.0, fail_on_error=False)
1732+
for success, response in responses:
1733+
if not success:
1734+
log.debug("Got unexpected response when preparing "
1735+
"statement on host %s: %r", host, response)
1736+
17241737
def _prepare_all_queries(self, host):
17251738
if not self._prepared_statements or not self.reprepare_on_up:
17261739
return
@@ -1730,24 +1743,23 @@ def _prepare_all_queries(self, host):
17301743
try:
17311744
connection = self.connection_factory(host.address)
17321745
statements = self._prepared_statements.values()
1733-
for keyspace, ks_statements in groupby(statements, lambda s: s.keyspace):
1734-
if keyspace is not None:
1735-
connection.set_keyspace_blocking(keyspace)
1736-
1737-
# prepare 10 statements at a time
1738-
ks_statements = list(ks_statements)
1746+
if ProtocolVersion.uses_keyspace_flag(self.protocol_version):
1747+
# V5 protocol and higher, no need to set the keyspace
17391748
chunks = []
1740-
for i in range(0, len(ks_statements), 10):
1741-
chunks.append(ks_statements[i:i + 10])
1742-
1743-
for ks_chunk in chunks:
1744-
messages = [PrepareMessage(query=s.query_string) for s in ks_chunk]
1745-
# TODO: make this timeout configurable somehow?
1746-
responses = connection.wait_for_responses(*messages, timeout=5.0, fail_on_error=False)
1747-
for success, response in responses:
1748-
if not success:
1749-
log.debug("Got unexpected response when preparing "
1750-
"statement on host %s: %r", host, response)
1749+
for i in range(0, len(statements), 10):
1750+
chunks.append(statements[i:i + 10])
1751+
self._send_chunks(connection, host, chunks, True)
1752+
else:
1753+
for keyspace, ks_statements in groupby(statements, lambda s: s.keyspace):
1754+
if keyspace is not None:
1755+
connection.set_keyspace_blocking(keyspace)
1756+
1757+
# prepare 10 statements at a time
1758+
ks_statements = list(ks_statements)
1759+
chunks = []
1760+
for i in range(0, len(ks_statements), 10):
1761+
chunks.append(ks_statements[i:i + 10])
1762+
self._send_chunks(connection, host, chunks)
17511763

17521764
log.debug("Done preparing all known prepared statements against host %s", host)
17531765
except OperationTimedOut as timeout:
@@ -2126,11 +2138,13 @@ def _create_response_future(self, query, parameters, trace, custom_payload, time
21262138

21272139
if isinstance(query, SimpleStatement):
21282140
query_string = query.query_string
2141+
statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None
21292142
if parameters:
21302143
query_string = bind_params(query_string, parameters, self.encoder)
21312144
message = QueryMessage(
21322145
query_string, cl, serial_cl,
2133-
fetch_size, timestamp=timestamp)
2146+
fetch_size, timestamp=timestamp,
2147+
keyspace=statement_keyspace)
21342148
elif isinstance(query, BoundStatement):
21352149
prepared_statement = query.prepared_statement
21362150
message = ExecuteMessage(
@@ -2143,9 +2157,10 @@ def _create_response_future(self, query, parameters, trace, custom_payload, time
21432157
"BatchStatement execution is only supported with protocol version "
21442158
"2 or higher (supported in Cassandra 2.0 and higher). Consider "
21452159
"setting Cluster.protocol_version to 2 to support this operation.")
2160+
statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None
21462161
message = BatchMessage(
21472162
query.batch_type, query._statements_and_parameters, cl,
2148-
serial_cl, timestamp)
2163+
serial_cl, timestamp, statement_keyspace)
21492164

21502165
message.tracing = trace
21512166

@@ -2214,7 +2229,7 @@ def _on_request(self, response_future):
22142229
for fn, args, kwargs in self._request_init_callbacks:
22152230
fn(response_future, *args, **kwargs)
22162231

2217-
def prepare(self, query, custom_payload=None):
2232+
def prepare(self, query, custom_payload=None, keyspace=None):
22182233
"""
22192234
Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement`
22202235
instance which can be used as follows::
@@ -2237,13 +2252,24 @@ def prepare(self, query, custom_payload=None):
22372252
... bound = prepared.bind((user.id, user.name, user.age))
22382253
... session.execute(bound)
22392254
2255+
Alternatively, if :attr:`~.Cluster.protocol_version` is 5 or higher
2256+
(requires Cassandra 4.0+), the keyspace can be specified as a
2257+
parameter. This will allow you to avoid specifying the keyspace in the
2258+
query without specifying a keyspace in :meth:`~.Cluster.connect`. It
2259+
even will let you prepare and use statements against a keyspace other
2260+
than the one originally specified on connection:
2261+
2262+
>>> analyticskeyspace_prepared = session.prepare(
2263+
... "INSERT INTO user_activity id, last_activity VALUES (?, ?)",
2264+
... keyspace="analyticskeyspace") # note the different keyspace
2265+
22402266
**Important**: PreparedStatements should be prepared only once.
22412267
Preparing the same query more than once will likely affect performance.
22422268
22432269
`custom_payload` is a key value map to be passed along with the prepare
22442270
message. See :ref:`custom_payload`.
22452271
"""
2246-
message = PrepareMessage(query=query)
2272+
message = PrepareMessage(query=query, keyspace=keyspace)
22472273
future = ResponseFuture(self, message, query=None, timeout=self.default_timeout)
22482274
try:
22492275
future.send_request()
@@ -2252,8 +2278,9 @@ def prepare(self, query, custom_payload=None):
22522278
log.exception("Error preparing query:")
22532279
raise
22542280

2281+
prepared_keyspace = keyspace if keyspace else self.keyspace
22552282
prepared_statement = PreparedStatement.from_message(
2256-
query_id, bind_metadata, pk_indexes, self.cluster.metadata, query, self.keyspace,
2283+
query_id, bind_metadata, pk_indexes, self.cluster.metadata, query, prepared_keyspace,
22572284
self._protocol_version, result_metadata)
22582285
prepared_statement.custom_payload = future.custom_payload
22592286

@@ -2262,21 +2289,22 @@ def prepare(self, query, custom_payload=None):
22622289
if self.cluster.prepare_on_all_hosts:
22632290
host = future._current_host
22642291
try:
2265-
self.prepare_on_all_hosts(prepared_statement.query_string, host)
2292+
self.prepare_on_all_hosts(prepared_statement.query_string, host, prepared_keyspace)
22662293
except Exception:
22672294
log.exception("Error preparing query on all hosts:")
22682295

22692296
return prepared_statement
22702297

2271-
def prepare_on_all_hosts(self, query, excluded_host):
2298+
def prepare_on_all_hosts(self, query, excluded_host, keyspace=None):
22722299
"""
22732300
Prepare the given query on all hosts, excluding ``excluded_host``.
22742301
Intended for internal use only.
22752302
"""
22762303
futures = []
22772304
for host in tuple(self._pools.keys()):
22782305
if host != excluded_host and host.is_up:
2279-
future = ResponseFuture(self, PrepareMessage(query=query), None, self.default_timeout)
2306+
future = ResponseFuture(self, PrepareMessage(query=query, keyspace=keyspace),
2307+
None, self.default_timeout)
22802308

22812309
# we don't care about errors preparing against specific hosts,
22822310
# since we can always prepare them as needed when the prepared
@@ -3659,7 +3687,8 @@ def _set_result(self, host, connection, pool, response):
36593687

36603688
current_keyspace = self._connection.keyspace
36613689
prepared_keyspace = prepared_statement.keyspace
3662-
if prepared_keyspace and current_keyspace != prepared_keyspace:
3690+
if not ProtocolVersion.uses_keyspace_flag(self.session.cluster.protocol_version) \
3691+
and prepared_keyspace and current_keyspace != prepared_keyspace:
36633692
self._set_final_exception(
36643693
ValueError("The Session's current keyspace (%s) does "
36653694
"not match the keyspace the statement was "
@@ -3669,7 +3698,10 @@ def _set_result(self, host, connection, pool, response):
36693698

36703699
log.debug("Re-preparing unrecognized prepared statement against host %s: %s",
36713700
host, prepared_statement.query_string)
3672-
prepare_message = PrepareMessage(query=prepared_statement.query_string)
3701+
prepared_keyspace = prepared_statement.keyspace \
3702+
if ProtocolVersion.uses_keyspace_flag(self.session.cluster.protocol_version) else None
3703+
prepare_message = PrepareMessage(query=prepared_statement.query_string,
3704+
keyspace=prepared_keyspace)
36733705
# since this might block, run on the executor to avoid hanging
36743706
# the event loop thread
36753707
self.session.submit(self._reprepare, prepare_message, host, connection, pool)

cassandra/protocol.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ def recv_body(cls, f, *args):
508508
_WITH_SERIAL_CONSISTENCY_FLAG = 0x10
509509
_PROTOCOL_TIMESTAMP = 0x20
510510
_WITH_KEYSPACE_FLAG = 0x80
511+
_PREPARED_WITH_KEYSPACE_FLAG = 0x01
511512

512513

513514
class QueryMessage(_MessageType):
@@ -791,7 +792,7 @@ def send_body(self, f, protocol_version):
791792

792793
if self.keyspace is not None:
793794
if ProtocolVersion.uses_keyspace_flag(protocol_version):
794-
flags |= _WITH_KEYSPACE_FLAG
795+
flags |= _PREPARED_WITH_KEYSPACE_FLAG
795796
else:
796797
raise UnsupportedOperation(
797798
"Keyspaces may only be set on queries with protocol version "

tests/integration/__init__.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,19 @@ def set_default_cass_ip():
182182
Cluster.__init__.__func__.__defaults__ = tuple(defaults)
183183

184184

185-
def get_default_protocol():
185+
def set_default_beta_flag_true():
186+
defaults = list(Cluster.__init__.__defaults__)
187+
defaults = defaults[:-3] + [True] + defaults[-2:]
188+
try:
189+
Cluster.__init__.__defaults__ = tuple(defaults)
190+
except:
191+
Cluster.__init__.__func__.__defaults__ = tuple(defaults)
186192

193+
194+
def get_default_protocol():
195+
if Version(CASSANDRA_VERSION) >= Version('4.0'):
196+
set_default_beta_flag_true()
197+
return 5
187198
if Version(CASSANDRA_VERSION) >= Version('2.2'):
188199
return 4
189200
elif Version(CASSANDRA_VERSION) >= Version('2.1'):
@@ -261,6 +272,7 @@ def get_unsupported_upper_protocol():
261272
greaterthanorequalcass36 = unittest.skipUnless(CASSANDRA_VERSION >= '3.6', 'Cassandra version 3.6 or greater required')
262273
greaterthanorequalcass3_10 = unittest.skipUnless(CASSANDRA_VERSION >= '3.10', 'Cassandra version 3.10 or greater required')
263274
greaterthanorequalcass3_11 = unittest.skipUnless(CASSANDRA_VERSION >= '3.11', 'Cassandra version 3.10 or greater required')
275+
greaterthanorequalcass40 = unittest.skipUnless(CASSANDRA_VERSION >= '4.0', 'Cassandra version 4.0 or greater required')
264276
lessthancass30 = unittest.skipUnless(CASSANDRA_VERSION < '3.0', 'Cassandra version less then 3.0 required')
265277
dseonly = unittest.skipUnless(DSE_VERSION, "Test is only applicalbe to DSE clusters")
266278
pypy = unittest.skipUnless(platform.python_implementation() == "PyPy", "Test is skipped unless it's on PyPy")
@@ -690,7 +702,7 @@ def tearDownClass(cls):
690702
class BasicSharedKeyspaceUnitTestCaseRF1(BasicSharedKeyspaceUnitTestCase):
691703
"""
692704
This is basic unit test case that can be leveraged to scope a keyspace to a specific test class.
693-
creates a keyspace named after the testclass with a rf of 1, and a table named after the class
705+
creates a keyspace named after the testclass with a rf of 1
694706
"""
695707
@classmethod
696708
def setUpClass(self):

0 commit comments

Comments
 (0)