Skip to content

Commit cb6672d

Browse files
committed
stream: add reduce
1 parent 7752eed commit cb6672d

File tree

3 files changed

+206
-3
lines changed

3 files changed

+206
-3
lines changed

doc/api/stream.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2142,6 +2142,49 @@ const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
21422142
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
21432143
```
21442144

2145+
### `readable.reduce(fn[, initial[, options])`
2146+
2147+
<!-- YAML
2148+
added: REPLACEME
2149+
-->
2150+
2151+
> Stability: 1 - Experimental
2152+
2153+
* `fn` {Function|AsyncFunction} a reducer function to call over every chunk
2154+
in the stream.
2155+
* `previous` {any} the value obtained from the last call to `fn` or the
2156+
`initial` value if specified or the first chunk of the stream otherwise.
2157+
* `data` {any} a chunk of data from the stream.
2158+
* `options` {Object}
2159+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2160+
abort the `fn` call early.
2161+
* `initial` {any} the initial value to use in the reduction.
2162+
* `options` {Object}
2163+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2164+
aborted.
2165+
* Returns: {Promise} a promise for the final value of the reduction.
2166+
2167+
This method calls `fn` on each chunk of the stream in order, passing it the
2168+
result from the calculation on the previous element. It returns a promise for
2169+
the final value of the reduction.
2170+
2171+
The reducer function iterates the stream element-by-element which means that
2172+
there is no `concurrency` parameter or parallism. To perform a `reduce`
2173+
concurrently, it can be chained to the [`readable.map`][] method.
2174+
2175+
If no `initial` value is supplied the first chunk of the stream is used as the
2176+
initial value. If the stream is empty, the promise is rejected with a
2177+
`TypeError` with the `ERR_INVALID_ARGS` code property.
2178+
2179+
```mjs
2180+
import { Readable } from 'stream';
2181+
2182+
const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => {
2183+
return previous + data;
2184+
});
2185+
console.log(ten); // 10
2186+
```
2187+
21452188
### Duplex and transform streams
21462189

21472190
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const {
66
codes: {
77
ERR_INVALID_ARG_TYPE,
88
ERR_OUT_OF_RANGE,
9+
ERR_MISSING_ARGS,
910
},
1011
AbortError,
1112
} = require('internal/errors');
@@ -196,8 +197,8 @@ async function every(fn, options) {
196197
'fn', ['Function', 'AsyncFunction'], fn);
197198
}
198199
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
199-
return !(await some.call(this, async (x) => {
200-
return !(await fn(x));
200+
return !(await some.call(this, async (...args) => {
201+
return !(await fn(...args));
201202
}, options));
202203
}
203204

@@ -228,11 +229,57 @@ async function * filter(fn, options) {
228229
yield* this.map(filterFn, options);
229230
}
230231

232+
// Specific to provide better error to reduce since the argument is only
233+
// missing if the stream has no items in it - but the code is still appropriate
234+
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
235+
constructor() {
236+
super('reduce');
237+
this.message = 'Reduce of an empty stream requires an initial value';
238+
}
239+
}
240+
241+
async function reduce(reducer, initialValue, options) {
242+
if (typeof reducer !== 'function') {
243+
throw new ERR_INVALID_ARG_TYPE(
244+
'reducer', ['Function', 'AsyncFunction'], reducer);
245+
}
246+
let hasInitialValue = arguments.length > 1;
247+
if (options?.signal?.aborted) {
248+
const err = new AbortError(undefined, { cause: options.signal.reason });
249+
this.once('error', () => {}); // The error is already propagated
250+
this.destroy(err);
251+
throw err;
252+
}
253+
const ac = new AbortController();
254+
const signal = ac.signal;
255+
let gotAnyItemFromStream = false;
256+
try {
257+
for await (const value of this) {
258+
gotAnyItemFromStream = true;
259+
if (options?.signal?.aborted) {
260+
throw new AbortError();
261+
}
262+
if (!hasInitialValue) {
263+
initialValue = value;
264+
hasInitialValue = true;
265+
} else {
266+
initialValue = await reducer(initialValue, value, { signal });
267+
}
268+
}
269+
if (!gotAnyItemFromStream && !hasInitialValue) {
270+
throw new ReduceAwareErrMissingArgs();
271+
}
272+
} finally {
273+
ac.abort();
274+
}
275+
return initialValue;
276+
}
277+
231278
async function toArray(options) {
232279
const result = [];
233280
for await (const val of this) {
234281
if (options?.signal?.aborted) {
235-
throw new AbortError({ cause: options.signal.reason });
282+
throw new AbortError(undefined, { cause: options.signal.reason });
236283
}
237284
ArrayPrototypePush(result, val);
238285
}
@@ -307,6 +354,7 @@ module.exports.streamReturningOperators = {
307354
module.exports.promiseReturningOperators = {
308355
every,
309356
forEach,
357+
reduce,
310358
toArray,
311359
some,
312360
};
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
9+
function sum(p, c) {
10+
return p + c;
11+
}
12+
13+
{
14+
// Does the same thing as `(await stream.toArray()).reduce(...)`
15+
(async () => {
16+
const tests = [
17+
[[], sum, 0],
18+
[[1], sum, 0],
19+
[[1, 2, 3, 4, 5], sum, 0],
20+
[[...Array(100).keys()], sum, 0],
21+
[['a', 'b', 'c'], sum, ''],
22+
[[1, 2], sum],
23+
[[1, 2, 3], (x, y) => y],
24+
];
25+
for (const [values, fn, initial] of tests) {
26+
const streamReduce = await Readable.from(values)
27+
.reduce(fn, initial);
28+
const arrayReduce = values.reduce(fn, initial);
29+
assert.deepStrictEqual(streamReduce, arrayReduce);
30+
}
31+
// Does the same thing as `(await stream.toArray()).reduce(...)` with an
32+
// asynchronous reducer
33+
for (const [values, fn, initial] of tests) {
34+
const streamReduce = await Readable.from(values)
35+
.map(async (x) => x)
36+
.reduce(fn, initial);
37+
const arrayReduce = values.reduce(fn, initial);
38+
assert.deepStrictEqual(streamReduce, arrayReduce);
39+
}
40+
})().then(common.mustCall());
41+
}
42+
{
43+
// Works with an async reducer, with or without initial value
44+
(async () => {
45+
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0);
46+
assert.strictEqual(six, 6);
47+
})().then(common.mustCall());
48+
(async () => {
49+
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c);
50+
assert.strictEqual(six, 6);
51+
})().then(common.mustCall());
52+
}
53+
{
54+
// Works lazily
55+
assert.rejects(Readable.from([1, 2, 3, 4, 5, 6])
56+
.map(common.mustCall((x) => {
57+
return x;
58+
}, 3)) // Two consumed and one buffered by `map` due to default concurrency
59+
.reduce(async (p, c) => {
60+
if (p === 1) {
61+
throw new Error('boom');
62+
}
63+
return c;
64+
}, 0)
65+
, /boom/).then(common.mustCall());
66+
}
67+
68+
{
69+
// Support for AbortSignal
70+
const ac = new AbortController();
71+
assert.rejects(async () => {
72+
await Readable.from([1, 2, 3]).reduce(async (p, c) => {
73+
if (c === 3) {
74+
await new Promise(() => {}); // Explicitly do not pass signal here
75+
}
76+
return Promise.resolve();
77+
}, 0, { signal: ac.signal });
78+
}, {
79+
name: 'AbortError',
80+
}).then(common.mustCall());
81+
ac.abort();
82+
}
83+
84+
85+
{
86+
// Support for AbortSignal - pre aborted
87+
const stream = Readable.from([1, 2, 3]);
88+
assert.rejects(async () => {
89+
await stream.reduce(async (p, c) => {
90+
if (c === 3) {
91+
await new Promise(() => {}); // Explicitly do not pass signal here
92+
}
93+
return Promise.resolve();
94+
}, 0, { signal: AbortSignal.abort() });
95+
}, {
96+
name: 'AbortError',
97+
}).then(common.mustCall(() => {
98+
assert.strictEqual(stream.destroyed, true);
99+
}));
100+
}
101+
102+
{
103+
// Error cases
104+
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
105+
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
106+
}
107+
108+
{
109+
// Test result is a Promise
110+
const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0);
111+
assert.ok(result instanceof Promise);
112+
}

0 commit comments

Comments
 (0)