forked from apache/cassandra-python-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
144 lines (107 loc) · 3.74 KB
/
__init__.py
File metadata and controls
144 lines (107 loc) · 3.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
try:
import unittest2 as unittest
except ImportError:
import unittest # noqa
import logging
log = logging.getLogger(__name__)
import os
from threading import Event
from cassandra.cluster import Cluster
try:
from ccmlib.cluster import Cluster as CCMCluster
from ccmlib import common
except ImportError as e:
raise unittest.SkipTest('ccm is a dependency for integration tests:', e)
CLUSTER_NAME = 'test_cluster'
CCM_CLUSTER = None
path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'ccm')
if not os.path.exists(path):
os.mkdir(path)
def get_server_versions():
"""
Probe system.local table to determine Cassandra and CQL version.
Returns a tuple of (cassandra_version, cql_version).
"""
c = Cluster()
s = c.connect()
s.set_keyspace('system')
row = s.execute('SELECT cql_version, release_version FROM local')[0]
cass_version = _tuple_version(row.release_version)
cql_version = _tuple_version(row.cql_version)
c.shutdown()
return (cass_version, cql_version)
def _tuple_version(version_string):
if '-' in version_string:
version_string = version_string[:version_string.index('-')]
return tuple([int(p) for p in version_string.split('.')])
def get_cluster():
return CCM_CLUSTER
def get_node(node_id):
return CCM_CLUSTER.nodes['node%s' % node_id]
def setup_package():
try:
try:
cluster = CCMCluster.load(path, CLUSTER_NAME)
log.debug("Found existing ccm test cluster, clearing")
cluster.clear()
except Exception:
log.debug("Creating new ccm test cluster")
cluster = CCMCluster(path, CLUSTER_NAME, cassandra_version='1.2.9')
cluster.set_configuration_options({'start_native_transport': True})
common.switch_cluster(path, CLUSTER_NAME)
cluster.populate(3)
log.debug("Starting ccm test cluster")
cluster.start(wait_for_binary_proto=True)
except Exception:
log.exception("Failed to start ccm cluster:")
raise
global CCM_CLUSTER
CCM_CLUSTER = cluster
setup_test_keyspace()
def setup_test_keyspace():
cluster = Cluster()
session = cluster.connect()
try:
results = session.execute("SELECT keyspace_name FROM system.schema_keyspaces")
existing_keyspaces = [row[0] for row in results]
for ksname in ('test1rf', 'test2rf', 'test3rf'):
if ksname in existing_keyspaces:
session.execute("DROP KEYSPACE %s" % ksname)
ddl = '''
CREATE KEYSPACE test3rf
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}'''
session.execute(ddl)
ddl = '''
CREATE KEYSPACE test2rf
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'}'''
session.execute(ddl)
ddl = '''
CREATE KEYSPACE test1rf
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}'''
session.execute(ddl)
ddl = '''
CREATE TABLE test3rf.test (
k int PRIMARY KEY,
v int )'''
session.execute(ddl)
finally:
cluster.shutdown()
def teardown_package():
if CCM_CLUSTER:
try:
CCM_CLUSTER.clear()
except Exception:
log.exception("Failed to clear cluster")
class UpDownWaiter(object):
def __init__(self, host):
self.down_event = Event()
self.up_event = Event()
host.monitor.register(self)
def on_up(self, host):
self.up_event.set()
def on_down(self, host):
self.down_event.set()
def wait_for_down(self):
self.down_event.wait()
def wait_for_up(self):
self.up_event.wait()