-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathutil.py
More file actions
197 lines (158 loc) · 7.38 KB
/
util.py
File metadata and controls
197 lines (158 loc) · 7.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# -*- coding: utf-8; -*-
"""Utilities for networking."""
__all__ = ["ReuseAddrThreadingTCPServer",
"ReceiveBuffer",
"bytessource", "streamsource", "socketsource",
"recvall",
"netstringify"]
import socket
import socketserver
import select
from io import BytesIO, IOBase
# https://docs.python.org/3/library/socketserver.html#socketserver.ThreadingTCPServer
# https://docs.python.org/3/library/socketserver.html#socketserver.ThreadingMixIn
# https://docs.python.org/3/library/socketserver.html#socketserver.TCPServer
class ReuseAddrThreadingTCPServer(socketserver.ThreadingTCPServer):
def server_bind(self):
"""Custom server_bind ensuring the socket is available for rebind immediately."""
# from https://stackoverflow.com/a/18858817
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
# We could achieve the same result using a `unpythonic.collections.box` to
# hold a `BytesIO`, but a class allows us to encapsulate also the set and
# append operations. So here OOP is really the right solution.
class ReceiveBuffer:
"""A receive buffer for message protocols running on top of stream-based transports.
To use this, read data from your original stream into this buffer, and get
the data from here.
The advantage over using a bare `BytesIO` is that we support partially
clearing the buffer when a complete message has been received. (This allows
removing the received message from the buffer, while keeping any bytes that
arrived on the stream transport after that particular message ended - most
likely containing the beginning of a new message.)
It is the caller's responsibility to define what a message is; we just
provide methods to `append` and `set` the buffer contents.
"""
def __init__(self, initial_contents=b""):
"""A receive buffer object for use with `decodemsg`."""
self._buffer = BytesIO()
self.set(initial_contents)
# The contents are potentially large, so we don't dump them into the TypeError messages.
def append(self, more_contents=b""):
"""Append `more_contents` to the buffer."""
if not isinstance(more_contents, bytes):
raise TypeError(f"Expected a bytes object, got {type(more_contents)}")
self._buffer.write(more_contents)
return self # convenience
def set(self, new_contents=b""):
"""Replace buffer contents with `new_contents`."""
if not isinstance(new_contents, bytes):
raise TypeError(f"Expected a bytes object, got {type(new_contents)}")
# Use write() to supply the new contents instead of ctor arg, so the
# stream position will be at the end, so any new writes continue from
# wherever the initial contents leave off.
self._buffer = BytesIO()
self._buffer.write(new_contents)
return self
def getvalue(self):
"""Return the data currently in the buffer, as a `bytes` object.
Mostly this is for internal use by message protocols; but an application
may need this if you intend to switch over from messages back to raw data
on an existing stream transport.
When you're done receiving messages, if you need to read the remaining data
after the last message, the data in the buffer should be processed first,
before you read and process any more data from your original stream.
"""
return self._buffer.getvalue()
def bytessource(data, chunksize=4096):
"""Generator that reads from a `bytes` object in chunksize-sized chunks.
Returns a generator instance.
The generator yields each chunk as a `bytes` object. The last one may be
smaller than `chunksize`. Stops iteration when data runs out.
Acts as a message source for `decodemsg`, for receiving data from a `bytes` object.
See also `streamsource`, `socketsource`.
"""
# Package the generator in an inner function to fail-fast.
if not isinstance(data, bytes):
raise TypeError(f"Expected a `bytes` object, got {type(data)}")
def bytes_chunk_iterator():
j = 0
while True:
if j * chunksize >= len(data):
return
chunk = data[(j * chunksize):((j + 1) * chunksize)]
yield chunk
j += 1
return bytes_chunk_iterator()
def streamsource(stream, chunksize=4096):
"""Generator that reads from an IO stream in (at most) chunksize-sized chunks.
This can be used with files opened with `open()`, in-memory `BytesIO` streams,
and such.
Returns a generator instance.
The generator yields each chunk as a `bytes` object. Each chunk may be
smaller than `chunksize`, if fewer than `chunksize` bytes are available in
the stream at the time when `next()` is called. (Consider `sys.stdin`.)
Blocks when no data is available, but the stream has not signaled EOF.
Stops iteration at EOF.
Acts as a message source for `decodemsg`, for receiving data from a binary IO stream.
See also `bytessource`, `socketsource`.
"""
if not isinstance(stream, IOBase):
raise TypeError(f"Expected a derivative of `IOBase`, got {type(stream)}")
def stream_chunk_iterator():
while True:
data = stream.read(4096)
if len(data) == 0:
return
yield data
return stream_chunk_iterator()
def socketsource(sock, chunksize=4096):
"""Generator that reads from a socket in (at most) chunksize-sized chunks.
Returns a generator instance.
The generator yields each chunk as a `bytes` object. Each chunk may be
smaller than `chunksize`, if fewer than `chunksize` bytes are available on
the socket at the time when `next()` is called.
Blocks when no data is available, but the socket is still connected to the
remote. Stops iteration when the socket is closed.
Acts as a message source for `decodemsg`, for receiving data over a socket.
See also `bytessource`, `streamsource`.
"""
if not isinstance(sock, socket.SocketType):
raise TypeError(f"Expected a socket object, got {type(sock)}")
def socket_chunk_iterator():
while True:
rs, ws, es = select.select([sock], [], [])
for r in rs:
data = sock.recv(chunksize)
if len(data) == 0:
return
yield data
return socket_chunk_iterator()
def recvall(n, sock):
"""Receive **exactly** `n` bytes from a socket.
Missing battery for the stdlib `socket` module (compare `socket.sendall`).
Returns a `bytes` object containing the bytes read, or `None` if the socket
is closed by the other end before `n` bytes have been received.
See:
http://stupidpythonideas.blogspot.com/2013/05/sockets-are-byte-streams-not-data.html
"""
buf = BytesIO()
while n:
data = sock.recv(n)
if not data:
return None
buf.write(data)
n -= len(data)
return buf.getvalue()
def netstringify(data):
"""Return a `bytes` object of `data` (also `bytes`), converted into a netstring."""
if not isinstance(data, bytes):
raise TypeError(f"Data must be bytes; got {type(data)}")
n = len(data)
buf = BytesIO()
header = f"{n}:"
footer = ","
buf.write(header.encode("utf-8"))
buf.write(data)
buf.write(footer.encode("utf-8"))
return buf.getvalue()