diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 57aa71817e4fba..9c2a15ecabe216 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -111,6 +111,7 @@ const kErroredValue = Symbol('kErroredValue'); const kDefaultEncodingValue = Symbol('kDefaultEncodingValue'); const kDecoderValue = Symbol('kDecoderValue'); const kEncodingValue = Symbol('kEncodingValue'); +const kPipeData = Symbol('kPipeData'); const kEnded = 1 << 9; const kEndEmitted = 1 << 10; @@ -131,6 +132,7 @@ const kFlowing = 1 << 24; const kHasPaused = 1 << 25; const kPaused = 1 << 26; const kDataListening = 1 << 27; +const kHasPipeData = 1 << 28; // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) { @@ -565,7 +567,12 @@ function addChunk(stream, state, chunk, addToFront) { } state[kState] |= kDataEmitted; - stream.emit('data', chunk); + if ((state[kState] & kHasPipeData) !== 0 && + stream._events.data === state[kPipeData]) { + state[kPipeData](chunk); + } else { + stream.emit('data', chunk); + } } else { // Update the buffer info. state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; @@ -791,7 +798,12 @@ Readable.prototype.read = function(n) { if (ret !== null && (state[kState] & (kErrorEmitted | kCloseEmitted)) === 0) { state[kState] |= kDataEmitted; - this.emit('data', ret); + if ((state[kState] & kHasPipeData) !== 0 && + this._events.data === state[kPipeData]) { + state[kPipeData](ret); + } else { + this.emit('data', ret); + } } return ret; @@ -976,6 +988,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.removeListener('end', onend); src.removeListener('end', unpipe); src.removeListener('data', ondata); + if (state[kPipeData] === ondata) + state[kState] &= ~kHasPipeData; cleanedUp = true; @@ -1016,6 +1030,10 @@ Readable.prototype.pipe = function(dest, pipeOpts) { } src.on('data', ondata); + if (src._events.data === ondata) { + state[kPipeData] = ondata; + state[kState] |= kHasPipeData; + } function ondata(chunk) { debug('ondata'); try { diff --git a/test/parallel/test-stream2-basic.js b/test/parallel/test-stream2-basic.js index f51009241da56e..a1dc9058b00e72 100644 --- a/test/parallel/test-stream2-basic.js +++ b/test/parallel/test-stream2-basic.js @@ -146,6 +146,75 @@ class TestWriter extends EE { r.pipe(w); } +{ + // Verify data listener ordering around pipe. + function makeWritable(events) { + return new W({ + write(chunk, enc, cb) { + events.push('write'); + cb(); + }, + }); + } + + { + const events = []; + const r = R.from(['x']); + const w = makeWritable(events); + + r.on('data', () => events.push('data')); + r.pipe(w); + w.on('finish', common.mustCall(() => { + assert.deepStrictEqual(events, ['data', 'write']); + })); + } + + { + const events = []; + const r = R.from(['x']); + const w = makeWritable(events); + + r.pipe(w); + r.on('data', () => events.push('data')); + w.on('finish', common.mustCall(() => { + assert.deepStrictEqual(events, ['write', 'data']); + })); + } + + { + const events = []; + const r = R.from(['x']); + const w = makeWritable(events); + + r.pipe(w); + r.prependListener('data', () => events.push('data')); + w.on('finish', common.mustCall(() => { + assert.deepStrictEqual(events, ['data', 'write']); + })); + } + + { + const events = []; + const r = R.from(['a', 'b']); + const w = new W({ + write: common.mustCall((chunk, enc, cb) => { + events.push(`write:${chunk}`); + if (String(chunk) === 'a') { + r.on('data', common.mustCall((chunk) => { + events.push(`data:${chunk}`); + })); + } + cb(); + }, 2), + }); + + r.pipe(w); + w.on('finish', common.mustCall(() => { + assert.deepStrictEqual(events, ['write:a', 'write:b', 'data:b']); + })); + } +} + [1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) { // Verify unpipe