Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
stream: add stream.compose
Refs: #32020

PR-URL: #39029
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Michaël Zasso <targos@protonmail.com>
Backport-PR-URL: #39563
  • Loading branch information
ronag committed Aug 23, 2021
commit 7663895a7b4b5548bca08ab613f842abc662402a
89 changes: 89 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1918,6 +1918,95 @@ run().catch(console.error);
after the `callback` has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors.

### `stream.compose(...streams)`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - `stream.compose` is experimental.

* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
* Returns: {stream.Duplex}

Combines two or more streams into a `Duplex` stream that writes to the
first stream and reads from the last. Each provided stream is piped into
the next, using `stream.pipeline`. If any of the streams error then all
are destroyed, including the outer `Duplex` stream.

Because `stream.compose` returns a new stream that in turn can (and
should) be piped into other streams, it enables composition. In contrast,
when passing streams to `stream.pipeline`, typically the first stream is
a readable stream and the last a writable stream, forming a closed
circuit.

If passed a `Function` it must be a factory method taking a `source`
`Iterable`.

```mjs
import { compose, Transform } from 'stream';

const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
}
});

async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}

console.log(res); // prints 'HELLOWORLD'
```

`stream.compose` can be used to convert async iterables, generators and
functions into streams.

* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
`null`.
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
Must take a source `AsyncIterable` as first parameter. Cannot yield
`null`.
* `AsyncFunction` converts into a writable `Duplex`. Must return
either `null` or `undefined`.

```mjs
import { compose } from 'stream';
import { finished } from 'stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD'
```

### `stream.Readable.from(iterable, [options])`
<!-- YAML
added:
Expand Down
208 changes: 208 additions & 0 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
'use strict';

const pipeline = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');
const {
isNodeStream,
} = require('internal/streams/utils');
const {
AbortError,
codes: {
ERR_INVALID_ARG_VALUE,
ERR_MISSING_ARGS,
},
} = require('internal/errors');

function isReadable(stream) {
const r = isReadableNodeStream(stream);
if (r === null || typeof stream.readable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.readable && !isReadableFinished(stream);
}

function isWritable(stream) {
const r = isWritableNodeStream(stream);
if (r === null || typeof stream.writable !== 'boolean') return null;
if (isDestroyed(stream)) return false;
return r && stream.writable && !isWritableEnded(stream);
}

// This is needed for pre node 17.
class ComposeDuplex extends Duplex {
constructor(options) {
super(options);

// https://github.com/nodejs/node/pull/34385

if (options?.readable === false) {
this._readableState.readable = false;
this._readableState.ended = true;
this._readableState.endEmitted = true;
}

if (options?.writable === false) {
this._writableState.writable = false;
this._writableState.ending = true;
this._writableState.ended = true;
this._writableState.finished = true;
}
}
}

module.exports = function compose(...streams) {
if (streams.length === 0) {
throw new ERR_MISSING_ARGS('streams');
}

if (streams.length === 1) {
return Duplex.from(streams[0]);
}

const orgStreams = [...streams];

if (typeof streams[0] === 'function') {
streams[0] = Duplex.from(streams[0]);
}

if (typeof streams[streams.length - 1] === 'function') {
const idx = streams.length - 1;
streams[idx] = Duplex.from(streams[idx]);
}

for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue;
}
if (n < streams.length - 1 && !isReadable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be readable'
);
}
if (n > 0 && !isWritable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be writable'
);
}
}

let ondrain;
let onfinish;
let onreadable;
let onclose;
let d;

function onfinished(err) {
const cb = onclose;
onclose = null;

if (cb) {
cb(err);
} else if (err) {
d.destroy(err);
} else if (!readable && !writable) {
d.destroy();
}
}

const head = streams[0];
const tail = pipeline(streams, onfinished);

const writable = !!isWritable(head);
const readable = !!isReadable(tail);

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new ComposeDuplex({
// TODO (ronag): highWaterMark?
writableObjectMode: !!head?.writableObjectMode,
readableObjectMode: !!tail?.writableObjectMode,
writable,
readable,
});

if (writable) {
d._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
head.end();
onfinish = callback;
};

head.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});

tail.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}

if (readable) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});

tail.on('end', function() {
d.push(null);
});

d._read = function() {
while (true) {
const buf = tail.read();

if (buf === null) {
onreadable = d._read;
return;
}

if (!d.push(buf)) {
return;
}
}
};
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}

onreadable = null;
ondrain = null;
onfinish = null;

if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
}
};

return d;
};
3 changes: 0 additions & 3 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,6 @@ function pipeline(...streams) {
}
}

// TODO(ronag): Consider returning a Duplex proxy if the first argument
// is a writable. Would improve composability.
// See, https://github.com/nodejs/node/issues/32020
return ret;
}

Expand Down
4 changes: 4 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const {
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const compose = require('internal/streams/compose');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');

Expand All @@ -46,6 +48,8 @@ Stream.pipeline = pipeline;
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Stream.destroy = destroyer;
Stream.compose = compose;

ObjectDefineProperty(Stream, 'promises', {
configurable: true,
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ const expectedModules = new Set([
'NativeModule internal/stream_base_commons',
'NativeModule internal/streams/add-abort-signal',
'NativeModule internal/streams/buffer_list',
'NativeModule internal/streams/compose',
'NativeModule internal/streams/destroy',
'NativeModule internal/streams/duplex',
'NativeModule internal/streams/end-of-stream',
Expand Down
Loading