Skip to content

Commit 7fe601c

Browse files
committed
Issue #13322: Fix BufferedWriter.write() to ensure that BlockingIOError is
raised when the wrapped raw file is non-blocking and the write would block. Previous code assumed that the raw write() would raise BlockingIOError, but RawIOBase.write() is defined to returned None when the call would block. Patch by sbt.
2 parents 41dde32 + 58fcf9f commit 7fe601c

5 files changed

Lines changed: 152 additions & 60 deletions

File tree

Lib/_pyio.py

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import abc
77
import codecs
88
import warnings
9+
import errno
910
# Import _thread instead of threading to reduce startup cost
1011
try:
1112
from _thread import allocate_lock as Lock
@@ -717,8 +718,11 @@ def flush(self):
717718

718719
def close(self):
719720
if self.raw is not None and not self.closed:
720-
self.flush()
721-
self.raw.close()
721+
try:
722+
# may raise BlockingIOError or BrokenPipeError etc
723+
self.flush()
724+
finally:
725+
self.raw.close()
722726

723727
def detach(self):
724728
if self.raw is None:
@@ -1077,13 +1081,9 @@ def write(self, b):
10771081
# XXX we can implement some more tricks to try and avoid
10781082
# partial writes
10791083
if len(self._write_buf) > self.buffer_size:
1080-
# We're full, so let's pre-flush the buffer
1081-
try:
1082-
self._flush_unlocked()
1083-
except BlockingIOError as e:
1084-
# We can't accept anything else.
1085-
# XXX Why not just let the exception pass through?
1086-
raise BlockingIOError(e.errno, e.strerror, 0)
1084+
# We're full, so let's pre-flush the buffer. (This may
1085+
# raise BlockingIOError with characters_written == 0.)
1086+
self._flush_unlocked()
10871087
before = len(self._write_buf)
10881088
self._write_buf.extend(b)
10891089
written = len(self._write_buf) - before
@@ -1114,22 +1114,21 @@ def flush(self):
11141114
def _flush_unlocked(self):
11151115
if self.closed:
11161116
raise ValueError("flush of closed file")
1117-
written = 0
1118-
try:
1119-
while self._write_buf:
1120-
try:
1121-
n = self.raw.write(self._write_buf)
1122-
except InterruptedError:
1123-
continue
1124-
if n > len(self._write_buf) or n < 0:
1125-
raise IOError("write() returned incorrect number of bytes")
1126-
del self._write_buf[:n]
1127-
written += n
1128-
except BlockingIOError as e:
1129-
n = e.characters_written
1117+
while self._write_buf:
1118+
try:
1119+
n = self.raw.write(self._write_buf)
1120+
except InterruptedError:
1121+
continue
1122+
except BlockingIOError:
1123+
raise RuntimeError("self.raw should implement RawIOBase: it "
1124+
"should not raise BlockingIOError")
1125+
if n is None:
1126+
raise BlockingIOError(
1127+
errno.EAGAIN,
1128+
"write could not complete without blocking", 0)
1129+
if n > len(self._write_buf) or n < 0:
1130+
raise IOError("write() returned incorrect number of bytes")
11301131
del self._write_buf[:n]
1131-
written += n
1132-
raise BlockingIOError(e.errno, e.strerror, written)
11331132

11341133
def tell(self):
11351134
return _BufferedIOMixin.tell(self) + len(self._write_buf)

Lib/test/test_io.py

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@
4242
import threading
4343
except ImportError:
4444
threading = None
45-
45+
try:
46+
import fcntl
47+
except ImportError:
48+
fcntl = None
4649

4750
def _default_chunk_size():
4851
"""Get the default TextIOWrapper chunk size"""
@@ -242,9 +245,14 @@ def write(self, b):
242245
except ValueError:
243246
pass
244247
else:
245-
self._blocker_char = None
246-
self._write_stack.append(b[:n])
247-
raise self.BlockingIOError(0, "test blocking", n)
248+
if n > 0:
249+
# write data up to the first blocker
250+
self._write_stack.append(b[:n])
251+
return n
252+
else:
253+
# cancel blocker and indicate would block
254+
self._blocker_char = None
255+
return None
248256
self._write_stack.append(b)
249257
return len(b)
250258

@@ -2753,6 +2761,70 @@ def test_pickling(self):
27532761
with self.open(support.TESTFN, **kwargs) as f:
27542762
self.assertRaises(TypeError, pickle.dumps, f, protocol)
27552763

2764+
@unittest.skipUnless(fcntl, 'fcntl required for this test')
2765+
def test_nonblock_pipe_write_bigbuf(self):
2766+
self._test_nonblock_pipe_write(16*1024)
2767+
2768+
@unittest.skipUnless(fcntl, 'fcntl required for this test')
2769+
def test_nonblock_pipe_write_smallbuf(self):
2770+
self._test_nonblock_pipe_write(1024)
2771+
2772+
def _set_non_blocking(self, fd):
2773+
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2774+
self.assertNotEqual(flags, -1)
2775+
res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2776+
self.assertEqual(res, 0)
2777+
2778+
def _test_nonblock_pipe_write(self, bufsize):
2779+
sent = []
2780+
received = []
2781+
r, w = os.pipe()
2782+
self._set_non_blocking(r)
2783+
self._set_non_blocking(w)
2784+
2785+
# To exercise all code paths in the C implementation we need
2786+
# to play with buffer sizes. For instance, if we choose a
2787+
# buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2788+
# then we will never get a partial write of the buffer.
2789+
rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2790+
wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2791+
2792+
with rf, wf:
2793+
for N in 9999, 73, 7574:
2794+
try:
2795+
i = 0
2796+
while True:
2797+
msg = bytes([i % 26 + 97]) * N
2798+
sent.append(msg)
2799+
wf.write(msg)
2800+
i += 1
2801+
2802+
except self.BlockingIOError as e:
2803+
self.assertEqual(e.args[0], errno.EAGAIN)
2804+
self.assertEqual(e.args[2], e.characters_written)
2805+
sent[-1] = sent[-1][:e.characters_written]
2806+
received.append(rf.read())
2807+
msg = b'BLOCKED'
2808+
wf.write(msg)
2809+
sent.append(msg)
2810+
2811+
while True:
2812+
try:
2813+
wf.flush()
2814+
break
2815+
except self.BlockingIOError as e:
2816+
self.assertEqual(e.args[0], errno.EAGAIN)
2817+
self.assertEqual(e.args[2], e.characters_written)
2818+
self.assertEqual(e.characters_written, 0)
2819+
received.append(rf.read())
2820+
2821+
received += iter(rf.read, None)
2822+
2823+
sent, received = b''.join(sent), b''.join(received)
2824+
self.assertTrue(sent == received)
2825+
self.assertTrue(wf.closed)
2826+
self.assertTrue(rf.closed)
2827+
27562828
class CMiscIOTest(MiscIOTest):
27572829
io = io
27582830

Misc/ACKS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,7 @@ Ilya Sandler
860860
Mark Sapiro
861861
Ty Sarna
862862
Ben Sayer
863+
sbt
863864
Andrew Schaaf
864865
Michael Scharf
865866
Andreas Schawo

Misc/NEWS

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,12 @@ Core and Builtins
384384
Library
385385
-------
386386

387+
- Issue #13322: Fix BufferedWriter.write() to ensure that BlockingIOError is
388+
raised when the wrapped raw file is non-blocking and the write would block.
389+
Previous code assumed that the raw write() would raise BlockingIOError, but
390+
RawIOBase.write() is defined to returned None when the call would block.
391+
Patch by sbt.
392+
387393
- Issue #13358: HTMLParser now calls handle_data only once for each CDATA.
388394

389395
- Issue #4147: minidom's toprettyxml no longer adds whitespace around a text

Modules/_io/bufferedio.c

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ buffered_getstate(buffered *self, PyObject *args)
596596

597597
/* Forward decls */
598598
static PyObject *
599-
_bufferedwriter_flush_unlocked(buffered *, int);
599+
_bufferedwriter_flush_unlocked(buffered *);
600600
static Py_ssize_t
601601
_bufferedreader_fill_buffer(buffered *self);
602602
static void
@@ -618,6 +618,18 @@ _bufferedreader_raw_read(buffered *self, char *start, Py_ssize_t len);
618618
* Helpers
619619
*/
620620

621+
/* Sets the current error to BlockingIOError */
622+
static void
623+
_set_BlockingIOError(char *msg, Py_ssize_t written)
624+
{
625+
PyObject *err;
626+
err = PyObject_CallFunction(PyExc_BlockingIOError, "isn",
627+
errno, msg, written);
628+
if (err)
629+
PyErr_SetObject(PyExc_BlockingIOError, err);
630+
Py_XDECREF(err);
631+
}
632+
621633
/* Returns the address of the `written` member if a BlockingIOError was
622634
raised, NULL otherwise. The error is always re-raised. */
623635
static Py_ssize_t *
@@ -772,7 +784,7 @@ buffered_flush_and_rewind_unlocked(buffered *self)
772784
{
773785
PyObject *res;
774786

775-
res = _bufferedwriter_flush_unlocked(self, 0);
787+
res = _bufferedwriter_flush_unlocked(self);
776788
if (res == NULL)
777789
return NULL;
778790
Py_DECREF(res);
@@ -1188,7 +1200,7 @@ buffered_seek(buffered *self, PyObject *args)
11881200

11891201
/* Fallback: invoke raw seek() method and clear buffer */
11901202
if (self->writable) {
1191-
res = _bufferedwriter_flush_unlocked(self, 0);
1203+
res = _bufferedwriter_flush_unlocked(self);
11921204
if (res == NULL)
11931205
goto end;
11941206
Py_CLEAR(res);
@@ -1807,6 +1819,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
18071819
Py_buffer buf;
18081820
PyObject *memobj, *res;
18091821
Py_ssize_t n;
1822+
int errnum;
18101823
/* NOTE: the buffer needn't be released as its object is NULL. */
18111824
if (PyBuffer_FillInfo(&buf, NULL, start, len, 1, PyBUF_CONTIG_RO) == -1)
18121825
return -1;
@@ -1819,11 +1832,21 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
18191832
raised (see issue #10956).
18201833
*/
18211834
do {
1835+
errno = 0;
18221836
res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_write, memobj, NULL);
1837+
errnum = errno;
18231838
} while (res == NULL && _trap_eintr());
18241839
Py_DECREF(memobj);
18251840
if (res == NULL)
18261841
return -1;
1842+
if (res == Py_None) {
1843+
/* Non-blocking stream would have blocked. Special return code!
1844+
Being paranoid we reset errno in case it is changed by code
1845+
triggered by a decref. errno is used by _set_BlockingIOError(). */
1846+
Py_DECREF(res);
1847+
errno = errnum;
1848+
return -2;
1849+
}
18271850
n = PyNumber_AsSsize_t(res, PyExc_ValueError);
18281851
Py_DECREF(res);
18291852
if (n < 0 || n > len) {
@@ -1840,7 +1863,7 @@ _bufferedwriter_raw_write(buffered *self, char *start, Py_ssize_t len)
18401863
/* `restore_pos` is 1 if we need to restore the raw stream position at
18411864
the end, 0 otherwise. */
18421865
static PyObject *
1843-
_bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
1866+
_bufferedwriter_flush_unlocked(buffered *self)
18441867
{
18451868
Py_ssize_t written = 0;
18461869
Py_off_t n, rewind;
@@ -1862,14 +1885,11 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
18621885
Py_SAFE_DOWNCAST(self->write_end - self->write_pos,
18631886
Py_off_t, Py_ssize_t));
18641887
if (n == -1) {
1865-
Py_ssize_t *w = _buffered_check_blocking_error();
1866-
if (w == NULL)
1867-
goto error;
1868-
self->write_pos += *w;
1869-
self->raw_pos = self->write_pos;
1870-
written += *w;
1871-
*w = written;
1872-
/* Already re-raised */
1888+
goto error;
1889+
}
1890+
else if (n == -2) {
1891+
_set_BlockingIOError("write could not complete without blocking",
1892+
0);
18731893
goto error;
18741894
}
18751895
self->write_pos += n;
@@ -1882,16 +1902,6 @@ _bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
18821902
goto error;
18831903
}
18841904

1885-
if (restore_pos) {
1886-
Py_off_t forward = rewind - written;
1887-
if (forward != 0) {
1888-
n = _buffered_raw_seek(self, forward, 1);
1889-
if (n < 0) {
1890-
goto error;
1891-
}
1892-
self->raw_pos += forward;
1893-
}
1894-
}
18951905
_bufferedwriter_reset_buf(self);
18961906

18971907
end:
@@ -1944,7 +1954,7 @@ bufferedwriter_write(buffered *self, PyObject *args)
19441954
}
19451955

19461956
/* First write the current buffer */
1947-
res = _bufferedwriter_flush_unlocked(self, 0);
1957+
res = _bufferedwriter_flush_unlocked(self);
19481958
if (res == NULL) {
19491959
Py_ssize_t *w = _buffered_check_blocking_error();
19501960
if (w == NULL)
@@ -1967,14 +1977,19 @@ bufferedwriter_write(buffered *self, PyObject *args)
19671977
PyErr_Clear();
19681978
memcpy(self->buffer + self->write_end, buf.buf, buf.len);
19691979
self->write_end += buf.len;
1980+
self->pos += buf.len;
19701981
written = buf.len;
19711982
goto end;
19721983
}
19731984
/* Buffer as much as possible. */
19741985
memcpy(self->buffer + self->write_end, buf.buf, avail);
19751986
self->write_end += avail;
1976-
/* Already re-raised */
1977-
*w = avail;
1987+
self->pos += avail;
1988+
/* XXX Modifying the existing exception e using the pointer w
1989+
will change e.characters_written but not e.args[2].
1990+
Therefore we just replace with a new error. */
1991+
_set_BlockingIOError("write could not complete without blocking",
1992+
avail);
19781993
goto error;
19791994
}
19801995
Py_CLEAR(res);
@@ -1999,20 +2014,19 @@ bufferedwriter_write(buffered *self, PyObject *args)
19992014
Py_ssize_t n = _bufferedwriter_raw_write(
20002015
self, (char *) buf.buf + written, buf.len - written);
20012016
if (n == -1) {
2002-
Py_ssize_t *w = _buffered_check_blocking_error();
2003-
if (w == NULL)
2004-
goto error;
2005-
written += *w;
2006-
remaining -= *w;
2017+
goto error;
2018+
} else if (n == -2) {
2019+
/* Write failed because raw file is non-blocking */
20072020
if (remaining > self->buffer_size) {
20082021
/* Can't buffer everything, still buffer as much as possible */
20092022
memcpy(self->buffer,
20102023
(char *) buf.buf + written, self->buffer_size);
20112024
self->raw_pos = 0;
20122025
ADJUST_POSITION(self, self->buffer_size);
20132026
self->write_end = self->buffer_size;
2014-
*w = written + self->buffer_size;
2015-
/* Already re-raised */
2027+
written += self->buffer_size;
2028+
_set_BlockingIOError("write could not complete without "
2029+
"blocking", written);
20162030
goto error;
20172031
}
20182032
PyErr_Clear();

0 commit comments

Comments
 (0)