Skip to content

Commit 8b315ef

Browse files
jimmodpgeorge
authored andcommitted
tests/extmod: Add deflate.DeflateIO tests.
Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
1 parent 3533924 commit 8b315ef

6 files changed

Lines changed: 557 additions & 0 deletions

File tree

tests/extmod/deflate_compress.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
try:
2+
# Check if deflate is available.
3+
import deflate
4+
import io
5+
except ImportError:
6+
print("SKIP")
7+
raise SystemExit
8+
9+
# Check if compression is enabled.
10+
if not hasattr(deflate.DeflateIO, "write"):
11+
print("SKIP")
12+
raise SystemExit
13+
14+
# Simple compression & decompression.
15+
b = io.BytesIO()
16+
g = deflate.DeflateIO(b, deflate.RAW)
17+
data = b"micropython"
18+
N = 10
19+
for i in range(N):
20+
g.write(data)
21+
g.close()
22+
result_raw = b.getvalue()
23+
print(len(result_raw) < len(data) * N)
24+
b = io.BytesIO(result_raw)
25+
g = deflate.DeflateIO(b, deflate.RAW)
26+
print(g.read())
27+
28+
# Same, but using a context manager.
29+
b = io.BytesIO()
30+
with deflate.DeflateIO(b, deflate.RAW) as g:
31+
for i in range(N):
32+
g.write(data)
33+
result_raw = b.getvalue()
34+
print(len(result_raw) < len(data) * N)
35+
b = io.BytesIO(result_raw)
36+
with deflate.DeflateIO(b, deflate.RAW) as g:
37+
print(g.read())
38+
39+
# Writing to a closed underlying stream.
40+
b = io.BytesIO()
41+
g = deflate.DeflateIO(b, deflate.RAW)
42+
g.write(b"micropython")
43+
b.close()
44+
try:
45+
g.write(b"micropython")
46+
except ValueError:
47+
print("ValueError")
48+
49+
# Writing to a closed DeflateIO.
50+
b = io.BytesIO()
51+
g = deflate.DeflateIO(b, deflate.RAW)
52+
g.write(b"micropython")
53+
g.close()
54+
try:
55+
g.write(b"micropython")
56+
except OSError:
57+
print("OSError")
58+
59+
60+
def decompress(data, *args):
61+
buf = io.BytesIO(data)
62+
with deflate.DeflateIO(buf, *args) as g:
63+
return g.read()
64+
65+
66+
def compress(data, *args):
67+
b = io.BytesIO()
68+
with deflate.DeflateIO(b, *args) as g:
69+
g.write(data)
70+
return b.getvalue()
71+
72+
73+
def compress_error(data, *args):
74+
try:
75+
compress(data, *args)
76+
except OSError:
77+
print("OSError")
78+
except ValueError:
79+
print("ValueError")
80+
81+
82+
# More test patterns.
83+
PATTERNS_RAW = (
84+
(b"0", b"3\x00\x00"),
85+
(b"a", b"K\x04\x00"),
86+
(b"0" * 100, b"3\xa0\x03\x00\x00"),
87+
(
88+
bytes(range(64)),
89+
b"c`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00",
90+
),
91+
)
92+
for unpacked, packed in PATTERNS_RAW:
93+
print(compress(unpacked) == packed)
94+
print(compress(unpacked, deflate.RAW) == packed)
95+
96+
# Verify header and checksum format.
97+
unpacked = b"hello"
98+
packed = b"\xcbH\xcd\xc9\xc9\x07\x00"
99+
100+
101+
def check_header(n, a, b):
102+
if a == b:
103+
print(n)
104+
else:
105+
print(n, a, b)
106+
107+
108+
check_header("RAW", compress(unpacked, deflate.RAW), packed)
109+
check_header(
110+
"ZLIB(9)", compress(unpacked, deflate.ZLIB, 9), b"\x18\x95" + packed + b"\x06,\x02\x15"
111+
)
112+
check_header(
113+
"ZLIB(15)", compress(unpacked, deflate.ZLIB, 15), b"\x78\x9c" + packed + b"\x06,\x02\x15"
114+
)
115+
check_header(
116+
"GZIP",
117+
compress(unpacked, deflate.GZIP, 9),
118+
b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03" + packed + b"\x86\xa6\x106\x05\x00\x00\x00",
119+
)
120+
121+
# Valid wbits values.
122+
compress_error(unpacked, deflate.RAW, -1)
123+
print(len(compress(unpacked, deflate.RAW, 0)))
124+
compress_error(unpacked, deflate.RAW, 1)
125+
compress_error(unpacked, deflate.RAW, 4)
126+
for i in range(5, 16):
127+
print(len(compress(unpacked, deflate.RAW, i)))
128+
compress_error(unpacked, deflate.RAW, 16)
129+
130+
# Invalid values for format.
131+
compress_error(unpacked, -1)
132+
compress_error(unpacked, 5)
133+
134+
# Fill buf with a predictable pseudorandom sequence.
135+
buf = bytearray(1024)
136+
lfsr = 1 << 15 | 1
137+
for i in range(len(buf)):
138+
bit = (lfsr ^ (lfsr >> 1) ^ (lfsr >> 3) ^ (lfsr >> 12)) & 1
139+
lfsr = (lfsr >> 1) | (bit << 15)
140+
buf[i] = lfsr & 0xFF
141+
142+
# Verify that compression improves as the window size increases.
143+
prev_len = len(buf)
144+
for wbits in range(5, 10):
145+
result = compress(buf, deflate.RAW, wbits)
146+
next_len = len(result)
147+
print(next_len < prev_len and decompress(result, deflate.RAW, wbits) == buf)
148+
prev_len = next_len
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
True
2+
b'micropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropython'
3+
True
4+
b'micropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropython'
5+
ValueError
6+
OSError
7+
True
8+
True
9+
True
10+
True
11+
True
12+
True
13+
True
14+
True
15+
RAW
16+
ZLIB(9)
17+
ZLIB(15)
18+
GZIP
19+
ValueError
20+
7
21+
ValueError
22+
ValueError
23+
7
24+
7
25+
7
26+
7
27+
7
28+
7
29+
7
30+
7
31+
7
32+
7
33+
7
34+
ValueError
35+
ValueError
36+
ValueError
37+
False
38+
True
39+
True
40+
True
41+
True

tests/extmod/deflate_decompress.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
try:
2+
# Check if deflate is available.
3+
import deflate
4+
import io
5+
except ImportError:
6+
print("SKIP")
7+
raise SystemExit
8+
9+
# zlib.compress(b'micropython hello world hello world micropython', wbits=-9)
10+
data_raw = b'\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00'
11+
# zlib.compress(b'micropython hello world hello world micropython', wbits=9)
12+
data_zlib = b'\x18\x95\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00\xbc\xfa\x12\x91'
13+
# zlib.compress(b'micropython hello world hello world micropython', wbits=25)
14+
data_gzip = b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00"\xeb\xc4\x98/\x00\x00\x00'
15+
16+
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 5)
17+
data_wbits_5 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\x95\xcfH\xcd\xc9\xc9\x07\x00"
18+
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 6)
19+
data_wbits_6 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xd5\x9f\x91\x9a\x93\x93\x0f\x00"
20+
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 8)
21+
data_wbits_8 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xf5\x7fFjNN>\x00"
22+
# compress(b'hello' + bytearray(2000) + b'hello', format=deflate.RAW, 10)
23+
data_wbits_10 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00"
24+
25+
26+
def decompress(data, *args):
27+
buf = io.BytesIO(data)
28+
with deflate.DeflateIO(buf, *args) as g:
29+
return g.read()
30+
31+
32+
def decompress_error(data, *args):
33+
try:
34+
decompress(data, *args)
35+
except OSError:
36+
print("OSError")
37+
except EOFError:
38+
print("EOFError")
39+
except ValueError:
40+
print("ValueError")
41+
42+
43+
# Basic handling of format and detection.
44+
print(decompress(data_raw, deflate.RAW))
45+
print(decompress(data_zlib, deflate.ZLIB))
46+
print(decompress(data_gzip, deflate.GZIP))
47+
print(decompress(data_zlib)) # detect zlib/gzip.
48+
print(decompress(data_gzip)) # detect zlib/gzip.
49+
50+
decompress_error(data_raw) # cannot detect zlib/gzip from raw stream
51+
decompress_error(data_raw, deflate.ZLIB)
52+
decompress_error(data_raw, deflate.GZIP)
53+
decompress_error(data_zlib, deflate.RAW)
54+
decompress_error(data_zlib, deflate.GZIP)
55+
decompress_error(data_gzip, deflate.RAW)
56+
decompress_error(data_gzip, deflate.ZLIB)
57+
58+
# Invalid data stream.
59+
decompress_error(b"abcef", deflate.RAW)
60+
61+
# Invalid block type. final-block, block-type=3.
62+
decompress_error(b"\x07", deflate.RAW)
63+
64+
# Truncated stream.
65+
decompress_error(data_raw[:10], deflate.RAW)
66+
67+
# Partial reads.
68+
buf = io.BytesIO(data_zlib)
69+
with deflate.DeflateIO(buf) as g:
70+
print(buf.seek(0, 1)) # verify stream is not read until first read of the DeflateIO stream.
71+
print(g.read(1))
72+
print(buf.seek(0, 1)) # verify that only the minimal amount is read from the source
73+
print(g.read(1))
74+
print(buf.seek(0, 1))
75+
print(g.read(2))
76+
print(buf.seek(0, 1))
77+
print(g.read())
78+
print(buf.seek(0, 1))
79+
print(g.read(1))
80+
print(buf.seek(0, 1))
81+
print(g.read())
82+
83+
# Invalid zlib checksum (+ length for gzip). Note: only checksum errors are
84+
# currently detected, see the end of uzlib_uncompress_chksum().
85+
decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00")
86+
decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00")
87+
decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00", deflate.ZLIB)
88+
decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00", deflate.GZIP)
89+
90+
# Reading from a closed underlying stream.
91+
b = io.BytesIO(data_raw)
92+
g = deflate.DeflateIO(b, deflate.RAW)
93+
g.read(4)
94+
b.close()
95+
try:
96+
g.read(4)
97+
except ValueError:
98+
print("ValueError")
99+
100+
# Reading from a closed DeflateIO.
101+
b = io.BytesIO(data_raw)
102+
g = deflate.DeflateIO(b, deflate.RAW)
103+
g.read(4)
104+
g.close()
105+
try:
106+
g.read(4)
107+
except OSError:
108+
print("OSError")
109+
110+
# Gzip header with extra flags (FCOMMENT FNAME FEXTRA FHCRC) enabled.
111+
data_gzip_header_extra = b"\x1f\x8b\x08\x1e}\x9a\x9bd\x02\x00\x00\x00\x00\x00\x00\xff\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcf\x03\x00\xf2KF>\x0b\x00\x00\x00"
112+
print(decompress(data_gzip_header_extra))
113+
114+
# Test patterns.
115+
PATTERNS_ZLIB = [
116+
# Packed results produced by CPy's zlib.compress()
117+
(b"0", b"x\x9c3\x00\x00\x001\x001"),
118+
(b"a", b"x\x9cK\x04\x00\x00b\x00b"),
119+
(b"0" * 100, b"x\x9c30\xa0=\x00\x00\xb3q\x12\xc1"),
120+
(
121+
bytes(range(64)),
122+
b"x\x9cc`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00\xaa\xe0\x07\xe1",
123+
),
124+
(b"hello", b"x\x01\x01\x05\x00\xfa\xffhello\x06,\x02\x15"), # compression level 0
125+
# adaptive/dynamic huffman tree
126+
(
127+
b"13371813150|13764518736|12345678901",
128+
b"x\x9c\x05\xc1\x81\x01\x000\x04\x04\xb1\x95\\\x1f\xcfn\x86o\x82d\x06Qq\xc8\x9d\xc5X}<e\xb5g\x83\x0f\x89X\x07\xab",
129+
),
130+
# dynamic Huffman tree with "case 17" (repeat code for 3-10 times)
131+
(
132+
b">I}\x00\x951D>I}\x00\x951D>I}\x00\x951D>I}\x00\x951D",
133+
b"x\x9c\x05\xc11\x01\x00\x00\x00\x010\x95\x14py\x84\x12C_\x9bR\x8cV\x8a\xd1J1Z)F\x1fw`\x089",
134+
),
135+
]
136+
for unpacked, packed in PATTERNS_ZLIB:
137+
print(decompress(packed) == unpacked)
138+
print(decompress(packed, deflate.ZLIB) == unpacked)
139+
140+
# Older version's of CPython's zlib module still included the checksum and length (as if it were a zlib/gzip stream).
141+
# Make sure there're no problem decompressing this.
142+
data_raw_with_footer = data_raw + b"\x00\x00\x00\x00\x00\x00\x00\x00"
143+
print(decompress(data_raw_with_footer, deflate.RAW))
144+
145+
# Valid wbits values.
146+
decompress_error(data_wbits_5, deflate.RAW, -1)
147+
print(len(decompress(data_wbits_5, deflate.RAW, 0)))
148+
decompress_error(data_wbits_5, deflate.RAW, 1)
149+
decompress_error(data_wbits_5, deflate.RAW, 4)
150+
for i in range(5, 16):
151+
print(len(decompress(data_wbits_5, deflate.RAW, i)))
152+
decompress_error(data_wbits_5, deflate.RAW, 16)
153+
154+
# Invalid values for format.
155+
decompress_error(data_raw, -1)
156+
decompress_error(data_raw, 5)
157+
158+
# Data that requires a higher wbits value.
159+
decompress_error(data_wbits_6, deflate.RAW, 5)
160+
print(len(decompress(data_wbits_6, deflate.RAW, 6)))
161+
print(len(decompress(data_wbits_6, deflate.RAW, 7)))
162+
decompress_error(data_wbits_8, deflate.RAW, 7)
163+
print(len(decompress(data_wbits_8, deflate.RAW, 8)))
164+
print(len(decompress(data_wbits_8, deflate.RAW, 9)))
165+
decompress_error(data_wbits_10, deflate.RAW)
166+
decompress_error(data_wbits_10, deflate.RAW, 9)
167+
print(len(decompress(data_wbits_10, deflate.RAW, 10)))
168+
169+
# zlib header sets the size, so works with wbits unset or wbits >= 10.
170+
data_wbits_10_zlib = b"(\x91\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00[\xbc\x04)"
171+
print(len(decompress(data_wbits_10_zlib, deflate.ZLIB)))
172+
decompress_error(data_wbits_10_zlib, deflate.ZLIB, 9)
173+
print(len(decompress(data_wbits_10_zlib, deflate.ZLIB, 10)))
174+
print(len(decompress(data_wbits_10_zlib)))

0 commit comments

Comments
 (0)