Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
stream: propagate errors from src streams in async iterator
Fixes: #28194
  • Loading branch information
marcosc90 committed Dec 9, 2019
commit 82b60df7e1c5dc02d1fdc90c965a7352ea3b5ff9
21 changes: 20 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ function ReadableState(options, stream, isDuplex) {
this.buffer = new BufferList();
this.length = 0;
this.pipes = [];
this.pipeSources = [];
this.flowing = null;
this.ended = false;
this.endEmitted = false;
Expand Down Expand Up @@ -698,6 +699,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
}
}

if (dest._readableState)
dest._readableState.pipeSources.push(this);

state.pipes.push(dest);
debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);

Expand Down Expand Up @@ -859,6 +863,16 @@ function pipeOnDrain(src, dest) {
};
}

function unpipeSources(src, dest) {
const destState = dest._readableState;
if (!destState)
return;

const pipeSourcesIndex = destState.pipeSources.indexOf(src);
if (pipeSourcesIndex !== -1)
destState.pipeSources.splice(pipeSourcesIndex, 1);
}


Readable.prototype.unpipe = function(dest) {
const state = this._readableState;
Expand All @@ -874,11 +888,14 @@ Readable.prototype.unpipe = function(dest) {
state.pipes = [];
state.flowing = false;

for (var i = 0; i < dests.length; i++)
for (let i = 0; i < dests.length; i++) {
unpipeSources(this, dests[i]);
dests[i].emit('unpipe', this, { hasUnpiped: false });
}
return this;
}


// Try to find the right one.
const index = state.pipes.indexOf(dest);
if (index === -1)
Expand All @@ -888,6 +905,8 @@ Readable.prototype.unpipe = function(dest) {
if (state.pipes.length === 0)
state.flowing = false;

unpipeSources(this, dest);

dest.emit('unpipe', this, unpipeInfo);

return this;
Expand Down
14 changes: 14 additions & 0 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ function wrapForNext(lastPromise, iter) {
};
}

function handlePipelineError(sources, current) {

if (!sources) return;

for (const stream of sources) {
const listener = (err) => current.emit('error', err);
stream.on('error', listener);
stream.on('unpipe', () => stream.off('error', listener));
handlePipelineError(stream._readableState.pipeSources, current);
}
}

const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

Expand Down Expand Up @@ -169,6 +181,8 @@ const createReadableStreamAsyncIterator = (stream) => {
});
iterator[kLastPromise] = null;

handlePipelineError(stream._readableState.pipeSources, stream);

finished(stream, { writable: false }, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
const reject = iterator[kLastReject];
Expand Down