Skip to content

Commit 7f2a463

Browse files
author
bjmb
committed
Added simulacron to test harness, refactores some tests using it
1 parent da017f1 commit 7f2a463

11 files changed

Lines changed: 512 additions & 469 deletions

File tree

build.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,21 @@ build:
103103
python setup.py build_ext --inplace --no-cython
104104
fi
105105
106+
echo "Installing simulacron"
107+
pushd ~
108+
git clone git@github.com:datastax/simulacron.git
109+
cd simulacron
110+
git clone git@github.com:datastax/native-protocol.git
111+
cd native-protocol
112+
mvn clean install
113+
cd ..
114+
mvn clean install -DskipTests=true
115+
ls standalone/target
116+
SIMULACRON_JAR=`find \`pwd\` -name "simulacron-standalone-*.jar"`
117+
echo "SIMULACRON_JAR: $SIMULACRON_JAR"
118+
119+
popd
120+
106121
# Run the unit tests, this is not done in travis because
107122
# it takes too much time for the whole matrix to build with cython
108123
if [[ $CYTHON == 'CYTHON' ]]; then
@@ -112,6 +127,10 @@ build:
112127
113128
fi
114129
130+
echo "Running with event loop manager: $EVENT_LOOP_MANAGER"
131+
echo "==========RUNNING SIMULACRON TESTS=========="
132+
SIMULACRON_JAR=$SIMULACRON_JAR EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CASSANDRA_DIR=$CCM_INSTALL_DIR DSE_VERSION=$CCM_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=simulacron_results.xml tests/integration/simulacron/ || true
133+
115134
echo "Running with event loop manager: $EVENT_LOOP_MANAGER"
116135
echo "==========RUNNING CQLENGINE TESTS=========="
117136
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CASSANDRA_VERSION=$CCM_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=cqle_results.xml tests/integration/cqlengine/ || true

tests/integration/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def _tuple_version(version_string):
114114

115115
USE_CASS_EXTERNAL = bool(os.getenv('USE_CASS_EXTERNAL', False))
116116
KEEP_TEST_CLUSTER = bool(os.getenv('KEEP_TEST_CLUSTER', False))
117-
117+
SIMULACRON_JAR = os.getenv('SIMULACRON_JAR', None)
118118

119119
# If set to to true this will force the Cython tests to run regardless of whether they are installed
120120
cython_env = os.getenv('VERIFY_CYTHON', "False")
@@ -265,6 +265,7 @@ def get_unsupported_upper_protocol():
265265
dseonly = unittest.skipUnless(DSE_VERSION, "Test is only applicalbe to DSE clusters")
266266
pypy = unittest.skipUnless(platform.python_implementation() == "PyPy", "Test is skipped unless it's on PyPy")
267267
notpy3 = unittest.skipIf(sys.version_info >= (3, 0), "Test not applicable for Python 3.x runtime")
268+
ifsimulacron = unittest.skipIf(SIMULACRON_JAR is None, "Simulacron jar hasn't been specified")
268269

269270

270271
def wait_for_node_socket(node, timeout):

tests/integration/scassandra/__init__.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

tests/integration/scassandra/client.py

Lines changed: 0 additions & 157 deletions
This file was deleted.

tests/integration/scassandra/server.py

Lines changed: 0 additions & 27 deletions
This file was deleted.

tests/integration/simulacron/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,8 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License
14+
15+
from tests.integration.simulacron.utils import stop_simulacron
16+
17+
def teardown_package():
18+
stop_simulacron()
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Copyright 2013-2017 DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
try:
15+
import unittest2 as unittest
16+
except ImportError:
17+
import unittest # noqa
18+
19+
import time
20+
import logging
21+
22+
from cassandra import OperationTimedOut
23+
from cassandra.cluster import Cluster, _Scheduler, EXEC_PROFILE_DEFAULT, ExecutionProfile
24+
from cassandra.policies import RoundRobinPolicy, HostStateListener
25+
from concurrent.futures import ThreadPoolExecutor
26+
27+
from tests.integration import ifsimulacron, CASSANDRA_VERSION
28+
from tests.integration.simulacron.utils import start_and_prime_cluster_defaults, prime_query, stop_simulacron, \
29+
prime_request, PrimeOptions, NO_THEN
30+
31+
import time
32+
33+
class TrackDownListener(HostStateListener):
34+
hosts_marked_down = []
35+
36+
def on_down(self, host):
37+
self.hosts_marked_down.append(host)
38+
39+
class ThreadTracker(ThreadPoolExecutor):
40+
called_functions = []
41+
42+
def submit(self, fn, *args, **kwargs):
43+
self.called_functions.append(fn.__name__)
44+
return super(ThreadTracker, self).submit(fn, *args, **kwargs)
45+
46+
@ifsimulacron
47+
class ConnectionTest(unittest.TestCase):
48+
49+
def test_heart_beat_timeout(self):
50+
"""
51+
Test to ensure the hosts are marked as down after a OTO is received.
52+
Also to ensure this happens within the expected timeout
53+
@since 3.10
54+
@jira_ticket PYTHON-762
55+
@expected_result all the hosts have been marked as down at some point
56+
57+
@test_category metadata
58+
"""
59+
number_of_dcs = 3
60+
nodes_per_dc = 100
61+
62+
query_to_prime = "INSERT INTO test3rf.test (k, v) VALUES (0, 1);"
63+
64+
idle_heartbeat_timeout = 5
65+
idle_heartbeat_interval = 1
66+
67+
start_and_prime_cluster_defaults(number_of_dcs, nodes_per_dc, CASSANDRA_VERSION)
68+
69+
listener = TrackDownListener()
70+
executor = ThreadTracker(max_workers=16)
71+
72+
# We need to disable compression since it's not supported in simulacron
73+
cluster = Cluster(compression=False,
74+
idle_heartbeat_interval=idle_heartbeat_interval,
75+
idle_heartbeat_timeout=idle_heartbeat_timeout,
76+
executor_threads=16,
77+
execution_profiles={
78+
EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=RoundRobinPolicy())})
79+
80+
cluster.scheduler.shutdown()
81+
cluster.executor = executor
82+
cluster.scheduler = _Scheduler(executor)
83+
84+
session = cluster.connect(wait_for_all_pools=True)
85+
cluster.register_listener(listener)
86+
log = logging.getLogger()
87+
log.setLevel('CRITICAL')
88+
89+
self.addCleanup(cluster.shutdown)
90+
self.addCleanup(stop_simulacron)
91+
self.addCleanup(log.setLevel, "DEBUG")
92+
93+
prime_query(query_to_prime, then=NO_THEN)
94+
95+
futures = []
96+
for _ in range(number_of_dcs * nodes_per_dc):
97+
future = session.execute_async(query_to_prime)
98+
futures.append(future)
99+
100+
for f in futures:
101+
f._event.wait()
102+
self.assertIsInstance(f._final_exception, OperationTimedOut)
103+
104+
prime_request(PrimeOptions(then=NO_THEN))
105+
106+
# We allow from some extra time for all the hosts to be to on_down
107+
# The callbacks should start happening after idle_heartbeat_timeout + idle_heartbeat_interval
108+
time.sleep((idle_heartbeat_timeout + idle_heartbeat_interval)*2)
109+
110+
for host in cluster.metadata.all_hosts():
111+
self.assertIn(host, listener.hosts_marked_down)
112+
113+
# In this case HostConnection._replace shouldn't be called
114+
self.assertNotIn("_replace", executor.called_functions)

0 commit comments

Comments
 (0)