Skip to content

Commit 5a8110f

Browse files
committed
fixup
1 parent cf96032 commit 5a8110f

5 files changed

Lines changed: 15 additions & 13 deletions

File tree

lib/internal/streams/compose.js

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ const {
88
isReadable,
99
isWritable,
1010
} = require('internal/streams/utils');
11-
const duplexify = require('internal/streams/duplexify');
1211
const {
1312
AbortError,
1413
codes: {
@@ -23,18 +22,18 @@ module.exports = function compose(...streams) {
2322
}
2423

2524
if (streams.length === 1) {
26-
return duplexify(streams[0], 'streams[0]');
25+
return Duplex.from(streams[0]);
2726
}
2827

2928
const orgStreams = [...streams];
3029

3130
if (typeof streams[0] === 'function') {
32-
streams[0] = duplexify(streams[0], 'streams[0]');
31+
streams[0] = Duplex.from(streams[0]);
3332
}
3433

3534
if (typeof streams[streams.length - 1] === 'function') {
3635
const idx = streams.length - 1;
37-
streams[idx] = duplexify(streams[idx], `streams[${idx}]`);
36+
streams[idx] = Duplex.from(streams[idx]);
3837
}
3938

4039
for (let n = 0; n < streams.length; ++n) {
@@ -87,7 +86,7 @@ module.exports = function compose(...streams) {
8786
// Implement Writable/Readable/Duplex traits.
8887
// See, https://github.com/nodejs/node/pull/33515.
8988
d = new Duplex({
90-
highWaterMark: 1,
89+
// TODO (ronag): highWaterMark?
9190
writableObjectMode: !!head?.writableObjectMode,
9291
readableObjectMode: !!tail?.writableObjectMode,
9392
writable,

lib/internal/streams/duplexify.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ function _duplexify(pair) {
197197
onfinished(err);
198198
});
199199

200+
// TODO(ronag): Avoid double buffering.
201+
// Implement Writable/Readable/Duplex traits.
202+
// See, https://github.com/nodejs/node/pull/33515.
200203
d = new Duplex({
201204
// TODO (ronag): highWaterMark?
202205
readableObjectMode: !!r?.readableObjectMode,

lib/internal/streams/pipeline.js

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ const {
1010
} = primordials;
1111

1212
const eos = require('internal/streams/end-of-stream');
13-
const duplexify = require('internal/streams/duplexify');
14-
1513
const { once } = require('internal/util');
1614
const destroyImpl = require('internal/streams/destroy');
15+
const Duplex = require('internal/streams/duplex');
1716
const {
1817
aggregateTwoErrors,
1918
codes: {
@@ -220,7 +219,7 @@ function pipeline(...streams) {
220219
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
221220
ret = stream;
222221
} else {
223-
ret = duplexify(stream, 'source');
222+
ret = Duplex.from(stream);
224223
}
225224
} else if (typeof stream === 'function') {
226225
ret = makeAsyncIterable(ret);
@@ -288,8 +287,7 @@ function pipeline(...streams) {
288287
}
289288
ret = stream;
290289
} else {
291-
const name = reading ? `transform[${i - 1}]` : 'destination';
292-
ret = duplexify(stream, name);
290+
ret = Duplex.from(stream);
293291
}
294292
}
295293

lib/stream.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ const compose = require('internal/streams/compose');
3434
const { destroyer } = require('internal/streams/destroy');
3535
const eos = require('internal/streams/end-of-stream');
3636
const internalBuffer = require('internal/buffer');
37-
const duplexify = require('internal/streams/duplexify');
37+
38+
let duplexify;
3839

3940
const promises = require('stream/promises');
4041

@@ -43,6 +44,9 @@ Stream.Readable = require('internal/streams/readable');
4344
Stream.Writable = require('internal/streams/writable');
4445
Stream.Duplex = require('internal/streams/duplex');
4546
Stream.Duplex.from = function from(body) {
47+
if (!duplexify) {
48+
duplexify = require('internal/streams/duplexify');
49+
}
4650
return duplexify(body, 'body');
4751
};
4852
Stream.Transform = require('internal/streams/transform');

test/parallel/test-bootstrap-modules.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,8 @@ const expectedModules = new Set([
101101
'NativeModule internal/stream_base_commons',
102102
'NativeModule internal/streams/add-abort-signal',
103103
'NativeModule internal/streams/buffer_list',
104-
'NativeModule internal/streams/compose',
105104
'NativeModule internal/streams/destroy',
106105
'NativeModule internal/streams/duplex',
107-
'NativeModule internal/streams/duplexify',
108106
'NativeModule internal/streams/end-of-stream',
109107
'NativeModule internal/streams/from',
110108
'NativeModule internal/streams/legacy',

0 commit comments

Comments
 (0)