-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtest_msg.py
More file actions
149 lines (133 loc) · 6.6 KB
/
test_msg.py
File metadata and controls
149 lines (133 loc) · 6.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# -*- coding: utf-8; -*-
from ...syntax import macros, test, warn # noqa: F401
from ...test.fixtures import session, testset
from io import BytesIO, SEEK_SET
from .fixtures import nettest
from ..msg import encodemsg, decodemsg, MessageDecoder
from ..util import ReceiveBuffer, bytessource, streamsource, socketsource
def runtests():
with testset("sans-IO"):
with testset("basic usage"):
# Encode a message:
rawdata = b"hello world"
message = encodemsg(rawdata)
# Decode a message:
decoder = MessageDecoder(bytessource(message))
test[decoder.decode() == b"hello world"]
test[decoder.decode() is None] # The message should have been consumed by the first decode.
# Decoding a message gets a whole message and only that message.
with testset("decode robustness"):
bio = BytesIO()
bio.write(message)
bio.write(b"junk junk junk")
bio.seek(0, SEEK_SET)
decoder = MessageDecoder(streamsource(bio))
test[decoder.decode() == b"hello world"]
test[decoder.decode() is None]
# - Messages are received in order.
# - Any leftover bytes already read into the receive buffer by the previous decode
# are consumed *from the buffer* by the next decode. This guarantees it doesn't
# matter if the transport does not honor message boundaries (which is indeed the
# whole point of having this protocol).
# - Note this means that should you wish to stop receiving messages on a particular
# source, and resume reading a raw stream from it instead, you must manually prepend
# the final contents of the receive buffer (`decoder.get_buffered_data()`) to whatever
# data you later receive from that source (since that data has already been placed
# into the receive buffer, so it is no longer available at the source).
# - So it's recommended to have a dedicated channel to communicate using messages,
# e.g. a dedicated TCP connection on which all communication is done with messages.
# This way you don't need to care about the receive buffer.
with testset("message ordering"):
bio = BytesIO()
bio.write(encodemsg(b"hello world"))
bio.write(encodemsg(b"hello again"))
bio.seek(0, SEEK_SET)
decoder = MessageDecoder(streamsource(bio))
test[decoder.decode() == b"hello world"]
test[decoder.decode() == b"hello again"]
test[decoder.decode() is None]
# Synchronization to message start is performed upon decode.
# It doesn't matter if there is junk between messages (the junk is discarded).
with testset("stream synchronization"):
bio = BytesIO()
bio.write(encodemsg(b"hello world"))
bio.write(b"junk junk junk")
bio.write(encodemsg(b"hello again"))
bio.seek(0, SEEK_SET)
decoder = MessageDecoder(streamsource(bio))
test[decoder.decode() == b"hello world"]
test[decoder.decode() == b"hello again"]
test[decoder.decode() is None]
# Junk containing sync bytes (0xFF) does not confuse or hang the decoder.
with testset("junk containing sync bytes"):
bio = BytesIO()
bio.write(encodemsg(b"hello world"))
bio.write(b"\xff" * 10)
bio.write(encodemsg(b"hello again"))
bio.seek(0, SEEK_SET)
decoder = MessageDecoder(streamsource(bio))
test[decoder.decode() == b"hello world"]
test[decoder.decode() == b"hello again"]
test[decoder.decode() is None]
with testset("decodemsg (free function form)"):
# `MessageDecoder` wraps `decodemsg` and manages the `ReceiveBuffer`
# internally; the tests above exercise both via the class-based path.
# `decodemsg` itself is also in `msg.__all__` as a public free-function
# entry point, so it deserves a direct test that doesn't route through
# the class.
with testset("basic roundtrip"):
buf = ReceiveBuffer()
source = bytessource(encodemsg(b"hello world"))
test[decodemsg(buf, source) == b"hello world"]
# Subsequent call: source is exhausted, returns None.
test[decodemsg(buf, source) is None]
with testset("multiple messages with stream synchronization"):
# Junk between the two messages must be discarded; both messages
# must decode in order. Exercises the same invariants as the
# `MessageDecoder` tests above, via the free-function API.
bio = BytesIO()
bio.write(encodemsg(b"first"))
bio.write(b"junk junk junk")
bio.write(encodemsg(b"second"))
bio.seek(0, SEEK_SET)
buf = ReceiveBuffer()
source = streamsource(bio)
test[decodemsg(buf, source) == b"first"]
test[decodemsg(buf, source) == b"second"]
test[decodemsg(buf, source) is None]
with testset("binary-safe payload"):
# Messages may contain arbitrary bytes, including the sync-byte
# value (0xFF) inside the payload.
payload = bytes(range(256))
buf = ReceiveBuffer()
source = bytessource(encodemsg(payload))
test[decodemsg(buf, source) == payload]
with testset("with TCP sockets"):
def server1(sock):
decoder = MessageDecoder(socketsource(sock))
data = decoder.decode()
return data
def client1(sock):
sock.sendall(encodemsg(b"hello world"))
test[nettest(server1, client1) == b"hello world"]
def server2(sock):
decoder = MessageDecoder(socketsource(sock))
data = decoder.decode()
return data
def client2(sock):
sock.sendall(encodemsg(b"hello world"))
sock.sendall(encodemsg(b"hello again"))
test[nettest(server2, client2) == b"hello world"]
def server3(sock):
decoder = MessageDecoder(socketsource(sock))
data = []
data.append(decoder.decode())
data.append(decoder.decode())
return data
def client3(sock):
sock.sendall(encodemsg(b"hello world"))
sock.sendall(encodemsg(b"hello again"))
test[nettest(server3, client3) == [b"hello world", b"hello again"]]
if __name__ == '__main__': # pragma: no cover
with session(__file__):
runtests()