Skip to content

Commit 7f7c84b

Browse files
committed
py/stream: Support both "exact size" and "one underlying call" operations.
Both read and write operations support variants where either a) a single call is made to the undelying stream implementation and returned buffer length may be less than requested, or b) calls are repeated until requested amount of data is collected, shorter amount is returned only in case of EOF or error. These operations are available from the level of C support functions to be used by other C modules to implementations of Python methods to be used in user-facing objects. The rationale of these changes is to allow to write concise and robust code to work with *blocking* streams of types prone to short reads, like serial interfaces and sockets. Particular object types may select "exact" vs "once" types of methods depending on their needs. E.g., for sockets, revc() and send() methods continue to be "once", while read() and write() thus converted to "exactly" versions. These changes don't affect non-blocking handling, e.g. trying "exact" method on the non-blocking socket will return as much data as available without blocking. No data available is continued to be signaled as None return value to read() and write(). From the point of view of CPython compatibility, this model is a cross between its io.RawIOBase and io.BufferedIOBase abstract classes. For blocking streams, it works as io.BufferedIOBase model (guaranteeing lack of short reads/writes), while for non-blocking - as io.RawIOBase, returning None in case of lack of data (instead of raising expensive exception, as required by io.BufferedIOBase). Such a cross-behavior should be optimal for MicroPython needs.
1 parent 92a342a commit 7f7c84b

5 files changed

Lines changed: 100 additions & 43 deletions

File tree

extmod/modwebrepl.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ STATIC mp_uint_t _webrepl_read(mp_obj_t self_in, void *buf, mp_uint_t size, int
250250

251251
DEBUG_printf("webrepl: Writing %lu bytes to file\n", buf_sz);
252252
int err;
253-
mp_uint_t res = mp_stream_writeall(self->cur_file, filebuf, buf_sz, &err);
254-
if(res == MP_STREAM_ERROR) {
253+
mp_uint_t res = mp_stream_write_exactly(self->cur_file, filebuf, buf_sz, &err);
254+
if (err != 0 || res != buf_sz) {
255255
assert(0);
256256
}
257257

extmod/modwebsocket.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,16 +240,19 @@ STATIC mp_uint_t websocket_write(mp_obj_t self_in, const void *buf, mp_uint_t si
240240
mp_call_method_n_kw(1, 0, dest);
241241
}
242242

243-
mp_uint_t out_sz = mp_stream_writeall(self->sock, header, hdr_sz, errcode);
244-
if (out_sz != MP_STREAM_ERROR) {
245-
out_sz = mp_stream_writeall(self->sock, buf, size, errcode);
243+
mp_uint_t out_sz = mp_stream_write_exactly(self->sock, header, hdr_sz, errcode);
244+
if (*errcode == 0) {
245+
out_sz = mp_stream_write_exactly(self->sock, buf, size, errcode);
246246
}
247247

248248
if (self->opts & BLOCKING_WRITE) {
249249
dest[2] = mp_const_false;
250250
mp_call_method_n_kw(1, 0, dest);
251251
}
252252

253+
if (*errcode != 0) {
254+
return MP_STREAM_ERROR;
255+
}
253256
return out_sz;
254257
}
255258

py/modio.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,13 @@ STATIC mp_uint_t bufwriter_write(mp_obj_t self_in, const void *buf, mp_uint_t si
7878
memcpy(self->buf + self->len, buf, rem);
7979
buf = (byte*)buf + rem;
8080
size -= rem;
81-
mp_uint_t out_sz = mp_stream_writeall(self->stream, self->buf, self->alloc, errcode);
82-
if (out_sz == MP_STREAM_ERROR) {
81+
mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->alloc, errcode);
82+
if (*errcode != 0) {
8383
return MP_STREAM_ERROR;
8484
}
85+
// TODO: try to recover from a case of non-blocking stream, e.g. move
86+
// remaining chunk to the beginning of buffer.
87+
assert(out_sz == self->alloc);
8588
self->len = 0;
8689
}
8790

@@ -93,9 +96,12 @@ STATIC mp_obj_t bufwriter_flush(mp_obj_t self_in) {
9396

9497
if (self->len != 0) {
9598
int err;
96-
mp_uint_t out_sz = mp_stream_writeall(self->stream, self->buf, self->len, &err);
99+
mp_uint_t out_sz = mp_stream_write_exactly(self->stream, self->buf, self->len, &err);
100+
// TODO: try to recover from a case of non-blocking stream, e.g. move
101+
// remaining chunk to the beginning of buffer.
102+
assert(out_sz == self->len);
97103
self->len = 0;
98-
if (out_sz == MP_STREAM_ERROR) {
104+
if (err != 0) {
99105
nlr_raise(mp_obj_new_exception_arg1(&mp_type_OSError, MP_OBJ_NEW_SMALL_INT(err)));
100106
}
101107
}

py/stream.c

Lines changed: 72 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,48 @@ STATIC mp_obj_t stream_readall(mp_obj_t self_in);
4949

5050
#define STREAM_CONTENT_TYPE(stream) (((stream)->is_text) ? &mp_type_str : &mp_type_bytes)
5151

52+
// Returns error condition in *errcode, if non-zero, return value is number of bytes written
53+
// before error condition occured. If *errcode == 0, returns total bytes written (which will
54+
// be equal to input size).
55+
mp_uint_t mp_stream_rw(mp_obj_t stream, void *buf_, mp_uint_t size, int *errcode, byte flags) {
56+
byte *buf = buf_;
57+
mp_obj_base_t* s = (mp_obj_base_t*)MP_OBJ_TO_PTR(stream);
58+
typedef mp_uint_t (*io_func_t)(mp_obj_t obj, void *buf, mp_uint_t size, int *errcode);
59+
io_func_t io_func;
60+
if (flags & MP_STREAM_RW_WRITE) {
61+
io_func = (io_func_t)s->type->stream_p->write;
62+
} else {
63+
io_func = s->type->stream_p->read;
64+
}
65+
66+
*errcode = 0;
67+
mp_uint_t done = 0;
68+
while (size > 0) {
69+
mp_uint_t out_sz = io_func(stream, buf, size, errcode);
70+
// For read, out_sz == 0 means EOF. For write, it's unspecified
71+
// what it means, but we don't make any progress, so returning
72+
// is still the best option.
73+
if (out_sz == 0) {
74+
return done;
75+
}
76+
if (out_sz == MP_STREAM_ERROR) {
77+
// If we read something before getting EAGAIN, don't leak it
78+
if (mp_is_nonblocking_error(*errcode) && done != 0) {
79+
*errcode = 0;
80+
}
81+
return done;
82+
}
83+
if (flags & MP_STREAM_RW_ONCE) {
84+
return out_sz;
85+
}
86+
87+
buf += out_sz;
88+
size -= out_sz;
89+
done += out_sz;
90+
}
91+
return done;
92+
}
93+
5294
const mp_stream_p_t *mp_get_stream_raise(mp_obj_t self_in, int flags) {
5395
mp_obj_base_t *o = (mp_obj_base_t*)MP_OBJ_TO_PTR(self_in);
5496
const mp_stream_p_t *stream_p = o->type->stream_p;
@@ -62,7 +104,7 @@ const mp_stream_p_t *mp_get_stream_raise(mp_obj_t self_in, int flags) {
62104
return stream_p;
63105
}
64106

65-
STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) {
107+
STATIC mp_obj_t stream_read_generic(size_t n_args, const mp_obj_t *args, byte flags) {
66108
const mp_stream_p_t *stream_p = mp_get_stream_raise(args[0], MP_STREAM_OP_READ);
67109

68110
// What to do if sz < -1? Python docs don't specify this case.
@@ -94,8 +136,8 @@ STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) {
94136
nlr_raise(mp_obj_new_exception_msg_varg(&mp_type_MemoryError, "out of memory"));
95137
}
96138
int error;
97-
mp_uint_t out_sz = stream_p->read(args[0], p, more_bytes, &error);
98-
if (out_sz == MP_STREAM_ERROR) {
139+
mp_uint_t out_sz = mp_stream_read_exactly(args[0], p, more_bytes, &error);
140+
if (error != 0) {
99141
vstr_cut_tail_bytes(&vstr, more_bytes);
100142
if (mp_is_nonblocking_error(error)) {
101143
// With non-blocking streams, we read as much as we can.
@@ -165,8 +207,8 @@ STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) {
165207
vstr_t vstr;
166208
vstr_init_len(&vstr, sz);
167209
int error;
168-
mp_uint_t out_sz = stream_p->read(args[0], vstr.buf, sz, &error);
169-
if (out_sz == MP_STREAM_ERROR) {
210+
mp_uint_t out_sz = mp_stream_rw(args[0], vstr.buf, sz, &error, flags);
211+
if (error != 0) {
170212
vstr_clear(&vstr);
171213
if (mp_is_nonblocking_error(error)) {
172214
// https://docs.python.org/3.4/library/io.html#io.RawIOBase.read
@@ -182,20 +224,27 @@ STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) {
182224
return mp_obj_new_str_from_vstr(STREAM_CONTENT_TYPE(stream_p), &vstr);
183225
}
184226
}
227+
228+
STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) {
229+
return stream_read_generic(n_args, args, MP_STREAM_RW_READ);
230+
}
185231
MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(mp_stream_read_obj, 1, 2, stream_read);
186232

187-
mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len) {
188-
const mp_stream_p_t *stream_p = mp_get_stream_raise(self_in, MP_STREAM_OP_WRITE);
233+
STATIC mp_obj_t stream_read1(size_t n_args, const mp_obj_t *args) {
234+
return stream_read_generic(n_args, args, MP_STREAM_RW_READ | MP_STREAM_RW_ONCE);
235+
}
236+
MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(mp_stream_read1_obj, 1, 2, stream_read1);
237+
238+
mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len, byte flags) {
239+
mp_get_stream_raise(self_in, MP_STREAM_OP_WRITE);
189240

190241
int error;
191-
mp_uint_t out_sz = stream_p->write(self_in, buf, len, &error);
192-
if (out_sz == MP_STREAM_ERROR) {
242+
mp_uint_t out_sz = mp_stream_rw(self_in, (void*)buf, len, &error, flags);
243+
if (error != 0) {
193244
if (mp_is_nonblocking_error(error)) {
194245
// http://docs.python.org/3/library/io.html#io.RawIOBase.write
195246
// "None is returned if the raw stream is set not to block and
196247
// no single byte could be readily written to it."
197-
// This is for consistency with read() behavior, still weird,
198-
// see abobe.
199248
return mp_const_none;
200249
}
201250
nlr_raise(mp_obj_new_exception_arg1(&mp_type_OSError, MP_OBJ_NEW_SMALL_INT(error)));
@@ -206,33 +255,25 @@ mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len) {
206255

207256
// XXX hack
208257
void mp_stream_write_adaptor(void *self, const char *buf, size_t len) {
209-
mp_stream_write(MP_OBJ_FROM_PTR(self), buf, len);
210-
}
211-
212-
// Works only with blocking streams
213-
mp_uint_t mp_stream_writeall(mp_obj_t stream, const byte *buf, mp_uint_t size, int *errcode) {
214-
mp_obj_base_t* s = (mp_obj_base_t*)MP_OBJ_TO_PTR(stream);
215-
mp_uint_t org_size = size;
216-
while (size > 0) {
217-
mp_uint_t out_sz = s->type->stream_p->write(stream, buf, size, errcode);
218-
if (out_sz == MP_STREAM_ERROR) {
219-
return MP_STREAM_ERROR;
220-
}
221-
buf += out_sz;
222-
size -= out_sz;
223-
}
224-
return org_size;
258+
mp_stream_write(MP_OBJ_FROM_PTR(self), buf, len, MP_STREAM_RW_WRITE);
225259
}
226260

227261
STATIC mp_obj_t stream_write_method(mp_obj_t self_in, mp_obj_t arg) {
228262
mp_buffer_info_t bufinfo;
229263
mp_get_buffer_raise(arg, &bufinfo, MP_BUFFER_READ);
230-
return mp_stream_write(self_in, bufinfo.buf, bufinfo.len);
264+
return mp_stream_write(self_in, bufinfo.buf, bufinfo.len, MP_STREAM_RW_WRITE);
231265
}
232266
MP_DEFINE_CONST_FUN_OBJ_2(mp_stream_write_obj, stream_write_method);
233267

268+
STATIC mp_obj_t stream_write1_method(mp_obj_t self_in, mp_obj_t arg) {
269+
mp_buffer_info_t bufinfo;
270+
mp_get_buffer_raise(arg, &bufinfo, MP_BUFFER_READ);
271+
return mp_stream_write(self_in, bufinfo.buf, bufinfo.len, MP_STREAM_RW_WRITE | MP_STREAM_RW_ONCE);
272+
}
273+
MP_DEFINE_CONST_FUN_OBJ_2(mp_stream_write1_obj, stream_write1_method);
274+
234275
STATIC mp_obj_t stream_readinto(size_t n_args, const mp_obj_t *args) {
235-
const mp_stream_p_t *stream_p = mp_get_stream_raise(args[0], MP_STREAM_OP_READ);
276+
mp_get_stream_raise(args[0], MP_STREAM_OP_READ);
236277
mp_buffer_info_t bufinfo;
237278
mp_get_buffer_raise(args[1], &bufinfo, MP_BUFFER_WRITE);
238279

@@ -248,8 +289,8 @@ STATIC mp_obj_t stream_readinto(size_t n_args, const mp_obj_t *args) {
248289
}
249290

250291
int error;
251-
mp_uint_t out_sz = stream_p->read(args[0], bufinfo.buf, len, &error);
252-
if (out_sz == MP_STREAM_ERROR) {
292+
mp_uint_t out_sz = mp_stream_read_exactly(args[0], bufinfo.buf, len, &error);
293+
if (error != 0) {
253294
if (mp_is_nonblocking_error(error)) {
254295
return mp_const_none;
255296
}

py/stream.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,13 @@ struct mp_stream_seek_t {
4848
};
4949

5050
MP_DECLARE_CONST_FUN_OBJ(mp_stream_read_obj);
51+
MP_DECLARE_CONST_FUN_OBJ(mp_stream_read1_obj);
5152
MP_DECLARE_CONST_FUN_OBJ(mp_stream_readinto_obj);
5253
MP_DECLARE_CONST_FUN_OBJ(mp_stream_readall_obj);
5354
MP_DECLARE_CONST_FUN_OBJ(mp_stream_unbuffered_readline_obj);
5455
MP_DECLARE_CONST_FUN_OBJ(mp_stream_unbuffered_readlines_obj);
5556
MP_DECLARE_CONST_FUN_OBJ(mp_stream_write_obj);
57+
MP_DECLARE_CONST_FUN_OBJ(mp_stream_write1_obj);
5658
MP_DECLARE_CONST_FUN_OBJ(mp_stream_seek_obj);
5759
MP_DECLARE_CONST_FUN_OBJ(mp_stream_tell_obj);
5860
MP_DECLARE_CONST_FUN_OBJ(mp_stream_ioctl_obj);
@@ -67,10 +69,15 @@ const mp_stream_p_t *mp_get_stream_raise(mp_obj_t self_in, int flags);
6769
// Iterator which uses mp_stream_unbuffered_readline_obj
6870
mp_obj_t mp_stream_unbuffered_iter(mp_obj_t self);
6971

70-
mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len);
72+
mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len, byte flags);
7173

72-
// Helper function to write entire buf to *blocking* stream
73-
mp_uint_t mp_stream_writeall(mp_obj_t stream, const byte *buf, mp_uint_t size, int *errcode);
74+
// C-level helper functions
75+
#define MP_STREAM_RW_READ 0
76+
#define MP_STREAM_RW_WRITE 2
77+
#define MP_STREAM_RW_ONCE 1
78+
mp_uint_t mp_stream_rw(mp_obj_t stream, void *buf, mp_uint_t size, int *errcode, byte flags);
79+
#define mp_stream_write_exactly(stream, buf, size, err) mp_stream_rw(stream, (byte*)buf, size, err, MP_STREAM_RW_WRITE)
80+
#define mp_stream_read_exactly(stream, buf, size, err) mp_stream_rw(stream, buf, size, err, MP_STREAM_RW_READ)
7481

7582
#if MICROPY_STREAMS_NON_BLOCK
7683
// TODO: This is POSIX-specific (but then POSIX is the only real thing,

0 commit comments

Comments
 (0)