3131from threading import Event
3232from subprocess import call
3333from itertools import groupby
34+ import six
3435
3536from cassandra import OperationTimedOut , ReadTimeout , ReadFailure , WriteTimeout , WriteFailure , \
3637 AlreadyExists , InvalidRequest
38+
3739from cassandra .protocol import ConfigurationException
3840
3941try :
4042 from ccmlib .cluster import Cluster as CCMCluster
41- from ccmlib .dse_cluster import DseCluster
4243 from ccmlib .cluster_factory import ClusterFactory as CCMClusterFactory
4344 from ccmlib import common
4445except ImportError as e :
@@ -104,30 +105,9 @@ def _tuple_version(version_string):
104105
105106default_cassandra_version = '2.2.0'
106107
107-
108- def _get_cass_version_from_dse (dse_version ):
109- if dse_version .startswith ('4.6' ) or dse_version .startswith ('4.5' ):
110- raise Exception ("Cassandra Version 2.0 not supported anymore" )
111- elif dse_version .startswith ('4.7' ) or dse_version .startswith ('4.8' ):
112- cass_ver = "2.1"
113- elif dse_version .startswith ('5.0' ):
114- cass_ver = "3.0"
115- elif dse_version .startswith ("5.1" ):
116- cass_ver = "3.10"
117- else :
118- log .error ("Unknown dse version found {0}, defaulting to 2.1" .format (dse_version ))
119- cass_ver = "2.1"
120-
121- return cass_ver
122-
123108CASSANDRA_IP = os .getenv ('CASSANDRA_IP' , '127.0.0.1' )
124109CASSANDRA_DIR = os .getenv ('CASSANDRA_DIR' , None )
125- DSE_VERSION = os .getenv ('DSE_VERSION' , None )
126- DSE_CRED = os .getenv ('DSE_CREDS' , None )
127- if DSE_VERSION :
128- CASSANDRA_VERSION = _get_cass_version_from_dse (DSE_VERSION )
129- else :
130- CASSANDRA_VERSION = os .getenv ('CASSANDRA_VERSION' , default_cassandra_version )
110+ CASSANDRA_VERSION = os .getenv ('CASSANDRA_VERSION' , default_cassandra_version )
131111
132112CCM_KWARGS = {}
133113if CASSANDRA_DIR :
@@ -138,15 +118,6 @@ def _get_cass_version_from_dse(dse_version):
138118 log .info ('Using Cassandra version: %s' , CASSANDRA_VERSION )
139119 CCM_KWARGS ['version' ] = CASSANDRA_VERSION
140120
141- if DSE_VERSION :
142- log .info ('Using DSE version: %s' , DSE_VERSION )
143- if not CASSANDRA_DIR :
144- CCM_KWARGS ['version' ] = DSE_VERSION
145- if DSE_CRED :
146- log .info ("Using DSE credentials file located at {0}" .format (DSE_CRED ))
147- CCM_KWARGS ['dse_credentials_file' ] = DSE_CRED
148-
149-
150121#This changes the default contact_point parameter in Cluster
151122def set_default_cass_ip ():
152123 if CASSANDRA_IP .startswith ("127.0.0." ):
@@ -208,7 +179,6 @@ def get_supported_protocol_versions():
208179 else :
209180 raise Exception ("Cassandra Version not supported anymore" )
210181
211-
212182def get_unsupported_lower_protocol ():
213183 """
214184 This is used to determine the lowest protocol version that is NOT
@@ -255,7 +225,7 @@ def get_unsupported_upper_protocol():
255225notpy3 = unittest .skipIf (sys .version_info >= (3 , 0 ), "Test not applicable for Python 3.x runtime" )
256226requiresmallclockgranularity = unittest .skipIf ("Windows" in platform .system () or "async" in EVENT_LOOP_MANAGER ,
257227 "This test is not suitible for environments with large clock granularity" )
258- requiressimulacron = unittest .skipIf (SIMULACRON_JAR is None , "Simulacron jar hasn't been specified" )
228+ requiressimulacron = unittest .skipIf (SIMULACRON_JAR is None or CASSANDRA_VERSION < "2.1" , "Simulacron jar hasn't been specified or C* version is 2.0 " )
259229
260230
261231def wait_for_node_socket (node , timeout ):
@@ -333,16 +303,22 @@ def is_current_cluster(cluster_name, node_counts):
333303 return False
334304
335305
336- def use_cluster (cluster_name , nodes , ipformat = None , start = True , workloads = []):
306+ def use_cluster (cluster_name , nodes , ipformat = None , start = True , workloads = [], set_keyspace = True , ccm_options = None ,
307+ configuration_options = {}):
337308 set_default_cass_ip ()
338309
310+ if ccm_options is None :
311+ ccm_options = CCM_KWARGS
312+ cassandra_version = ccm_options .get ('version' , CASSANDRA_VERSION )
313+
339314 global CCM_CLUSTER
340315 if USE_CASS_EXTERNAL :
341316 if CCM_CLUSTER :
342317 log .debug ("Using external CCM cluster {0}" .format (CCM_CLUSTER .name ))
343318 else :
344319 log .debug ("Using unnamed external cluster" )
345- setup_keyspace (ipformat = ipformat , wait = False )
320+ if set_keyspace and start :
321+ setup_keyspace (ipformat = ipformat , wait = False )
346322 return
347323
348324 if is_current_cluster (cluster_name , nodes ):
@@ -356,27 +332,22 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]):
356332 CCM_CLUSTER = CCMClusterFactory .load (path , cluster_name )
357333 log .debug ("Found existing CCM cluster, {0}; clearing." .format (cluster_name ))
358334 CCM_CLUSTER .clear ()
359- CCM_CLUSTER .set_install_dir (** CCM_KWARGS )
335+ CCM_CLUSTER .set_install_dir (** ccm_options )
336+ CCM_CLUSTER .set_configuration_options (configuration_options )
360337 except Exception :
361338 ex_type , ex , tb = sys .exc_info ()
362339 log .warn ("{0}: {1} Backtrace: {2}" .format (ex_type .__name__ , ex , traceback .extract_tb (tb )))
363340 del tb
364341
365- log .debug ("Creating new CCM cluster, {0}, with args {1}" .format (cluster_name , CCM_KWARGS ))
366- if DSE_VERSION :
367- log .error ("creating dse cluster" )
368- CCM_CLUSTER = DseCluster (path , cluster_name , ** CCM_KWARGS )
369- else :
370- CCM_CLUSTER = CCMCluster (path , cluster_name , ** CCM_KWARGS )
342+ log .debug ("Creating new CCM cluster, {0}, with args {1}" .format (cluster_name , ccm_options ))
343+ CCM_CLUSTER = CCMCluster (path , cluster_name , ** ccm_options )
371344 CCM_CLUSTER .set_configuration_options ({'start_native_transport' : True })
372- if CASSANDRA_VERSION >= '2.2' :
345+ if cassandra_version >= '2.2' :
373346 CCM_CLUSTER .set_configuration_options ({'enable_user_defined_functions' : True })
374- if CASSANDRA_VERSION >= '3.0' :
347+ if cassandra_version >= '3.0' :
375348 CCM_CLUSTER .set_configuration_options ({'enable_scripted_user_defined_functions' : True })
376- if 'spark' in workloads :
377- config_options = {"initial_spark_worker_resources" : 0.1 }
378- CCM_CLUSTER .set_dse_configuration_options (config_options )
379349 common .switch_cluster (path , cluster_name )
350+ CCM_CLUSTER .set_configuration_options (configuration_options )
380351 CCM_CLUSTER .populate (nodes , ipformat = ipformat )
381352 try :
382353 jvm_args = []
@@ -394,18 +365,20 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]):
394365 # Added to wait for slow nodes to start up
395366 for node in CCM_CLUSTER .nodes .values ():
396367 wait_for_node_socket (node , 120 )
397- setup_keyspace (ipformat = ipformat )
368+ if set_keyspace :
369+ setup_keyspace (ipformat = ipformat )
398370 except Exception :
399371 log .exception ("Failed to start CCM cluster; removing cluster." )
400372
401373 if os .name == "nt" :
402374 if CCM_CLUSTER :
403- for node in CCM_CLUSTER . nodes . itervalues ():
375+ for node in six . itervalues (CCM_CLUSTER . nodes ):
404376 os .system ("taskkill /F /PID " + str (node .pid ))
405377 else :
406378 call (["pkill" , "-9" , "-f" , ".ccm" ])
407379 remove_cluster ()
408380 raise
381+ return CCM_CLUSTER
409382
410383
411384def teardown_package ():
0 commit comments