Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
stream: fix y.pipe(x)+y.pipe(x)+y.unpipe(x)
Fix the uncommon situation when a readable stream is piped twice into
the same destination stream, and then unpiped once.

Previously, the `unpipe` event handlers weren’t able to tell whether
they were corresponding to the “right” conceptual pipe that was being
removed; this fixes this by adding a counter to the `unpipe` event
handler and only removing a single piping destination at most.

Fixes: #12718
  • Loading branch information
addaleax committed May 2, 2017
commit 86b9243dea950c8586f9c91387bb81bfdef204bf
14 changes: 9 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,13 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
src.once('end', endFn);

dest.on('unpipe', onunpipe);
function onunpipe(readable) {
function onunpipe(readable, unpipeInfo) {
debug('onunpipe');
if (readable === src) {
cleanup();
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
cleanup();
}
}
}

Expand Down Expand Up @@ -647,6 +650,7 @@ function pipeOnDrain(src) {

Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
var unpipeInfo = { hasUnpiped: false };

// if we're not piping anywhere, then do nothing.
if (state.pipesCount === 0)
Expand All @@ -666,7 +670,7 @@ Readable.prototype.unpipe = function(dest) {
state.pipesCount = 0;
state.flowing = false;
if (dest)
dest.emit('unpipe', this);
dest.emit('unpipe', this, unpipeInfo);
return this;
}

Expand All @@ -681,7 +685,7 @@ Readable.prototype.unpipe = function(dest) {
state.flowing = false;

for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this);
dests[i].emit('unpipe', this, unpipeInfo);
return this;
}

Expand All @@ -695,7 +699,7 @@ Readable.prototype.unpipe = function(dest) {
if (state.pipesCount === 1)
state.pipes = state.pipes[0];

dest.emit('unpipe', this);
dest.emit('unpipe', this, unpipeInfo);

return this;
};
Expand Down
78 changes: 78 additions & 0 deletions test/parallel/test-stream-pipe-same-destination-twice.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
'use strict';
const common = require('../common');

// Regression test for https://github.com/nodejs/node/issues/12718.
// Tests that piping a source stream twice to the same destination stream
// works, and that a subsequent unpipe() call only removes the pipe *once*.
const assert = require('assert');
const { PassThrough, Writable } = require('stream');

{
const passThrough = new PassThrough();
const dest = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(`${chunk}`, 'foobar');
cb();
})
});

passThrough.pipe(dest);
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

passThrough.unpipe(dest);

assert.strictEqual(passThrough._events.data.length, 1);
assert.strictEqual(passThrough._readableState.pipesCount, 1);
assert.strictEqual(passThrough._readableState.pipes, dest);

passThrough.write('foobar');
passThrough.pipe(dest);
}

{
const passThrough = new PassThrough();
const dest = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(`${chunk}`, 'foobar');
cb();
}, 2)
});

passThrough.pipe(dest);
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

passThrough.write('foobar');
}

{
const passThrough = new PassThrough();
const dest = new Writable({
write: common.mustNotCall()
});

passThrough.pipe(dest);
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

passThrough.unpipe(dest);
passThrough.unpipe(dest);

assert.strictEqual(passThrough._events.data, undefined);
assert.strictEqual(passThrough._readableState.pipesCount, 0);

passThrough.write('foobar');
}