-
Notifications
You must be signed in to change notification settings - Fork 582
CLQEngine multiple sessions/keyspaces support #656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
687f3a3
f4a82ee
0b1d012
e93fbcc
894b9be
a0f86f1
da10c63
e5d27eb
d873937
5dbd435
5de8b57
71d822b
9152b4a
d1d5ef4
07aea03
83e8d66
d648bfe
eb9cb9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,13 +12,13 @@ | |
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from collections import namedtuple, defaultdict | ||
| from collections import defaultdict | ||
| import logging | ||
| import six | ||
| import threading | ||
|
|
||
| from cassandra.cluster import Cluster, _NOT_SET, NoHostAvailable, UserTypeDoesNotExist | ||
| from cassandra.query import SimpleStatement, Statement, dict_factory | ||
| from cassandra.query import SimpleStatement, dict_factory | ||
|
|
||
| from cassandra.cqlengine import CQLEngineException | ||
| from cassandra.cqlengine.statements import BaseCQLStatement | ||
|
|
@@ -28,67 +28,215 @@ | |
|
|
||
| NOT_SET = _NOT_SET # required for passing timeout to Session.execute | ||
|
|
||
| Host = namedtuple('Host', ['name', 'port']) | ||
|
|
||
| cluster = None | ||
| session = None | ||
| lazy_connect_args = None | ||
| lazy_connect_lock = threading.RLock() | ||
|
|
||
| # connections registry | ||
| DEFAULT_CONNECTION = object() | ||
| _connections = {} | ||
|
|
||
| # Because type models may be registered before a connection is present, | ||
| # and because sessions may be replaced, we must register UDTs here, in order | ||
| # to have them registered when a new session is established. | ||
| udt_by_keyspace = defaultdict(dict) | ||
|
|
||
|
|
||
| def format_log_context(msg, connection=None, keyspace=None): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. based on our discussion, is it OK to keep this as is, since the Connection objects are kindly private in cqlengine.connection ? |
||
| """Format log message to add keyspace and connection context""" | ||
| connection_info = connection or 'DEFAULT_CONNECTION' | ||
|
|
||
| if keyspace: | ||
| msg = '[Connection: {0}, Keyspace: {1}] {2}'.format(connection_info, keyspace, msg) | ||
| else: | ||
| msg = '[Connection: {0}] {1}'.format(connection_info, msg) | ||
| return msg | ||
|
|
||
|
|
||
| class UndefinedKeyspaceException(CQLEngineException): | ||
| pass | ||
|
|
||
|
|
||
| class Connection(object): | ||
| """CQLEngine Connection""" | ||
|
|
||
| name = None | ||
| hosts = None | ||
|
|
||
| consistency = None | ||
| retry_connect = False | ||
| lazy_connect = False | ||
| lazy_connect_lock = None | ||
| cluster_options = None | ||
|
|
||
| cluster = None | ||
| session = None | ||
|
|
||
| def __init__(self, name, hosts, consistency=None, | ||
| lazy_connect=False, retry_connect=False, cluster_options=None): | ||
| self.hosts = hosts | ||
| self.name = name | ||
| self.consistency = consistency | ||
| self.lazy_connect = lazy_connect | ||
| self.retry_connect = retry_connect | ||
| self.cluster_options = cluster_options if cluster_options else {} | ||
| self.lazy_connect_lock = threading.RLock() | ||
|
|
||
| def setup(self): | ||
| """Setup the connection""" | ||
| global cluster, session | ||
|
|
||
| if 'username' in self.cluster_options or 'password' in self.cluster_options: | ||
| raise CQLEngineException("Username & Password are now handled by using the native driver's auth_provider") | ||
|
|
||
| if self.lazy_connect: | ||
| return | ||
|
|
||
| self.cluster = Cluster(self.hosts, **self.cluster_options) | ||
| try: | ||
| self.session = self.cluster.connect() | ||
| log.debug(format_log_context("connection initialized with internally created session", connection=self.name)) | ||
| except NoHostAvailable: | ||
| if self.retry_connect: | ||
| log.warning(format_log_context("connect failed, setting up for re-attempt on first use", connection=self.name)) | ||
| self.lazy_connect = True | ||
| raise | ||
|
|
||
| if self.consistency is not None: | ||
| self.session.default_consistency_level = self.consistency | ||
|
|
||
| if DEFAULT_CONNECTION in _connections and _connections[DEFAULT_CONNECTION] == self: | ||
| cluster = _connections[DEFAULT_CONNECTION].cluster | ||
| session = _connections[DEFAULT_CONNECTION].session | ||
|
|
||
| self.setup_session() | ||
|
|
||
| def setup_session(self): | ||
| self.session.row_factory = dict_factory | ||
| enc = self.session.encoder | ||
| enc.mapping[tuple] = enc.cql_encode_tuple | ||
| _register_known_types(self.session.cluster) | ||
|
|
||
| def handle_lazy_connect(self): | ||
|
|
||
| # if lazy_connect is False, it means the cluster is setup and ready | ||
| # No need to acquire the lock | ||
| if not self.lazy_connect: | ||
| return | ||
|
|
||
| with self.lazy_connect_lock: | ||
| # lazy_connect might have been set to False by another thread while waiting the lock | ||
| # In this case, do nothing. | ||
| if self.lazy_connect: | ||
| log.debug(format_log_context("Lazy connect for connection", connection=self.name)) | ||
| self.lazy_connect = False | ||
| self.setup() | ||
|
|
||
|
|
||
| def register_connection(name, hosts, consistency=None, lazy_connect=False, | ||
| retry_connect=False, cluster_options=None, default=False): | ||
|
|
||
| if name in _connections: | ||
| log.warning("Registering connection '{0}' when it already exists.".format(name)) | ||
|
|
||
| conn = Connection(name, hosts, consistency=consistency,lazy_connect=lazy_connect, | ||
| retry_connect=retry_connect, cluster_options=cluster_options) | ||
|
|
||
| _connections[name] = conn | ||
|
|
||
| if default: | ||
| set_default_connection(name) | ||
|
|
||
| conn.setup() | ||
| return conn | ||
|
|
||
|
|
||
| def unregister_connection(name): | ||
| global cluster, session | ||
|
|
||
| if name not in _connections: | ||
| return | ||
|
|
||
| if DEFAULT_CONNECTION in _connections and _connections[name] == _connections[DEFAULT_CONNECTION]: | ||
| del _connections[DEFAULT_CONNECTION] | ||
| cluster = None | ||
| session = None | ||
| log.warning("Unregistering default connection '{0}'. Use set_default_connection to set a new one.".format(name)) | ||
|
|
||
| log.debug("Connection '{0}' has been removed from the registry.".format(name)) | ||
| del _connections[name] | ||
|
|
||
|
|
||
| def set_default_connection(name): | ||
| global cluster, session | ||
|
|
||
| if name not in _connections: | ||
| raise CQLEngineException("Connection '{0}' doesn't exist.".format(name)) | ||
|
|
||
| log.debug("Connection '{0}' has been set as default.".format(name)) | ||
| _connections[DEFAULT_CONNECTION] = _connections[name] | ||
| cluster = _connections[name].cluster | ||
| session = _connections[name].session | ||
|
|
||
|
|
||
| def get_connection(name=None): | ||
|
|
||
| if not name: | ||
| name = DEFAULT_CONNECTION | ||
|
|
||
| if name not in _connections: | ||
| raise CQLEngineException("Connection name '{0}' doesn't exist in the registry.".format(name)) | ||
|
|
||
| conn = _connections[name] | ||
| conn.handle_lazy_connect() | ||
|
|
||
| return conn | ||
|
|
||
|
|
||
| def default(): | ||
| """ | ||
| Configures the global mapper connection to localhost, using the driver defaults | ||
| Configures the default connection to localhost, using the driver defaults | ||
| (except for row_factory) | ||
| """ | ||
| global cluster, session | ||
|
|
||
| if session: | ||
| log.warning("configuring new connection for cqlengine when one was already set") | ||
|
|
||
| cluster = Cluster() | ||
| session = cluster.connect() | ||
| try: | ||
| conn = get_connection() | ||
| if conn.session: | ||
| log.warning("configuring new default connection for cqlengine when one was already set") | ||
| except: | ||
| pass | ||
|
|
||
| _setup_session(session) | ||
| conn = register_connection('default', hosts=None, default=True) | ||
| conn.setup() | ||
|
|
||
| log.debug("cqlengine connection initialized with default session to localhost") | ||
|
|
||
|
|
||
| def set_session(s): | ||
| """ | ||
| Configures the global mapper connection with a preexisting :class:`cassandra.cluster.Session` | ||
| Configures the default connection with a preexisting :class:`cassandra.cluster.Session` | ||
|
|
||
| Note: the mapper presently requires a Session :attr:`~.row_factory` set to ``dict_factory``. | ||
| This may be relaxed in the future | ||
| """ | ||
| global cluster, session | ||
|
|
||
| if session: | ||
| log.warning("configuring new connection for cqlengine when one was already set") | ||
| conn = get_connection() | ||
|
|
||
| if conn.session: | ||
| log.warning("configuring new default connection for cqlengine when one was already set") | ||
|
|
||
| if s.row_factory is not dict_factory: | ||
| raise CQLEngineException("Failed to initialize: 'Session.row_factory' must be 'dict_factory'.") | ||
| session = s | ||
| cluster = s.cluster | ||
| conn.session = s | ||
| conn.cluster = s.cluster | ||
|
|
||
| # Set default keyspace from given session's keyspace | ||
| if session.keyspace: | ||
| if conn.session.keyspace: | ||
| from cassandra.cqlengine import models | ||
| models.DEFAULT_KEYSPACE = session.keyspace | ||
| models.DEFAULT_KEYSPACE = conn.session.keyspace | ||
|
|
||
| _setup_session(session) | ||
| conn.setup_session() | ||
|
|
||
| log.debug("cqlengine connection initialized with %s", s) | ||
| log.debug("cqlengine default connection initialized with %s", s) | ||
|
|
||
|
|
||
| def setup( | ||
|
|
@@ -108,53 +256,19 @@ def setup( | |
| :param bool retry_connect: True if we should retry to connect even if there was a connection failure initially | ||
| :param \*\*kwargs: Pass-through keyword arguments for :class:`cassandra.cluster.Cluster` | ||
| """ | ||
| global cluster, session, lazy_connect_args | ||
|
|
||
| if 'username' in kwargs or 'password' in kwargs: | ||
| raise CQLEngineException("Username & Password are now handled by using the native driver's auth_provider") | ||
|
|
||
| from cassandra.cqlengine import models | ||
| models.DEFAULT_KEYSPACE = default_keyspace | ||
|
|
||
| if lazy_connect: | ||
| kwargs['default_keyspace'] = default_keyspace | ||
| kwargs['consistency'] = consistency | ||
| kwargs['lazy_connect'] = False | ||
| kwargs['retry_connect'] = retry_connect | ||
| lazy_connect_args = (hosts, kwargs) | ||
| return | ||
|
|
||
| cluster = Cluster(hosts, **kwargs) | ||
| try: | ||
| session = cluster.connect() | ||
| log.debug("cqlengine connection initialized with internally created session") | ||
| except NoHostAvailable: | ||
| if retry_connect: | ||
| log.warning("connect failed, setting up for re-attempt on first use") | ||
| kwargs['default_keyspace'] = default_keyspace | ||
| kwargs['consistency'] = consistency | ||
| kwargs['lazy_connect'] = False | ||
| kwargs['retry_connect'] = retry_connect | ||
| lazy_connect_args = (hosts, kwargs) | ||
| raise | ||
| if consistency is not None: | ||
| session.default_consistency_level = consistency | ||
|
|
||
| _setup_session(session) | ||
|
|
||
| register_connection('default', hosts=hosts, consistency=consistency, lazy_connect=lazy_connect, | ||
| retry_connect=retry_connect, cluster_options=kwargs, default=True) | ||
|
|
||
| def _setup_session(session): | ||
| session.row_factory = dict_factory | ||
| enc = session.encoder | ||
| enc.mapping[tuple] = enc.cql_encode_tuple | ||
| _register_known_types(session.cluster) | ||
|
|
||
| def execute(query, params=None, consistency_level=None, timeout=NOT_SET, connection=None): | ||
|
|
||
| def execute(query, params=None, consistency_level=None, timeout=NOT_SET): | ||
| conn = get_connection(connection) | ||
|
|
||
| handle_lazy_connect() | ||
|
|
||
| if not session: | ||
| if not conn.session: | ||
| raise CQLEngineException("It is required to setup() cqlengine before executing queries") | ||
|
|
||
| if isinstance(query, SimpleStatement): | ||
|
|
@@ -165,47 +279,29 @@ def execute(query, params=None, consistency_level=None, timeout=NOT_SET): | |
| elif isinstance(query, six.string_types): | ||
| query = SimpleStatement(query, consistency_level=consistency_level) | ||
|
|
||
| log.debug(query.query_string) | ||
| log.debug(format_log_context(query.query_string, connection=connection)) | ||
|
|
||
| result = session.execute(query, params, timeout=timeout) | ||
| result = conn.session.execute(query, params, timeout=timeout) | ||
|
|
||
| return result | ||
|
|
||
|
|
||
| def get_session(): | ||
| handle_lazy_connect() | ||
| return session | ||
| def get_session(connection=None): | ||
| conn = get_connection(connection) | ||
| return conn.session | ||
|
|
||
|
|
||
| def get_cluster(): | ||
| handle_lazy_connect() | ||
| if not cluster: | ||
| def get_cluster(connection=None): | ||
| conn = get_connection(connection) | ||
| if not conn.cluster: | ||
| raise CQLEngineException("%s.cluster is not configured. Call one of the setup or default functions first." % __name__) | ||
| return cluster | ||
|
|
||
|
|
||
| def handle_lazy_connect(): | ||
| global lazy_connect_args | ||
|
|
||
| # if lazy_connect_args is None, it means the cluster is setup and ready | ||
| # No need to acquire the lock | ||
| if not lazy_connect_args: | ||
| return | ||
|
|
||
| with lazy_connect_lock: | ||
| # lazy_connect_args might have been set to None by another thread while waiting the lock | ||
| # In this case, do nothing. | ||
| if lazy_connect_args: | ||
| log.debug("lazy connect") | ||
| hosts, kwargs = lazy_connect_args | ||
| setup(hosts, **kwargs) | ||
| lazy_connect_args = None | ||
| return conn.cluster | ||
|
|
||
|
|
||
| def register_udt(keyspace, type_name, klass): | ||
| def register_udt(keyspace, type_name, klass, connection=None): | ||
| udt_by_keyspace[keyspace][type_name] = klass | ||
|
|
||
| global cluster | ||
| cluster = get_cluster(connection) | ||
| if cluster: | ||
| try: | ||
| cluster.register_user_type(keyspace, type_name, klass) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the object oriented approach, but wondering if we should set these module attributes when the default connection is made, so we don't lose attributes that were not explicitly "private".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest do it for 3.x, but remove in 4.0. So cluster will always point to the DEFAULT cluster object.