22import socket
33from uuid import UUID
44
5- # from six.moves import cStringIO as StringIO
6- from six .moves import xrange
7- from six import BytesIO
5+ import six
6+ from six .moves import range
87
98from cassandra import (Unavailable , WriteTimeout , ReadTimeout ,
109 AlreadyExists , InvalidRequest , Unauthorized )
1110from cassandra .marshal import (int32_pack , int32_unpack , uint16_pack , uint16_unpack ,
12- int8_pack , int8_unpack )
11+ int8_pack , int8_unpack , header_pack )
1312from cassandra .cqltypes import (AsciiType , BytesType , BooleanType ,
1413 CounterColumnType , DateType , DecimalType ,
1514 DoubleType , FloatType , Int32Type ,
@@ -34,61 +33,69 @@ class InternalError(Exception):
3433HEADER_DIRECTION_TO_CLIENT = 0x80
3534HEADER_DIRECTION_MASK = 0x80
3635
36+ COMPRESSED_FLAG = 0x01
37+ TRACING_FLAG = 0x02
3738
3839_message_types_by_name = {}
3940_message_types_by_opcode = {}
4041
4142
42- class _register_msg_type (type ):
43+ class _RegisterMessageType (type ):
4344 def __init__ (cls , name , bases , dct ):
44- if not name . startswith ( '_ ' ):
45+ if name not in ( 'NewBase' , '_MessageType ' ):
4546 _message_types_by_name [cls .name ] = cls
4647 _message_types_by_opcode [cls .opcode ] = cls
4748
4849
49- class _MessageType (object ):
50- __metaclass__ = _register_msg_type
50+ class _MessageType (six .with_metaclass (_RegisterMessageType , object )):
5151
5252 tracing = False
5353
5454 def to_binary (self , stream_id , protocol_version , compression = None ):
55- body = BytesIO ()
55+ body = six . BytesIO ()
5656 self .send_body (body , protocol_version )
5757 body = body .getvalue ()
58- version = protocol_version | HEADER_DIRECTION_FROM_CLIENT
58+
5959 flags = 0
60- if compression is not None and len (body ) > 0 :
60+ if compression and len (body ) > 0 :
6161 body = compression (body )
62- flags |= 0x01
62+ flags |= COMPRESSED_FLAG
6363 if self .tracing :
64- flags |= 0x02
65- msglen = int32_pack (len (body ))
66- msg_parts = [int8_pack (i ) for i in (version , flags , stream_id , self .opcode )] + [msglen , body ]
67- return six .binary_type ().join (msg_parts )
64+ flags |= TRACING_FLAG
6865
69- def __str__ (self ):
70- paramstrs = ['%s=%r' % (pname , getattr (self , pname )) for pname in _get_params (self )]
71- return '<%s(%s)>' % (self .__class__ .__name__ , ', ' .join (paramstrs ))
72- __repr__ = __str__
66+ msg = six .BytesIO ()
67+ write_header (
68+ msg ,
69+ protocol_version | HEADER_DIRECTION_FROM_CLIENT ,
70+ flags , stream_id , self .opcode , len (body )
71+ )
72+ msg .write (body )
73+
74+ return msg .getvalue ()
75+
76+ def __repr__ (self ):
77+ return '<%s(%s)>' % (self .__class__ .__name__ , ', ' .join ('%s=%r' % i for i in _get_params (self )))
7378
7479
7580def _get_params (message_obj ):
7681 base_attrs = dir (_MessageType )
77- return [a for a in dir (message_obj )
78- if a not in base_attrs and not a .startswith ('_' ) and not callable (getattr (message_obj , a ))]
82+ return (
83+ (n , a ) for n , a in message_obj .__dict__ .items ()
84+ if n not in base_attrs and not n .startswith ('_' ) and not callable (a )
85+ )
7986
8087
8188def decode_response (stream_id , flags , opcode , body , decompressor = None ):
82- if flags & 0x01 :
89+ if flags & COMPRESSED_FLAG :
8390 if decompressor is None :
84- raise Exception ("No decompressor available for compressed frame!" )
91+ raise Exception ("No de-compressor available for compressed frame!" )
8592 body = decompressor (body )
86- flags ^= 0x01
93+ flags ^= COMPRESSED_FLAG
8794
88- body = BytesIO (body )
89- if flags & 0x02 :
95+ body = six . BytesIO (body )
96+ if flags & TRACING_FLAG :
9097 trace_id = UUID (bytes = body .read (16 ))
91- flags ^= 0x02
98+ flags ^= TRACING_FLAG
9299 else :
93100 trace_id = None
94101
@@ -142,14 +149,13 @@ def to_exception(self):
142149 return self
143150
144151
145- class ErrorMessageSubclass (_register_msg_type ):
152+ class ErrorMessageSubclass (_RegisterMessageType ):
146153 def __init__ (cls , name , bases , dct ):
147- if cls . error_code is not None :
154+ if name not in ( 'NewBase' , ) and cls . error_code :
148155 error_classes [cls .error_code ] = cls
149156
150157
151- class ErrorMessageSub (ErrorMessage ):
152- __metaclass__ = ErrorMessageSubclass
158+ class ErrorMessageSub (six .with_metaclass (ErrorMessageSubclass , ErrorMessage )):
153159 error_code = None
154160
155161
@@ -450,7 +456,7 @@ def recv_body(cls, f):
450456 def recv_results_rows (cls , f ):
451457 column_metadata = cls .recv_results_metadata (f )
452458 rowcount = read_int (f )
453- rows = [cls .recv_row (f , len (column_metadata )) for x in xrange (rowcount )]
459+ rows = [cls .recv_row (f , len (column_metadata )) for _ in range (rowcount )]
454460 colnames = [c [2 ] for c in column_metadata ]
455461 coltypes = [c [3 ] for c in column_metadata ]
456462 return (colnames , [tuple (ctype .from_binary (val ) for ctype , val in zip (coltypes , row ))
@@ -471,7 +477,7 @@ def recv_results_metadata(cls, f):
471477 ksname = read_string (f )
472478 cfname = read_string (f )
473479 column_metadata = []
474- for x in xrange (colcount ):
480+ for _ in range (colcount ):
475481 if glob_tblspec :
476482 colksname = ksname
477483 colcfname = cfname
@@ -513,7 +519,7 @@ def read_type(cls, f):
513519
514520 @staticmethod
515521 def recv_row (f , colcount ):
516- return [read_value (f ) for x in xrange (colcount )]
522+ return [read_value (f ) for _ in range (colcount )]
517523
518524
519525class PrepareMessage (_MessageType ):
@@ -635,6 +641,14 @@ def recv_schema_change(cls, f):
635641 return dict (change_type = change_type , keyspace = keyspace , table = table )
636642
637643
644+ def write_header (f , version , flags , stream_id , opcode , length ):
645+ """
646+ Write a CQL protocol frame header.
647+ """
648+ f .write (header_pack (version , flags , stream_id , opcode ))
649+ write_int (f , length )
650+
651+
638652def read_byte (f ):
639653 return int8_unpack (f .read (1 ))
640654
@@ -701,7 +715,7 @@ def write_longstring(f, s):
701715
702716def read_stringlist (f ):
703717 numstrs = read_short (f )
704- return [read_string (f ) for x in xrange (numstrs )]
718+ return [read_string (f ) for _ in range (numstrs )]
705719
706720
707721def write_stringlist (f , stringlist ):
@@ -713,7 +727,7 @@ def write_stringlist(f, stringlist):
713727def read_stringmap (f ):
714728 numpairs = read_short (f )
715729 strmap = {}
716- for x in xrange (numpairs ):
730+ for _ in range (numpairs ):
717731 k = read_string (f )
718732 strmap [k ] = read_string (f )
719733 return strmap
@@ -729,7 +743,7 @@ def write_stringmap(f, strmap):
729743def read_stringmultimap (f ):
730744 numkeys = read_short (f )
731745 strmmap = {}
732- for x in xrange (numkeys ):
746+ for _ in range (numkeys ):
733747 k = read_string (f )
734748 strmmap [k ] = read_stringlist (f )
735749 return strmmap
0 commit comments