Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 60 additions & 3 deletions benchmark/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,35 @@ const {
} = require('node:stream');

const bench = common.createBenchmark(main, {
n: [1e3],
type: ['creation', 'throughput'],
n: [1, 1e3],
streams: [100],
chunks: [1e4],
}, {
combinationFilter({ type, n }) {
return type === 'creation' ? n === 1e3 : n === 1;
},
test: {
n: [1, 1e3],
type: ['creation', 'throughput'],
},
});

function main({ n }) {
function main({ type, n, streams, chunks }) {
switch (type) {
case 'creation':
return benchCreation(n, streams);
case 'throughput':
return benchThroughput(n, streams, chunks);
}
}

function benchCreation(n, numberOfPassThroughs) {
const cachedPassThroughs = [];
const cachedReadables = [];
const cachedWritables = [];

for (let i = 0; i < n; i++) {
const numberOfPassThroughs = 100;
const passThroughs = [];

for (let i = 0; i < numberOfPassThroughs; i++) {
Expand All @@ -40,3 +59,41 @@ function main({ n }) {
}
bench.end(n);
}

function benchThroughput(n, numberOfPassThroughs, chunks) {
const chunk = Buffer.alloc(1024);

let i = 0;
bench.start();

function run() {
if (i++ === n) {
bench.end(n * chunks);
return;
}

const passThroughs = [];
for (let i = 0; i < numberOfPassThroughs; i++) {
passThroughs.push(new PassThrough());
}

let remaining = chunks;
const composed = compose(...passThroughs);
composed.on('data', () => {});
composed.on('end', run);

write();

function write() {
while (remaining-- > 0) {
if (!composed.write(chunk)) {
composed.once('drain', write);
return;
}
}
composed.end();
}
}

run();
}
28 changes: 7 additions & 21 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ module.exports = function compose(...streams) {

let ondrain;
let onfinish;
let onreadable;
let onclose;
let d;

Expand Down Expand Up @@ -184,31 +183,19 @@ module.exports = function compose(...streams) {

if (readable) {
if (isNodeStream(tail)) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
d._read = function() {
tail.resume();
};

tail.on('data', function(chunk) {
if (!d.push(chunk)) {
tail.pause();
}
});

tail.on('end', function() {
d.push(null);
});

d._read = function() {
while (true) {
const buf = tail.read();
if (buf === null) {
onreadable = d._read;
return;
}

if (!d.push(buf)) {
return;
}
}
};
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
const reader = readable.getReader();
Expand Down Expand Up @@ -238,7 +225,6 @@ module.exports = function compose(...streams) {
err = new AbortError();
}

onreadable = null;
ondrain = null;
onfinish = null;

Expand Down
Loading