Skip to content

Commit 714ef32

Browse files
committed
Migrated casandra23 into cassandra library. Ported more secions.
1 parent 62e97f6 commit 714ef32

15 files changed

Lines changed: 163 additions & 549 deletions

cassandra/cluster.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS,
3939
RESULT_KIND_SCHEMA_CHANGE)
4040
from cassandra.metadata import Metadata
41-
from cassandra.metrics import Metrics
41+
# from cassandra.metrics import Metrics
4242
from cassandra.policies import (RoundRobinPolicy, SimpleConvictionPolicy,
4343
ExponentialReconnectionPolicy, HostDistance,
4444
RetryPolicy)
@@ -376,6 +376,7 @@ def __init__(self,
376376
self._lock = RLock()
377377

378378
if self.metrics_enabled:
379+
from cassandra.metrics import Metrics
379380
self.metrics = Metrics(weakref.proxy(self))
380381

381382
self.control_connection = ControlConnection(

cassandra/connection.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
from six.moves.queue import Queue
77

88
from cassandra import ConsistencyLevel, AuthenticationFailed, OperationTimedOut
9-
from cassandra.marshal import int8_unpack, int32_pack
9+
from cassandra.marshal import int8_unpack, int32_pack, header_unpack
1010
from cassandra.decoder import (ReadyMessage, AuthenticateMessage, OptionsMessage,
1111
StartupMessage, ErrorMessage, CredentialsMessage,
1212
QueryMessage, ResultMessage, decode_response,
1313
InvalidRequestException, SupportedMessage)
14+
import six
1415

1516

1617
log = logging.getLogger(__name__)
@@ -102,7 +103,8 @@ def wrapper(self, *args, **kwargs):
102103
return f(self, *args, **kwargs)
103104
except Exception as exc:
104105
self.defunct(exc)
105-
106+
# return f(self, *args, **kwargs)
107+
# TODO: Clean up the above test code.
106108
return wrapper
107109

108110

@@ -170,7 +172,7 @@ def register_watchers(self, type_callback_dict):
170172

171173
@defunct_on_error
172174
def process_msg(self, msg, body_len):
173-
version, flags, stream_id, opcode = (int8_unpack(f) for f in msg[:4])
175+
version, flags, stream_id, opcode = header_unpack(msg[:4])
174176
if stream_id < 0:
175177
callback = None
176178
else:
@@ -195,7 +197,7 @@ def process_msg(self, msg, body_len):
195197
if body_len > 0:
196198
body = msg[8:]
197199
elif body_len == 0:
198-
body = ""
200+
body = six.binary_type()
199201
else:
200202
raise ProtocolError("Got negative body length: %r" % body_len)
201203

cassandra/cqltypes.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323
import warnings
2424

2525
import six
26-
from six.moves import cStringIO as StringIO
27-
from six.moves import xrange
26+
from six.moves import range
2827

2928
from cassandra.marshal import (int8_pack, int8_unpack, uint16_pack, uint16_unpack,
3029
int32_pack, int32_unpack, int64_pack, int64_unpack,
@@ -34,12 +33,10 @@
3433

3534
apache_cassandra_type_prefix = 'org.apache.cassandra.db.marshal.'
3635

37-
## Python 3 support #########
3836
if six.PY3:
3937
_number_types = frozenset((int, float))
4038
else:
4139
_number_types = frozenset((int, long, float))
42-
#############################
4340

4441
try:
4542
from blist import sortedset
@@ -78,7 +75,7 @@ class CassandraTypeType(type):
7875
def __new__(metacls, name, bases, dct):
7976
dct.setdefault('cassname', name)
8077
cls = type.__new__(metacls, name, bases, dct)
81-
if not name.startswith('_'):
78+
if name != 'NewBase' and not name.startswith('_'):
8279
_casstypes[name] = cls
8380
return cls
8481

@@ -167,8 +164,7 @@ def __str__(self):
167164
EMPTY = EmptyValue()
168165

169166

170-
class _CassandraType(object):
171-
__metaclass__ = CassandraTypeType
167+
class _CassandraType(six.with_metaclass(CassandraTypeType, object)):
172168
subtypes = ()
173169
num_subtypes = 0
174170
empty_binary_ok = False
@@ -189,9 +185,8 @@ class _CassandraType(object):
189185
def __init__(self, val):
190186
self.val = self.validate(val)
191187

192-
def __str__(self):
188+
def __repr__(self):
193189
return '<%s( %r )>' % (self.cql_parameterized_type(), self.val)
194-
__repr__ = __str__
195190

196191
@staticmethod
197192
def validate(val):
@@ -211,7 +206,7 @@ def from_binary(cls, byts):
211206
"""
212207
if byts is None:
213208
return None
214-
elif byts == '' and not cls.empty_binary_ok:
209+
elif len(byts) == 0 and not cls.empty_binary_ok:
215210
return EMPTY if cls.support_empty_values else None
216211
return cls.deserialize(byts)
217212

@@ -222,7 +217,7 @@ def to_binary(cls, val):
222217
more information. This method differs in that if None is passed in,
223218
the result is the empty string.
224219
"""
225-
return '' if val is None else cls.serialize(val)
220+
return six.binary_type() if val is None else cls.serialize(val)
226221

227222
@staticmethod
228223
def deserialize(byts):
@@ -618,7 +613,7 @@ def deserialize_safe(cls, byts):
618613
numelements = uint16_unpack(byts[:2])
619614
p = 2
620615
result = []
621-
for n in xrange(numelements):
616+
for _ in range(numelements):
622617
itemlen = uint16_unpack(byts[p:p + 2])
623618
p += 2
624619
item = byts[p:p + itemlen]
@@ -632,7 +627,7 @@ def serialize_safe(cls, items):
632627
raise TypeError("Received a string for a type that expects a sequence")
633628

634629
subtype, = cls.subtypes
635-
buf = StringIO()
630+
buf = six.BytesIO()
636631
buf.write(uint16_pack(len(items)))
637632
for item in items:
638633
itembytes = subtype.to_binary(item)
@@ -668,7 +663,7 @@ def deserialize_safe(cls, byts):
668663
numelements = uint16_unpack(byts[:2])
669664
p = 2
670665
themap = OrderedDict()
671-
for n in xrange(numelements):
666+
for _ in range(numelements):
672667
key_len = uint16_unpack(byts[p:p + 2])
673668
p += 2
674669
keybytes = byts[p:p + key_len]
@@ -685,7 +680,7 @@ def deserialize_safe(cls, byts):
685680
@classmethod
686681
def serialize_safe(cls, themap):
687682
subkeytype, subvaltype = cls.subtypes
688-
buf = StringIO()
683+
buf = six.BytesIO()
689684
buf.write(uint16_pack(len(themap)))
690685
try:
691686
items = themap.iteritems()

cassandra/decoder.py

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22
import socket
33
from uuid import UUID
44

5-
# from six.moves import cStringIO as StringIO
6-
from six.moves import xrange
7-
from six import BytesIO
5+
import six
6+
from six.moves import range
87

98
from cassandra import (Unavailable, WriteTimeout, ReadTimeout,
109
AlreadyExists, InvalidRequest, Unauthorized)
1110
from cassandra.marshal import (int32_pack, int32_unpack, uint16_pack, uint16_unpack,
12-
int8_pack, int8_unpack)
11+
int8_pack, int8_unpack, header_pack)
1312
from cassandra.cqltypes import (AsciiType, BytesType, BooleanType,
1413
CounterColumnType, DateType, DecimalType,
1514
DoubleType, FloatType, Int32Type,
@@ -34,61 +33,69 @@ class InternalError(Exception):
3433
HEADER_DIRECTION_TO_CLIENT = 0x80
3534
HEADER_DIRECTION_MASK = 0x80
3635

36+
COMPRESSED_FLAG = 0x01
37+
TRACING_FLAG = 0x02
3738

3839
_message_types_by_name = {}
3940
_message_types_by_opcode = {}
4041

4142

42-
class _register_msg_type(type):
43+
class _RegisterMessageType(type):
4344
def __init__(cls, name, bases, dct):
44-
if not name.startswith('_'):
45+
if name not in ('NewBase', '_MessageType'):
4546
_message_types_by_name[cls.name] = cls
4647
_message_types_by_opcode[cls.opcode] = cls
4748

4849

49-
class _MessageType(object):
50-
__metaclass__ = _register_msg_type
50+
class _MessageType(six.with_metaclass(_RegisterMessageType, object)):
5151

5252
tracing = False
5353

5454
def to_binary(self, stream_id, protocol_version, compression=None):
55-
body = BytesIO()
55+
body = six.BytesIO()
5656
self.send_body(body, protocol_version)
5757
body = body.getvalue()
58-
version = protocol_version | HEADER_DIRECTION_FROM_CLIENT
58+
5959
flags = 0
60-
if compression is not None and len(body) > 0:
60+
if compression and len(body) > 0:
6161
body = compression(body)
62-
flags |= 0x01
62+
flags |= COMPRESSED_FLAG
6363
if self.tracing:
64-
flags |= 0x02
65-
msglen = int32_pack(len(body))
66-
msg_parts = [int8_pack(i) for i in (version, flags, stream_id, self.opcode)] + [msglen, body]
67-
return six.binary_type().join(msg_parts)
64+
flags |= TRACING_FLAG
6865

69-
def __str__(self):
70-
paramstrs = ['%s=%r' % (pname, getattr(self, pname)) for pname in _get_params(self)]
71-
return '<%s(%s)>' % (self.__class__.__name__, ', '.join(paramstrs))
72-
__repr__ = __str__
66+
msg = six.BytesIO()
67+
write_header(
68+
msg,
69+
protocol_version | HEADER_DIRECTION_FROM_CLIENT,
70+
flags, stream_id, self.opcode, len(body)
71+
)
72+
msg.write(body)
73+
74+
return msg.getvalue()
75+
76+
def __repr__(self):
77+
return '<%s(%s)>' % (self.__class__.__name__, ', '.join('%s=%r' % i for i in _get_params(self)))
7378

7479

7580
def _get_params(message_obj):
7681
base_attrs = dir(_MessageType)
77-
return [a for a in dir(message_obj)
78-
if a not in base_attrs and not a.startswith('_') and not callable(getattr(message_obj, a))]
82+
return (
83+
(n, a) for n, a in message_obj.__dict__.items()
84+
if n not in base_attrs and not n.startswith('_') and not callable(a)
85+
)
7986

8087

8188
def decode_response(stream_id, flags, opcode, body, decompressor=None):
82-
if flags & 0x01:
89+
if flags & COMPRESSED_FLAG:
8390
if decompressor is None:
84-
raise Exception("No decompressor available for compressed frame!")
91+
raise Exception("No de-compressor available for compressed frame!")
8592
body = decompressor(body)
86-
flags ^= 0x01
93+
flags ^= COMPRESSED_FLAG
8794

88-
body = BytesIO(body)
89-
if flags & 0x02:
95+
body = six.BytesIO(body)
96+
if flags & TRACING_FLAG:
9097
trace_id = UUID(bytes=body.read(16))
91-
flags ^= 0x02
98+
flags ^= TRACING_FLAG
9299
else:
93100
trace_id = None
94101

@@ -142,14 +149,13 @@ def to_exception(self):
142149
return self
143150

144151

145-
class ErrorMessageSubclass(_register_msg_type):
152+
class ErrorMessageSubclass(_RegisterMessageType):
146153
def __init__(cls, name, bases, dct):
147-
if cls.error_code is not None:
154+
if name not in ('NewBase', ) and cls.error_code:
148155
error_classes[cls.error_code] = cls
149156

150157

151-
class ErrorMessageSub(ErrorMessage):
152-
__metaclass__ = ErrorMessageSubclass
158+
class ErrorMessageSub(six.with_metaclass(ErrorMessageSubclass, ErrorMessage)):
153159
error_code = None
154160

155161

@@ -450,7 +456,7 @@ def recv_body(cls, f):
450456
def recv_results_rows(cls, f):
451457
column_metadata = cls.recv_results_metadata(f)
452458
rowcount = read_int(f)
453-
rows = [cls.recv_row(f, len(column_metadata)) for x in xrange(rowcount)]
459+
rows = [cls.recv_row(f, len(column_metadata)) for _ in range(rowcount)]
454460
colnames = [c[2] for c in column_metadata]
455461
coltypes = [c[3] for c in column_metadata]
456462
return (colnames, [tuple(ctype.from_binary(val) for ctype, val in zip(coltypes, row))
@@ -471,7 +477,7 @@ def recv_results_metadata(cls, f):
471477
ksname = read_string(f)
472478
cfname = read_string(f)
473479
column_metadata = []
474-
for x in xrange(colcount):
480+
for _ in range(colcount):
475481
if glob_tblspec:
476482
colksname = ksname
477483
colcfname = cfname
@@ -513,7 +519,7 @@ def read_type(cls, f):
513519

514520
@staticmethod
515521
def recv_row(f, colcount):
516-
return [read_value(f) for x in xrange(colcount)]
522+
return [read_value(f) for _ in range(colcount)]
517523

518524

519525
class PrepareMessage(_MessageType):
@@ -635,6 +641,14 @@ def recv_schema_change(cls, f):
635641
return dict(change_type=change_type, keyspace=keyspace, table=table)
636642

637643

644+
def write_header(f, version, flags, stream_id, opcode, length):
645+
"""
646+
Write a CQL protocol frame header.
647+
"""
648+
f.write(header_pack(version, flags, stream_id, opcode))
649+
write_int(f, length)
650+
651+
638652
def read_byte(f):
639653
return int8_unpack(f.read(1))
640654

@@ -701,7 +715,7 @@ def write_longstring(f, s):
701715

702716
def read_stringlist(f):
703717
numstrs = read_short(f)
704-
return [read_string(f) for x in xrange(numstrs)]
718+
return [read_string(f) for _ in range(numstrs)]
705719

706720

707721
def write_stringlist(f, stringlist):
@@ -713,7 +727,7 @@ def write_stringlist(f, stringlist):
713727
def read_stringmap(f):
714728
numpairs = read_short(f)
715729
strmap = {}
716-
for x in xrange(numpairs):
730+
for _ in range(numpairs):
717731
k = read_string(f)
718732
strmap[k] = read_string(f)
719733
return strmap
@@ -729,7 +743,7 @@ def write_stringmap(f, strmap):
729743
def read_stringmultimap(f):
730744
numkeys = read_short(f)
731745
strmmap = {}
732-
for x in xrange(numkeys):
746+
for _ in range(numkeys):
733747
k = read_string(f)
734748
strmmap[k] = read_stringlist(f)
735749
return strmmap

cassandra/encoder.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88

99
from cassandra.util import OrderedDict
1010

11-
if six.PY3:
12-
unicode = str
13-
long = int
11+
# if six.PY3:
12+
# unicode = str
13+
# long = int
1414

1515

1616
def cql_quote(term):

cassandra/io/asyncorereactor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,10 @@ def defunct(self, exc):
183183
return
184184
self.is_defunct = True
185185

186-
trace = traceback.format_exc(exc)
186+
trace = traceback.format_exc() #exc)
187187
if trace != "None":
188188
log.debug("Defuncting connection (%s) to %s: %s\n%s",
189-
id(self), self.host, exc, traceback.format_exc(exc))
189+
id(self), self.host, exc, traceback.format_exc())
190190
else:
191191
log.debug("Defuncting connection (%s) to %s: %s",
192192
id(self), self.host, exc)

0 commit comments

Comments
 (0)