-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtest_msg.py
More file actions
113 lines (100 loc) · 4.86 KB
/
test_msg.py
File metadata and controls
113 lines (100 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -*- coding: utf-8; -*-
from ...syntax import macros, test, warn # noqa: F401
from ...test.fixtures import session, testset
from io import BytesIO, SEEK_SET
from .fixtures import nettest
from ..msg import encodemsg, MessageDecoder
from ..util import bytessource, streamsource, socketsource
def runtests():
with testset("sans-IO"):
with testset("basic usage"):
# Encode a message:
rawdata = b"hello world"
message = encodemsg(rawdata)
# Decode a message:
decoder = MessageDecoder(bytessource(message))
test[decoder.decode() == b"hello world"]
test[decoder.decode() is None] # The message should have been consumed by the first decode.
# Decoding a message gets a whole message and only that message.
with testset("decode robustness"):
bio = BytesIO()
bio.write(message)
bio.write(b"junk junk junk")
bio.seek(0, SEEK_SET)
decoder = MessageDecoder(streamsource(bio))
test[decoder.decode() == b"hello world"]
test[decoder.decode() is None]
# - Messages are received in order.
# - Any leftover bytes already read into the receive buffer by the previous decode
# are consumed *from the buffer* by the next decode. This guarantees it doesn't
# matter if the transport does not honor message boundaries (which is indeed the
# whole point of having this protocol).
# - Note this means that should you wish to stop receiving messages on a particular
# source, and resume reading a raw stream from it instead, you must manually prepend
# the final contents of the receive buffer (`decoder.get_buffered_data()`) to whatever
# data you later receive from that source (since that data has already been placed
# into the receive buffer, so it is no longer available at the source).
# - So it's recommended to have a dedicated channel to communicate using messages,
# e.g. a dedicated TCP connection on which all communication is done with messages.
# This way you don't need to care about the receive buffer.
with testset("message ordering"):
bio = BytesIO()
bio.write(encodemsg(b"hello world"))
bio.write(encodemsg(b"hello again"))
bio.seek(0, SEEK_SET)
decoder = MessageDecoder(streamsource(bio))
test[decoder.decode() == b"hello world"]
test[decoder.decode() == b"hello again"]
test[decoder.decode() is None]
# Synchronization to message start is performed upon decode.
# It doesn't matter if there is junk between messages (the junk is discarded).
with testset("stream synchronization"):
bio = BytesIO()
bio.write(encodemsg(b"hello world"))
bio.write(b"junk junk junk")
bio.write(encodemsg(b"hello again"))
bio.seek(0, SEEK_SET)
decoder = MessageDecoder(streamsource(bio))
test[decoder.decode() == b"hello world"]
test[decoder.decode() == b"hello again"]
test[decoder.decode() is None]
# Junk containing sync bytes (0xFF) does not confuse or hang the decoder.
with testset("junk containing sync bytes"):
bio = BytesIO()
bio.write(encodemsg(b"hello world"))
bio.write(b"\xff" * 10)
bio.write(encodemsg(b"hello again"))
bio.seek(0, SEEK_SET)
decoder = MessageDecoder(streamsource(bio))
test[decoder.decode() == b"hello world"]
test[decoder.decode() == b"hello again"]
test[decoder.decode() is None]
with testset("with TCP sockets"):
def server1(sock):
decoder = MessageDecoder(socketsource(sock))
data = decoder.decode()
return data
def client1(sock):
sock.sendall(encodemsg(b"hello world"))
test[nettest(server1, client1) == b"hello world"]
def server2(sock):
decoder = MessageDecoder(socketsource(sock))
data = decoder.decode()
return data
def client2(sock):
sock.sendall(encodemsg(b"hello world"))
sock.sendall(encodemsg(b"hello again"))
test[nettest(server2, client2) == b"hello world"]
def server3(sock):
decoder = MessageDecoder(socketsource(sock))
data = []
data.append(decoder.decode())
data.append(decoder.decode())
return data
def client3(sock):
sock.sendall(encodemsg(b"hello world"))
sock.sendall(encodemsg(b"hello again"))
test[nettest(server3, client3) == [b"hello world", b"hello again"]]
if __name__ == '__main__': # pragma: no cover
with session(__file__):
runtests()