Skip to content

Commit c6bf953

Browse files
committed
stream: simplify pipeline
PR-URL: nodejs#31316 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
1 parent 2c54459 commit c6bf953

1 file changed

Lines changed: 8 additions & 32 deletions

File tree

lib/internal/streams/pipeline.js

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,6 @@ function isRequest(stream) {
2828
return stream && stream.setHeader && typeof stream.abort === 'function';
2929
}
3030

31-
function destroyStream(stream, err) {
32-
// request.destroy just do .end - .abort is what we want
33-
if (isRequest(stream)) return stream.abort();
34-
if (isRequest(stream.req)) return stream.req.abort();
35-
if (typeof stream.destroy === 'function') return stream.destroy(err);
36-
if (typeof stream.close === 'function') return stream.close();
37-
}
38-
3931
function destroyer(stream, reading, writing, callback) {
4032
callback = once(callback);
4133

@@ -57,7 +49,11 @@ function destroyer(stream, reading, writing, callback) {
5749
if (destroyed) return;
5850
destroyed = true;
5951

60-
destroyStream(stream, err);
52+
// request.destroy just do .end - .abort is what we want
53+
if (isRequest(stream)) return stream.abort();
54+
if (isRequest(stream.req)) return stream.req.abort();
55+
if (typeof stream.destroy === 'function') return stream.destroy(err);
56+
if (typeof stream.close === 'function') return stream.close();
6157

6258
callback(err || new ERR_STREAM_DESTROYED('pipe'));
6359
};
@@ -101,39 +97,19 @@ function makeAsyncIterable(val) {
10197
return val;
10298
} else if (isReadable(val)) {
10399
// Legacy streams are not Iterable.
104-
return _fromReadable(val);
100+
return fromReadable(val);
105101
} else {
106102
throw new ERR_INVALID_ARG_TYPE(
107103
'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
108104
}
109105
}
110106

111-
async function* _fromReadable(val) {
107+
async function* fromReadable(val) {
112108
if (!createReadableStreamAsyncIterator) {
113109
createReadableStreamAsyncIterator =
114110
require('internal/streams/async_iterator');
115111
}
116-
117-
try {
118-
if (typeof val.read !== 'function') {
119-
// createReadableStreamAsyncIterator does not support
120-
// v1 streams. Convert it into a v2 stream.
121-
122-
if (!PassThrough) {
123-
PassThrough = require('_stream_passthrough');
124-
}
125-
126-
const pt = new PassThrough();
127-
val
128-
.on('error', (err) => pt.destroy(err))
129-
.pipe(pt);
130-
yield* createReadableStreamAsyncIterator(pt);
131-
} else {
132-
yield* createReadableStreamAsyncIterator(val);
133-
}
134-
} finally {
135-
destroyStream(val);
136-
}
112+
yield* createReadableStreamAsyncIterator(val);
137113
}
138114

139115
async function pump(iterable, writable, finish) {

0 commit comments

Comments
 (0)