Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
stream: always invoke callback before emitting error
Ensure the callback is always invoked before emitting
the error in both sync and async case.
  • Loading branch information
ronag committed Sep 20, 2019
commit 9f4a28fa6a54f3b124c975d575feffb7ff7769d0
3 changes: 2 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,8 @@ The `writable.write()` method writes some data to the stream, and calls the
supplied `callback` once the data has been fully handled. If an error
occurs, the `callback` *may or may not* be called with the error as its
first argument. To reliably detect write errors, add a listener for the
`'error'` event.
`'error'` event. If `callback` is called with an error, it will be called
before the `'error'` event is emitted.

The return value is `true` if the internal buffer is less than the
`highWaterMark` configured when the stream was created after admitting `chunk`.
Expand Down
48 changes: 33 additions & 15 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const {
ERR_UNKNOWN_ENCODING
} = require('internal/errors').codes;

const { errorOrDestroy } = destroyImpl;
const { errorOrDestroy, errorMaybe } = destroyImpl;

Object.setPrototypeOf(Writable.prototype, Stream.prototype);
Object.setPrototypeOf(Writable, Stream);
Expand Down Expand Up @@ -155,6 +155,11 @@ function WritableState(options, stream, isDuplex) {
// Should .destroy() be called after 'finish' (and potentially 'end')
this.autoDestroy = !!options.autoDestroy;

// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;

// Count buffered requests
this.bufferedRequestCount = 0;

Expand Down Expand Up @@ -394,7 +399,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!ret)
state.needDrain = true;

if (state.writing || state.corked) {
if (state.writing || state.corked || state.errored) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
Expand All @@ -413,7 +418,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
doWrite(stream, state, false, len, chunk, encoding, cb);
}

return ret;
// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
Copy link
Copy Markdown
Member Author

@ronag ronag Aug 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See updated tests where "incorrectly" a synchronous error was assumed to break such loops.

}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
Expand All @@ -431,18 +438,19 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
}

function onwriteError(stream, state, sync, er, cb) {
--state.pendingcb;

if (sync) {
// Defer the callback if we are being called synchronously
// to avoid piling up things on the stack
process.nextTick(cb, er);
process.nextTick(onwriteErrorNT, stream, state, er, cb);
} else {
// The caller expect this to happen before if
// it is async
cb(er);
onwriteErrorNT(stream, state, er, cb);
}
errorOrDestroy(stream, er);
}

function onwriteErrorNT(stream, state, er, cb) {
--state.pendingcb;

cb(er);
// This can emit error, but error must always follow cb.
errorMaybe(stream, er);
}

function onwrite(stream, er) {
Expand All @@ -458,9 +466,19 @@ function onwrite(stream, er) {
state.length -= state.writelen;
state.writelen = 0;

if (er)
onwriteError(stream, state, sync, er, cb);
else {
if (er) {
state.errored = true;
Copy link
Copy Markdown
Member Author

@ronag ronag Aug 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use errorEmitted since it must be set after cb and cb must be invoked asynchronously. Thus without this we have no way to synchronously tell if a stream has errored.

Copy link
Copy Markdown
Member Author

@ronag ronag Aug 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use destroyed since it will not be set on !autoDestroy.

if (state.autoDestroy) {
stream.destroy(er, (err) => {
// TODO(ronag): Minor optimization opportunities:
// - We might no longer be sync here.
// - Closure allocation can probably be optimized away.
onwriteError(stream, state, sync, err, cb);
});
} else {
onwriteError(stream, state, sync, er, cb);
}
} else {
// Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed;

Expand Down
11 changes: 10 additions & 1 deletion lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ function destroy(err, cb) {
this._destroy(err || null, (err) => {
const emitClose = (w && w.emitClose) || (r && r.emitClose);
if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
cb(err);
Copy link
Copy Markdown
Member Author

@ronag ronag Aug 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When passing explicit cb we have to emit 'error' etc. in cb. However, unless cb is invoked first it's not possible to schedule an 'error' before 'close'.

if (emitClose) {
process.nextTick(emitCloseNT, this);
}
cb(err);
} else if (needError(this, err)) {
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
} else if (emitClose) {
Expand Down Expand Up @@ -91,6 +93,7 @@ function undestroy() {

if (w) {
w.destroyed = false;
w.errored = false;
w.ended = false;
w.ending = false;
w.finalCalled = false;
Expand All @@ -116,9 +119,15 @@ function errorOrDestroy(stream, err) {
stream.emit('error', err);
}

function errorMaybe(stream, err) {
if (needError(stream, err)) {
stream.emit('error', err);
}
}

module.exports = {
destroy,
undestroy,
errorMaybe,
errorOrDestroy
};
5 changes: 4 additions & 1 deletion test/parallel/test-http2-reset-flood.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => {
h2header.writeIntBE(1, 0, 3); // Length: 1
h2header.writeIntBE(i, 5, 4); // Stream ID
// 0x88 = :status: 200
conn.write(Buffer.concat([h2header, Buffer.from([0x88])]));
if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) {
process.nextTick(writeRequests);
break;
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test assumed a synchronous error to break the loop. Which is no longer the case.

}
}

Expand Down
58 changes: 58 additions & 0 deletions test/parallel/test-stream-writable-write-cb-error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
'use strict';
const common = require('../common');
const { Writable } = require('stream');
const assert = require('assert');

// Ensure callback is always invoked before
// error is emitted. Regardless if error was
// sync or async.

{
let callbackCalled = false;
// Sync Error
const writable = new Writable({
write: common.mustCall((buf, enc, cb) => {
cb(new Error());
})
});
writable.on('error', common.mustCall(() => {
assert.strictEqual(callbackCalled, true);
}));
writable.write('hi', common.mustCall(() => {
callbackCalled = true;
}));
}

{
let callbackCalled = false;
// Async Error
const writable = new Writable({
write: common.mustCall((buf, enc, cb) => {
process.nextTick(cb, new Error());
})
});
writable.on('error', common.mustCall(() => {
assert.strictEqual(callbackCalled, true);
}));
writable.write('hi', common.mustCall(() => {
callbackCalled = true;
}));
}

{
// Sync Error
const writable = new Writable({
write: common.mustCall((buf, enc, cb) => {
cb(new Error());
})
});

writable.on('error', common.mustCall());

let cnt = 0;
// Ensure we don't live lock on sync error
while (writable.write('a'))
cnt++;

assert.strictEqual(cnt, 0);
}
2 changes: 1 addition & 1 deletion test/parallel/test-stream-writable-write-writev-finish.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ const stream = require('stream');

const ws = new stream.Writable();

ws.on('finish', common.mustNotCall());
ws.on('finish', common.mustCall());
Copy link
Copy Markdown
Member Author

@ronag ronag Aug 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if this is weird (there is a separate issue for 'finish' after 'error')... all the other tests assume exactly this. Only reason this was different was because of the sync/async difference. Was there a purpose here?

ws.on('error', common.mustCall());

ws._write = (chunk, encoding, done) => {
Expand Down
6 changes: 5 additions & 1 deletion test/parallel/test-wrap-js-stream-exceptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ const socket = new JSStreamWrap(new Duplex({
})
}));

assert.throws(() => socket.end('foo'), /Error: write EPROTO/);
socket.end('foo');
socket.on('error', common.expectsError({
type: Error,
message: 'write EPROTO'
}));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test assumed a synchronous error which is no longer the case.

14 changes: 6 additions & 8 deletions test/parallel/test-zlib-write-after-close.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ const zlib = require('zlib');
zlib.gzip('hello', common.mustCall(function(err, out) {
const unzip = zlib.createGunzip();
unzip.close(common.mustCall());
common.expectsError(
() => unzip.write(out),
{
code: 'ERR_STREAM_DESTROYED',
type: Error,
message: 'Cannot call write after a stream was destroyed'
}
);

unzip.write(out);
unzip.on('error', common.expectsError({
code: 'ERR_STREAM_DESTROYED',
type: Error
}));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test assumed a synchronous error which is no longer the case.

}));