Skip to content

Commit 87dd9df

Browse files
authored
Merge pull request apache#656 from datastax/613
CLQEngine multiple sessions/keyspaces support
2 parents 0ba2dc9 + eb9cb9e commit 87dd9df

10 files changed

Lines changed: 906 additions & 193 deletions

File tree

cassandra/cqlengine/connection.py

Lines changed: 188 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from collections import namedtuple, defaultdict
15+
from collections import defaultdict
1616
import logging
1717
import six
1818
import threading
1919

2020
from cassandra.cluster import Cluster, _NOT_SET, NoHostAvailable, UserTypeDoesNotExist
21-
from cassandra.query import SimpleStatement, Statement, dict_factory
21+
from cassandra.query import SimpleStatement, dict_factory
2222

2323
from cassandra.cqlengine import CQLEngineException
2424
from cassandra.cqlengine.statements import BaseCQLStatement
@@ -28,67 +28,215 @@
2828

2929
NOT_SET = _NOT_SET # required for passing timeout to Session.execute
3030

31-
Host = namedtuple('Host', ['name', 'port'])
32-
3331
cluster = None
3432
session = None
35-
lazy_connect_args = None
36-
lazy_connect_lock = threading.RLock()
3733

34+
# connections registry
35+
DEFAULT_CONNECTION = object()
36+
_connections = {}
3837

3938
# Because type models may be registered before a connection is present,
4039
# and because sessions may be replaced, we must register UDTs here, in order
4140
# to have them registered when a new session is established.
4241
udt_by_keyspace = defaultdict(dict)
4342

4443

44+
def format_log_context(msg, connection=None, keyspace=None):
45+
"""Format log message to add keyspace and connection context"""
46+
connection_info = connection or 'DEFAULT_CONNECTION'
47+
48+
if keyspace:
49+
msg = '[Connection: {0}, Keyspace: {1}] {2}'.format(connection_info, keyspace, msg)
50+
else:
51+
msg = '[Connection: {0}] {1}'.format(connection_info, msg)
52+
return msg
53+
54+
4555
class UndefinedKeyspaceException(CQLEngineException):
4656
pass
4757

4858

59+
class Connection(object):
60+
"""CQLEngine Connection"""
61+
62+
name = None
63+
hosts = None
64+
65+
consistency = None
66+
retry_connect = False
67+
lazy_connect = False
68+
lazy_connect_lock = None
69+
cluster_options = None
70+
71+
cluster = None
72+
session = None
73+
74+
def __init__(self, name, hosts, consistency=None,
75+
lazy_connect=False, retry_connect=False, cluster_options=None):
76+
self.hosts = hosts
77+
self.name = name
78+
self.consistency = consistency
79+
self.lazy_connect = lazy_connect
80+
self.retry_connect = retry_connect
81+
self.cluster_options = cluster_options if cluster_options else {}
82+
self.lazy_connect_lock = threading.RLock()
83+
84+
def setup(self):
85+
"""Setup the connection"""
86+
global cluster, session
87+
88+
if 'username' in self.cluster_options or 'password' in self.cluster_options:
89+
raise CQLEngineException("Username & Password are now handled by using the native driver's auth_provider")
90+
91+
if self.lazy_connect:
92+
return
93+
94+
self.cluster = Cluster(self.hosts, **self.cluster_options)
95+
try:
96+
self.session = self.cluster.connect()
97+
log.debug(format_log_context("connection initialized with internally created session", connection=self.name))
98+
except NoHostAvailable:
99+
if self.retry_connect:
100+
log.warning(format_log_context("connect failed, setting up for re-attempt on first use", connection=self.name))
101+
self.lazy_connect = True
102+
raise
103+
104+
if self.consistency is not None:
105+
self.session.default_consistency_level = self.consistency
106+
107+
if DEFAULT_CONNECTION in _connections and _connections[DEFAULT_CONNECTION] == self:
108+
cluster = _connections[DEFAULT_CONNECTION].cluster
109+
session = _connections[DEFAULT_CONNECTION].session
110+
111+
self.setup_session()
112+
113+
def setup_session(self):
114+
self.session.row_factory = dict_factory
115+
enc = self.session.encoder
116+
enc.mapping[tuple] = enc.cql_encode_tuple
117+
_register_known_types(self.session.cluster)
118+
119+
def handle_lazy_connect(self):
120+
121+
# if lazy_connect is False, it means the cluster is setup and ready
122+
# No need to acquire the lock
123+
if not self.lazy_connect:
124+
return
125+
126+
with self.lazy_connect_lock:
127+
# lazy_connect might have been set to False by another thread while waiting the lock
128+
# In this case, do nothing.
129+
if self.lazy_connect:
130+
log.debug(format_log_context("Lazy connect for connection", connection=self.name))
131+
self.lazy_connect = False
132+
self.setup()
133+
134+
135+
def register_connection(name, hosts, consistency=None, lazy_connect=False,
136+
retry_connect=False, cluster_options=None, default=False):
137+
138+
if name in _connections:
139+
log.warning("Registering connection '{0}' when it already exists.".format(name))
140+
141+
conn = Connection(name, hosts, consistency=consistency,lazy_connect=lazy_connect,
142+
retry_connect=retry_connect, cluster_options=cluster_options)
143+
144+
_connections[name] = conn
145+
146+
if default:
147+
set_default_connection(name)
148+
149+
conn.setup()
150+
return conn
151+
152+
153+
def unregister_connection(name):
154+
global cluster, session
155+
156+
if name not in _connections:
157+
return
158+
159+
if DEFAULT_CONNECTION in _connections and _connections[name] == _connections[DEFAULT_CONNECTION]:
160+
del _connections[DEFAULT_CONNECTION]
161+
cluster = None
162+
session = None
163+
log.warning("Unregistering default connection '{0}'. Use set_default_connection to set a new one.".format(name))
164+
165+
log.debug("Connection '{0}' has been removed from the registry.".format(name))
166+
del _connections[name]
167+
168+
169+
def set_default_connection(name):
170+
global cluster, session
171+
172+
if name not in _connections:
173+
raise CQLEngineException("Connection '{0}' doesn't exist.".format(name))
174+
175+
log.debug("Connection '{0}' has been set as default.".format(name))
176+
_connections[DEFAULT_CONNECTION] = _connections[name]
177+
cluster = _connections[name].cluster
178+
session = _connections[name].session
179+
180+
181+
def get_connection(name=None):
182+
183+
if not name:
184+
name = DEFAULT_CONNECTION
185+
186+
if name not in _connections:
187+
raise CQLEngineException("Connection name '{0}' doesn't exist in the registry.".format(name))
188+
189+
conn = _connections[name]
190+
conn.handle_lazy_connect()
191+
192+
return conn
193+
194+
49195
def default():
50196
"""
51-
Configures the global mapper connection to localhost, using the driver defaults
197+
Configures the default connection to localhost, using the driver defaults
52198
(except for row_factory)
53199
"""
54-
global cluster, session
55200

56-
if session:
57-
log.warning("configuring new connection for cqlengine when one was already set")
58-
59-
cluster = Cluster()
60-
session = cluster.connect()
201+
try:
202+
conn = get_connection()
203+
if conn.session:
204+
log.warning("configuring new default connection for cqlengine when one was already set")
205+
except:
206+
pass
61207

62-
_setup_session(session)
208+
conn = register_connection('default', hosts=None, default=True)
209+
conn.setup()
63210

64211
log.debug("cqlengine connection initialized with default session to localhost")
65212

66213

67214
def set_session(s):
68215
"""
69-
Configures the global mapper connection with a preexisting :class:`cassandra.cluster.Session`
216+
Configures the default connection with a preexisting :class:`cassandra.cluster.Session`
70217
71218
Note: the mapper presently requires a Session :attr:`~.row_factory` set to ``dict_factory``.
72219
This may be relaxed in the future
73220
"""
74-
global cluster, session
75221

76-
if session:
77-
log.warning("configuring new connection for cqlengine when one was already set")
222+
conn = get_connection()
223+
224+
if conn.session:
225+
log.warning("configuring new default connection for cqlengine when one was already set")
78226

79227
if s.row_factory is not dict_factory:
80228
raise CQLEngineException("Failed to initialize: 'Session.row_factory' must be 'dict_factory'.")
81-
session = s
82-
cluster = s.cluster
229+
conn.session = s
230+
conn.cluster = s.cluster
83231

84232
# Set default keyspace from given session's keyspace
85-
if session.keyspace:
233+
if conn.session.keyspace:
86234
from cassandra.cqlengine import models
87-
models.DEFAULT_KEYSPACE = session.keyspace
235+
models.DEFAULT_KEYSPACE = conn.session.keyspace
88236

89-
_setup_session(session)
237+
conn.setup_session()
90238

91-
log.debug("cqlengine connection initialized with %s", s)
239+
log.debug("cqlengine default connection initialized with %s", s)
92240

93241

94242
def setup(
@@ -108,53 +256,19 @@ def setup(
108256
:param bool retry_connect: True if we should retry to connect even if there was a connection failure initially
109257
:param \*\*kwargs: Pass-through keyword arguments for :class:`cassandra.cluster.Cluster`
110258
"""
111-
global cluster, session, lazy_connect_args
112-
113-
if 'username' in kwargs or 'password' in kwargs:
114-
raise CQLEngineException("Username & Password are now handled by using the native driver's auth_provider")
115259

116260
from cassandra.cqlengine import models
117261
models.DEFAULT_KEYSPACE = default_keyspace
118262

119-
if lazy_connect:
120-
kwargs['default_keyspace'] = default_keyspace
121-
kwargs['consistency'] = consistency
122-
kwargs['lazy_connect'] = False
123-
kwargs['retry_connect'] = retry_connect
124-
lazy_connect_args = (hosts, kwargs)
125-
return
126-
127-
cluster = Cluster(hosts, **kwargs)
128-
try:
129-
session = cluster.connect()
130-
log.debug("cqlengine connection initialized with internally created session")
131-
except NoHostAvailable:
132-
if retry_connect:
133-
log.warning("connect failed, setting up for re-attempt on first use")
134-
kwargs['default_keyspace'] = default_keyspace
135-
kwargs['consistency'] = consistency
136-
kwargs['lazy_connect'] = False
137-
kwargs['retry_connect'] = retry_connect
138-
lazy_connect_args = (hosts, kwargs)
139-
raise
140-
if consistency is not None:
141-
session.default_consistency_level = consistency
142-
143-
_setup_session(session)
144-
263+
register_connection('default', hosts=hosts, consistency=consistency, lazy_connect=lazy_connect,
264+
retry_connect=retry_connect, cluster_options=kwargs, default=True)
145265

146-
def _setup_session(session):
147-
session.row_factory = dict_factory
148-
enc = session.encoder
149-
enc.mapping[tuple] = enc.cql_encode_tuple
150-
_register_known_types(session.cluster)
151266

267+
def execute(query, params=None, consistency_level=None, timeout=NOT_SET, connection=None):
152268

153-
def execute(query, params=None, consistency_level=None, timeout=NOT_SET):
269+
conn = get_connection(connection)
154270

155-
handle_lazy_connect()
156-
157-
if not session:
271+
if not conn.session:
158272
raise CQLEngineException("It is required to setup() cqlengine before executing queries")
159273

160274
if isinstance(query, SimpleStatement):
@@ -165,47 +279,29 @@ def execute(query, params=None, consistency_level=None, timeout=NOT_SET):
165279
elif isinstance(query, six.string_types):
166280
query = SimpleStatement(query, consistency_level=consistency_level)
167281

168-
log.debug(query.query_string)
282+
log.debug(format_log_context(query.query_string, connection=connection))
169283

170-
result = session.execute(query, params, timeout=timeout)
284+
result = conn.session.execute(query, params, timeout=timeout)
171285

172286
return result
173287

174288

175-
def get_session():
176-
handle_lazy_connect()
177-
return session
289+
def get_session(connection=None):
290+
conn = get_connection(connection)
291+
return conn.session
178292

179293

180-
def get_cluster():
181-
handle_lazy_connect()
182-
if not cluster:
294+
def get_cluster(connection=None):
295+
conn = get_connection(connection)
296+
if not conn.cluster:
183297
raise CQLEngineException("%s.cluster is not configured. Call one of the setup or default functions first." % __name__)
184-
return cluster
185-
186-
187-
def handle_lazy_connect():
188-
global lazy_connect_args
189-
190-
# if lazy_connect_args is None, it means the cluster is setup and ready
191-
# No need to acquire the lock
192-
if not lazy_connect_args:
193-
return
194-
195-
with lazy_connect_lock:
196-
# lazy_connect_args might have been set to None by another thread while waiting the lock
197-
# In this case, do nothing.
198-
if lazy_connect_args:
199-
log.debug("lazy connect")
200-
hosts, kwargs = lazy_connect_args
201-
setup(hosts, **kwargs)
202-
lazy_connect_args = None
298+
return conn.cluster
203299

204300

205-
def register_udt(keyspace, type_name, klass):
301+
def register_udt(keyspace, type_name, klass, connection=None):
206302
udt_by_keyspace[keyspace][type_name] = klass
207303

208-
global cluster
304+
cluster = get_cluster(connection)
209305
if cluster:
210306
try:
211307
cluster.register_user_type(keyspace, type_name, klass)

0 commit comments

Comments
 (0)