Skip to content
This repository was archived by the owner on Apr 27, 2021. It is now read-only.

Commit ae568b4

Browse files
committed
Add User Definted Types to mapper.
First pass; requires tests, further documentation
1 parent ef72d2f commit ae568b4

10 files changed

Lines changed: 343 additions & 18 deletions

File tree

cassandra/cqlengine/columns.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,11 @@ def get_column_def(self):
251251
static = "static" if self.static else ""
252252
return '{} {} {}'.format(self.cql, self.db_type, static)
253253

254+
# TODO: make columns use cqltypes under the hood
255+
# until then, this bridges the gap in using types along with cassandra.metadata for CQL generation
256+
def cql_parameterized_type(self):
257+
return self.db_type
258+
254259
def set_column_name(self, name):
255260
"""
256261
Sets the column name during document class construction
@@ -279,6 +284,10 @@ def _val_is_null(self, val):
279284
""" determines if the given value equates to a null value for the given column type """
280285
return val is None
281286

287+
@property
288+
def sub_columns(self):
289+
return []
290+
282291

283292
class Blob(Column):
284293
"""
@@ -671,6 +680,10 @@ def validate(self, value):
671680
def _val_is_null(self, val):
672681
return not val
673682

683+
@property
684+
def sub_columns(self):
685+
return [self.value_col]
686+
674687

675688
class BaseContainerQuoter(ValueQuoter):
676689

@@ -841,6 +854,37 @@ def to_database(self, value):
841854
return value
842855
return self.Quoter({self.key_col.to_database(k): self.value_col.to_database(v) for k, v in value.items()})
843856

857+
@property
858+
def sub_columns(self):
859+
return [self.key_col, self.value_col]
860+
861+
862+
class UserDefinedType(Column):
863+
"""
864+
User Defined Type column
865+
866+
http://www.datastax.com/documentation/cql/3.1/cql/cql_using/cqlUseUDT.html
867+
"""
868+
869+
def __init__(self, user_type, **kwargs):
870+
"""
871+
:param type user_type: specifies a :class:`~.Type` model for the column
872+
"""
873+
self.user_type = user_type
874+
self.db_type = "frozen<%s>" % user_type.type_name()
875+
super(UserDefinedType, self).__init__(**kwargs)
876+
877+
@property
878+
def sub_columns(self):
879+
return list(self.user_type._fields.values())
880+
881+
882+
def resolve_udts(col_def, out_list):
883+
for col in col_def.sub_columns:
884+
resolve_udts(col, out_list)
885+
if isinstance(col_def, UserDefinedType):
886+
out_list.append(col_def.user_type)
887+
844888

845889
class _PartitionKeysToken(Column):
846890
"""

cassandra/cqlengine/connection.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import six
1818

1919
from cassandra import ConsistencyLevel
20-
from cassandra.cluster import Cluster, _NOT_SET, NoHostAvailable
20+
from cassandra.cluster import Cluster, _NOT_SET, NoHostAvailable, UserTypeDoesNotExist
2121
from cassandra.query import SimpleStatement, Statement, dict_factory
2222

2323
from cassandra.cqlengine import CQLEngineException
@@ -36,6 +36,12 @@
3636
default_consistency_level = ConsistencyLevel.ONE
3737

3838

39+
# Because type models may be registered before a connection is present,
40+
# and because sessions may be replaced, we must register UDTs here, in order
41+
# to have them registered when a new session is established.
42+
udt_by_keyspace = {}
43+
44+
3945
class UndefinedKeyspaceException(CQLEngineException):
4046
pass
4147

@@ -54,6 +60,8 @@ def default():
5460
session = cluster.connect()
5561
session.row_factory = dict_factory
5662

63+
_register_known_types(cluster)
64+
5765
log.debug("cqlengine connection initialized with default session to localhost")
5866

5967

@@ -74,6 +82,8 @@ def set_session(s):
7482
session = s
7583
cluster = s.cluster
7684

85+
_register_known_types(cluster)
86+
7787
log.debug("cqlengine connection initialized with %s", s)
7888

7989

@@ -129,6 +139,8 @@ def setup(
129139
raise
130140
session.row_factory = dict_factory
131141

142+
_register_known_types(cluster)
143+
132144

133145
def execute(query, params=None, consistency_level=None, timeout=NOT_SET):
134146

@@ -178,3 +190,23 @@ def handle_lazy_connect():
178190
hosts, kwargs = lazy_connect_args
179191
lazy_connect_args = None
180192
setup(hosts, **kwargs)
193+
194+
195+
def register_udt(keyspace, type_name, klass):
196+
try:
197+
udt_by_keyspace[keyspace][type_name] = klass
198+
except KeyError:
199+
udt_by_keyspace[keyspace] = {type_name: klass}
200+
201+
global cluster
202+
if cluster:
203+
cluster.register_user_type(keyspace, type_name, klass)
204+
205+
206+
def _register_known_types(cluster):
207+
for ks_name, name_type_map in udt_by_keyspace.items():
208+
for type_name, klass in name_type_map.items():
209+
try:
210+
cluster.register_user_type(ks_name, type_name, klass)
211+
except UserTypeDoesNotExist:
212+
pass # new types are covered in management sync functions

cassandra/cqlengine/management.py

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import six
2020
import warnings
2121

22-
from cassandra.metadata import KeyspaceMetadata
22+
from cassandra import metadata
2323
from cassandra.cqlengine import CQLEngineException, SizeTieredCompactionStrategy, LeveledCompactionStrategy
24+
from cassandra.cqlengine import columns
2425
from cassandra.cqlengine.connection import execute, get_cluster
2526
from cassandra.cqlengine.models import Model
2627
from cassandra.cqlengine.named import NamedTable
28+
from cassandra.cqlengine.usertype import UserType
2729

2830
CQLENG_ALLOW_SCHEMA_MANAGEMENT = 'CQLENG_ALLOW_SCHEMA_MANAGEMENT'
2931

@@ -132,7 +134,7 @@ def _create_keyspace(name, durable_writes, strategy_class, strategy_options):
132134

133135
if name not in cluster.metadata.keyspaces:
134136
log.info("Creating keyspace %s ", name)
135-
ks_meta = KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options)
137+
ks_meta = metadata.KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options)
136138
execute(ks_meta.as_cql_query())
137139
else:
138140
log.info("Not creating keyspace %s because it already exists", name)
@@ -168,6 +170,8 @@ def sync_table(model):
168170
"""
169171
Inspects the model and creates / updates the corresponding table and columns.
170172
173+
Any User Defined Types used in the table are implicitly synchronized.
174+
171175
This function can only add fields that are not part of the primary key.
172176
173177
Note that the attributes removed from the model are not deleted on the database.
@@ -198,6 +202,13 @@ def sync_table(model):
198202
keyspace = cluster.metadata.keyspaces[ks_name]
199203
tables = keyspace.tables
200204

205+
syncd_types = set()
206+
for col in model._columns.values():
207+
udts = []
208+
columns.resolve_udts(col, udts)
209+
for udt in [u for u in udts if u not in syncd_types]:
210+
_sync_type(ks_name, udt, syncd_types)
211+
201212
# check for an existing column family
202213
if raw_cf_name not in tables:
203214
log.debug("sync_table creating new table %s", cf_name)
@@ -216,6 +227,7 @@ def sync_table(model):
216227
fields = get_fields(model)
217228
field_names = [x.name for x in fields]
218229
model_fields = set()
230+
# # TODO: does this work with db_name??
219231
for name, col in model._columns.items():
220232
if col.primary_key or col.partition_key:
221233
continue # we can't mess with the PK
@@ -248,6 +260,77 @@ def sync_table(model):
248260
execute(qs)
249261

250262

263+
def sync_type(ks_name, type_model):
264+
"""
265+
Inspects the type_model and creates / updates the corresponding type.
266+
267+
Note that the attributes removed from the type_model are not deleted on the database (this operation is not supported).
268+
They become effectively ignored by (will not show up on) the type_model.
269+
270+
**This function should be used with caution, especially in production environments.
271+
Take care to execute schema modifications in a single context (i.e. not concurrently with other clients).**
272+
273+
*There are plans to guard schema-modifying functions with an environment-driven conditional.*
274+
"""
275+
if not _allow_schema_modification():
276+
return
277+
278+
if not issubclass(type_model, UserType):
279+
raise CQLEngineException("Types must be derived from base UserType.")
280+
281+
_sync_type(ks_name, type_model)
282+
283+
284+
def _sync_type(ks_name, type_model, omit_subtypes=None):
285+
286+
syncd_sub_types = omit_subtypes or set()
287+
for field in type_model._fields.values():
288+
udts = []
289+
columns.resolve_udts(field, udts)
290+
for udt in [u for u in udts if u not in syncd_sub_types]:
291+
_sync_type(ks_name, udt, syncd_sub_types)
292+
syncd_sub_types.add(udt)
293+
294+
type_name = type_model.type_name()
295+
type_name_qualified = "%s.%s" % (ks_name, type_name)
296+
297+
cluster = get_cluster()
298+
299+
keyspace = cluster.metadata.keyspaces[ks_name]
300+
defined_types = keyspace.user_types
301+
302+
if type_name not in defined_types:
303+
log.debug("sync_type creating new type %s", type_name_qualified)
304+
cql = get_create_type(type_model, ks_name)
305+
execute(cql)
306+
type_model.register_for_keyspace(ks_name)
307+
else:
308+
defined_fields = defined_types[type_name].field_names
309+
model_fields = set()
310+
for field in type_model._fields.values():
311+
model_fields.add(field.db_field_name)
312+
if field.db_field_name not in defined_fields:
313+
execute("ALTER TYPE {} ADD {}".format(type_name_qualified, field.get_column_def()))
314+
315+
if len(defined_fields) == len(model_fields):
316+
log.info("Type %s did not require synchronization", type_name_qualified)
317+
return
318+
319+
db_fields_not_in_model = model_fields.symmetric_difference(defined_fields)
320+
if db_fields_not_in_model:
321+
log.info("Type %s has fields not referenced by model: %s", type_name_qualified, db_fields_not_in_model)
322+
323+
type_model.register_for_keyspace(ks_name)
324+
325+
326+
def get_create_type(type_model, keyspace):
327+
type_meta = metadata.UserType(keyspace,
328+
type_model.type_name(),
329+
(f.db_field_name for f in type_model._fields.values()),
330+
type_model._fields.values())
331+
return type_meta.as_cql_query()
332+
333+
251334
def get_create_table(model):
252335
cf_name = model.column_family_name()
253336
qs = ['CREATE TABLE {}'.format(cf_name)]
@@ -439,11 +522,12 @@ def drop_table(model):
439522
raw_cf_name = model.column_family_name(include_keyspace=False)
440523

441524
try:
442-
table = meta.keyspaces[ks_name].tables[raw_cf_name]
525+
meta.keyspaces[ks_name].tables[raw_cf_name]
443526
execute('drop table {};'.format(model.column_family_name(include_keyspace=True)))
444527
except KeyError:
445528
pass
446529

530+
447531
def _allow_schema_modification():
448532
if not os.getenv(CQLENG_ALLOW_SCHEMA_MANAGEMENT):
449533
msg = CQLENG_ALLOW_SCHEMA_MANAGEMENT + " environment variable is not set. Future versions of this package will require this variable to enable management functions."

cassandra/cqlengine/models.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
from cassandra.cqlengine import CQLEngineException, ValidationError
2121
from cassandra.cqlengine import columns
22-
from cassandra.cqlengine.query import ModelQuerySet, DMLQuery, AbstractQueryableColumn, NOT_SET
22+
from cassandra.cqlengine import connection
23+
from cassandra.cqlengine import query
2324
from cassandra.cqlengine.query import DoesNotExist as _DoesNotExist
2425
from cassandra.cqlengine.query import MultipleObjectsReturned as _MultipleObjectsReturned
2526
from cassandra.util import OrderedDict
@@ -211,7 +212,7 @@ def __call__(self, *args, **kwargs):
211212
raise NotImplementedError
212213

213214

214-
class ColumnQueryEvaluator(AbstractQueryableColumn):
215+
class ColumnQueryEvaluator(query.AbstractQueryableColumn):
215216
"""
216217
Wraps a column and allows it to be used in comparator
217218
expressions, returning query operators
@@ -330,8 +331,8 @@ class MultipleObjectsReturned(_MultipleObjectsReturned):
330331

331332
# end compaction
332333
# the queryset class used for this class
333-
__queryset__ = ModelQuerySet
334-
__dmlquery__ = DMLQuery
334+
__queryset__ = query.ModelQuerySet
335+
__dmlquery__ = query.DMLQuery
335336

336337
__consistency__ = None # can be set per query
337338

@@ -371,7 +372,7 @@ def __init__(self, **values):
371372
# that update should be used when persisting changes
372373
self._is_persisted = False
373374
self._batch = None
374-
self._timeout = NOT_SET
375+
self._timeout = connection.NOT_SET
375376

376377
def __repr__(self):
377378
"""
@@ -741,7 +742,7 @@ def _class_batch(cls, batch):
741742
return cls.objects.batch(batch)
742743

743744
def _inst_batch(self, batch):
744-
assert self._timeout is NOT_SET, 'Setting both timeout and batch is not supported'
745+
assert self._timeout is connection.NOT_SET, 'Setting both timeout and batch is not supported'
745746
self._batch = batch
746747
return self
747748

@@ -919,6 +920,13 @@ def _get_polymorphic_base(bases):
919920
# create the class and add a QuerySet to it
920921
klass = super(ModelMetaClass, cls).__new__(cls, name, bases, attrs)
921922

923+
udts = []
924+
for col in column_dict.values():
925+
columns.resolve_udts(col, udts)
926+
927+
for user_type in set(udts):
928+
user_type.register_for_keyspace(klass._get_keyspace())
929+
922930
return klass
923931

924932

0 commit comments

Comments
 (0)