1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15- from collections import namedtuple , defaultdict
15+ from collections import defaultdict
1616import logging
1717import six
1818import threading
1919
2020from cassandra .cluster import Cluster , _NOT_SET , NoHostAvailable , UserTypeDoesNotExist
21- from cassandra .query import SimpleStatement , Statement , dict_factory
21+ from cassandra .query import SimpleStatement , dict_factory
2222
2323from cassandra .cqlengine import CQLEngineException
2424from cassandra .cqlengine .statements import BaseCQLStatement
2828
2929NOT_SET = _NOT_SET # required for passing timeout to Session.execute
3030
31- Host = namedtuple ('Host' , ['name' , 'port' ])
32-
3331cluster = None
3432session = None
35- lazy_connect_args = None
36- lazy_connect_lock = threading .RLock ()
3733
34+ # connections registry
35+ DEFAULT_CONNECTION = object ()
36+ _connections = {}
3837
3938# Because type models may be registered before a connection is present,
4039# and because sessions may be replaced, we must register UDTs here, in order
4140# to have them registered when a new session is established.
4241udt_by_keyspace = defaultdict (dict )
4342
4443
44+ def format_log_context (msg , connection = None , keyspace = None ):
45+ """Format log message to add keyspace and connection context"""
46+ connection_info = connection or 'DEFAULT_CONNECTION'
47+
48+ if keyspace :
49+ msg = '[Connection: {0}, Keyspace: {1}] {2}' .format (connection_info , keyspace , msg )
50+ else :
51+ msg = '[Connection: {0}] {1}' .format (connection_info , msg )
52+ return msg
53+
54+
4555class UndefinedKeyspaceException (CQLEngineException ):
4656 pass
4757
4858
59+ class Connection (object ):
60+ """CQLEngine Connection"""
61+
62+ name = None
63+ hosts = None
64+
65+ consistency = None
66+ retry_connect = False
67+ lazy_connect = False
68+ lazy_connect_lock = None
69+ cluster_options = None
70+
71+ cluster = None
72+ session = None
73+
74+ def __init__ (self , name , hosts , consistency = None ,
75+ lazy_connect = False , retry_connect = False , cluster_options = None ):
76+ self .hosts = hosts
77+ self .name = name
78+ self .consistency = consistency
79+ self .lazy_connect = lazy_connect
80+ self .retry_connect = retry_connect
81+ self .cluster_options = cluster_options if cluster_options else {}
82+ self .lazy_connect_lock = threading .RLock ()
83+
84+ def setup (self ):
85+ """Setup the connection"""
86+ global cluster , session
87+
88+ if 'username' in self .cluster_options or 'password' in self .cluster_options :
89+ raise CQLEngineException ("Username & Password are now handled by using the native driver's auth_provider" )
90+
91+ if self .lazy_connect :
92+ return
93+
94+ self .cluster = Cluster (self .hosts , ** self .cluster_options )
95+ try :
96+ self .session = self .cluster .connect ()
97+ log .debug (format_log_context ("connection initialized with internally created session" , connection = self .name ))
98+ except NoHostAvailable :
99+ if self .retry_connect :
100+ log .warning (format_log_context ("connect failed, setting up for re-attempt on first use" , connection = self .name ))
101+ self .lazy_connect = True
102+ raise
103+
104+ if self .consistency is not None :
105+ self .session .default_consistency_level = self .consistency
106+
107+ if DEFAULT_CONNECTION in _connections and _connections [DEFAULT_CONNECTION ] == self :
108+ cluster = _connections [DEFAULT_CONNECTION ].cluster
109+ session = _connections [DEFAULT_CONNECTION ].session
110+
111+ self .setup_session ()
112+
113+ def setup_session (self ):
114+ self .session .row_factory = dict_factory
115+ enc = self .session .encoder
116+ enc .mapping [tuple ] = enc .cql_encode_tuple
117+ _register_known_types (self .session .cluster )
118+
119+ def handle_lazy_connect (self ):
120+
121+ # if lazy_connect is False, it means the cluster is setup and ready
122+ # No need to acquire the lock
123+ if not self .lazy_connect :
124+ return
125+
126+ with self .lazy_connect_lock :
127+ # lazy_connect might have been set to False by another thread while waiting the lock
128+ # In this case, do nothing.
129+ if self .lazy_connect :
130+ log .debug (format_log_context ("Lazy connect for connection" , connection = self .name ))
131+ self .lazy_connect = False
132+ self .setup ()
133+
134+
135+ def register_connection (name , hosts , consistency = None , lazy_connect = False ,
136+ retry_connect = False , cluster_options = None , default = False ):
137+
138+ if name in _connections :
139+ log .warning ("Registering connection '{0}' when it already exists." .format (name ))
140+
141+ conn = Connection (name , hosts , consistency = consistency ,lazy_connect = lazy_connect ,
142+ retry_connect = retry_connect , cluster_options = cluster_options )
143+
144+ _connections [name ] = conn
145+
146+ if default :
147+ set_default_connection (name )
148+
149+ conn .setup ()
150+ return conn
151+
152+
153+ def unregister_connection (name ):
154+ global cluster , session
155+
156+ if name not in _connections :
157+ return
158+
159+ if DEFAULT_CONNECTION in _connections and _connections [name ] == _connections [DEFAULT_CONNECTION ]:
160+ del _connections [DEFAULT_CONNECTION ]
161+ cluster = None
162+ session = None
163+ log .warning ("Unregistering default connection '{0}'. Use set_default_connection to set a new one." .format (name ))
164+
165+ log .debug ("Connection '{0}' has been removed from the registry." .format (name ))
166+ del _connections [name ]
167+
168+
169+ def set_default_connection (name ):
170+ global cluster , session
171+
172+ if name not in _connections :
173+ raise CQLEngineException ("Connection '{0}' doesn't exist." .format (name ))
174+
175+ log .debug ("Connection '{0}' has been set as default." .format (name ))
176+ _connections [DEFAULT_CONNECTION ] = _connections [name ]
177+ cluster = _connections [name ].cluster
178+ session = _connections [name ].session
179+
180+
181+ def get_connection (name = None ):
182+
183+ if not name :
184+ name = DEFAULT_CONNECTION
185+
186+ if name not in _connections :
187+ raise CQLEngineException ("Connection name '{0}' doesn't exist in the registry." .format (name ))
188+
189+ conn = _connections [name ]
190+ conn .handle_lazy_connect ()
191+
192+ return conn
193+
194+
49195def default ():
50196 """
51- Configures the global mapper connection to localhost, using the driver defaults
197+ Configures the default connection to localhost, using the driver defaults
52198 (except for row_factory)
53199 """
54- global cluster , session
55200
56- if session :
57- log .warning ("configuring new connection for cqlengine when one was already set" )
58-
59- cluster = Cluster ()
60- session = cluster .connect ()
201+ try :
202+ conn = get_connection ()
203+ if conn .session :
204+ log .warning ("configuring new default connection for cqlengine when one was already set" )
205+ except :
206+ pass
61207
62- _setup_session (session )
208+ conn = register_connection ('default' , hosts = None , default = True )
209+ conn .setup ()
63210
64211 log .debug ("cqlengine connection initialized with default session to localhost" )
65212
66213
67214def set_session (s ):
68215 """
69- Configures the global mapper connection with a preexisting :class:`cassandra.cluster.Session`
216+ Configures the default connection with a preexisting :class:`cassandra.cluster.Session`
70217
71218 Note: the mapper presently requires a Session :attr:`~.row_factory` set to ``dict_factory``.
72219 This may be relaxed in the future
73220 """
74- global cluster , session
75221
76- if session :
77- log .warning ("configuring new connection for cqlengine when one was already set" )
222+ conn = get_connection ()
223+
224+ if conn .session :
225+ log .warning ("configuring new default connection for cqlengine when one was already set" )
78226
79227 if s .row_factory is not dict_factory :
80228 raise CQLEngineException ("Failed to initialize: 'Session.row_factory' must be 'dict_factory'." )
81- session = s
82- cluster = s .cluster
229+ conn . session = s
230+ conn . cluster = s .cluster
83231
84232 # Set default keyspace from given session's keyspace
85- if session .keyspace :
233+ if conn . session .keyspace :
86234 from cassandra .cqlengine import models
87- models .DEFAULT_KEYSPACE = session .keyspace
235+ models .DEFAULT_KEYSPACE = conn . session .keyspace
88236
89- _setup_session ( session )
237+ conn . setup_session ( )
90238
91- log .debug ("cqlengine connection initialized with %s" , s )
239+ log .debug ("cqlengine default connection initialized with %s" , s )
92240
93241
94242def setup (
@@ -108,53 +256,19 @@ def setup(
108256 :param bool retry_connect: True if we should retry to connect even if there was a connection failure initially
109257 :param \*\*kwargs: Pass-through keyword arguments for :class:`cassandra.cluster.Cluster`
110258 """
111- global cluster , session , lazy_connect_args
112-
113- if 'username' in kwargs or 'password' in kwargs :
114- raise CQLEngineException ("Username & Password are now handled by using the native driver's auth_provider" )
115259
116260 from cassandra .cqlengine import models
117261 models .DEFAULT_KEYSPACE = default_keyspace
118262
119- if lazy_connect :
120- kwargs ['default_keyspace' ] = default_keyspace
121- kwargs ['consistency' ] = consistency
122- kwargs ['lazy_connect' ] = False
123- kwargs ['retry_connect' ] = retry_connect
124- lazy_connect_args = (hosts , kwargs )
125- return
126-
127- cluster = Cluster (hosts , ** kwargs )
128- try :
129- session = cluster .connect ()
130- log .debug ("cqlengine connection initialized with internally created session" )
131- except NoHostAvailable :
132- if retry_connect :
133- log .warning ("connect failed, setting up for re-attempt on first use" )
134- kwargs ['default_keyspace' ] = default_keyspace
135- kwargs ['consistency' ] = consistency
136- kwargs ['lazy_connect' ] = False
137- kwargs ['retry_connect' ] = retry_connect
138- lazy_connect_args = (hosts , kwargs )
139- raise
140- if consistency is not None :
141- session .default_consistency_level = consistency
142-
143- _setup_session (session )
144-
263+ register_connection ('default' , hosts = hosts , consistency = consistency , lazy_connect = lazy_connect ,
264+ retry_connect = retry_connect , cluster_options = kwargs , default = True )
145265
146- def _setup_session (session ):
147- session .row_factory = dict_factory
148- enc = session .encoder
149- enc .mapping [tuple ] = enc .cql_encode_tuple
150- _register_known_types (session .cluster )
151266
267+ def execute (query , params = None , consistency_level = None , timeout = NOT_SET , connection = None ):
152268
153- def execute ( query , params = None , consistency_level = None , timeout = NOT_SET ):
269+ conn = get_connection ( connection )
154270
155- handle_lazy_connect ()
156-
157- if not session :
271+ if not conn .session :
158272 raise CQLEngineException ("It is required to setup() cqlengine before executing queries" )
159273
160274 if isinstance (query , SimpleStatement ):
@@ -165,47 +279,29 @@ def execute(query, params=None, consistency_level=None, timeout=NOT_SET):
165279 elif isinstance (query , six .string_types ):
166280 query = SimpleStatement (query , consistency_level = consistency_level )
167281
168- log .debug (query .query_string )
282+ log .debug (format_log_context ( query .query_string , connection = connection ) )
169283
170- result = session .execute (query , params , timeout = timeout )
284+ result = conn . session .execute (query , params , timeout = timeout )
171285
172286 return result
173287
174288
175- def get_session ():
176- handle_lazy_connect ( )
177- return session
289+ def get_session (connection = None ):
290+ conn = get_connection ( connection )
291+ return conn . session
178292
179293
180- def get_cluster ():
181- handle_lazy_connect ( )
182- if not cluster :
294+ def get_cluster (connection = None ):
295+ conn = get_connection ( connection )
296+ if not conn . cluster :
183297 raise CQLEngineException ("%s.cluster is not configured. Call one of the setup or default functions first." % __name__ )
184- return cluster
185-
186-
187- def handle_lazy_connect ():
188- global lazy_connect_args
189-
190- # if lazy_connect_args is None, it means the cluster is setup and ready
191- # No need to acquire the lock
192- if not lazy_connect_args :
193- return
194-
195- with lazy_connect_lock :
196- # lazy_connect_args might have been set to None by another thread while waiting the lock
197- # In this case, do nothing.
198- if lazy_connect_args :
199- log .debug ("lazy connect" )
200- hosts , kwargs = lazy_connect_args
201- setup (hosts , ** kwargs )
202- lazy_connect_args = None
298+ return conn .cluster
203299
204300
205- def register_udt (keyspace , type_name , klass ):
301+ def register_udt (keyspace , type_name , klass , connection = None ):
206302 udt_by_keyspace [keyspace ][type_name ] = klass
207303
208- global cluster
304+ cluster = get_cluster ( connection )
209305 if cluster :
210306 try :
211307 cluster .register_user_type (keyspace , type_name , klass )
0 commit comments