From 3c2166accb4305044e017bf3b4d672e07e983af4 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Tue, 26 May 2026 23:55:53 -0700 Subject: [PATCH] stream: use data listener for compose forwarding Forward Node.js tail stream data through a pipe-style data listener instead of manually draining readable events. This keeps compose closer to the stream pipe hot path while preserving backpressure with pause and resume. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- benchmark/streams/compose.js | 63 +++++++++++++++++++++++++++++++-- lib/internal/streams/compose.js | 28 ++++----------- 2 files changed, 67 insertions(+), 24 deletions(-) diff --git a/benchmark/streams/compose.js b/benchmark/streams/compose.js index b98596ffbd1411..283ad8b7e30b32 100644 --- a/benchmark/streams/compose.js +++ b/benchmark/streams/compose.js @@ -9,16 +9,35 @@ const { } = require('node:stream'); const bench = common.createBenchmark(main, { - n: [1e3], + type: ['creation', 'throughput'], + n: [1, 1e3], + streams: [100], + chunks: [1e4], +}, { + combinationFilter({ type, n }) { + return type === 'creation' ? n === 1e3 : n === 1; + }, + test: { + n: [1, 1e3], + type: ['creation', 'throughput'], + }, }); -function main({ n }) { +function main({ type, n, streams, chunks }) { + switch (type) { + case 'creation': + return benchCreation(n, streams); + case 'throughput': + return benchThroughput(n, streams, chunks); + } +} + +function benchCreation(n, numberOfPassThroughs) { const cachedPassThroughs = []; const cachedReadables = []; const cachedWritables = []; for (let i = 0; i < n; i++) { - const numberOfPassThroughs = 100; const passThroughs = []; for (let i = 0; i < numberOfPassThroughs; i++) { @@ -40,3 +59,41 @@ function main({ n }) { } bench.end(n); } + +function benchThroughput(n, numberOfPassThroughs, chunks) { + const chunk = Buffer.alloc(1024); + + let i = 0; + bench.start(); + + function run() { + if (i++ === n) { + bench.end(n * chunks); + return; + } + + const passThroughs = []; + for (let i = 0; i < numberOfPassThroughs; i++) { + passThroughs.push(new PassThrough()); + } + + let remaining = chunks; + const composed = compose(...passThroughs); + composed.on('data', () => {}); + composed.on('end', run); + + write(); + + function write() { + while (remaining-- > 0) { + if (!composed.write(chunk)) { + composed.once('drain', write); + return; + } + } + composed.end(); + } + } + + run(); +} diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index d664a430b8d75a..7baa7974ffdf05 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -82,7 +82,6 @@ module.exports = function compose(...streams) { let ondrain; let onfinish; - let onreadable; let onclose; let d; @@ -184,31 +183,19 @@ module.exports = function compose(...streams) { if (readable) { if (isNodeStream(tail)) { - tail.on('readable', function() { - if (onreadable) { - const cb = onreadable; - onreadable = null; - cb(); + d._read = function() { + tail.resume(); + }; + + tail.on('data', function(chunk) { + if (!d.push(chunk)) { + tail.pause(); } }); tail.on('end', function() { d.push(null); }); - - d._read = function() { - while (true) { - const buf = tail.read(); - if (buf === null) { - onreadable = d._read; - return; - } - - if (!d.push(buf)) { - return; - } - } - }; } else if (isWebStream(tail)) { const readable = isTransformStream(tail) ? tail.readable : tail; const reader = readable.getReader(); @@ -238,7 +225,6 @@ module.exports = function compose(...streams) { err = new AbortError(); } - onreadable = null; ondrain = null; onfinish = null;