From 687f3a33b24f3267d5ae818c8c70d2c093bcd22b Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Mon, 22 Aug 2016 14:04:37 -0400 Subject: [PATCH 01/18] Initial connection registry implementation --- cassandra/cqlengine/connection.py | 220 +++++++++++------- .../cqlengine/connections/test_connection.py | 3 +- 2 files changed, 135 insertions(+), 88 deletions(-) diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index 8edcc76d54..289a44372b 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import namedtuple, defaultdict +from collections import defaultdict import logging import six import threading from cassandra.cluster import Cluster, _NOT_SET, NoHostAvailable, UserTypeDoesNotExist -from cassandra.query import SimpleStatement, Statement, dict_factory +from cassandra.query import SimpleStatement, dict_factory from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.statements import BaseCQLStatement @@ -28,13 +28,8 @@ NOT_SET = _NOT_SET # required for passing timeout to Session.execute -Host = namedtuple('Host', ['name', 'port']) - -cluster = None -session = None -lazy_connect_args = None -lazy_connect_lock = threading.RLock() - +# connections registry +_connections = {} # Because type models may be registered before a connection is present, # and because sessions may be replaced, we must register UDTs here, in order @@ -46,20 +41,123 @@ class UndefinedKeyspaceException(CQLEngineException): pass +class Connection(object): + """CQLEngine Connection""" + + name = None + hosts = None + + consistency = None + retry_connect = False + lazy_connect = False + lazy_connect_lock = None + cluster_options = None + + cluster = None + session = None + + def __init__(self, name, hosts, consistency=None, + lazy_connect=False, retry_connect=False, cluster_options=None): + self.hosts = hosts + self.name = name + self.consistency = consistency + self.lazy_connect = lazy_connect + self.retry_connect = retry_connect + self.cluster_options = cluster_options if cluster_options else {} + self.lazy_connect_lock = threading.RLock() + + def setup(self): + """Setup the connection""" + + if 'username' in self.cluster_options or 'password' in self.cluster_options: + raise CQLEngineException("Username & Password are now handled by using the native driver's auth_provider") + + if self.lazy_connect: + return + + self.cluster = Cluster(self.hosts, **self.cluster_options) + try: + self.session = self.cluster.connect() + log.debug("cqlengine connection '{0}' initialized with internally created session".format(self.name)) + except NoHostAvailable: + if self.retry_connect: + log.warning("connect failed for '{0}', setting up for re-attempt on first use".format(self.name)) + self.lazy_connect = True + raise + + if self.consistency is not None: + self.session.default_consistency_level = self.consistency + + self.setup_session() + + def setup_session(self): + self.session.row_factory = dict_factory + enc = self.session.encoder + enc.mapping[tuple] = enc.cql_encode_tuple + _register_known_types(self.session.cluster) + + def handle_lazy_connect(self): + + # if lazy_connect is False, it means the cluster is setup and ready + # No need to acquire the lock + if not self.lazy_connect: + return + + with self.lazy_connect_lock: + # lazy_connect might have been set to False by another thread while waiting the lock + # In this case, do nothing. + if self.lazy_connect: + log.debug("Lazy connect for connection '{0}'".format(self.name)) + self.lazy_connect = False + self.setup() + + +def register_connection(name, hosts, consistency=None, lazy_connect=False, + retry_connect=False, cluster_options=None, default=False): + + if name in _connections: + log.warning("Registering connection '{0}' when it already exists.".format(name)) + + conn = Connection(name, hosts, consistency=consistency,lazy_connect=lazy_connect, + retry_connect=retry_connect, cluster_options=cluster_options) + + _connections[name] = conn + + if default: + _connections['_default_'] = conn + + return conn + + +def get_connection(name=None): + + if not name: + name = '_default_' + + if name not in _connections: + raise ValueError("Connection name '{0}' doesn't exist in the registry.".format(name)) + + conn = _connections[name] + conn.handle_lazy_connect() + + return conn + + def default(): """ Configures the global mapper connection to localhost, using the driver defaults (except for row_factory) """ - global cluster, session - if session: - log.warning("configuring new connection for cqlengine when one was already set") - - cluster = Cluster() - session = cluster.connect() + try: + conn = get_connection() + if conn.session: + log.warning("configuring new connection for cqlengine when one was already set") + except: + pass - _setup_session(session) + conn = register_connection('default', hosts=None, default=True) + conn.setup() log.debug("cqlengine connection initialized with default session to localhost") @@ -71,22 +169,23 @@ def set_session(s): Note: the mapper presently requires a Session :attr:`~.row_factory` set to ``dict_factory``. This may be relaxed in the future """ - global cluster, session - if session: + conn = get_connection() + + if conn.session: log.warning("configuring new connection for cqlengine when one was already set") if s.row_factory is not dict_factory: raise CQLEngineException("Failed to initialize: 'Session.row_factory' must be 'dict_factory'.") - session = s - cluster = s.cluster + conn.session = s + conn.cluster = s.cluster # Set default keyspace from given session's keyspace - if session.keyspace: + if conn.session.keyspace: from cassandra.cqlengine import models - models.DEFAULT_KEYSPACE = session.keyspace + models.DEFAULT_KEYSPACE = conn.session.keyspace - _setup_session(session) + conn.setup_session() log.debug("cqlengine connection initialized with %s", s) @@ -108,53 +207,20 @@ def setup( :param bool retry_connect: True if we should retry to connect even if there was a connection failure initially :param \*\*kwargs: Pass-through keyword arguments for :class:`cassandra.cluster.Cluster` """ - global cluster, session, lazy_connect_args - - if 'username' in kwargs or 'password' in kwargs: - raise CQLEngineException("Username & Password are now handled by using the native driver's auth_provider") from cassandra.cqlengine import models models.DEFAULT_KEYSPACE = default_keyspace - if lazy_connect: - kwargs['default_keyspace'] = default_keyspace - kwargs['consistency'] = consistency - kwargs['lazy_connect'] = False - kwargs['retry_connect'] = retry_connect - lazy_connect_args = (hosts, kwargs) - return - - cluster = Cluster(hosts, **kwargs) - try: - session = cluster.connect() - log.debug("cqlengine connection initialized with internally created session") - except NoHostAvailable: - if retry_connect: - log.warning("connect failed, setting up for re-attempt on first use") - kwargs['default_keyspace'] = default_keyspace - kwargs['consistency'] = consistency - kwargs['lazy_connect'] = False - kwargs['retry_connect'] = retry_connect - lazy_connect_args = (hosts, kwargs) - raise - if consistency is not None: - session.default_consistency_level = consistency - - _setup_session(session) - - -def _setup_session(session): - session.row_factory = dict_factory - enc = session.encoder - enc.mapping[tuple] = enc.cql_encode_tuple - _register_known_types(session.cluster) + conn = register_connection('default', hosts=hosts, consistency=consistency, lazy_connect=lazy_connect, + retry_connect=retry_connect, cluster_options=kwargs, default=True) + conn.setup() def execute(query, params=None, consistency_level=None, timeout=NOT_SET): - handle_lazy_connect() + conn = get_connection() - if not session: + if not conn.session: raise CQLEngineException("It is required to setup() cqlengine before executing queries") if isinstance(query, SimpleStatement): @@ -167,45 +233,27 @@ def execute(query, params=None, consistency_level=None, timeout=NOT_SET): log.debug(query.query_string) - result = session.execute(query, params, timeout=timeout) + result = conn.session.execute(query, params, timeout=timeout) return result def get_session(): - handle_lazy_connect() - return session + conn = get_connection() + return conn.session def get_cluster(): - handle_lazy_connect() - if not cluster: + conn = get_connection() + if not conn.cluster: raise CQLEngineException("%s.cluster is not configured. Call one of the setup or default functions first." % __name__) - return cluster - - -def handle_lazy_connect(): - global lazy_connect_args - - # if lazy_connect_args is None, it means the cluster is setup and ready - # No need to acquire the lock - if not lazy_connect_args: - return - - with lazy_connect_lock: - # lazy_connect_args might have been set to None by another thread while waiting the lock - # In this case, do nothing. - if lazy_connect_args: - log.debug("lazy connect") - hosts, kwargs = lazy_connect_args - setup(hosts, **kwargs) - lazy_connect_args = None + return conn.cluster def register_udt(keyspace, type_name, klass): udt_by_keyspace[keyspace][type_name] = klass - global cluster + cluster = get_cluster() if cluster: try: cluster.register_user_type(keyspace, type_name, klass) diff --git a/tests/integration/cqlengine/connections/test_connection.py b/tests/integration/cqlengine/connections/test_connection.py index 80771d3697..659893bd0f 100644 --- a/tests/integration/cqlengine/connections/test_connection.py +++ b/tests/integration/cqlengine/connections/test_connection.py @@ -40,7 +40,7 @@ class ConnectionTest(BaseCassEngTestCase): @classmethod def setUpClass(cls): - cls.original_cluster = connection.cluster + cls.original_cluster = connection.get_cluster() cls.keyspace1 = 'ctest1' cls.keyspace2 = 'ctest2' super(ConnectionTest, cls).setUpClass() @@ -95,4 +95,3 @@ def test_connection_session_switch(self): connection.set_session(self.session2) self.assertEqual(1, TestConnectModel.objects.count()) self.assertEqual(TestConnectModel.objects.first(), TCM2) - From f4a82ee073d481379fca4e8dbc7dc2109e7124ed Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Mon, 22 Aug 2016 15:51:01 -0400 Subject: [PATCH 02/18] Add QuerySet using() method --- cassandra/cqlengine/query.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index f04a4a9a3a..3fc45fcb38 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -965,6 +965,18 @@ def timeout(self, timeout): clone._timeout = timeout return clone + def using(self, keyspace=None): + """ + Change the context on-the-fly of the Model class (connection, keyspace) + """ + + clone = copy.deepcopy(self) + if keyspace: + new_type = type(self.model.__name__, (self.model,), {'__keyspace__': keyspace}) + clone.model = new_type + + return clone + class ResultObject(dict): """ From 0b1d012524b1a7377b2dc44a81655ed6bbbf9609 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Tue, 23 Aug 2016 15:42:42 -0400 Subject: [PATCH 03/18] Add connection selection support to mangement commands --- cassandra/cqlengine/connection.py | 29 ++--- cassandra/cqlengine/management.py | 201 +++++++++++++++++++----------- cassandra/cqlengine/query.py | 4 +- cassandra/cqlengine/usertype.py | 6 +- 4 files changed, 146 insertions(+), 94 deletions(-) diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index 289a44372b..1a8922a862 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -29,6 +29,7 @@ NOT_SET = _NOT_SET # required for passing timeout to Session.execute # connections registry +DEFAULT_CONNECTION = '_default_' _connections = {} # Because type models may be registered before a connection is present, @@ -124,18 +125,19 @@ def register_connection(name, hosts, consistency=None, lazy_connect=False, _connections[name] = conn if default: - _connections['_default_'] = conn + _connections[DEFAULT_CONNECTION] = conn + conn.setup() return conn def get_connection(name=None): if not name: - name = '_default_' + name = DEFAULT_CONNECTION if name not in _connections: - raise ValueError("Connection name '{0}' doesn't exist in the registry.".format(name)) + raise CQLEngineException("Connection name '{0}' doesn't exist in the registry.".format(name)) conn = _connections[name] conn.handle_lazy_connect() @@ -211,14 +213,13 @@ def setup( from cassandra.cqlengine import models models.DEFAULT_KEYSPACE = default_keyspace - conn = register_connection('default', hosts=hosts, consistency=consistency, lazy_connect=lazy_connect, - retry_connect=retry_connect, cluster_options=kwargs, default=True) - conn.setup() + register_connection('default', hosts=hosts, consistency=consistency, lazy_connect=lazy_connect, + retry_connect=retry_connect, cluster_options=kwargs, default=True) -def execute(query, params=None, consistency_level=None, timeout=NOT_SET): +def execute(query, params=None, consistency_level=None, timeout=NOT_SET, connection=None): - conn = get_connection() + conn = get_connection(connection) if not conn.session: raise CQLEngineException("It is required to setup() cqlengine before executing queries") @@ -238,22 +239,22 @@ def execute(query, params=None, consistency_level=None, timeout=NOT_SET): return result -def get_session(): - conn = get_connection() +def get_session(connection=None): + conn = get_connection(connection) return conn.session -def get_cluster(): - conn = get_connection() +def get_cluster(connection=None): + conn = get_connection(connection) if not conn.cluster: raise CQLEngineException("%s.cluster is not configured. Call one of the setup or default functions first." % __name__) return conn.cluster -def register_udt(keyspace, type_name, klass): +def register_udt(keyspace, type_name, klass, connection=None): udt_by_keyspace[keyspace][type_name] = klass - cluster = get_cluster() + cluster = get_cluster(connection) if cluster: try: cluster.register_user_type(keyspace, type_name, klass) diff --git a/cassandra/cqlengine/management.py b/cassandra/cqlengine/management.py index cc2a34599f..4a8ad5a32b 100644 --- a/cassandra/cqlengine/management.py +++ b/cassandra/cqlengine/management.py @@ -18,11 +18,12 @@ import os import six import warnings +from itertools import product from cassandra import metadata from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine import columns, query -from cassandra.cqlengine.connection import execute, get_cluster +from cassandra.cqlengine.connection import execute, get_cluster, DEFAULT_CONNECTION from cassandra.cqlengine.models import Model from cassandra.cqlengine.named import NamedTable from cassandra.cqlengine.usertype import UserType @@ -37,7 +38,34 @@ schema_columnfamilies = NamedTable('system', 'schema_columnfamilies') -def create_keyspace_simple(name, replication_factor, durable_writes=True): +def get_context(keyspaces, connections): + """Return the execution context""" + + if keyspaces: + if not isinstance(keyspaces, (list, tuple)): + raise ValueError('keyspaces must be a list or a tuple.') + + if connections: + if not isinstance(connections, (list, tuple)): + raise ValueError('connections must be a list or a tuple.') + + keyspaces = keyspaces if keyspaces else [None] + connections = connections if connections else [None] + + return product(connections, keyspaces) + + +def log_msg(msg, connection=None, keyspace=None): + """Format log message to add keyspace and connection context""" + connection_info = connection if connection else DEFAULT_CONNECTION + if keyspace: + msg = '[Connection: {0}, Keyspace: {1}] {2}'.format(connection_info, keyspace, msg) + else: + msg = '[Connection: {0}] {1}'.format(connection_info, msg) + return msg + + +def create_keyspace_simple(name, replication_factor, durable_writes=True, connections=None): """ Creates a keyspace with SimpleStrategy for replica placement @@ -51,12 +79,13 @@ def create_keyspace_simple(name, replication_factor, durable_writes=True): :param str name: name of keyspace to create :param int replication_factor: keyspace replication factor, used with :attr:`~.SimpleStrategy` :param bool durable_writes: Write log is bypassed if set to False + :param str connections: List of connection names """ _create_keyspace(name, durable_writes, 'SimpleStrategy', - {'replication_factor': replication_factor}) + {'replication_factor': replication_factor}, connections=connections) -def create_keyspace_network_topology(name, dc_replication_map, durable_writes=True): +def create_keyspace_network_topology(name, dc_replication_map, durable_writes=True, connections=None): """ Creates a keyspace with NetworkTopologyStrategy for replica placement @@ -70,25 +99,37 @@ def create_keyspace_network_topology(name, dc_replication_map, durable_writes=Tr :param str name: name of keyspace to create :param dict dc_replication_map: map of dc_names: replication_factor :param bool durable_writes: Write log is bypassed if set to False + :param str connections: List of connection names """ - _create_keyspace(name, durable_writes, 'NetworkTopologyStrategy', dc_replication_map) + _create_keyspace(name, durable_writes, 'NetworkTopologyStrategy', dc_replication_map, connections=connections) -def _create_keyspace(name, durable_writes, strategy_class, strategy_options): +def _create_keyspace(name, durable_writes, strategy_class, strategy_options, connections=None): if not _allow_schema_modification(): return - cluster = get_cluster() + if connections: + if not isinstance(connections, (list, tuple)): + raise ValueError('Connections must be a list or a tuple.') + + def __create_keyspace(name, durable_writes, strategy_class, strategy_options, connection=None): + cluster = get_cluster(connection) + + if name not in cluster.metadata.keyspaces: + log.info(log_msg("Creating keyspace %s", connection=connection), name) + ks_meta = metadata.KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options) + execute(ks_meta.as_cql_query(), connection=connection) + else: + log.info(log_msg("Not creating keyspace %s because it already exists", connection=connection), name) - if name not in cluster.metadata.keyspaces: - log.info("Creating keyspace %s ", name) - ks_meta = metadata.KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options) - execute(ks_meta.as_cql_query()) + if connections: + for connection in connections: + __create_keyspace(name, durable_writes, strategy_class, strategy_options, connection=connection) else: - log.info("Not creating keyspace %s because it already exists", name) + __create_keyspace(name, durable_writes, strategy_class, strategy_options) -def drop_keyspace(name): +def drop_keyspace(name, connections=None): """ Drops a keyspace, if it exists. @@ -98,14 +139,25 @@ def drop_keyspace(name): Take care to execute schema modifications in a single context (i.e. not concurrently with other clients).** :param str name: name of keyspace to drop + :param str connections: List of connection names """ if not _allow_schema_modification(): return - cluster = get_cluster() - if name in cluster.metadata.keyspaces: - execute("DROP KEYSPACE {0}".format(metadata.protect_name(name))) + if connections: + if not isinstance(connections, (list, tuple)): + raise ValueError('Connections must be a list or a tuple.') + def _drop_keyspace(name, connection=None): + cluster = get_cluster(connection) + if name in cluster.metadata.keyspaces: + execute("DROP KEYSPACE {0}".format(metadata.protect_name(name)), connection=connection) + + if connections: + for connection in connections: + _drop_keyspace(name, connection) + else: + _drop_keyspace(name) def _get_index_name_by_column(table, column_name): """ @@ -119,7 +171,7 @@ def _get_index_name_by_column(table, column_name): return index_metadata.name -def sync_table(model, keyspaces=None): +def sync_table(model, keyspaces=None, connections=None): """ Inspects the model and creates / updates the corresponding table and columns. @@ -138,19 +190,13 @@ def sync_table(model, keyspaces=None): *There are plans to guard schema-modifying functions with an environment-driven conditional.* """ - if keyspaces: - if not isinstance(keyspaces, (list, tuple)): - raise ValueError('keyspaces must be a list or a tuple.') - - for keyspace in keyspaces: - with query.ContextQuery(model, keyspace=keyspace) as m: - _sync_table(m) - else: - _sync_table(model) - + context = get_context(keyspaces, connections) + for connection, keyspace in context: + with query.ContextQuery(model, keyspace=keyspace) as m: + _sync_table(m, connection=connection) -def _sync_table(model): +def _sync_table(model, connection=None): if not _allow_schema_modification(): return @@ -165,12 +211,13 @@ def _sync_table(model): ks_name = model._get_keyspace() - cluster = get_cluster() + cluster = get_cluster(connection) try: keyspace = cluster.metadata.keyspaces[ks_name] except KeyError: - raise CQLEngineException("Keyspace '{0}' for model {1} does not exist.".format(ks_name, model)) + msg = log_msg("Keyspace '{0}' for model {1} does not exist.", connection=connection) + raise CQLEngineException(msg.format(ks_name, model)) tables = keyspace.tables @@ -179,21 +226,21 @@ def _sync_table(model): udts = [] columns.resolve_udts(col, udts) for udt in [u for u in udts if u not in syncd_types]: - _sync_type(ks_name, udt, syncd_types) + _sync_type(ks_name, udt, syncd_types, connection=connection) if raw_cf_name not in tables: - log.debug("sync_table creating new table %s", cf_name) + log.debug(log_msg("sync_table creating new table %s", keyspace=ks_name, connection=connection), cf_name) qs = _get_create_table(model) try: - execute(qs) + execute(qs, connection=connection) except CQLEngineException as ex: # 1.2 doesn't return cf names, so we have to examine the exception # and ignore if it says the column family already exists if "Cannot add already existing column family" not in unicode(ex): raise else: - log.debug("sync_table checking existing table %s", cf_name) + log.debug(log_msg("sync_table checking existing table %s", keyspace=ks_name, connection=connection), cf_name) table_meta = tables[raw_cf_name] _validate_pk(model, table_meta) @@ -207,24 +254,27 @@ def _sync_table(model): if db_name in table_columns: col_meta = table_columns[db_name] if col_meta.cql_type != col.db_type: - msg = 'Existing table {0} has column "{1}" with a type ({2}) differing from the model type ({3}).' \ - ' Model should be updated.'.format(cf_name, db_name, col_meta.cql_type, col.db_type) + msg = log_msg('Existing table {0} has column "{1}" with a type ({2}) differing from the model type ({3}).' + ' Model should be updated.', keyspace=ks_name, connection=connection) + msg = msg.format(cf_name, db_name, col_meta.cql_type, col.db_type) warnings.warn(msg) log.warning(msg) continue if col.primary_key or col.primary_key: - raise CQLEngineException("Cannot add primary key '{0}' (with db_field '{1}') to existing table {2}".format(model_name, db_name, cf_name)) + msg = log_msg("Cannot add primary key '{0}' (with db_field '{1}') to existing table {2}", keyspace=ks_name, connection=connection) + raise CQLEngineException(msg.format(model_name, db_name, cf_name)) query = "ALTER TABLE {0} add {1}".format(cf_name, col.get_column_def()) - execute(query) + execute(query, connection=connection) db_fields_not_in_model = model_fields.symmetric_difference(table_columns) if db_fields_not_in_model: - log.info("Table {0} has fields not referenced by model: {1}".format(cf_name, db_fields_not_in_model)) + msg = log_msg("Table {0} has fields not referenced by model: {1}", keyspace=ks_name, connection=connection) + log.info(msg.format(cf_name, db_fields_not_in_model)) - _update_options(model) + _update_options(model, connection=connection) table = cluster.metadata.keyspaces[ks_name].tables[raw_cf_name] @@ -240,7 +290,7 @@ def _sync_table(model): qs += ['ON {0}'.format(cf_name)] qs += ['("{0}")'.format(column.db_field_name)] qs = ' '.join(qs) - execute(qs) + execute(qs, connection=connection) def _validate_pk(model, table_meta): @@ -259,7 +309,7 @@ def _pk_string(partition, clustering): _pk_string(meta_partition, meta_clustering))) -def sync_type(ks_name, type_model): +def sync_type(ks_name, type_model, connection=None): """ Inspects the type_model and creates / updates the corresponding type. @@ -277,33 +327,33 @@ def sync_type(ks_name, type_model): if not issubclass(type_model, UserType): raise CQLEngineException("Types must be derived from base UserType.") - _sync_type(ks_name, type_model) + _sync_type(ks_name, type_model, connection=connection) -def _sync_type(ks_name, type_model, omit_subtypes=None): +def _sync_type(ks_name, type_model, omit_subtypes=None, connection=None): syncd_sub_types = omit_subtypes or set() for field in type_model._fields.values(): udts = [] columns.resolve_udts(field, udts) for udt in [u for u in udts if u not in syncd_sub_types]: - _sync_type(ks_name, udt, syncd_sub_types) + _sync_type(ks_name, udt, syncd_sub_types, connection=connection) syncd_sub_types.add(udt) type_name = type_model.type_name() type_name_qualified = "%s.%s" % (ks_name, type_name) - cluster = get_cluster() + cluster = get_cluster(connection) keyspace = cluster.metadata.keyspaces[ks_name] defined_types = keyspace.user_types if type_name not in defined_types: - log.debug("sync_type creating new type %s", type_name_qualified) + log.debug(log_msg("sync_type creating new type %s", keyspace=ks_name, connection=connection), type_name_qualified) cql = get_create_type(type_model, ks_name) - execute(cql) + execute(cql, connection=connection) cluster.refresh_user_type_metadata(ks_name, type_name) - type_model.register_for_keyspace(ks_name) + type_model.register_for_keyspace(ks_name, connection=connection) else: type_meta = defined_types[type_name] defined_fields = type_meta.field_names @@ -311,24 +361,26 @@ def _sync_type(ks_name, type_model, omit_subtypes=None): for field in type_model._fields.values(): model_fields.add(field.db_field_name) if field.db_field_name not in defined_fields: - execute("ALTER TYPE {0} ADD {1}".format(type_name_qualified, field.get_column_def())) + execute("ALTER TYPE {0} ADD {1}".format(type_name_qualified, field.get_column_def()), connection=connection) else: field_type = type_meta.field_types[defined_fields.index(field.db_field_name)] if field_type != field.db_type: - msg = 'Existing user type {0} has field "{1}" with a type ({2}) differing from the model user type ({3}).' \ - ' UserType should be updated.'.format(type_name_qualified, field.db_field_name, field_type, field.db_type) + msg = log_msg('Existing user type {0} has field "{1}" with a type ({2}) differing from the model user type ({3}).' + ' UserType should be updated.', keyspace=ks_name, connection=connection) + msg = msg.format(type_name_qualified, field.db_field_name, field_type, field.db_type) warnings.warn(msg) log.warning(msg) - type_model.register_for_keyspace(ks_name) + type_model.register_for_keyspace(ks_name, connection=connection) if len(defined_fields) == len(model_fields): - log.info("Type %s did not require synchronization", type_name_qualified) + log.info(log_msg("Type %s did not require synchronization", keyspace=ks_name, connection=connection), type_name_qualified) return db_fields_not_in_model = model_fields.symmetric_difference(defined_fields) if db_fields_not_in_model: - log.info("Type %s has fields not referenced by model: %s", type_name_qualified, db_fields_not_in_model) + msg = log_msg("Type %s has fields not referenced by model: %s", keyspace=ks_name, connection=connection) + log.info(msg, type_name_qualified, db_fields_not_in_model) def get_create_type(type_model, keyspace): @@ -377,9 +429,9 @@ def add_column(col): return ' '.join(query_strings) -def _get_table_metadata(model): +def _get_table_metadata(model, connection=None): # returns the table as provided by the native driver for a given model - cluster = get_cluster() + cluster = get_cluster(connection) ks = model._get_keyspace() table = model._raw_column_family_name() table = cluster.metadata.keyspaces[ks].tables[table] @@ -401,19 +453,22 @@ def _options_map_from_strings(option_strings): return options -def _update_options(model): +def _update_options(model, connection=None): """Updates the table options for the given model if necessary. :param model: The model to update. + :param connection: Name of the connection to use :return: `True`, if the options were modified in Cassandra, `False` otherwise. :rtype: bool """ - log.debug("Checking %s for option differences", model) + ks_name = model._get_keyspace() + msg = log_msg("Checking %s for option differences", keyspace=ks_name, connection=connection) + log.debug(msg, model) model_options = model.__options__ or {} - table_meta = _get_table_metadata(model) + table_meta = _get_table_metadata(model, connection=connection) # go to CQL string first to normalize meta from different versions existing_option_strings = set(table_meta._make_option_strings(table_meta.options)) existing_options = _options_map_from_strings(existing_option_strings) @@ -425,7 +480,8 @@ def _update_options(model): try: existing_value = existing_options[name] except KeyError: - raise KeyError("Invalid table option: '%s'; known options: %s" % (name, existing_options.keys())) + msg = log_msg("Invalid table option: '%s'; known options: %s", keyspace=ks_name, connection=connection) + raise KeyError(msg % (name, existing_options.keys())) if isinstance(existing_value, six.string_types): if value != existing_value: update_options[name] = value @@ -441,13 +497,13 @@ def _update_options(model): if update_options: options = ' AND '.join(metadata.TableMetadataV3._make_option_strings(update_options)) query = "ALTER TABLE {0} WITH {1}".format(model.column_family_name(), options) - execute(query) + execute(query, connection=connection) return True return False -def drop_table(model, keyspaces=None): +def drop_table(model, keyspaces=None, connections=None): """ Drops the table indicated by the model, if it exists. @@ -459,29 +515,24 @@ def drop_table(model, keyspaces=None): *There are plans to guard schema-modifying functions with an environment-driven conditional.* """ - if keyspaces: - if not isinstance(keyspaces, (list, tuple)): - raise ValueError('keyspaces must be a list or a tuple.') - - for keyspace in keyspaces: - with query.ContextQuery(model, keyspace=keyspace) as m: - _drop_table(m) - else: - _drop_table(model) + context = get_context(keyspaces, connections) + for connection, keyspace in context: + with query.ContextQuery(model, keyspace=keyspace) as m: + _drop_table(m, connection=connection) -def _drop_table(model): +def _drop_table(model, connection=None): if not _allow_schema_modification(): return # don't try to delete non existant tables - meta = get_cluster().metadata + meta = get_cluster(connection).metadata ks_name = model._get_keyspace() raw_cf_name = model._raw_column_family_name() try: meta.keyspaces[ks_name].tables[raw_cf_name] - execute('DROP TABLE {0};'.format(model.column_family_name())) + execute('DROP TABLE {0};'.format(model.column_family_name()), connection=connection) except KeyError: pass diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index 3fc45fcb38..0eb95a6900 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -290,7 +290,7 @@ def __init__(self, model, keyspace=None): raise CQLEngineException("Models must be derived from base Model.") ks = keyspace if keyspace else model.__keyspace__ - new_type = type(model.__name__, (model,), {'__keyspace__': ks}) + new_type = type(model.__name__, (model,), {'__keyspace__': ks, '__abstract__': model.__abstract__}) self.model = new_type @@ -972,7 +972,7 @@ def using(self, keyspace=None): clone = copy.deepcopy(self) if keyspace: - new_type = type(self.model.__name__, (self.model,), {'__keyspace__': keyspace}) + new_type = type(self.model.__name__, (self.model,), {'__keyspace__': keyspace, '__abstract__': self.model.__abstract__}) clone.model = new_type return clone diff --git a/cassandra/cqlengine/usertype.py b/cassandra/cqlengine/usertype.py index 1d62e6beaf..adf3f5e95b 100644 --- a/cassandra/cqlengine/usertype.py +++ b/cassandra/cqlengine/usertype.py @@ -4,7 +4,7 @@ from cassandra.util import OrderedDict from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine import columns -from cassandra.cqlengine import connection +from cassandra.cqlengine import connection as conn from cassandra.cqlengine import models @@ -112,8 +112,8 @@ def items(self): return [(k, self[k]) for k in self] @classmethod - def register_for_keyspace(cls, keyspace): - connection.register_udt(keyspace, cls.type_name(), cls) + def register_for_keyspace(cls, keyspace, connection=None): + conn.register_udt(keyspace, cls.type_name(), cls, connection=connection) @classmethod def type_name(cls): From e93fbcc5de3e7bffe301bad8b16ce7ee4ad5e685 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Thu, 25 Aug 2016 07:24:39 -0400 Subject: [PATCH 04/18] add connection support via using() to QuerySet and Model --- cassandra/cqlengine/connection.py | 24 +++++++---- cassandra/cqlengine/management.py | 40 +++++++------------ cassandra/cqlengine/models.py | 40 +++++++++++++++++++ cassandra/cqlengine/named.py | 11 ++++- cassandra/cqlengine/query.py | 39 +++++++++++------- .../cqlengine/query/test_queryoperators.py | 1 - 6 files changed, 106 insertions(+), 49 deletions(-) diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index 1a8922a862..a59f250c90 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -38,6 +38,16 @@ udt_by_keyspace = defaultdict(dict) +def format_log_context(msg, connection=None, keyspace=None): + """Format log message to add keyspace and connection context""" + connection_info = connection if connection else DEFAULT_CONNECTION + if keyspace: + msg = '[Connection: {0}, Keyspace: {1}] {2}'.format(connection_info, keyspace, msg) + else: + msg = '[Connection: {0}] {1}'.format(connection_info, msg) + return msg + + class UndefinedKeyspaceException(CQLEngineException): pass @@ -79,10 +89,10 @@ def setup(self): self.cluster = Cluster(self.hosts, **self.cluster_options) try: self.session = self.cluster.connect() - log.debug("cqlengine connection '{0}' initialized with internally created session".format(self.name)) + log.debug(format_log_context("connection initialized with internally created session", connection=self.name)) except NoHostAvailable: if self.retry_connect: - log.warning("connect failed for '{0}', setting up for re-attempt on first use".format(self.name)) + log.warning(format_log_context("connect failed, setting up for re-attempt on first use", connection=self.name)) self.lazy_connect = True raise @@ -108,7 +118,7 @@ def handle_lazy_connect(self): # lazy_connect might have been set to False by another thread while waiting the lock # In this case, do nothing. if self.lazy_connect: - log.debug("Lazy connect for connection '{0}'".format(self.name)) + log.debug(format_log_context("Lazy connect for connection", connection=self.name)) self.lazy_connect = False self.setup() @@ -154,7 +164,7 @@ def default(): try: conn = get_connection() if conn.session: - log.warning("configuring new connection for cqlengine when one was already set") + log.warning("configuring new default connection for cqlengine when one was already set") except: pass @@ -175,7 +185,7 @@ def set_session(s): conn = get_connection() if conn.session: - log.warning("configuring new connection for cqlengine when one was already set") + log.warning("configuring new default connection for cqlengine when one was already set") if s.row_factory is not dict_factory: raise CQLEngineException("Failed to initialize: 'Session.row_factory' must be 'dict_factory'.") @@ -189,7 +199,7 @@ def set_session(s): conn.setup_session() - log.debug("cqlengine connection initialized with %s", s) + log.debug("cqlengine default connection initialized with %s", s) def setup( @@ -232,7 +242,7 @@ def execute(query, params=None, consistency_level=None, timeout=NOT_SET, connect elif isinstance(query, six.string_types): query = SimpleStatement(query, consistency_level=consistency_level) - log.debug(query.query_string) + log.debug(format_log_context(query.query_string, connection=connection)) result = conn.session.execute(query, params, timeout=timeout) diff --git a/cassandra/cqlengine/management.py b/cassandra/cqlengine/management.py index 4a8ad5a32b..b53c15b8cb 100644 --- a/cassandra/cqlengine/management.py +++ b/cassandra/cqlengine/management.py @@ -23,7 +23,7 @@ from cassandra import metadata from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine import columns, query -from cassandra.cqlengine.connection import execute, get_cluster, DEFAULT_CONNECTION +from cassandra.cqlengine.connection import execute, get_cluster, format_log_context from cassandra.cqlengine.models import Model from cassandra.cqlengine.named import NamedTable from cassandra.cqlengine.usertype import UserType @@ -55,16 +55,6 @@ def get_context(keyspaces, connections): return product(connections, keyspaces) -def log_msg(msg, connection=None, keyspace=None): - """Format log message to add keyspace and connection context""" - connection_info = connection if connection else DEFAULT_CONNECTION - if keyspace: - msg = '[Connection: {0}, Keyspace: {1}] {2}'.format(connection_info, keyspace, msg) - else: - msg = '[Connection: {0}] {1}'.format(connection_info, msg) - return msg - - def create_keyspace_simple(name, replication_factor, durable_writes=True, connections=None): """ Creates a keyspace with SimpleStrategy for replica placement @@ -116,11 +106,11 @@ def __create_keyspace(name, durable_writes, strategy_class, strategy_options, co cluster = get_cluster(connection) if name not in cluster.metadata.keyspaces: - log.info(log_msg("Creating keyspace %s", connection=connection), name) + log.info(format_log_context("Creating keyspace %s", connection=connection), name) ks_meta = metadata.KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options) execute(ks_meta.as_cql_query(), connection=connection) else: - log.info(log_msg("Not creating keyspace %s because it already exists", connection=connection), name) + log.info(format_log_context("Not creating keyspace %s because it already exists", connection=connection), name) if connections: for connection in connections: @@ -216,7 +206,7 @@ def _sync_table(model, connection=None): try: keyspace = cluster.metadata.keyspaces[ks_name] except KeyError: - msg = log_msg("Keyspace '{0}' for model {1} does not exist.", connection=connection) + msg = format_log_context("Keyspace '{0}' for model {1} does not exist.", connection=connection) raise CQLEngineException(msg.format(ks_name, model)) tables = keyspace.tables @@ -229,7 +219,7 @@ def _sync_table(model, connection=None): _sync_type(ks_name, udt, syncd_types, connection=connection) if raw_cf_name not in tables: - log.debug(log_msg("sync_table creating new table %s", keyspace=ks_name, connection=connection), cf_name) + log.debug(format_log_context("sync_table creating new table %s", keyspace=ks_name, connection=connection), cf_name) qs = _get_create_table(model) try: @@ -240,7 +230,7 @@ def _sync_table(model, connection=None): if "Cannot add already existing column family" not in unicode(ex): raise else: - log.debug(log_msg("sync_table checking existing table %s", keyspace=ks_name, connection=connection), cf_name) + log.debug(format_log_context("sync_table checking existing table %s", keyspace=ks_name, connection=connection), cf_name) table_meta = tables[raw_cf_name] _validate_pk(model, table_meta) @@ -254,7 +244,7 @@ def _sync_table(model, connection=None): if db_name in table_columns: col_meta = table_columns[db_name] if col_meta.cql_type != col.db_type: - msg = log_msg('Existing table {0} has column "{1}" with a type ({2}) differing from the model type ({3}).' + msg = format_log_context('Existing table {0} has column "{1}" with a type ({2}) differing from the model type ({3}).' ' Model should be updated.', keyspace=ks_name, connection=connection) msg = msg.format(cf_name, db_name, col_meta.cql_type, col.db_type) warnings.warn(msg) @@ -263,7 +253,7 @@ def _sync_table(model, connection=None): continue if col.primary_key or col.primary_key: - msg = log_msg("Cannot add primary key '{0}' (with db_field '{1}') to existing table {2}", keyspace=ks_name, connection=connection) + msg = format_log_context("Cannot add primary key '{0}' (with db_field '{1}') to existing table {2}", keyspace=ks_name, connection=connection) raise CQLEngineException(msg.format(model_name, db_name, cf_name)) query = "ALTER TABLE {0} add {1}".format(cf_name, col.get_column_def()) @@ -271,7 +261,7 @@ def _sync_table(model, connection=None): db_fields_not_in_model = model_fields.symmetric_difference(table_columns) if db_fields_not_in_model: - msg = log_msg("Table {0} has fields not referenced by model: {1}", keyspace=ks_name, connection=connection) + msg = format_log_context("Table {0} has fields not referenced by model: {1}", keyspace=ks_name, connection=connection) log.info(msg.format(cf_name, db_fields_not_in_model)) _update_options(model, connection=connection) @@ -349,7 +339,7 @@ def _sync_type(ks_name, type_model, omit_subtypes=None, connection=None): defined_types = keyspace.user_types if type_name not in defined_types: - log.debug(log_msg("sync_type creating new type %s", keyspace=ks_name, connection=connection), type_name_qualified) + log.debug(format_log_context("sync_type creating new type %s", keyspace=ks_name, connection=connection), type_name_qualified) cql = get_create_type(type_model, ks_name) execute(cql, connection=connection) cluster.refresh_user_type_metadata(ks_name, type_name) @@ -365,7 +355,7 @@ def _sync_type(ks_name, type_model, omit_subtypes=None, connection=None): else: field_type = type_meta.field_types[defined_fields.index(field.db_field_name)] if field_type != field.db_type: - msg = log_msg('Existing user type {0} has field "{1}" with a type ({2}) differing from the model user type ({3}).' + msg = format_log_context('Existing user type {0} has field "{1}" with a type ({2}) differing from the model user type ({3}).' ' UserType should be updated.', keyspace=ks_name, connection=connection) msg = msg.format(type_name_qualified, field.db_field_name, field_type, field.db_type) warnings.warn(msg) @@ -374,12 +364,12 @@ def _sync_type(ks_name, type_model, omit_subtypes=None, connection=None): type_model.register_for_keyspace(ks_name, connection=connection) if len(defined_fields) == len(model_fields): - log.info(log_msg("Type %s did not require synchronization", keyspace=ks_name, connection=connection), type_name_qualified) + log.info(format_log_context("Type %s did not require synchronization", keyspace=ks_name, connection=connection), type_name_qualified) return db_fields_not_in_model = model_fields.symmetric_difference(defined_fields) if db_fields_not_in_model: - msg = log_msg("Type %s has fields not referenced by model: %s", keyspace=ks_name, connection=connection) + msg = format_log_context("Type %s has fields not referenced by model: %s", keyspace=ks_name, connection=connection) log.info(msg, type_name_qualified, db_fields_not_in_model) @@ -464,7 +454,7 @@ def _update_options(model, connection=None): :rtype: bool """ ks_name = model._get_keyspace() - msg = log_msg("Checking %s for option differences", keyspace=ks_name, connection=connection) + msg = format_log_context("Checking %s for option differences", keyspace=ks_name, connection=connection) log.debug(msg, model) model_options = model.__options__ or {} @@ -480,7 +470,7 @@ def _update_options(model, connection=None): try: existing_value = existing_options[name] except KeyError: - msg = log_msg("Invalid table option: '%s'; known options: %s", keyspace=ks_name, connection=connection) + msg = format_log_context("Invalid table option: '%s'; known options: %s", keyspace=ks_name, connection=connection) raise KeyError(msg % (name, existing_options.keys())) if isinstance(existing_value, six.string_types): if value != existing_value: diff --git a/cassandra/cqlengine/models.py b/cassandra/cqlengine/models.py index 41dfc77770..a7caa7131c 100644 --- a/cassandra/cqlengine/models.py +++ b/cassandra/cqlengine/models.py @@ -231,6 +231,25 @@ def __call__(self, *args, **kwargs): raise NotImplementedError +class UsingDescriptor(object): + """ + return a query set descriptor with a connection context specified + """ + def __get__(self, instance, model): + if instance: + # instance method + def using_setter(connection=None): + if connection: + instance._connection = connection + return instance + return using_setter + + return model.objects.using + + def __call__(self, *args, **kwargs): + raise NotImplementedError + + class ColumnQueryEvaluator(query.AbstractQueryableColumn): """ Wraps a column and allows it to be used in comparator @@ -323,6 +342,8 @@ class MultipleObjectsReturned(_MultipleObjectsReturned): if_exists = IfExistsDescriptor() + using = UsingDescriptor() + # _len is lazily created by __len__ __table_name__ = None @@ -331,6 +352,8 @@ class MultipleObjectsReturned(_MultipleObjectsReturned): __keyspace__ = None + __connection__ = None + __discriminator_value__ = None __options__ = None @@ -351,6 +374,8 @@ class MultipleObjectsReturned(_MultipleObjectsReturned): _table_name = None # used internally to cache a derived table name + _connection = None + def __init__(self, **values): self._ttl = None self._timestamp = None @@ -358,6 +383,7 @@ def __init__(self, **values): self._batch = None self._timeout = connection.NOT_SET self._is_persisted = False + self._connection = None self._values = {} for name, column in self._columns.items(): @@ -774,6 +800,15 @@ def _inst_batch(self, batch): batch = hybrid_classmethod(_class_batch, _inst_batch) + @classmethod + def _class_get_connection(cls): + return cls.__connection__ + + def _inst_get_connection(self): + return self._connection or self.__connection__ + + _get_connection = hybrid_classmethod(_class_get_connection, _inst_get_connection) + class ModelMetaClass(type): @@ -1002,6 +1037,11 @@ class Model(BaseModel): Sets the name of the keyspace used by this model. """ + __connection__ = None + """ + Sets the name of the default connection used by this model. + """ + __options__ = None """ *Optional* Table options applied with this model diff --git a/cassandra/cqlengine/named.py b/cassandra/cqlengine/named.py index 07b4c50b61..14d14c402e 100644 --- a/cassandra/cqlengine/named.py +++ b/cassandra/cqlengine/named.py @@ -17,6 +17,7 @@ from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.columns import Column from cassandra.cqlengine.connection import get_cluster +from cassandra.cqlengine.models import UsingDescriptor, BaseModel from cassandra.cqlengine.query import AbstractQueryableColumn, SimpleQuerySet from cassandra.cqlengine.query import DoesNotExist as _DoesNotExist from cassandra.cqlengine.query import MultipleObjectsReturned as _MultipleObjectsReturned @@ -86,6 +87,13 @@ class NamedTable(object): _partition_key_index = None + __connection__ = None + _connection = None + + using = UsingDescriptor() + + _get_connection = BaseModel._get_connection + class DoesNotExist(_DoesNotExist): pass @@ -95,6 +103,7 @@ class MultipleObjectsReturned(_MultipleObjectsReturned): def __init__(self, keyspace, name): self.keyspace = keyspace self.name = name + self._connection = None @property def _partition_keys(self): @@ -104,7 +113,7 @@ def _partition_keys(self): def _get_partition_keys(self): try: - table_meta = get_cluster().metadata.keyspaces[self.keyspace].tables[self.name] + table_meta = get_cluster(self._get_connection()).metadata.keyspaces[self.keyspace].tables[self.name] self.__partition_keys = OrderedDict((pk.name, Column(primary_key=True, partition_key=True, db_field=pk.name)) for pk in table_meta.partition_key) except Exception as e: raise CQLEngineException("Failed inspecting partition keys for {0}." diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index 0eb95a6900..8752e40cc7 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -21,7 +21,7 @@ from cassandra.query import SimpleStatement from cassandra.cqlengine import columns, CQLEngineException, ValidationError, UnicodeMixin -from cassandra.cqlengine import connection +from cassandra.cqlengine import connection as conn from cassandra.cqlengine.functions import Token, BaseQueryFunction, QueryValue from cassandra.cqlengine.operators import (InOperator, EqualsOperator, GreaterThanOperator, GreaterThanOrEqualOperator, LessThanOperator, @@ -144,7 +144,7 @@ class BatchQuery(object): _consistency = None def __init__(self, batch_type=None, timestamp=None, consistency=None, execute_on_exception=False, - timeout=connection.NOT_SET): + timeout=conn.NOT_SET): """ :param batch_type: (optional) One of batch type values available through BatchType enum :type batch_type: str or None @@ -244,7 +244,7 @@ def execute(self): query_list.append('APPLY BATCH;') - tmp = connection.execute('\n'.join(query_list), parameters, self._consistency, self._timeout) + tmp = conn.execute('\n'.join(query_list), parameters, self._consistency, self._timeout) check_applied(tmp) self.queries = [] @@ -289,10 +289,12 @@ def __init__(self, model, keyspace=None): if not issubclass(model, models.Model): raise CQLEngineException("Models must be derived from base Model.") - ks = keyspace if keyspace else model.__keyspace__ - new_type = type(model.__name__, (model,), {'__keyspace__': ks, '__abstract__': model.__abstract__}) + self.model = model - self.model = new_type + if keyspace: + ks = keyspace + new_type = type(model.__name__, (model,), {'__keyspace__': ks}) + self.model = new_type def __enter__(self): return self.model @@ -345,9 +347,10 @@ def __init__(self, model): self._consistency = None self._timestamp = None self._if_not_exists = False - self._timeout = connection.NOT_SET + self._timeout = conn.NOT_SET self._if_exists = False self._fetch_size = None + self._connection = None @property def column_family_name(self): @@ -357,7 +360,7 @@ def _execute(self, statement): if self._batch: return self._batch.add_query(statement) else: - result = _execute_statement(self.model, statement, self._consistency, self._timeout) + result = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=self._connection) if self._if_not_exists or self._if_exists or self._conditional: check_applied(result) return result @@ -928,6 +931,7 @@ def create(self, **kwargs): .if_not_exists(self._if_not_exists) \ .timestamp(self._timestamp) \ .if_exists(self._if_exists) \ + .using(connection=self._connection) \ .save() def delete(self): @@ -965,16 +969,19 @@ def timeout(self, timeout): clone._timeout = timeout return clone - def using(self, keyspace=None): + def using(self, keyspace=None, connection=None): """ Change the context on-the-fly of the Model class (connection, keyspace) """ clone = copy.deepcopy(self) if keyspace: - new_type = type(self.model.__name__, (self.model,), {'__keyspace__': keyspace, '__abstract__': self.model.__abstract__}) + new_type = type(self.model.__name__, (self.model,), {'__keyspace__': keyspace}) clone.model = new_type + if connection: + clone._connection = connection + return clone @@ -1261,7 +1268,7 @@ class DMLQuery(object): _if_exists = False def __init__(self, model, instance=None, batch=None, ttl=None, consistency=None, timestamp=None, - if_not_exists=False, conditional=None, timeout=connection.NOT_SET, if_exists=False): + if_not_exists=False, conditional=None, timeout=conn.NOT_SET, if_exists=False): self.model = model self.column_family_name = self.model.column_family_name() self.instance = instance @@ -1278,7 +1285,8 @@ def _execute(self, statement): if self._batch: return self._batch.add_query(statement) else: - results = _execute_statement(self.model, statement, self._consistency, self._timeout) + connection = self.instance._get_connection() if self.instance else self.model._get_connection() + results = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=connection) if self._if_not_exists or self._if_exists or self._conditional: check_applied(results) return results @@ -1419,13 +1427,14 @@ def delete(self): self._execute(ds) -def _execute_statement(model, statement, consistency_level, timeout): +def _execute_statement(model, statement, consistency_level, timeout, connection=None): params = statement.get_context() s = SimpleStatement(str(statement), consistency_level=consistency_level, fetch_size=statement.fetch_size) if model._partition_key_index: key_values = statement.partition_key_values(model._partition_key_index) if not any(v is None for v in key_values): - parts = model._routing_key_from_values(key_values, connection.get_cluster().protocol_version) + parts = model._routing_key_from_values(key_values, conn.get_cluster(connection).protocol_version) s.routing_key = parts s.keyspace = model._get_keyspace() - return connection.execute(s, params, timeout=timeout) + connection = connection if connection else model._get_connection() + return conn.execute(s, params, timeout=timeout, connection=connection) diff --git a/tests/integration/cqlengine/query/test_queryoperators.py b/tests/integration/cqlengine/query/test_queryoperators.py index 055e8f3db2..5741b5a2d7 100644 --- a/tests/integration/cqlengine/query/test_queryoperators.py +++ b/tests/integration/cqlengine/query/test_queryoperators.py @@ -156,4 +156,3 @@ def test_named_table_pk_token_function(self): self.assertTrue(len(first_page) is 1) next_page = list(query.filter(pk__token__gt=functions.Token(last.key))) self.assertTrue(len(next_page) is 1) - From 894b9be1b07f5bf75e9bcc1b783263acda36debe Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Thu, 25 Aug 2016 10:27:15 -0400 Subject: [PATCH 05/18] Add connection support to BatchQuery --- cassandra/cqlengine/query.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index 8752e40cc7..2fe3bb9b2e 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -143,8 +143,12 @@ class BatchQuery(object): _consistency = None + _connection = None + _connection_explicit = False + + def __init__(self, batch_type=None, timestamp=None, consistency=None, execute_on_exception=False, - timeout=conn.NOT_SET): + timeout=conn.NOT_SET, connection=None): """ :param batch_type: (optional) One of batch type values available through BatchType enum :type batch_type: str or None @@ -161,6 +165,7 @@ def __init__(self, batch_type=None, timestamp=None, consistency=None, execute_on :param timeout: (optional) Timeout for the entire batch (in seconds), if not specified fallback to default session timeout :type timeout: float or None + :param str connection: Connection name to use for the batch execution """ self.queries = [] self.batch_type = batch_type @@ -173,6 +178,9 @@ def __init__(self, batch_type=None, timestamp=None, consistency=None, execute_on self._callbacks = [] self._executed = False self._context_entered = False + self._connection = connection + if connection: + self._connection_explicit = True def add_query(self, query): if not isinstance(query, BaseCQLStatement): @@ -244,7 +252,7 @@ def execute(self): query_list.append('APPLY BATCH;') - tmp = conn.execute('\n'.join(query_list), parameters, self._consistency, self._timeout) + tmp = conn.execute('\n'.join(query_list), parameters, self._consistency, self._timeout, connection=self._connection) check_applied(tmp) self.queries = [] @@ -544,6 +552,9 @@ def batch(self, batch_obj): Note: running a select query with a batch object will raise an exception """ + if self._connection: + raise CQLEngineException("Cannot specify the connection on model in batch mode.") + if batch_obj is not None and not isinstance(batch_obj, BatchQuery): raise CQLEngineException('batch_obj must be a BatchQuery instance or None') clone = copy.deepcopy(self) @@ -974,6 +985,9 @@ def using(self, keyspace=None, connection=None): Change the context on-the-fly of the Model class (connection, keyspace) """ + if connection and self._batch: + raise CQLEngineException("Cannot specify a connection on model in batch mode.") + clone = copy.deepcopy(self) if keyspace: new_type = type(self.model.__name__, (self.model,), {'__keyspace__': keyspace}) @@ -1282,10 +1296,17 @@ def __init__(self, model, instance=None, batch=None, ttl=None, consistency=None, self._timeout = timeout def _execute(self, statement): + connection = self.instance._get_connection() if self.instance else self.model._get_connection() if self._batch: + if self._batch._connection: + if not self._batch._connection_explicit and connection and \ + connection != self._batch._connection: + raise CQLEngineException('BatchQuery queries must be executed on the same connection') + else: + # set the BatchQuery connection from the model + self._batch._connection = connection return self._batch.add_query(statement) else: - connection = self.instance._get_connection() if self.instance else self.model._get_connection() results = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=connection) if self._if_not_exists or self._if_exists or self._conditional: check_applied(results) From a0f86f1cdcd6ec445888ebe588e6db61062ce998 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Thu, 25 Aug 2016 15:03:30 -0400 Subject: [PATCH 06/18] Fix class model copy --- cassandra/cqlengine/models.py | 8 ++++++++ cassandra/cqlengine/query.py | 8 ++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/cassandra/cqlengine/models.py b/cassandra/cqlengine/models.py index a7caa7131c..ef2ecea5b4 100644 --- a/cassandra/cqlengine/models.py +++ b/cassandra/cqlengine/models.py @@ -29,6 +29,14 @@ log = logging.getLogger(__name__) +def _copy_model_class(model, attrs): + new_type = type(model.__name__, (model,), attrs) + new_type.__abstract__ = model.__abstract__ + new_type.__discriminator_value__ = model.__discriminator_value__ + new_type.__default_ttl__ = model.__default_ttl__ + return new_type + + class ModelException(CQLEngineException): pass diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index 2fe3bb9b2e..48c8b89b88 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -300,9 +300,9 @@ def __init__(self, model, keyspace=None): self.model = model if keyspace: + from cassandra.cqlengine.models import _copy_model_class ks = keyspace - new_type = type(model.__name__, (model,), {'__keyspace__': ks}) - self.model = new_type + self.model = _copy_model_class(model, {'__keyspace__': ks}) def __enter__(self): return self.model @@ -990,8 +990,8 @@ def using(self, keyspace=None, connection=None): clone = copy.deepcopy(self) if keyspace: - new_type = type(self.model.__name__, (self.model,), {'__keyspace__': keyspace}) - clone.model = new_type + from cassandra.cqlengine.models import _copy_model_class + clone.model = type(self.model, {'__keyspace__': keyspace}) if connection: clone._connection = connection From da10c634611f725e24bbf99f8ff56b4152f74cee Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Thu, 25 Aug 2016 16:22:57 -0400 Subject: [PATCH 07/18] connection support in ContextQuery --- cassandra/cqlengine/models.py | 2 +- cassandra/cqlengine/query.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/cassandra/cqlengine/models.py b/cassandra/cqlengine/models.py index ef2ecea5b4..306b07bdcb 100644 --- a/cassandra/cqlengine/models.py +++ b/cassandra/cqlengine/models.py @@ -810,7 +810,7 @@ def _inst_batch(self, batch): @classmethod def _class_get_connection(cls): - return cls.__connection__ + return cls._connection or cls.__connection__ def _inst_get_connection(self): return self._connection or self.__connection__ diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index 48c8b89b88..a7dbf3be3b 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -287,7 +287,7 @@ class ContextQuery(object): """ - def __init__(self, model, keyspace=None): + def __init__(self, model, keyspace=None, connection=None): """ :param model: A model. This should be a class type, not an instance. :param keyspace: (optional) A keyspace name @@ -304,10 +304,14 @@ def __init__(self, model, keyspace=None): ks = keyspace self.model = _copy_model_class(model, {'__keyspace__': ks}) + if connection: + self.model._connection = connection + def __enter__(self): return self.model def __exit__(self, exc_type, exc_val, exc_tb): + self.model._connection = None return @@ -991,7 +995,7 @@ def using(self, keyspace=None, connection=None): clone = copy.deepcopy(self) if keyspace: from cassandra.cqlengine.models import _copy_model_class - clone.model = type(self.model, {'__keyspace__': keyspace}) + clone.model = _copy_model_class(self.model, {'__keyspace__': keyspace}) if connection: clone._connection = connection From e5d27eb5c3eddc1cf4e0dff1171f95dd2eae097c Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Thu, 25 Aug 2016 17:37:11 -0400 Subject: [PATCH 08/18] Add multiple models support to ContextQuery --- cassandra/cqlengine/query.py | 49 +++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index a7dbf3be3b..38abe7c72a 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -285,33 +285,54 @@ class ContextQuery(object): with ContextQuery(Automobile, keyspace='test4') as A: print len(A.objects.all()) # 0 result + # Multiple models + with ContextQuery(Automobile, Automobile2, connection='cluster2') as (A, A2): + print len(A.objects.all()) + print len(A2.objects.all()) + """ - def __init__(self, model, keyspace=None, connection=None): + def __init__(self, *args, **kwargs): """ - :param model: A model. This should be a class type, not an instance. - :param keyspace: (optional) A keyspace name + :param *args: One or more models. A model should be a class type, not an instance. + :param **kwargs: (optional) Context parameters: can be keyspace or connection """ from cassandra.cqlengine import models - if not issubclass(model, models.Model): - raise CQLEngineException("Models must be derived from base Model.") + self.models = [] - self.model = model + if len(args) < 1: + raise CQLEngineException("No model provided.") - if keyspace: - from cassandra.cqlengine.models import _copy_model_class - ks = keyspace - self.model = _copy_model_class(model, {'__keyspace__': ks}) + keyspace = kwargs.pop('keyspace', None) + connection = kwargs.pop('connection', None) - if connection: - self.model._connection = connection + if kwargs: + raise CQLEngineException("Unknown keyword argument(s): {0}".format( + ','.join(kwargs.keys()))) + + for model in args: + if not issubclass(model, models.Model): + raise CQLEngineException("Models must be derived from base Model.") + + m = copy.deepcopy(model) if not keyspace else None + + if keyspace: + from cassandra.cqlengine.models import _copy_model_class + ks = keyspace + m = _copy_model_class(model, {'__keyspace__': ks}) + + if connection: + m._connection = connection + + self.models.append(m) def __enter__(self): - return self.model + if len(self.models) > 1: + return tuple(self.models) + return self.models[0] def __exit__(self, exc_type, exc_val, exc_tb): - self.model._connection = None return From d8739374c4718fbb7abb9aef94535f5390e02b36 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Thu, 25 Aug 2016 20:01:03 -0400 Subject: [PATCH 09/18] Ensure the Class is properly cloned in the ContextQuery --- cassandra/cqlengine/models.py | 4 ++-- cassandra/cqlengine/query.py | 13 +++++-------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/cassandra/cqlengine/models.py b/cassandra/cqlengine/models.py index 306b07bdcb..286d8c1f62 100644 --- a/cassandra/cqlengine/models.py +++ b/cassandra/cqlengine/models.py @@ -29,7 +29,7 @@ log = logging.getLogger(__name__) -def _copy_model_class(model, attrs): +def _clone_model_class(model, attrs): new_type = type(model.__name__, (model,), attrs) new_type.__abstract__ = model.__abstract__ new_type.__discriminator_value__ = model.__discriminator_value__ @@ -810,7 +810,7 @@ def _inst_batch(self, batch): @classmethod def _class_get_connection(cls): - return cls._connection or cls.__connection__ + return cls.__connection__ def _inst_get_connection(self): return self._connection or self.__connection__ diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index 38abe7c72a..8b968c6f68 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -315,15 +315,12 @@ def __init__(self, *args, **kwargs): if not issubclass(model, models.Model): raise CQLEngineException("Models must be derived from base Model.") - m = copy.deepcopy(model) if not keyspace else None + m = models._clone_model_class(model, {}) if keyspace: - from cassandra.cqlengine.models import _copy_model_class - ks = keyspace - m = _copy_model_class(model, {'__keyspace__': ks}) - + m.__keyspace__ = keyspace if connection: - m._connection = connection + m.__connection__ = connection self.models.append(m) @@ -1015,8 +1012,8 @@ def using(self, keyspace=None, connection=None): clone = copy.deepcopy(self) if keyspace: - from cassandra.cqlengine.models import _copy_model_class - clone.model = _copy_model_class(self.model, {'__keyspace__': keyspace}) + from cassandra.cqlengine.models import _clone_model_class + clone.model = _clone_model_class(self.model, {'__keyspace__': keyspace}) if connection: clone._connection = connection From 5dbd4357077954e9cc8c17ba9ac9a8fc2c53190f Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Fri, 26 Aug 2016 07:07:33 -0400 Subject: [PATCH 10/18] Get the connection from the model if not specified in sync_table and drop_table --- cassandra/cqlengine/management.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cassandra/cqlengine/management.py b/cassandra/cqlengine/management.py index b53c15b8cb..bb85a2c689 100644 --- a/cassandra/cqlengine/management.py +++ b/cassandra/cqlengine/management.py @@ -39,7 +39,7 @@ def get_context(keyspaces, connections): - """Return the execution context""" + """Return all the execution contexts""" if keyspaces: if not isinstance(keyspaces, (list, tuple)): @@ -69,7 +69,7 @@ def create_keyspace_simple(name, replication_factor, durable_writes=True, connec :param str name: name of keyspace to create :param int replication_factor: keyspace replication factor, used with :attr:`~.SimpleStrategy` :param bool durable_writes: Write log is bypassed if set to False - :param str connections: List of connection names + :param list connections: List of connection names """ _create_keyspace(name, durable_writes, 'SimpleStrategy', {'replication_factor': replication_factor}, connections=connections) @@ -89,7 +89,7 @@ def create_keyspace_network_topology(name, dc_replication_map, durable_writes=Tr :param str name: name of keyspace to create :param dict dc_replication_map: map of dc_names: replication_factor :param bool durable_writes: Write log is bypassed if set to False - :param str connections: List of connection names + :param list connections: List of connection names """ _create_keyspace(name, durable_writes, 'NetworkTopologyStrategy', dc_replication_map, connections=connections) @@ -129,7 +129,7 @@ def drop_keyspace(name, connections=None): Take care to execute schema modifications in a single context (i.e. not concurrently with other clients).** :param str name: name of keyspace to drop - :param str connections: List of connection names + :param list connections: List of connection names """ if not _allow_schema_modification(): return @@ -182,6 +182,7 @@ def sync_table(model, keyspaces=None, connections=None): context = get_context(keyspaces, connections) for connection, keyspace in context: + connection = connection if connection else model._get_connection() with query.ContextQuery(model, keyspace=keyspace) as m: _sync_table(m, connection=connection) @@ -507,6 +508,7 @@ def drop_table(model, keyspaces=None, connections=None): context = get_context(keyspaces, connections) for connection, keyspace in context: + connection = connection if connection else model._get_connection() with query.ContextQuery(model, keyspace=keyspace) as m: _drop_table(m, connection=connection) From 5de8b576557b8711700667ddc952512cc84ca7f7 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Fri, 26 Aug 2016 07:21:26 -0400 Subject: [PATCH 11/18] Add unregister_connection and set_default_connection --- cassandra/cqlengine/connection.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index a59f250c90..c88c26f45c 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -141,6 +141,28 @@ def register_connection(name, hosts, consistency=None, lazy_connect=False, return conn +def unregister_connection(name): + + if name not in _connections: + return + + if _connections[name] == _connections[DEFAULT_CONNECTION]: + del _connections[DEFAULT_CONNECTION] + log.warning("Unregistering default connection '{0}'. Use set_default_connection to set a new one.".format(name)) + + log.debug("Connection '{0}' has been removed from the registry.".format(name)) + del _connections[name] + + +def set_default_connection(name): + + if name not in _connections: + raise CQLEngineException("Connection '{0}' doesn't exist.".format(name)) + + log.debug("Connection '{0}' has been set as default.".format(name)) + _connections[DEFAULT_CONNECTION] = _connections[name] + + def get_connection(name=None): if not name: From 71d822bb35eb6eb741522186e7e317ab241e7869 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Fri, 26 Aug 2016 07:29:07 -0400 Subject: [PATCH 12/18] minor changes --- cassandra/cqlengine/connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index c88c26f45c..3d3cd89ca9 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -179,7 +179,7 @@ def get_connection(name=None): def default(): """ - Configures the global mapper connection to localhost, using the driver defaults + Configures the default connection to localhost, using the driver defaults (except for row_factory) """ @@ -198,7 +198,7 @@ def default(): def set_session(s): """ - Configures the global mapper connection with a preexisting :class:`cassandra.cluster.Session` + Configures the default connection with a preexisting :class:`cassandra.cluster.Session` Note: the mapper presently requires a Session :attr:`~.row_factory` set to ``dict_factory``. This may be relaxed in the future From 9152b4a1d45596fa1d01cd6de90bffb7ab565e1b Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Fri, 26 Aug 2016 08:13:17 -0400 Subject: [PATCH 13/18] minor fixes --- cassandra/cqlengine/management.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cassandra/cqlengine/management.py b/cassandra/cqlengine/management.py index bb85a2c689..4fdba1c4d2 100644 --- a/cassandra/cqlengine/management.py +++ b/cassandra/cqlengine/management.py @@ -182,7 +182,6 @@ def sync_table(model, keyspaces=None, connections=None): context = get_context(keyspaces, connections) for connection, keyspace in context: - connection = connection if connection else model._get_connection() with query.ContextQuery(model, keyspace=keyspace) as m: _sync_table(m, connection=connection) @@ -201,6 +200,7 @@ def _sync_table(model, connection=None): raw_cf_name = model._raw_column_family_name() ks_name = model._get_keyspace() + connection = connection if connection else model._get_connection() cluster = get_cluster(connection) @@ -508,14 +508,16 @@ def drop_table(model, keyspaces=None, connections=None): context = get_context(keyspaces, connections) for connection, keyspace in context: - connection = connection if connection else model._get_connection() with query.ContextQuery(model, keyspace=keyspace) as m: _drop_table(m, connection=connection) + def _drop_table(model, connection=None): if not _allow_schema_modification(): return + connection = connection if connection else model._get_connection() + # don't try to delete non existant tables meta = get_cluster(connection).metadata From d1d5ef4ec6a3da80ef2494157a9c156ade4053c2 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Wed, 31 Aug 2016 07:37:07 -0400 Subject: [PATCH 14/18] Add integration tests --- cassandra/cqlengine/connection.py | 2 +- cassandra/cqlengine/models.py | 11 +- cassandra/cqlengine/query.py | 13 +- .../integration/cqlengine/test_connections.py | 389 ++++++++++++++++++ .../cqlengine/test_context_query.py | 48 +++ 5 files changed, 454 insertions(+), 9 deletions(-) create mode 100644 tests/integration/cqlengine/test_connections.py diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index 3d3cd89ca9..3070e67e56 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -146,7 +146,7 @@ def unregister_connection(name): if name not in _connections: return - if _connections[name] == _connections[DEFAULT_CONNECTION]: + if DEFAULT_CONNECTION in _connections and _connections[name] == _connections[DEFAULT_CONNECTION]: del _connections[DEFAULT_CONNECTION] log.warning("Unregistering default connection '{0}'. Use set_default_connection to set a new one.".format(name)) diff --git a/cassandra/cqlengine/models.py b/cassandra/cqlengine/models.py index 286d8c1f62..1b1987396f 100644 --- a/cassandra/cqlengine/models.py +++ b/cassandra/cqlengine/models.py @@ -31,9 +31,12 @@ def _clone_model_class(model, attrs): new_type = type(model.__name__, (model,), attrs) - new_type.__abstract__ = model.__abstract__ - new_type.__discriminator_value__ = model.__discriminator_value__ - new_type.__default_ttl__ = model.__default_ttl__ + try: + new_type.__abstract__ = model.__abstract__ + new_type.__discriminator_value__ = model.__discriminator_value__ + new_type.__default_ttl__ = model.__default_ttl__ + except AttributeError: + pass return new_type @@ -803,6 +806,8 @@ def _class_batch(cls, batch): def _inst_batch(self, batch): assert self._timeout is connection.NOT_SET, 'Setting both timeout and batch is not supported' + if self._connection: + raise CQLEngineException("Cannot specify a connection on model in batch mode.") self._batch = batch return self diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index 8b968c6f68..f47b025f08 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -302,18 +302,20 @@ def __init__(self, *args, **kwargs): self.models = [] if len(args) < 1: - raise CQLEngineException("No model provided.") + raise ValueError("No model provided.") keyspace = kwargs.pop('keyspace', None) connection = kwargs.pop('connection', None) if kwargs: - raise CQLEngineException("Unknown keyword argument(s): {0}".format( + raise ValueError("Unknown keyword argument(s): {0}".format( ','.join(kwargs.keys()))) for model in args: - if not issubclass(model, models.Model): - raise CQLEngineException("Models must be derived from base Model.") + try: + issubclass(model, models.Model) + except TypeError: + raise ValueError("Models must be derived from base Model.") m = models._clone_model_class(model, {}) @@ -390,7 +392,8 @@ def _execute(self, statement): if self._batch: return self._batch.add_query(statement) else: - result = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=self._connection) + connection = self._connection if self._connection else self.model._get_connection() + result = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=connection) if self._if_not_exists or self._if_exists or self._conditional: check_applied(result) return result diff --git a/tests/integration/cqlengine/test_connections.py b/tests/integration/cqlengine/test_connections.py new file mode 100644 index 0000000000..f2f88b0faa --- /dev/null +++ b/tests/integration/cqlengine/test_connections.py @@ -0,0 +1,389 @@ +# Copyright 2013-2016 DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cassandra import InvalidRequest +from cassandra.cluster import NoHostAvailable +from cassandra.cqlengine import columns, CQLEngineException +from cassandra.cqlengine import connection as conn +from cassandra.cqlengine.management import drop_keyspace, sync_table, drop_table, create_keyspace_simple +from cassandra.cqlengine.models import Model +from cassandra.cqlengine.query import ContextQuery, BatchQuery +from tests.integration.cqlengine import setup_connection, DEFAULT_KEYSPACE +from tests.integration.cqlengine.base import BaseCassEngTestCase + + +class TestModel(Model): + + __keyspace__ = 'ks1' + + partition = columns.Integer(primary_key=True) + cluster = columns.Integer(primary_key=True) + count = columns.Integer() + text = columns.Text() + + +class AnotherTestModel(Model): + + __keyspace__ = 'ks1' + + partition = columns.Integer(primary_key=True) + cluster = columns.Integer(primary_key=True) + count = columns.Integer() + text = columns.Text() + + +class ContextQueryConnectionTests(BaseCassEngTestCase): + + @classmethod + def setUpClass(cls): + super(ContextQueryConnectionTests, cls).setUpClass() + create_keyspace_simple('ks1', 1) + + conn.unregister_connection('default') + conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True) + conn.register_connection('cluster', ['127.0.0.1']) + + with ContextQuery(TestModel, connection='cluster') as tm: + sync_table(tm) + + @classmethod + def tearDownClass(cls): + super(ContextQueryConnectionTests, cls).tearDownClass() + + with ContextQuery(TestModel, connection='cluster') as tm: + drop_table(tm) + drop_keyspace('ks1', connections=['cluster']) + + + # reset the default connection + conn.unregister_connection('fake_cluster') + conn.unregister_connection('cluster') + setup_connection(DEFAULT_KEYSPACE) + + + def setUp(self): + super(BaseCassEngTestCase, self).setUp() + + def test_context_connection_priority(self): + + # Set the default connection on the Model + TestModel.__connection__ = 'cluster' + with ContextQuery(TestModel) as tm: + tm.objects.create(partition=1, cluster=1) + + # ContextQuery connection should have priority over default one + with ContextQuery(TestModel, connection='fake_cluster') as tm: + with self.assertRaises(NoHostAvailable): + tm.objects.create(partition=1, cluster=1) + + # Explicit connection should have priority over ContextQuery one + with ContextQuery(TestModel, connection='fake_cluster') as tm: + tm.objects.using(connection='cluster').create(partition=1, cluster=1) + + # Reset the default conn of the model + TestModel.__connection__ = None + + # No model connection and an invalid default connection + with ContextQuery(TestModel) as tm: + with self.assertRaises(NoHostAvailable): + tm.objects.create(partition=1, cluster=1) + + def test_context_connection_with_keyspace(self): + + # ks2 doesn't exist + with ContextQuery(TestModel, connection='cluster', keyspace='ks2') as tm: + with self.assertRaises(InvalidRequest): + tm.objects.create(partition=1, cluster=1) + + +class ManagementConnectionTests(BaseCassEngTestCase): + + keyspaces = ['ks1', 'ks2'] + conns = ['cluster'] + + @classmethod + def setUpClass(cls): + super(ManagementConnectionTests, cls).setUpClass() + conn.unregister_connection('default') + conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True) + conn.register_connection('cluster', ['127.0.0.1']) + + + @classmethod + def tearDownClass(cls): + super(ManagementConnectionTests, cls).tearDownClass() + + # reset the default connection + conn.unregister_connection('fake_cluster') + conn.unregister_connection('cluster') + setup_connection(DEFAULT_KEYSPACE) + + def setUp(self): + super(BaseCassEngTestCase, self).setUp() + + def test_create_drop_keyspace(self): + + # No connection (default is fake) + with self.assertRaises(NoHostAvailable): + create_keyspace_simple(self.keyspaces[0], 1) + + # Explicit connections + for ks in self.keyspaces: + create_keyspace_simple(ks, 1, connections=self.conns) + + for ks in self.keyspaces: + drop_keyspace(ks, connections=self.conns) + + def test_create_drop_table(self): + + for ks in self.keyspaces: + create_keyspace_simple(ks, 1, connections=self.conns) + + # No connection (default is fake) + with self.assertRaises(NoHostAvailable): + sync_table(TestModel) + + # Explicit connections + sync_table(TestModel, connections=self.conns) + + # Explicit drop + drop_table(TestModel, connections=self.conns) + + # Model connection + TestModel.__connection__ = 'cluster' + sync_table(TestModel) + TestModel.__connection__ = None + + # No connection (default is fake) + with self.assertRaises(NoHostAvailable): + drop_table(TestModel) + + # Model connection + TestModel.__connection__ = 'cluster' + drop_table(TestModel) + TestModel.__connection__ = None + + # Model connection + for ks in self.keyspaces: + drop_keyspace(ks, connections=self.conns) + + +class BatchQueryConnectionTests(BaseCassEngTestCase): + + conns = ['cluster'] + + @classmethod + def setUpClass(cls): + super(BatchQueryConnectionTests, cls).setUpClass() + + create_keyspace_simple('ks1', 1) + sync_table(TestModel) + sync_table(AnotherTestModel) + + conn.unregister_connection('default') + conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True) + conn.register_connection('cluster', ['127.0.0.1']) + + + @classmethod + def tearDownClass(cls): + super(BatchQueryConnectionTests, cls).tearDownClass() + + # reset the default connection + conn.unregister_connection('fake_cluster') + conn.unregister_connection('cluster') + setup_connection(DEFAULT_KEYSPACE) + + drop_keyspace('ks1') + + def setUp(self): + super(BaseCassEngTestCase, self).setUp() + + def test_basic_batch_query(self): + """Test BatchQuery requests""" + + # No connection with a QuerySet (default is a fake one) + with self.assertRaises(NoHostAvailable): + with BatchQuery() as b: + TestModel.objects.batch(b).create(partition=1, cluster=1) + + # Explicit connection with a QuerySet + with BatchQuery(connection='cluster') as b: + TestModel.objects.batch(b).create(partition=1, cluster=1) + + # Get an object from the BD + with ContextQuery(TestModel, connection='cluster') as tm: + obj = tm.objects.get(partition=1, cluster=1) + obj.__connection__ = None + + # No connection with a model (default is a fake one) + with self.assertRaises(NoHostAvailable): + with BatchQuery() as b: + obj.count = 2 + obj.batch(b).save() + + # Explicit connection with a model + with BatchQuery(connection='cluster') as b: + obj.count = 2 + obj.batch(b).save() + + def test_batch_query_different_connection(self): + """Test BatchQuery with Models that have a different connection""" + + # Testing on a model class + TestModel.__connection__ = 'cluster' + AnotherTestModel.__connection__ = 'cluster2' + + with self.assertRaises(CQLEngineException): + with BatchQuery() as b: + TestModel.objects.batch(b).create(partition=1, cluster=1) + AnotherTestModel.objects.batch(b).create(partition=1, cluster=1) + + TestModel.__connection__ = None + AnotherTestModel.__connection__ = None + + with BatchQuery(connection='cluster') as b: + TestModel.objects.batch(b).create(partition=1, cluster=1) + AnotherTestModel.objects.batch(b).create(partition=1, cluster=1) + + # Testing on a model instance + with ContextQuery(TestModel, AnotherTestModel, connection='cluster') as (tm, atm): + obj1 = tm.objects.get(partition=1, cluster=1) + obj2 = atm.objects.get(partition=1, cluster=1) + + obj1.__connection__ = 'cluster' + obj2.__connection__ = 'cluster2' + + obj1.count = 4 + obj2.count = 4 + + with self.assertRaises(CQLEngineException): + with BatchQuery() as b: + obj1.batch(b).save() + obj2.batch(b).save() + + def test_batch_query_connection_override(self): + """Test that we cannot override a BatchQuery connection per model""" + + with self.assertRaises(CQLEngineException): + with BatchQuery(connection='cluster') as b: + TestModel.batch(b).using(connection='test').save() + + with self.assertRaises(CQLEngineException): + with BatchQuery(connection='cluster') as b: + TestModel.using(connection='test').batch(b).save() + + with ContextQuery(TestModel, AnotherTestModel, connection='cluster') as (tm, atm): + obj1 = tm.objects.get(partition=1, cluster=1) + obj1.__connection__ = None + + with self.assertRaises(CQLEngineException): + with BatchQuery(connection='cluster') as b: + obj1.using(connection='test').batch(b).save() + + with self.assertRaises(CQLEngineException): + with BatchQuery(connection='cluster') as b: + obj1.batch(b).using(connection='test').save() + + +class UsingDescriptorTests(BaseCassEngTestCase): + + conns = ['cluster'] + keyspaces = ['ks1', 'ks2'] + + @classmethod + def setUpClass(cls): + super(UsingDescriptorTests, cls).setUpClass() + + conn.unregister_connection('default') + conn.register_connection('fake_cluster', ['127.0.0.100'], lazy_connect=True, retry_connect=True, default=True) + conn.register_connection('cluster', ['127.0.0.1']) + + + @classmethod + def tearDownClass(cls): + super(UsingDescriptorTests, cls).tearDownClass() + + # reset the default connection + conn.unregister_connection('fake_cluster') + conn.unregister_connection('cluster') + setup_connection(DEFAULT_KEYSPACE) + + for ks in cls.keyspaces: + drop_keyspace(ks) + + def setUp(self): + super(BaseCassEngTestCase, self).setUp() + + def _reset_data(self): + + for ks in self.keyspaces: + drop_keyspace(ks, connections=self.conns) + + for ks in self.keyspaces: + create_keyspace_simple(ks, 1, connections=self.conns) + sync_table(TestModel, keyspaces=self.keyspaces, connections=self.conns) + + def test_keyspace(self): + + self._reset_data() + + with ContextQuery(TestModel, connection='cluster') as tm: + + # keyspace Model class + tm.objects.using(keyspace='ks2').create(partition=1, cluster=1) + tm.objects.using(keyspace='ks2').create(partition=2, cluster=2) + + with self.assertRaises(TestModel.DoesNotExist): + tm.objects.get(partition=1, cluster=1) # default keyspace ks1 + obj1 = tm.objects.using(keyspace='ks2').get(partition=1, cluster=1) + + obj1.count = 2 + obj1.save() + + with self.assertRaises(NoHostAvailable): + TestModel.objects.using(keyspace='ks2').get(partition=1, cluster=1) + + obj2 = TestModel.objects.using(connection='cluster', keyspace='ks2').get(partition=1, cluster=1) + self.assertEqual(obj2.count, 2) + + # Update test + TestModel.objects(partition=2, cluster=2).using(connection='cluster', keyspace='ks2').update(count=5) + obj3 = TestModel.objects.using(connection='cluster', keyspace='ks2').get(partition=2, cluster=2) + self.assertEqual(obj3.count, 5) + + TestModel.objects(partition=2, cluster=2).using(connection='cluster', keyspace='ks2').delete() + with self.assertRaises(TestModel.DoesNotExist): + TestModel.objects.using(connection='cluster', keyspace='ks2').get(partition=2, cluster=2) + + def test_connection(self): + + self._reset_data() + + # Model class + with self.assertRaises(NoHostAvailable): + TestModel.objects.create(partition=1, cluster=1) + + TestModel.objects.using(connection='cluster').create(partition=1, cluster=1) + TestModel.objects(partition=1, cluster=1).using(connection='cluster').update(count=2) + obj1 = TestModel.objects.using(connection='cluster').get(partition=1, cluster=1) + self.assertEqual(obj1.count, 2) + + obj1.using(connection='cluster').update(count=5) + obj1 = TestModel.objects.using(connection='cluster').get(partition=1, cluster=1) + self.assertEqual(obj1.count, 5) + + obj1.using(connection='cluster').delete() + with self.assertRaises(TestModel.DoesNotExist): + TestModel.objects.using(connection='cluster').get(partition=1, cluster=1) diff --git a/tests/integration/cqlengine/test_context_query.py b/tests/integration/cqlengine/test_context_query.py index b3941319e9..0a29688d96 100644 --- a/tests/integration/cqlengine/test_context_query.py +++ b/tests/integration/cqlengine/test_context_query.py @@ -46,6 +46,7 @@ def tearDownClass(cls): for ks in cls.KEYSPACES: drop_keyspace(ks) + def setUp(self): super(ContextQueryTests, self).setUp() for ks in self.KEYSPACES: @@ -125,3 +126,50 @@ def test_context_keyspace(self): self.assertEqual(42, tm.objects.get(partition=1).count) + def test_context_multiple_models(self): + """ + Tests the use of multiple models with the context manager + + @since 3.7 + @jira_ticket PYTHON-613 + @expected_result all models are properly updated with the context + + @test_category query + """ + + with ContextQuery(TestModel, TestModel, keyspace='ks4') as (tm1, tm2): + + self.assertNotEqual(tm1, tm2) + self.assertEqual(tm1.__keyspace__, 'ks4') + self.assertEqual(tm2.__keyspace__, 'ks4') + + def test_context_invalid_parameters(self): + """ + Tests that invalid parameters are raised by the context manager + + @since 3.7 + @jira_ticket PYTHON-613 + @expected_result a ValueError is raised when passing invalid parameters + + @test_category query + """ + + with self.assertRaises(ValueError): + with ContextQuery(keyspace='ks2'): + pass + + with self.assertRaises(ValueError): + with ContextQuery(42) as tm: + pass + + with self.assertRaises(ValueError): + with ContextQuery(TestModel, 42): + pass + + with self.assertRaises(ValueError): + with ContextQuery(TestModel, unknown_param=42): + pass + + with self.assertRaises(ValueError): + with ContextQuery(TestModel, keyspace='ks2', unknown_param=42): + pass \ No newline at end of file From 07aea035bf9b00364c28b47b239047141a2ca580 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Wed, 31 Aug 2016 09:45:59 -0400 Subject: [PATCH 15/18] preserve connection.cluster/session since they are not explicitly private --- cassandra/cqlengine/connection.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index 3070e67e56..943467b8c8 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -28,6 +28,9 @@ NOT_SET = _NOT_SET # required for passing timeout to Session.execute +cluster = None +session = None + # connections registry DEFAULT_CONNECTION = '_default_' _connections = {} @@ -79,6 +82,7 @@ def __init__(self, name, hosts, consistency=None, def setup(self): """Setup the connection""" + global cluster, session if 'username' in self.cluster_options or 'password' in self.cluster_options: raise CQLEngineException("Username & Password are now handled by using the native driver's auth_provider") @@ -99,6 +103,10 @@ def setup(self): if self.consistency is not None: self.session.default_consistency_level = self.consistency + if DEFAULT_CONNECTION in _connections and _connections[DEFAULT_CONNECTION] == self: + cluster = _connections[DEFAULT_CONNECTION].cluster + session = _connections[DEFAULT_CONNECTION].session + self.setup_session() def setup_session(self): @@ -135,19 +143,22 @@ def register_connection(name, hosts, consistency=None, lazy_connect=False, _connections[name] = conn if default: - _connections[DEFAULT_CONNECTION] = conn + set_default_connection(name) conn.setup() return conn def unregister_connection(name): + global cluster, session if name not in _connections: return if DEFAULT_CONNECTION in _connections and _connections[name] == _connections[DEFAULT_CONNECTION]: del _connections[DEFAULT_CONNECTION] + cluster = None + session = None log.warning("Unregistering default connection '{0}'. Use set_default_connection to set a new one.".format(name)) log.debug("Connection '{0}' has been removed from the registry.".format(name)) @@ -155,12 +166,15 @@ def unregister_connection(name): def set_default_connection(name): + global cluster, session if name not in _connections: raise CQLEngineException("Connection '{0}' doesn't exist.".format(name)) log.debug("Connection '{0}' has been set as default.".format(name)) _connections[DEFAULT_CONNECTION] = _connections[name] + cluster = _connections[name].cluster + session = _connections[name].session def get_connection(name=None): From 83e8d66435041b679990035b6e77590a7ea0c13f Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Wed, 31 Aug 2016 09:52:42 -0400 Subject: [PATCH 16/18] make DEFAULT_CONNECTION key an object --- cassandra/cqlengine/connection.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index 943467b8c8..9b11dc3abd 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -32,7 +32,7 @@ session = None # connections registry -DEFAULT_CONNECTION = '_default_' +DEFAULT_CONNECTION = object() _connections = {} # Because type models may be registered before a connection is present, @@ -43,7 +43,9 @@ def format_log_context(msg, connection=None, keyspace=None): """Format log message to add keyspace and connection context""" - connection_info = connection if connection else DEFAULT_CONNECTION + connection_info = connection + if not connection_info: + connection_info = 'DEFAULT_CONNECTION' if keyspace: msg = '[Connection: {0}, Keyspace: {1}] {2}'.format(connection_info, keyspace, msg) else: From d648bfe397066e6a867cb701b3a5645036d287d8 Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Wed, 31 Aug 2016 10:12:36 -0400 Subject: [PATCH 17/18] minor fixes --- cassandra/cqlengine/management.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/cassandra/cqlengine/management.py b/cassandra/cqlengine/management.py index 4fdba1c4d2..b5ca80d3a7 100644 --- a/cassandra/cqlengine/management.py +++ b/cassandra/cqlengine/management.py @@ -38,7 +38,7 @@ schema_columnfamilies = NamedTable('system', 'schema_columnfamilies') -def get_context(keyspaces, connections): +def _get_context(keyspaces, connections): """Return all the execution contexts""" if keyspaces: @@ -165,7 +165,11 @@ def sync_table(model, keyspaces=None, connections=None): """ Inspects the model and creates / updates the corresponding table and columns. - If `keyspaces` is specified, the table will be synched for all specified keyspaces. Note that the `Model.__keyspace__` is ignored in that case. + If `keyspaces` is specified, the table will be synched for all specified keyspaces. + Note that the `Model.__keyspace__` is ignored in that case. + + If `connections` is specified, the table will be synched for all specified connections. Note that the `Model.__connection__` is ignored in that case. + If not specified, it will try to get the connection from the Model. Any User Defined Types used in the table are implicitly synchronized. @@ -180,7 +184,7 @@ def sync_table(model, keyspaces=None, connections=None): *There are plans to guard schema-modifying functions with an environment-driven conditional.* """ - context = get_context(keyspaces, connections) + context = _get_context(keyspaces, connections) for connection, keyspace in context: with query.ContextQuery(model, keyspace=keyspace) as m: _sync_table(m, connection=connection) @@ -500,13 +504,17 @@ def drop_table(model, keyspaces=None, connections=None): If `keyspaces` is specified, the table will be dropped for all specified keyspaces. Note that the `Model.__keyspace__` is ignored in that case. + If `connections` is specified, the table will be synched for all specified connections. Note that the `Model.__connection__` is ignored in that case. + If not specified, it will try to get the connection from the Model. + + **This function should be used with caution, especially in production environments. Take care to execute schema modifications in a single context (i.e. not concurrently with other clients).** *There are plans to guard schema-modifying functions with an environment-driven conditional.* """ - context = get_context(keyspaces, connections) + context = _get_context(keyspaces, connections) for connection, keyspace in context: with query.ContextQuery(model, keyspace=keyspace) as m: _drop_table(m, connection=connection) From eb9cb9ea7091c9465814dd1cf24a985513138bcd Mon Sep 17 00:00:00 2001 From: Alan Boudreault Date: Wed, 31 Aug 2016 10:59:56 -0400 Subject: [PATCH 18/18] better syntax --- cassandra/cqlengine/connection.py | 5 ++--- cassandra/cqlengine/management.py | 4 ++-- cassandra/cqlengine/query.py | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index 9b11dc3abd..cd015abebb 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -43,9 +43,8 @@ def format_log_context(msg, connection=None, keyspace=None): """Format log message to add keyspace and connection context""" - connection_info = connection - if not connection_info: - connection_info = 'DEFAULT_CONNECTION' + connection_info = connection or 'DEFAULT_CONNECTION' + if keyspace: msg = '[Connection: {0}, Keyspace: {1}] {2}'.format(connection_info, keyspace, msg) else: diff --git a/cassandra/cqlengine/management.py b/cassandra/cqlengine/management.py index b5ca80d3a7..fe313214bb 100644 --- a/cassandra/cqlengine/management.py +++ b/cassandra/cqlengine/management.py @@ -204,7 +204,7 @@ def _sync_table(model, connection=None): raw_cf_name = model._raw_column_family_name() ks_name = model._get_keyspace() - connection = connection if connection else model._get_connection() + connection = connection or model._get_connection() cluster = get_cluster(connection) @@ -524,7 +524,7 @@ def _drop_table(model, connection=None): if not _allow_schema_modification(): return - connection = connection if connection else model._get_connection() + connection = connection or model._get_connection() # don't try to delete non existant tables meta = get_cluster(connection).metadata diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index f47b025f08..cf682c7a48 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -392,7 +392,7 @@ def _execute(self, statement): if self._batch: return self._batch.add_query(statement) else: - connection = self._connection if self._connection else self.model._get_connection() + connection = self._connection or self.model._get_connection() result = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=connection) if self._if_not_exists or self._if_exists or self._conditional: check_applied(result) @@ -1482,5 +1482,5 @@ def _execute_statement(model, statement, consistency_level, timeout, connection= parts = model._routing_key_from_values(key_values, conn.get_cluster(connection).protocol_version) s.routing_key = parts s.keyspace = model._get_keyspace() - connection = connection if connection else model._get_connection() + connection = connection or model._get_connection() return conn.execute(s, params, timeout=timeout, connection=connection)