Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 188 additions & 92 deletions cassandra/cqlengine/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import namedtuple, defaultdict
from collections import defaultdict
import logging
import six
import threading

from cassandra.cluster import Cluster, _NOT_SET, NoHostAvailable, UserTypeDoesNotExist
from cassandra.query import SimpleStatement, Statement, dict_factory
from cassandra.query import SimpleStatement, dict_factory

from cassandra.cqlengine import CQLEngineException
from cassandra.cqlengine.statements import BaseCQLStatement
Expand All @@ -28,67 +28,215 @@

NOT_SET = _NOT_SET # required for passing timeout to Session.execute

Host = namedtuple('Host', ['name', 'port'])

cluster = None
session = None
lazy_connect_args = None
lazy_connect_lock = threading.RLock()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the object oriented approach, but wondering if we should set these module attributes when the default connection is made, so we don't lose attributes that were not explicitly "private".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest do it for 3.x, but remove in 4.0. So cluster will always point to the DEFAULT cluster object.

# connections registry
DEFAULT_CONNECTION = object()
_connections = {}

# Because type models may be registered before a connection is present,
# and because sessions may be replaced, we must register UDTs here, in order
# to have them registered when a new session is established.
udt_by_keyspace = defaultdict(dict)


def format_log_context(msg, connection=None, keyspace=None):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be connection_name so I don't get confused about passing connection objects in?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on our discussion, is it OK to keep this as is, since the Connection objects are kindly private in cqlengine.connection ?

"""Format log message to add keyspace and connection context"""
connection_info = connection or 'DEFAULT_CONNECTION'

if keyspace:
msg = '[Connection: {0}, Keyspace: {1}] {2}'.format(connection_info, keyspace, msg)
else:
msg = '[Connection: {0}] {1}'.format(connection_info, msg)
return msg


class UndefinedKeyspaceException(CQLEngineException):
pass


class Connection(object):
"""CQLEngine Connection"""

name = None
hosts = None

consistency = None
retry_connect = False
lazy_connect = False
lazy_connect_lock = None
cluster_options = None

cluster = None
session = None

def __init__(self, name, hosts, consistency=None,
lazy_connect=False, retry_connect=False, cluster_options=None):
self.hosts = hosts
self.name = name
self.consistency = consistency
self.lazy_connect = lazy_connect
self.retry_connect = retry_connect
self.cluster_options = cluster_options if cluster_options else {}
self.lazy_connect_lock = threading.RLock()

def setup(self):
"""Setup the connection"""
global cluster, session

if 'username' in self.cluster_options or 'password' in self.cluster_options:
raise CQLEngineException("Username & Password are now handled by using the native driver's auth_provider")

if self.lazy_connect:
return

self.cluster = Cluster(self.hosts, **self.cluster_options)
try:
self.session = self.cluster.connect()
log.debug(format_log_context("connection initialized with internally created session", connection=self.name))
except NoHostAvailable:
if self.retry_connect:
log.warning(format_log_context("connect failed, setting up for re-attempt on first use", connection=self.name))
self.lazy_connect = True
raise

if self.consistency is not None:
self.session.default_consistency_level = self.consistency

if DEFAULT_CONNECTION in _connections and _connections[DEFAULT_CONNECTION] == self:
cluster = _connections[DEFAULT_CONNECTION].cluster
session = _connections[DEFAULT_CONNECTION].session

self.setup_session()

def setup_session(self):
self.session.row_factory = dict_factory
enc = self.session.encoder
enc.mapping[tuple] = enc.cql_encode_tuple
_register_known_types(self.session.cluster)

def handle_lazy_connect(self):

# if lazy_connect is False, it means the cluster is setup and ready
# No need to acquire the lock
if not self.lazy_connect:
return

with self.lazy_connect_lock:
# lazy_connect might have been set to False by another thread while waiting the lock
# In this case, do nothing.
if self.lazy_connect:
log.debug(format_log_context("Lazy connect for connection", connection=self.name))
self.lazy_connect = False
self.setup()


def register_connection(name, hosts, consistency=None, lazy_connect=False,
retry_connect=False, cluster_options=None, default=False):

if name in _connections:
log.warning("Registering connection '{0}' when it already exists.".format(name))

conn = Connection(name, hosts, consistency=consistency,lazy_connect=lazy_connect,
retry_connect=retry_connect, cluster_options=cluster_options)

_connections[name] = conn

if default:
set_default_connection(name)

conn.setup()
return conn


def unregister_connection(name):
global cluster, session

if name not in _connections:
return

if DEFAULT_CONNECTION in _connections and _connections[name] == _connections[DEFAULT_CONNECTION]:
del _connections[DEFAULT_CONNECTION]
cluster = None
session = None
log.warning("Unregistering default connection '{0}'. Use set_default_connection to set a new one.".format(name))

log.debug("Connection '{0}' has been removed from the registry.".format(name))
del _connections[name]


def set_default_connection(name):
global cluster, session

if name not in _connections:
raise CQLEngineException("Connection '{0}' doesn't exist.".format(name))

log.debug("Connection '{0}' has been set as default.".format(name))
_connections[DEFAULT_CONNECTION] = _connections[name]
cluster = _connections[name].cluster
session = _connections[name].session


def get_connection(name=None):

if not name:
name = DEFAULT_CONNECTION

if name not in _connections:
raise CQLEngineException("Connection name '{0}' doesn't exist in the registry.".format(name))

conn = _connections[name]
conn.handle_lazy_connect()

return conn


def default():
"""
Configures the global mapper connection to localhost, using the driver defaults
Configures the default connection to localhost, using the driver defaults
(except for row_factory)
"""
global cluster, session

if session:
log.warning("configuring new connection for cqlengine when one was already set")

cluster = Cluster()
session = cluster.connect()
try:
conn = get_connection()
if conn.session:
log.warning("configuring new default connection for cqlengine when one was already set")
except:
pass

_setup_session(session)
conn = register_connection('default', hosts=None, default=True)
conn.setup()

log.debug("cqlengine connection initialized with default session to localhost")


def set_session(s):
"""
Configures the global mapper connection with a preexisting :class:`cassandra.cluster.Session`
Configures the default connection with a preexisting :class:`cassandra.cluster.Session`

Note: the mapper presently requires a Session :attr:`~.row_factory` set to ``dict_factory``.
This may be relaxed in the future
"""
global cluster, session

if session:
log.warning("configuring new connection for cqlengine when one was already set")
conn = get_connection()

if conn.session:
log.warning("configuring new default connection for cqlengine when one was already set")

if s.row_factory is not dict_factory:
raise CQLEngineException("Failed to initialize: 'Session.row_factory' must be 'dict_factory'.")
session = s
cluster = s.cluster
conn.session = s
conn.cluster = s.cluster

# Set default keyspace from given session's keyspace
if session.keyspace:
if conn.session.keyspace:
from cassandra.cqlengine import models
models.DEFAULT_KEYSPACE = session.keyspace
models.DEFAULT_KEYSPACE = conn.session.keyspace

_setup_session(session)
conn.setup_session()

log.debug("cqlengine connection initialized with %s", s)
log.debug("cqlengine default connection initialized with %s", s)


def setup(
Expand All @@ -108,53 +256,19 @@ def setup(
:param bool retry_connect: True if we should retry to connect even if there was a connection failure initially
:param \*\*kwargs: Pass-through keyword arguments for :class:`cassandra.cluster.Cluster`
"""
global cluster, session, lazy_connect_args

if 'username' in kwargs or 'password' in kwargs:
raise CQLEngineException("Username & Password are now handled by using the native driver's auth_provider")

from cassandra.cqlengine import models
models.DEFAULT_KEYSPACE = default_keyspace

if lazy_connect:
kwargs['default_keyspace'] = default_keyspace
kwargs['consistency'] = consistency
kwargs['lazy_connect'] = False
kwargs['retry_connect'] = retry_connect
lazy_connect_args = (hosts, kwargs)
return

cluster = Cluster(hosts, **kwargs)
try:
session = cluster.connect()
log.debug("cqlengine connection initialized with internally created session")
except NoHostAvailable:
if retry_connect:
log.warning("connect failed, setting up for re-attempt on first use")
kwargs['default_keyspace'] = default_keyspace
kwargs['consistency'] = consistency
kwargs['lazy_connect'] = False
kwargs['retry_connect'] = retry_connect
lazy_connect_args = (hosts, kwargs)
raise
if consistency is not None:
session.default_consistency_level = consistency

_setup_session(session)

register_connection('default', hosts=hosts, consistency=consistency, lazy_connect=lazy_connect,
retry_connect=retry_connect, cluster_options=kwargs, default=True)

def _setup_session(session):
session.row_factory = dict_factory
enc = session.encoder
enc.mapping[tuple] = enc.cql_encode_tuple
_register_known_types(session.cluster)

def execute(query, params=None, consistency_level=None, timeout=NOT_SET, connection=None):

def execute(query, params=None, consistency_level=None, timeout=NOT_SET):
conn = get_connection(connection)

handle_lazy_connect()

if not session:
if not conn.session:
raise CQLEngineException("It is required to setup() cqlengine before executing queries")

if isinstance(query, SimpleStatement):
Expand All @@ -165,47 +279,29 @@ def execute(query, params=None, consistency_level=None, timeout=NOT_SET):
elif isinstance(query, six.string_types):
query = SimpleStatement(query, consistency_level=consistency_level)

log.debug(query.query_string)
log.debug(format_log_context(query.query_string, connection=connection))

result = session.execute(query, params, timeout=timeout)
result = conn.session.execute(query, params, timeout=timeout)

return result


def get_session():
handle_lazy_connect()
return session
def get_session(connection=None):
conn = get_connection(connection)
return conn.session


def get_cluster():
handle_lazy_connect()
if not cluster:
def get_cluster(connection=None):
conn = get_connection(connection)
if not conn.cluster:
raise CQLEngineException("%s.cluster is not configured. Call one of the setup or default functions first." % __name__)
return cluster


def handle_lazy_connect():
global lazy_connect_args

# if lazy_connect_args is None, it means the cluster is setup and ready
# No need to acquire the lock
if not lazy_connect_args:
return

with lazy_connect_lock:
# lazy_connect_args might have been set to None by another thread while waiting the lock
# In this case, do nothing.
if lazy_connect_args:
log.debug("lazy connect")
hosts, kwargs = lazy_connect_args
setup(hosts, **kwargs)
lazy_connect_args = None
return conn.cluster


def register_udt(keyspace, type_name, klass):
def register_udt(keyspace, type_name, klass, connection=None):
udt_by_keyspace[keyspace][type_name] = klass

global cluster
cluster = get_cluster(connection)
if cluster:
try:
cluster.register_user_type(keyspace, type_name, klass)
Expand Down
Loading