Skip to content

Commit 7ce8e3a

Browse files
committed
Use Cython-based deserializers whenever available
1 parent 2f4a2d4 commit 7ce8e3a

4 files changed

Lines changed: 102 additions & 12 deletions

File tree

cassandra/cython_deps.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
try:
2+
from cassandra.rowparser import make_recv_results_rows
3+
HAVE_CYTHON = True
4+
except ImportError:
5+
HAVE_CYTHON = False

cassandra/protocol.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
TupleType, lookup_casstype, SimpleDateType,
4141
TimeType, ByteType, ShortType)
4242
from cassandra.policies import WriteType
43+
from cassandra.cython_deps import HAVE_CYTHON
4344
from cassandra import util
4445

4546
log = logging.getLogger(__name__)
@@ -69,10 +70,16 @@ class InternalError(Exception):
6970

7071
_UNSET_VALUE = object()
7172

73+
def register_class(cls):
74+
_message_types_by_opcode[cls.opcode] = cls
75+
76+
def get_registered_classes():
77+
return _message_types_by_opcode.copy()
78+
7279
class _RegisterMessageType(type):
7380
def __init__(cls, name, bases, dct):
7481
if not name.startswith('_'):
75-
_message_types_by_opcode[cls.opcode] = cls
82+
register_class(cls)
7683

7784

7885
@six.add_metaclass(_RegisterMessageType)
@@ -987,6 +994,62 @@ def decode_message(cls, protocol_version, user_type_map, stream_id, flags, opcod
987994
return msg
988995

989996

997+
def cython_protocol_handler(colparser):
998+
"""
999+
Given a column parser to deserialize ResultMessages, return a suitable
1000+
Cython-based protocol handler.
1001+
1002+
There are three Cython-based protocol handlers (least to most performant):
1003+
1004+
1. objparser.ListParser
1005+
this parser decodes result messages into a list of tuples
1006+
1007+
2. objparser.LazyParser
1008+
this parser decodes result messages lazily by returning an iterator
1009+
1010+
3. numpyparser.NumPyParser
1011+
this parser decodes result messages into NumPy arrays
1012+
1013+
The default is to use objparser.ListParser
1014+
"""
1015+
# TODO: It may be cleaner to turn ProtocolHandler and ResultMessage into
1016+
# TODO: instances and use methods instead of class methods
1017+
from cassandra.rowparser import make_recv_results_rows
1018+
1019+
class FastResultMessage(ResultMessage):
1020+
"""
1021+
Cython version of Result Message that has a faster implementation of
1022+
recv_results_row.
1023+
"""
1024+
# type_codes = ResultMessage.type_codes.copy()
1025+
code_to_type = dict((v, k) for k, v in ResultMessage.type_codes.items())
1026+
recv_results_rows = classmethod(make_recv_results_rows(colparser))
1027+
1028+
class CythonProtocolHandler(ProtocolHandler):
1029+
"""
1030+
Use FastResultMessage to decode query result message messages.
1031+
"""
1032+
1033+
my_opcodes = ProtocolHandler.message_types_by_opcode.copy()
1034+
my_opcodes[FastResultMessage.opcode] = FastResultMessage
1035+
message_types_by_opcode = my_opcodes
1036+
1037+
return CythonProtocolHandler
1038+
1039+
1040+
if HAVE_CYTHON:
1041+
from cassandra.objparser import ListParser, LazyParser
1042+
from cassandra.numpyparser import NumpyParser
1043+
1044+
ProtocolHandler = cython_protocol_handler(ListParser())
1045+
LazyProtocolHandler = cython_protocol_handler(LazyParser())
1046+
NumpyProtocolHandler = cython_protocol_handler(NumpyParser())
1047+
else:
1048+
# Use Python-based ProtocolHandler
1049+
LazyProtocolHandler = None
1050+
NumpyProtocolHandler = None
1051+
1052+
9901053
def read_byte(f):
9911054
return int8_unpack(f.read(1))
9921055

cassandra/rowparser.pyx

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# -- cython: profile=True
2+
3+
from cassandra.parsing cimport ParseDesc, ColumnParser
4+
from cassandra.deserializers import make_deserializers
5+
6+
include "ioutils.pyx"
7+
8+
def make_recv_results_rows(ColumnParser colparser):
9+
def recv_results_rows(cls, f, int protocol_version, user_type_map):
10+
"""
11+
Parse protocol data given as a BytesIO f into a set of columns (e.g. list of tuples)
12+
This is used as the recv_results_rows method of (Fast)ResultMessage
13+
"""
14+
paging_state, column_metadata = cls.recv_results_metadata(f, user_type_map)
15+
16+
colnames = [c[2] for c in column_metadata]
17+
coltypes = [c[3] for c in column_metadata]
18+
19+
desc = ParseDesc(colnames, coltypes, make_deserializers(coltypes),
20+
protocol_version)
21+
reader = BytesIOReader(f.read())
22+
parsed_rows = colparser.parse_rows(reader, desc)
23+
24+
return (paging_state, (colnames, parsed_rows))
25+
26+
return recv_results_rows

tests/integration/standard/test_cython_protocol_handlers.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,15 @@
88
import unittest
99

1010
from cassandra.cluster import Cluster
11+
from cassandra.protocol import ProtocolHandler, LazyProtocolHandler, NumpyProtocolHandler
1112
from tests.integration import use_singledc, PROTOCOL_VERSION
1213
from tests.integration.datatype_utils import update_datatypes
1314
from tests.integration.standard.utils import create_table_with_all_types, get_all_primitive_params
14-
from six import next
1515

16-
try:
17-
from cassandra.cython_protocol_handler import make_protocol_handler
18-
except ImportError as e:
16+
from cassandra.cython_deps import HAVE_CYTHON
17+
if not HAVE_CYTHON:
1918
raise unittest.SkipTest("Skipping test, not compiled with Cython enabled")
2019

21-
from cassandra.numpyparser import NumpyParser
22-
from cassandra.objparser import ListParser, LazyParser
23-
2420

2521
def setup_module():
2622
use_singledc()
@@ -47,20 +43,20 @@ def test_cython_parser(self):
4743
"""
4844
Test Cython-based parser that returns a list of tuples
4945
"""
50-
self.cython_parser(ListParser())
46+
self.cython_parser(ProtocolHandler)
5147

5248
def test_cython_lazy_parser(self):
5349
"""
5450
Test Cython-based parser that returns a list of tuples
5551
"""
56-
self.cython_parser(LazyParser())
52+
self.cython_parser(LazyProtocolHandler)
5753

58-
def cython_parser(self, colparser):
54+
def cython_parser(self, protocol_handler):
5955
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
6056
session = cluster.connect(keyspace="testspace")
6157

6258
# use our custom protocol handler
63-
session.client_protocol_handler = make_protocol_handler(colparser)
59+
session.client_protocol_handler = protocol_handler
6460
# session.row_factory = tuple_factory
6561

6662
# verify data

0 commit comments

Comments
 (0)