Skip to content

Commit 89b2368

Browse files
committed
stream: do not swallow errors with async iterators and pipeline
Before this patch, pipeline() could swallow errors by pre-emptively producing a ERR_STREAM_PREMATURE_CLOSE that was not really helpful to the user.
1 parent 7cafd5f commit 89b2368

File tree

2 files changed

+42
-3
lines changed

2 files changed

+42
-3
lines changed

lib/internal/streams/pipeline.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
const {
77
ArrayIsArray,
88
SymbolAsyncIterator,
9-
SymbolIterator
9+
SymbolIterator,
10+
Symbol
1011
} = primordials;
1112

1213
let eos;
@@ -21,6 +22,8 @@ const {
2122
ERR_STREAM_DESTROYED
2223
} = require('internal/errors').codes;
2324

25+
const kSkipPrematureClose = Symbol('kSkipPrematureClose');
26+
2427
let EE;
2528
let PassThrough;
2629
let createReadableStreamAsyncIterator;
@@ -159,9 +162,17 @@ function pipeline(...streams) {
159162
}
160163

161164
function wrap(stream, reading, writing, final) {
162-
destroys.push(destroyer(stream, reading, writing, (err) => {
165+
const fn = destroyer(stream, reading, writing, (err) => {
166+
167+
// We need to skip the premature close error here to avoid swallowing
168+
// any errors in the downstream async iterator.
169+
if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
170+
stream[kSkipPrematureClose]) {
171+
err = undefined;
172+
}
163173
finish(err, final);
164-
}));
174+
});
175+
destroys.push(fn);
165176
}
166177

167178
let ret;
@@ -190,6 +201,7 @@ function pipeline(...streams) {
190201
}
191202
} else if (typeof stream === 'function') {
192203
ret = makeAsyncIterable(ret);
204+
ret[kSkipPrematureClose] = true;
193205
ret = stream(ret);
194206

195207
if (reading) {

test/parallel/test-stream-pipeline.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,3 +938,30 @@ const { promisify } = require('util');
938938
r.push(null);
939939
r.emit('close');
940940
}
941+
942+
{
943+
let res = '';
944+
const rs = new Readable({
945+
read() {
946+
setImmediate(() => {
947+
rs.push('hello');
948+
});
949+
}
950+
});
951+
const ws = new Writable({
952+
write: common.mustNotCall()
953+
});
954+
pipeline(rs, async function*(stream) {
955+
/* eslint no-unused-vars: off */
956+
for await (const chunk of stream) {
957+
throw new Error('kaboom');
958+
}
959+
}, async function *(source) {
960+
for await (const chunk of source) {
961+
res += chunk;
962+
}
963+
}, ws, common.mustCall((err) => {
964+
assert.strictEqual(err.message, 'kaboom');
965+
assert.strictEqual(res, '');
966+
}));
967+
}

0 commit comments

Comments
 (0)