Skip to content

Commit 67e4734

Browse files
committed
Added upgrade tests
1 parent 22571af commit 67e4734

4 files changed

Lines changed: 499 additions & 13 deletions

File tree

build.yaml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,18 @@ schedules:
5858
env_vars: |
5959
EVENT_LOOP_MANAGER='twisted'
6060
61+
upgrade_tests:
62+
schedule: adhoc
63+
branches:
64+
include: [master, python-546]
65+
env_vars: |
66+
EVENT_LOOP_MANAGER='libev'
67+
JUST_UPGRADE=True
68+
matrix:
69+
exclude:
70+
- python: [3.4, 3.6]
71+
- cassandra: ['2.0', '2.1', '2.2', '3.0']
72+
6173
python:
6274
- 2.7
6375
- 3.4
@@ -91,6 +103,7 @@ build:
91103
92104
pip install -r test-requirements.txt
93105
pip install nose-ignore-docstring
106+
pip install nose-exclude
94107
FORCE_CYTHON=False
95108
if [[ $CYTHON == 'CYTHON' ]]; then
96109
FORCE_CYTHON=True
@@ -118,6 +131,14 @@ build:
118131
119132
popd
120133
134+
135+
echo "JUST_UPGRADE: $JUST_UPGRADE"
136+
if [[ $JUST_UPGRADE == 'True' ]]; then
137+
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=upgrade_results.xml tests/integration/long/upgrade || true
138+
exit 0
139+
fi
140+
141+
121142
# Run the unit tests, this is not done in travis because
122143
# it takes too much time for the whole matrix to build with cython
123144
if [[ $CYTHON == 'CYTHON' ]]; then
@@ -139,6 +160,6 @@ build:
139160
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CASSANDRA_VERSION=$CCM_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=standard_results.xml tests/integration/standard/ || true
140161
141162
echo "==========RUNNING LONG INTEGRATION TESTS=========="
142-
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CASSANDRA_VERSION=$CCM_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=long_results.xml tests/integration/long/ || true
163+
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CASSANDRA_VERSION=$CCM_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --exclude-dir=tests/integration/long/upgrade --with-ignore-docstrings --with-xunit --xunit-file=long_results.xml tests/integration/long/ || true
143164
- xunit:
144165
- "*_results.xml"

tests/integration/__init__.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131
from threading import Event
3232
from subprocess import call
3333
from itertools import groupby
34+
import six
3435

3536
from cassandra import OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure, AlreadyExists, \
3637
InvalidRequest
38+
from cassandra.cluster import NoHostAvailable
3739

3840
from cassandra.protocol import ConfigurationException
3941

@@ -339,9 +341,14 @@ def is_current_cluster(cluster_name, node_counts):
339341
return False
340342

341343

342-
def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]):
344+
def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[], ccm_options=None,
345+
configuration_options={}):
343346
set_default_cass_ip()
344347

348+
if ccm_options is None:
349+
ccm_options = CCM_KWARGS
350+
cassandra_version = ccm_options["version"] if "version" in ccm_options else CASSANDRA_VERSION
351+
345352
global CCM_CLUSTER
346353
if USE_CASS_EXTERNAL:
347354
if CCM_CLUSTER:
@@ -362,27 +369,25 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]):
362369
CCM_CLUSTER = CCMClusterFactory.load(path, cluster_name)
363370
log.debug("Found existing CCM cluster, {0}; clearing.".format(cluster_name))
364371
CCM_CLUSTER.clear()
365-
CCM_CLUSTER.set_install_dir(**CCM_KWARGS)
372+
CCM_CLUSTER.set_install_dir(**ccm_options)
366373
except Exception:
367374
ex_type, ex, tb = sys.exc_info()
368375
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
369376
del tb
370377

371-
log.debug("Creating new CCM cluster, {0}, with args {1}".format(cluster_name, CCM_KWARGS))
378+
log.debug("Creating new CCM cluster, {0}, with args {1}".format(cluster_name, ccm_options))
372379
if DSE_VERSION:
373380
log.error("creating dse cluster")
374-
CCM_CLUSTER = DseCluster(path, cluster_name, **CCM_KWARGS)
381+
CCM_CLUSTER = DseCluster(path, cluster_name, **ccm_options)
375382
else:
376-
CCM_CLUSTER = CCMCluster(path, cluster_name, **CCM_KWARGS)
383+
CCM_CLUSTER = CCMCluster(path, cluster_name, **ccm_options)
377384
CCM_CLUSTER.set_configuration_options({'start_native_transport': True})
378-
if CASSANDRA_VERSION >= '2.2':
385+
if cassandra_version >= '2.2':
379386
CCM_CLUSTER.set_configuration_options({'enable_user_defined_functions': True})
380-
if CASSANDRA_VERSION >= '3.0':
387+
if cassandra_version >= '3.0':
381388
CCM_CLUSTER.set_configuration_options({'enable_scripted_user_defined_functions': True})
382-
if 'spark' in workloads:
383-
config_options = {"initial_spark_worker_resources": 0.1}
384-
CCM_CLUSTER.set_dse_configuration_options(config_options)
385389
common.switch_cluster(path, cluster_name)
390+
CCM_CLUSTER.set_configuration_options(configuration_options)
386391
CCM_CLUSTER.populate(nodes, ipformat=ipformat)
387392
try:
388393
jvm_args = []
@@ -400,18 +405,24 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]):
400405
# Added to wait for slow nodes to start up
401406
for node in CCM_CLUSTER.nodes.values():
402407
wait_for_node_socket(node, 120)
403-
setup_keyspace(ipformat=ipformat)
408+
try:
409+
setup_keyspace(ipformat=ipformat)
410+
# This could happen if authentication was set on the server
411+
# through configuration_options
412+
except NoHostAvailable:
413+
pass
404414
except Exception:
405415
log.exception("Failed to start CCM cluster; removing cluster.")
406416

407417
if os.name == "nt":
408418
if CCM_CLUSTER:
409-
for node in CCM_CLUSTER.nodes.itervalues():
419+
for node in six.itervalues(CCM_CLUSTER.nodes):
410420
os.system("taskkill /F /PID " + str(node.pid))
411421
else:
412422
call(["pkill", "-9", "-f", ".ccm"])
413423
remove_cluster()
414424
raise
425+
return CCM_CLUSTER
415426

416427

417428
def teardown_package():
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
# Copyright 2013-2017 DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from tests.integration import CCM_KWARGS, use_cluster, remove_cluster, MockLoggingHandler
17+
from tests.integration import setup_keyspace
18+
19+
from cassandra.cluster import Cluster
20+
from cassandra import cluster
21+
22+
from collections import namedtuple
23+
from functools import wraps
24+
from threading import Thread, Event
25+
from ccmlib.node import TimeoutError
26+
import time
27+
import logging
28+
29+
try:
30+
import unittest2 as unittest
31+
except ImportError:
32+
import unittest # noqa
33+
34+
35+
def setup_module():
36+
remove_cluster()
37+
38+
39+
UPGRADE_CLUSTER = "upgrade_cluster"
40+
UpgradePath = namedtuple('UpgradePath', ('name', 'starting_version', 'upgrade_version', 'configuration_options'))
41+
42+
43+
class upgrade_paths(object):
44+
"""
45+
Decorator used to specify the upgrade paths for a particular method
46+
"""
47+
def __init__(self, paths):
48+
self.paths = paths
49+
50+
def __call__(self, method):
51+
@wraps(method)
52+
def wrapper(*args, **kwargs):
53+
for path in self.paths:
54+
self_from_decorated = args[0]
55+
self_from_decorated.UPGRADE_PATH = path
56+
self_from_decorated._setUp()
57+
method(*args, **kwargs)
58+
self_from_decorated._tearDown()
59+
return wrapper
60+
61+
62+
class UpgradeBase(unittest.TestCase):
63+
"""
64+
Base class for the upgrade tests. The _setup method
65+
will clean the environment and start the appropriate C* version according
66+
to the upgrade path. The upgrade can be done in a different thread using the
67+
start_upgrade upgrade_method (this would be the most realistic scenario)
68+
or node by node, waiting for the upgrade to happen, using _upgrade_one_node method
69+
"""
70+
UPGRADE_PATH = None
71+
start_cluster = True
72+
73+
@classmethod
74+
def setUpClass(cls):
75+
cls.logger_handler = MockLoggingHandler()
76+
logger = logging.getLogger(cluster.__name__)
77+
logger.addHandler(cls.logger_handler)
78+
79+
def _setUp(self):
80+
"""
81+
This is not the regular _setUp method because it will be called from
82+
the decorator instead of letting nose handle it.
83+
This setup method will start a cluster with the right version according
84+
to the variable UPGRADE_PATH.
85+
"""
86+
remove_cluster()
87+
self.cluster = use_cluster(UPGRADE_CLUSTER + self.UPGRADE_PATH.name, [3],
88+
ccm_options=self.UPGRADE_PATH.starting_version,
89+
configuration_options=self.UPGRADE_PATH.configuration_options)
90+
self.nodes = self.cluster.nodelist()
91+
self.last_node_upgraded = None
92+
self.upgrade_done = Event()
93+
self.upgrade_thread = None
94+
95+
if self.start_cluster:
96+
setup_keyspace()
97+
98+
self.cluster_driver = Cluster()
99+
self.session = self.cluster_driver.connect()
100+
self.logger_handler.reset()
101+
102+
def _tearDown(self):
103+
"""
104+
special tearDown method called by the decorator after the method has ended
105+
"""
106+
if self.upgrade_thread:
107+
self.upgrade_thread.join(timeout=5)
108+
self.upgrade_thread = None
109+
110+
if self.start_cluster:
111+
self.cluster_driver.shutdown()
112+
113+
def start_upgrade(self, time_node_upgrade):
114+
"""
115+
Starts the upgrade in a different thread
116+
"""
117+
self.upgrade_thread = Thread(target=self._upgrade, args=(time_node_upgrade,))
118+
self.upgrade_thread.start()
119+
120+
def _upgrade(self, time_node_upgrade):
121+
"""
122+
Starts the upgrade in the same thread
123+
"""
124+
start_time = time.time()
125+
while self._upgrade_one_node():
126+
end_time = time.time()
127+
time_to_upgrade = end_time - start_time
128+
if time_node_upgrade > time_to_upgrade:
129+
time.sleep(time_node_upgrade - time_to_upgrade)
130+
self.upgrade_done.set()
131+
132+
def is_upgraded(self):
133+
"""
134+
Returns True if the upgrade has finished and False otherwise
135+
"""
136+
return self.upgrade_done.is_set()
137+
138+
def wait_for_upgrade(self, timeout=None):
139+
"""
140+
Waits until the upgrade has completed
141+
"""
142+
self.upgrade_done.wait(timeout=timeout)
143+
144+
def _upgrade_one_node(self):
145+
"""
146+
Upgrades only one node. Return True if the upgrade
147+
has finished and False otherwise
148+
"""
149+
if self.last_node_upgraded is None:
150+
node_to_upgrade = self.nodes[0]
151+
self.last_node_upgraded = 0
152+
else:
153+
if len(self.nodes) - 1 == self.last_node_upgraded:
154+
return False
155+
self.last_node_upgraded += 1
156+
node_to_upgrade = self.nodes[self.last_node_upgraded]
157+
158+
node_to_upgrade.drain()
159+
node_to_upgrade.stop(gently=True)
160+
161+
node_to_upgrade.set_install_dir(**self.UPGRADE_PATH.upgrade_version)
162+
163+
# There must be a cleaner way of doing this, but it's necessary here
164+
# to call the private method from cluster __update_topology_files
165+
self.cluster._Cluster__update_topology_files()
166+
try:
167+
node_to_upgrade.start(wait_for_binary_proto=True, wait_other_notice=True)
168+
except TimeoutError:
169+
self.fail("Error starting C* node while upgrading")
170+
171+
return True
172+
173+
174+
class UpgradeBaseAuth(UpgradeBase):
175+
"""
176+
Base class of authentication test, the authentication parameters for
177+
C* still have to be specified within the upgrade path variable
178+
"""
179+
start_cluster = False

0 commit comments

Comments
 (0)