Skip to content

Commit 09505e6

Browse files
committed
[PYTHON-238] Added tests for protocol v4 Exceptions, and MISC test cleanup
1 parent ff1d75a commit 09505e6

2 files changed

Lines changed: 285 additions & 2 deletions

File tree

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
# Copyright 2013-2015 DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
try:
17+
import unittest2 as unittest
18+
except ImportError:
19+
import unittest
20+
21+
22+
from cassandra.cluster import Cluster
23+
from cassandra import ConsistencyLevel
24+
from cassandra import WriteFailure, ReadFailure, FunctionFailure
25+
from cassandra.concurrent import execute_concurrent_with_args
26+
from cassandra.query import SimpleStatement
27+
from tests.integration import use_singledc, PROTOCOL_VERSION, get_cluster, setup_keyspace
28+
29+
30+
def setup_module():
31+
"""
32+
We need some custom setup for this module. All unit tests in this module
33+
require protocol >=4. We won't bother going through the setup required unless that is the
34+
protocol version we are using.
35+
"""
36+
37+
# If we aren't at protocol v 4 or greater don't waste time setting anything up, all tests will be skipped
38+
if PROTOCOL_VERSION >= 4:
39+
use_singledc(start=False)
40+
ccm_cluster = get_cluster()
41+
ccm_cluster.stop()
42+
config_options = {'tombstone_failure_threshold': 2000, 'tombstone_warn_threshold': 1000}
43+
ccm_cluster.set_configuration_options(config_options)
44+
ccm_cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
45+
setup_keyspace()
46+
47+
48+
def teardown_module():
49+
"""
50+
The rest of the tests don't need custom tombstones
51+
reset the config options so as to not mess with other tests.
52+
"""
53+
if PROTOCOL_VERSION >= 4:
54+
ccm_cluster = get_cluster()
55+
config_options = {}
56+
ccm_cluster.set_configuration_options(config_options)
57+
if ccm_cluster is not None:
58+
ccm_cluster.stop()
59+
60+
61+
class ClientExceptionTests(unittest.TestCase):
62+
63+
def setUp(self):
64+
"""
65+
Test is skipped if run with native protocol version <4
66+
"""
67+
68+
if PROTOCOL_VERSION < 4:
69+
raise unittest.SkipTest(
70+
"Native protocol 4,0+ is required for custom payloads, currently using %r"
71+
% (PROTOCOL_VERSION,))
72+
73+
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
74+
self.session = self.cluster.connect()
75+
self.nodes_currently_failing = []
76+
self.node1, self.node2, self.node3 = get_cluster().nodes.values()
77+
78+
def tearDown(self):
79+
80+
self.cluster.shutdown()
81+
failing_nodes = []
82+
83+
# Restart the nodes to fully functional again
84+
self.setFailingNodes(failing_nodes, "testksfail")
85+
86+
def setFailingNodes(self, failing_nodes, keyspace):
87+
"""
88+
This method will take in a set of failing nodes, and toggle all of the nodes in the provided list to fail
89+
writes.
90+
@param failing_nodes A definitive list of nodes that should fail writes
91+
@param keyspace The keyspace to enable failures on
92+
93+
"""
94+
95+
# Ensure all of the nodes on the list have failures enabled
96+
for node in failing_nodes:
97+
if node not in self.nodes_currently_failing:
98+
node.stop(wait_other_notice=True, gently=False)
99+
node.start(jvm_args=[" -Dcassandra.test.fail_writes_ks=" + keyspace], wait_for_binary_proto=True,
100+
wait_other_notice=True)
101+
self.nodes_currently_failing.append(node)
102+
103+
# Ensure all nodes not on the list, but that are currently set to failing are enabled
104+
for node in self.nodes_currently_failing:
105+
if node not in failing_nodes:
106+
node.stop(wait_other_notice=True, gently=False)
107+
node.start(wait_for_binary_proto=True, wait_other_notice=True)
108+
self.nodes_currently_failing.remove(node)
109+
110+
def _perform_cql_statement(self, text, consistency_level, expected_exception):
111+
"""
112+
Simple helper method to preform cql statements and check for expected exception
113+
@param text CQl statement to execute
114+
@param consistency_level Consistency level at which it is to be executed
115+
@param expected_exception Exception expected to be throw or none
116+
"""
117+
statement = SimpleStatement(text)
118+
statement.consistency_level = consistency_level
119+
120+
if expected_exception is None:
121+
self.session.execute(statement)
122+
else:
123+
with self.assertRaises(expected_exception):
124+
self.session.execute(statement)
125+
126+
def test_write_failures_from_coordinator(self):
127+
"""
128+
Test to validate that write failures from the coordinator are surfaced appropriately.
129+
130+
test_write_failures_from_coordinator Enable write failures on the various nodes using a custom jvm flag,
131+
cassandra.test.fail_writes_ks. This will cause writes to fail on that specific node. Depending on the replication
132+
factor of the keyspace, and the consistency level, we will expect the coordinator to send WriteFailure, or not.
133+
134+
135+
@since 2.6.0
136+
@jira_ticket PYTHON-238
137+
@expected_result Appropriate write failures from the coordinator
138+
139+
@test_category queries:basic
140+
"""
141+
142+
# Setup temporary keyspace.
143+
self._perform_cql_statement(
144+
"""
145+
CREATE KEYSPACE testksfail
146+
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}
147+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
148+
149+
# create table
150+
self._perform_cql_statement(
151+
"""
152+
CREATE TABLE testksfail.test (
153+
k int PRIMARY KEY,
154+
v int )
155+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
156+
157+
# Disable one node
158+
failing_nodes = [self.node1]
159+
self.setFailingNodes(failing_nodes, "testksfail")
160+
161+
# With one node disabled we would expect a write failure with ConsistencyLevel of all
162+
self._perform_cql_statement(
163+
"""
164+
INSERT INTO testksfail.test (k, v) VALUES (1, 0 )
165+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=WriteFailure)
166+
167+
# We have two nodes left so a write with consistency level of QUORUM should complete as expected
168+
self._perform_cql_statement(
169+
"""
170+
INSERT INTO testksfail.test (k, v) VALUES (1, 0 )
171+
""", consistency_level=ConsistencyLevel.QUORUM, expected_exception=None)
172+
173+
failing_nodes = []
174+
175+
# Restart the nodes to fully functional again
176+
self.setFailingNodes(failing_nodes, "testksfail")
177+
178+
# Drop temporary keyspace
179+
self._perform_cql_statement(
180+
"""
181+
DROP KEYSPACE testksfail
182+
""", consistency_level=ConsistencyLevel.ANY, expected_exception=None)
183+
184+
def test_tombstone_overflow_read_failure(self):
185+
"""
186+
Test to validate that a ReadFailure is returned from the node when a specified threshold of tombstombs is
187+
reached.
188+
189+
test_tombstomb_overflow_read_failure First sets the tombstone failure threshold down to a level that allows it
190+
to be more easily encountered. We then create some wide rows and ensure they are deleted appropriately. This
191+
produces the correct amount of tombstombs. Upon making a simple query we expect to get a read failure back
192+
from the coordinator.
193+
194+
195+
@since 2.6.0
196+
@jira_ticket PYTHON-238
197+
@expected_result Appropriate write failures from the coordinator
198+
199+
@test_category queries:basic
200+
"""
201+
202+
# Setup table for "wide row"
203+
self._perform_cql_statement(
204+
"""
205+
CREATE TABLE test3rf.test2 (
206+
k int,
207+
v0 int,
208+
v1 int, PRIMARY KEY (k,v0))
209+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
210+
211+
statement = self.session.prepare("INSERT INTO test3rf.test2 (k, v0,v1) VALUES (1,?,1)")
212+
parameters = [(x,) for x in range(3000)]
213+
execute_concurrent_with_args(self.session, statement, parameters, concurrency=50)
214+
215+
statement = self.session.prepare("DELETE v1 FROM test3rf.test2 WHERE k = 1 AND v0 =?")
216+
parameters = [(x,) for x in range(2001)]
217+
execute_concurrent_with_args(self.session, statement, parameters, concurrency=50)
218+
219+
self._perform_cql_statement(
220+
"""
221+
SELECT * FROM test3rf.test2 WHERE k = 1
222+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=ReadFailure)
223+
224+
self._perform_cql_statement(
225+
"""
226+
DROP TABLE test3rf.test2;
227+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
228+
229+
def test_user_function_failure(self):
230+
"""
231+
Test to validate that exceptions in user defined function are correctly surfaced by the driver to us.
232+
233+
test_user_function_failure First creates a table to use for testing. Then creates a function that will throw an
234+
exception when invoked. It then invokes the function and expects a FunctionException. Finally it preforms
235+
cleanup operations.
236+
237+
@since 2.6.0
238+
@jira_ticket PYTHON-238
239+
@expected_result Function failures when UDF throws exception
240+
241+
@test_category queries:basic
242+
"""
243+
244+
# create UDF that throws an exception
245+
self._perform_cql_statement(
246+
"""
247+
CREATE FUNCTION test3rf.test_failure(d double)
248+
RETURNS NULL ON NULL INPUT
249+
RETURNS double
250+
LANGUAGE java AS 'throw new RuntimeException("failure");';
251+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
252+
253+
# Create test table
254+
self._perform_cql_statement(
255+
"""
256+
CREATE TABLE test3rf.d (k int PRIMARY KEY , d double);
257+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
258+
259+
# Insert some values
260+
self._perform_cql_statement(
261+
"""
262+
INSERT INTO test3rf.d (k,d) VALUES (0, 5.12);
263+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
264+
265+
# Run the function expect a function failure exception
266+
self._perform_cql_statement(
267+
"""
268+
SELECT test_failure(d) FROM test3rf.d WHERE k = 0;
269+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=FunctionFailure)
270+
271+
self._perform_cql_statement(
272+
"""
273+
DROP FUNCTION test3rf.test_failure;
274+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)
275+
276+
self._perform_cql_statement(
277+
"""
278+
DROP TABLE test3rf.d;
279+
""", consistency_level=ConsistencyLevel.ALL, expected_exception=None)

tests/integration/standard/test_query.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
from tests.integration import use_singledc, PROTOCOL_VERSION
3030

31+
import re
32+
3133

3234
def setup_module():
3335
use_singledc()
@@ -136,15 +138,17 @@ def test_client_ip_in_trace(self):
136138
statement = SimpleStatement(query)
137139
response_future = session.execute_async(statement, trace=True)
138140
response_future.result(timeout=10.0)
139-
current_host = response_future._current_host.address
140141

141142
# Fetch the client_ip from the trace.
142143
trace = response_future.get_query_trace(max_wait=2.0)
143144
client_ip = trace.client
144145

146+
# Ip address should be in the local_host range
147+
pat = re.compile("127.0.0.\d{1,3}")
148+
145149
# Ensure that ip is set
146150
self.assertIsNotNone(client_ip, "Client IP was not set in trace with C* >= 2.2")
147-
self.assertEqual(client_ip, current_host, "Client IP from trace did not match the expected value")
151+
self.assertTrue(pat.match(client_ip), "Client IP from trace did not match the expected value")
148152

149153
cluster.shutdown()
150154

0 commit comments

Comments
 (0)