Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -985,9 +985,13 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
if (ondrain && state.awaitDrainWriters &&
(!dest._writableState || dest._writableState.needDrain))
// `pipeOnDrain` only removes this destination from the awaiting-drain set
// and resumes the source once nothing else is awaiting a drain, so it is
// safe to call unconditionally and no-ops when this writer wasn't awaiting
// a drain. Guarding on `needDrain` used to skip this for a destination that
// errored synchronously, leaving it stuck in the set and starving other
// destinations piped from the same source (see #53185).
if (ondrain && state.awaitDrainWriters)
ondrain();
}

Expand Down
48 changes: 48 additions & 0 deletions test/parallel/test-stream-pipe-multiple-destinations-error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';
const common = require('../common');

// Regression test for https://github.com/nodejs/node/issues/53185
// When a source is piped to multiple destinations and one destination errors
// synchronously in `_write`, the source must keep flowing to the remaining
// healthy destination(s) instead of stalling forever.

const assert = require('assert');
const { PassThrough, Writable, finished } = require('stream');

const source = new PassThrough({ highWaterMark: 16 * 1024 });

// A destination that errors synchronously on its first write.
const failing = new Writable({
highWaterMark: 16 * 1024,
write(chunk, encoding, callback) {
callback(new Error('boom'));
},
});
failing.on('error', common.mustCall());

// A healthy destination that counts everything it receives.
let received = 0;
const healthy = new Writable({
highWaterMark: 16 * 1024,
write(chunk, encoding, callback) {
received += chunk.length;
callback();
},
});

source.pipe(failing);
source.pipe(healthy);

// Two chunks each larger than the highWaterMark so that `write()` returns
// false and the source is paused awaiting a drain.
const chunk = Buffer.alloc(20000, 'a');
source.write(chunk);
source.write(chunk);
source.end();

finished(healthy, common.mustSucceed(() => {
// Without the fix, the errored destination stays in the source's
// awaitDrainWriters set, the source never resumes, and `healthy` only
// receives the first chunk.
assert.strictEqual(received, chunk.length * 2);
}));