Skip to content

Commit 12eea24

Browse files
committed
perf: add Cython LZ4 wrapper with direct C linkage for CQL v4 compression
Implement cython_lz4.pyx that calls LZ4_compress_default() and LZ4_decompress_safe() directly via Cython's cdef extern, bypassing the Python lz4 module's object allocation overhead in the hot compress/decompress path. Key design decisions: - Direct C linkage (cdef extern from "lz4.h") eliminates all intermediate Python object allocations for byte-order conversion - Zero-copy compress: uses _PyBytes_Resize to shrink the output bytes object in-place (CPython-specific; documented and safe because the object has refcount=1 during construction) - Wire-compatible with CQL binary protocol v4 format: [4 bytes big-endian uncompressed length][raw LZ4 compressed data] - Safety guards: LZ4_MAX_INPUT_SIZE check (prevents Py_ssize_t→int truncation), INT32_MAX compressed payload check, 256 MiB decompressed size cap, result size verification - bytes not None parameter typing rejects None/bytearray/memoryview - PyPy-safe: this is a Cython module (CPython only); PyPy users automatically fall back to the pure-Python lz4 wrappers via the import chain in connection.py Integration: - connection.py: Cython import with fallback; also enables LZ4 without the Python lz4 package when the Cython extension is built - setup.py: separate Extension with libraries=['lz4'], excluded from the .pyx glob (which lacks the -llz4 link flag) Benchmark results (taskset -c 0, CPython 3.14): Payload Operation Python (ns) Cython (ns) Speedup 1KB compress 596 360 1.66x 1KB decompress 313 136 2.30x 8KB compress 1192 722 1.65x 8KB decompress 1102 825 1.34x 64KB compress 8179 3976 2.06x 64KB decompress 6539 4890 1.34x
1 parent 117abb2 commit 12eea24

5 files changed

Lines changed: 574 additions & 2 deletions

File tree

benchmarks/bench_lz4.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
# Copyright 2026 ScyllaDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Microbenchmark comparing the Python lz4 wrappers (connection.py) against
17+
the Cython direct-C-linkage wrappers (cython_lz4.pyx) for the CQL binary
18+
protocol v4 LZ4 compression path.
19+
20+
Usage (pin to one core for stable numbers):
21+
22+
taskset -c 0 python benchmarks/bench_lz4.py
23+
24+
Payload sizes tested: 1 KB, 8 KB, 64 KB.
25+
"""
26+
27+
import os
28+
import struct
29+
import timeit
30+
31+
# ---------------------------------------------------------------------------
32+
# Python wrappers (duplicated from connection.py to avoid import side-effects)
33+
# ---------------------------------------------------------------------------
34+
try:
35+
import lz4.block as lz4_block
36+
HAS_PYTHON_LZ4 = True
37+
except ImportError:
38+
HAS_PYTHON_LZ4 = False
39+
print("WARNING: lz4 Python package not available, skipping Python benchmarks")
40+
41+
int32_pack = struct.Struct('>i').pack
42+
43+
44+
def py_lz4_compress(byts):
45+
return int32_pack(len(byts)) + lz4_block.compress(byts)[4:]
46+
47+
48+
def py_lz4_decompress(byts):
49+
return lz4_block.decompress(byts[3::-1] + byts[4:])
50+
51+
52+
# ---------------------------------------------------------------------------
53+
# Cython wrappers
54+
# ---------------------------------------------------------------------------
55+
try:
56+
from cassandra.cython_lz4 import lz4_compress as cy_lz4_compress
57+
from cassandra.cython_lz4 import lz4_decompress as cy_lz4_decompress
58+
HAS_CYTHON = True
59+
except ImportError:
60+
HAS_CYTHON = False
61+
print("WARNING: cassandra.cython_lz4 not available, skipping Cython benchmarks")
62+
63+
64+
# ---------------------------------------------------------------------------
65+
# Benchmark helpers
66+
# ---------------------------------------------------------------------------
67+
SIZES = {
68+
"1KB": 1024,
69+
"8KB": 8 * 1024,
70+
"64KB": 64 * 1024,
71+
}
72+
73+
# Number of inner-loop iterations per timeit.repeat() call.
74+
INNER = 10_000
75+
# Number of repetitions (we report the minimum).
76+
REPEAT = 5
77+
78+
79+
def make_payload(size):
80+
"""Generate a pseudo-realistic compressible payload."""
81+
# Mix of repetitive and random-ish bytes to simulate CQL result rows.
82+
chunk = (b"row_value_" + os.urandom(6)) * (size // 16 + 1)
83+
return chunk[:size]
84+
85+
86+
def bench(label, func, arg, inner=INNER, repeat=REPEAT):
87+
"""Return the best per-call time in nanoseconds."""
88+
times = timeit.repeat(lambda: func(arg), number=inner, repeat=repeat)
89+
best = min(times) / inner
90+
ns = best * 1e9
91+
return ns
92+
93+
94+
def main():
95+
if not HAS_PYTHON_LZ4 and not HAS_CYTHON:
96+
print("ERROR: Neither lz4 Python package nor cassandra.cython_lz4 available.")
97+
return
98+
99+
headers = f"{'Payload':<8} {'Operation':<12} "
100+
if HAS_PYTHON_LZ4:
101+
headers += f"{'Python (ns)':>12} "
102+
if HAS_CYTHON:
103+
headers += f"{'Cython (ns)':>12} "
104+
if HAS_PYTHON_LZ4 and HAS_CYTHON:
105+
headers += f"{'Speedup':>8}"
106+
print(headers)
107+
print("-" * len(headers))
108+
109+
for size_label, size in SIZES.items():
110+
payload = make_payload(size)
111+
112+
# -- compress --
113+
py_compressed = None
114+
cy_compressed = None
115+
116+
row = f"{size_label:<8} {'compress':<12} "
117+
if HAS_PYTHON_LZ4:
118+
py_ns = bench("py_compress", py_lz4_compress, payload)
119+
py_compressed = py_lz4_compress(payload)
120+
row += f"{py_ns:>12.1f} "
121+
if HAS_CYTHON:
122+
cy_ns = bench("cy_compress", cy_lz4_compress, payload)
123+
cy_compressed = cy_lz4_compress(payload)
124+
row += f"{cy_ns:>12.1f} "
125+
if HAS_PYTHON_LZ4:
126+
speedup = py_ns / cy_ns if cy_ns > 0 else float('inf')
127+
row += f"{speedup:>7.2f}x"
128+
print(row)
129+
130+
# Verify cross-compatibility: Cython can decompress Python's output
131+
if HAS_PYTHON_LZ4 and HAS_CYTHON:
132+
assert cy_lz4_decompress(py_compressed) == payload, "cross-compat failed (py->cy)"
133+
assert py_lz4_decompress(cy_compressed) == payload, "cross-compat failed (cy->py)"
134+
135+
# -- decompress --
136+
row = f"{size_label:<8} {'decompress':<12} "
137+
if HAS_PYTHON_LZ4:
138+
py_ns = bench("py_decompress", py_lz4_decompress, py_compressed)
139+
row += f"{py_ns:>12.1f} "
140+
if HAS_CYTHON:
141+
# Use Cython-compressed data for the Cython decompress benchmark
142+
cy_ns = bench("cy_decompress", cy_lz4_decompress, cy_compressed)
143+
row += f"{cy_ns:>12.1f} "
144+
if HAS_PYTHON_LZ4:
145+
speedup = py_ns / cy_ns if cy_ns > 0 else float('inf')
146+
row += f"{speedup:>7.2f}x"
147+
print(row)
148+
149+
print()
150+
151+
152+
if __name__ == "__main__":
153+
main()

cassandra/connection.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@
6666
try:
6767
import lz4
6868
except ImportError:
69-
log.debug("lz4 package could not be imported. LZ4 Compression will not be available")
70-
pass
69+
lz4 = None
7170
else:
7271
# The compress and decompress functions we need were moved from the lz4 to
7372
# the lz4.block namespace, so we try both here.
@@ -102,6 +101,19 @@ def lz4_decompress(byts):
102101
locally_supported_compressions['lz4'] = (lz4_compress, lz4_decompress)
103102
segment_codec_lz4 = SegmentCodec(lz4_compress, lz4_decompress)
104103

104+
# Prefer the Cython wrappers that call liblz4 directly (no Python object
105+
# allocation overhead for the byte-order conversion). This also enables
106+
# LZ4 support when the Cython extension is available but the Python lz4
107+
# package is not installed.
108+
try:
109+
from cassandra.cython_lz4 import lz4_compress, lz4_decompress
110+
locally_supported_compressions['lz4'] = (lz4_compress, lz4_decompress)
111+
segment_codec_lz4 = SegmentCodec(lz4_compress, lz4_decompress)
112+
except ImportError:
113+
if lz4 is None:
114+
log.debug("Neither the lz4 package nor the cython_lz4 extension could "
115+
"be imported. LZ4 Compression will not be available")
116+
105117
try:
106118
import snappy
107119
except ImportError:

cassandra/cython_lz4.pyx

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
# Copyright 2026 ScyllaDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Cython-optimized LZ4 compression/decompression wrappers that call the LZ4 C
17+
library directly, bypassing the Python lz4 module's overhead.
18+
19+
These functions produce output that is wire-compatible with the CQL binary
20+
protocol v4 LZ4 compression format:
21+
22+
[4 bytes big-endian uncompressed length] [LZ4 compressed data]
23+
24+
The Cassandra/ScyllaDB protocol requires big-endian byte order for the
25+
uncompressed length prefix, while the Python lz4 library uses little-endian.
26+
The pure-Python wrappers in connection.py work around this mismatch by
27+
byte-swapping and slicing Python bytes objects, which allocates intermediate
28+
objects on every call. By calling LZ4_compress_default() and
29+
LZ4_decompress_safe() through Cython's C interface we avoid all intermediate
30+
Python object allocations and perform the byte-order conversion with simple
31+
C pointer operations.
32+
"""
33+
34+
from cpython.bytes cimport (PyBytes_AS_STRING, PyBytes_GET_SIZE,
35+
PyBytes_FromStringAndSize)
36+
from libc.stdint cimport uint8_t, uint32_t, INT32_MAX
37+
from libc.stdlib cimport malloc, free
38+
from libc.string cimport memcpy
39+
40+
cdef extern from "alloca.h":
41+
void *alloca(size_t size) noexcept nogil
42+
43+
# CQL native protocol v4 frames have a 32-bit body length, so the
44+
# theoretical maximum is ~2 GiB. We use 256 MiB as a practical upper
45+
# bound (matching the server's default frame size limit) to avoid
46+
# accidentally allocating multi-GiB buffers on corrupt headers.
47+
DEF MAX_DECOMPRESSED_LENGTH = 268435456 # 256 MiB
48+
49+
# LZ4_MAX_INPUT_SIZE from lz4.h — the LZ4 C API uses C int (32-bit
50+
# signed) for sizes, so we must reject Python bytes objects that
51+
# exceed this before casting Py_ssize_t down to int.
52+
DEF LZ4_MAX_INPUT_SIZE = 0x7E000000 # 2 113 929 216 bytes
53+
54+
# Maximum LZ4_compressBound value for which we use alloca (stack
55+
# allocation) instead of malloc. 128 KiB is well within the default
56+
# 8 MiB thread stack size and covers CQL frames up to ~127 KiB
57+
# uncompressed — the vast majority of real traffic. Larger frames
58+
# fall back to heap allocation.
59+
DEF STACK_ALLOC_THRESHOLD = 131072 # 128 KiB
60+
61+
62+
cdef extern from "lz4.h":
63+
int LZ4_compress_default(const char *src, char *dst,
64+
int srcSize, int dstCapacity) nogil
65+
int LZ4_decompress_safe(const char *src, char *dst,
66+
int compressedSize, int dstCapacity) nogil
67+
int LZ4_compressBound(int inputSize) nogil
68+
69+
70+
cdef inline void _write_be32(char *dst, uint32_t value) noexcept nogil:
71+
"""Write a 32-bit unsigned integer in big-endian byte order."""
72+
dst[0] = <char>((value >> 24) & 0xFF)
73+
dst[1] = <char>((value >> 16) & 0xFF)
74+
dst[2] = <char>((value >> 8) & 0xFF)
75+
dst[3] = <char>( value & 0xFF)
76+
77+
78+
cdef inline uint32_t _read_be32(const char *src) noexcept nogil:
79+
"""Read a 32-bit unsigned integer in big-endian byte order."""
80+
return ((<uint32_t>(<uint8_t>src[0]) << 24) |
81+
(<uint32_t>(<uint8_t>src[1]) << 16) |
82+
(<uint32_t>(<uint8_t>src[2]) << 8) |
83+
<uint32_t>(<uint8_t>src[3]))
84+
85+
86+
def lz4_compress(bytes data not None):
87+
"""Compress *data* using LZ4 for the CQL binary protocol.
88+
89+
Returns a bytes object containing a 4-byte big-endian uncompressed-length
90+
header followed by the raw LZ4-compressed payload.
91+
92+
Raises ``RuntimeError`` if LZ4 compression fails (returns 0). This should
93+
only happen if *data* exceeds ``LZ4_MAX_INPUT_SIZE`` (~1.9 GiB).
94+
"""
95+
cdef Py_ssize_t src_len = PyBytes_GET_SIZE(data)
96+
97+
if src_len > LZ4_MAX_INPUT_SIZE:
98+
raise OverflowError(
99+
"Input size %d exceeds LZ4_MAX_INPUT_SIZE (%d)" %
100+
(src_len, LZ4_MAX_INPUT_SIZE))
101+
102+
cdef const char *src = PyBytes_AS_STRING(data)
103+
cdef int src_size = <int>src_len
104+
105+
cdef int bound = LZ4_compressBound(src_size)
106+
if bound <= 0:
107+
raise RuntimeError(
108+
"LZ4_compressBound() returned non-positive value for input "
109+
"size %d; input may exceed LZ4_MAX_INPUT_SIZE" % src_size)
110+
111+
# Compress into a temporary buffer to learn the exact output size,
112+
# then copy into an exact-size Python bytes object. For typical CQL
113+
# frames (bound <= 128 KiB) we use alloca to avoid heap malloc/free
114+
# overhead entirely — stack allocation is just a pointer decrement.
115+
# Rare oversized frames fall back to heap allocation.
116+
cdef char *tmp
117+
cdef bint heap_allocated = bound > STACK_ALLOC_THRESHOLD
118+
if heap_allocated:
119+
tmp = <char *>malloc(bound)
120+
if tmp == NULL:
121+
raise MemoryError(
122+
"Failed to allocate %d-byte LZ4 compression buffer" % bound)
123+
else:
124+
tmp = <char *>alloca(bound)
125+
126+
cdef int compressed_size
127+
with nogil:
128+
compressed_size = LZ4_compress_default(src, tmp, src_size, bound)
129+
if compressed_size <= 0:
130+
if heap_allocated:
131+
free(tmp)
132+
raise RuntimeError(
133+
"LZ4_compress_default() failed for input size %d" % src_size)
134+
135+
# Build the final bytes: [4-byte BE header][compressed data].
136+
cdef Py_ssize_t final_size = 4 + <Py_ssize_t>compressed_size
137+
cdef bytes result = PyBytes_FromStringAndSize(NULL, final_size)
138+
cdef char *out_ptr = PyBytes_AS_STRING(result)
139+
_write_be32(out_ptr, <uint32_t>src_size)
140+
memcpy(out_ptr + 4, tmp, <size_t>compressed_size)
141+
if heap_allocated:
142+
free(tmp)
143+
144+
return result
145+
146+
147+
def lz4_decompress(bytes data not None):
148+
"""Decompress a CQL-protocol LZ4 frame.
149+
150+
Expects *data* to start with a 4-byte big-endian uncompressed-length header
151+
followed by raw LZ4-compressed payload (the format produced by
152+
:func:`lz4_compress` and by the Cassandra/ScyllaDB server).
153+
154+
Raises ``ValueError`` if *data* is too short or the declared size is
155+
non-positive. Raises ``RuntimeError`` if decompression fails (malformed
156+
payload).
157+
"""
158+
cdef const char *src = PyBytes_AS_STRING(data)
159+
cdef Py_ssize_t src_len = PyBytes_GET_SIZE(data)
160+
161+
if src_len < 4:
162+
raise ValueError(
163+
"LZ4-compressed frame too short: need at least 4 bytes for the "
164+
"length header, got %d" % src_len)
165+
166+
cdef uint32_t uncompressed_size = _read_be32(src)
167+
168+
if uncompressed_size == 0:
169+
return b""
170+
171+
if uncompressed_size > MAX_DECOMPRESSED_LENGTH:
172+
raise ValueError(
173+
"Declared uncompressed size %d exceeds safety limit of %d bytes; "
174+
"frame header may be corrupt" % (uncompressed_size,
175+
MAX_DECOMPRESSED_LENGTH))
176+
177+
cdef Py_ssize_t compressed_len = src_len - 4
178+
if compressed_len > <Py_ssize_t>INT32_MAX:
179+
raise ValueError(
180+
"Compressed payload size %d exceeds maximum supported size" %
181+
compressed_len)
182+
cdef int compressed_size = <int>compressed_len
183+
cdef bytes out = PyBytes_FromStringAndSize(NULL, <Py_ssize_t>uncompressed_size)
184+
cdef char *out_ptr = PyBytes_AS_STRING(out)
185+
186+
cdef int result
187+
with nogil:
188+
result = LZ4_decompress_safe(src + 4, out_ptr,
189+
compressed_size,
190+
<int>uncompressed_size)
191+
if result < 0:
192+
raise RuntimeError(
193+
"LZ4_decompress_safe() failed with error code %d; "
194+
"compressed payload may be malformed" % result)
195+
196+
if <uint32_t>result != uncompressed_size:
197+
raise RuntimeError(
198+
"LZ4_decompress_safe() produced %d bytes but header declared %d" %
199+
(result, uncompressed_size))
200+
201+
return out

0 commit comments

Comments
 (0)