@@ -1751,7 +1751,7 @@ added: REPLACEME
17511751 * ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
17521752 abort the ` fn ` call early.
17531753* ` options ` {Object}
1754- * ` concurrency ` {number} the maximal concurrent invocation of ` fn ` to call
1754+ * ` concurrency ` {number} the maximum concurrent invocation of ` fn ` to call
17551755 on the stream at once. ** Default:** ` 1 ` .
17561756 * ` signal ` {AbortSignal} allows destroying the stream if the signal is
17571757 aborted.
@@ -1795,7 +1795,7 @@ added: REPLACEME
17951795 * ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
17961796 abort the ` fn ` call early.
17971797* ` options ` {Object}
1798- * ` concurrency ` {number} the maximal concurrent invocation of ` fn ` to call
1798+ * ` concurrency ` {number} the maximum concurrent invocation of ` fn ` to call
17991799 on the stream at once. ** Default:** ` 1 ` .
18001800 * ` signal ` {AbortSignal} allows destroying the stream if the signal is
18011801 aborted.
@@ -1830,6 +1830,65 @@ for await (const result of dnsResults) {
18301830}
18311831```
18321832
1833+ ### ` readable.forEach(fn[, options]) `
1834+
1835+ <!-- YAML
1836+ added: REPLACEME
1837+ -->
1838+
1839+ > Stability: 1 - Experimental
1840+
1841+ * ` fn ` {Function|AsyncFunction} a function to call on each item of the stream.
1842+ * ` data ` {any} a chunk of data from the stream.
1843+ * ` options ` {Object}
1844+ * ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
1845+ abort the ` fn ` call early.
1846+ * ` options ` {Object}
1847+ * ` concurrency ` {number} the maximum concurrent invocation of ` fn ` to call
1848+ on the stream at once. ** Default:** ` 1 ` .
1849+ * ` signal ` {AbortSignal} allows destroying the stream if the signal is
1850+ aborted.
1851+ * Returns: {Promise} a promise for when the stream has finished.
1852+
1853+ This method allows iterating a stream. For each item in the stream the
1854+ ` fn ` function will be called. If the ` fn ` function returns a promise - that
1855+ promise will be ` await ` ed.
1856+
1857+ This method is different from ` for await...of ` loops in that it can optionally
1858+ process items concurrently. In addition, a ` forEach ` iteration can only be
1859+ stopped by having passed a ` signal ` option and aborting the related
1860+ ` AbortController ` while ` for await...of ` can be stopped with ` break ` or
1861+ ` return ` . In either case the stream will be destroyed.
1862+
1863+ This method is different from listening to the [ ` 'data' ` ] [ ] event in that it
1864+ uses the [ ` readable ` ] [ ] event in the underlying machinary and can limit the
1865+ number of concurrent ` fn ` calls.
1866+
1867+ ``` mjs
1868+ import { Readable } from ' stream' ;
1869+ import { Resolver } from ' dns/promises' ;
1870+
1871+ // With a synchronous predicate.
1872+ for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1873+ console .log (item); // 3, 4
1874+ }
1875+ // With an asynchronous predicate, making at most 2 queries at a time.
1876+ const resolver = new Resolver ();
1877+ const dnsResults = await Readable .from ([
1878+ ' nodejs.org' ,
1879+ ' openjsf.org' ,
1880+ ' www.linuxfoundation.org' ,
1881+ ]).map (async (domain ) => {
1882+ const { address } = await resolver .resolve4 (domain, { ttl: true });
1883+ return address;
1884+ }, { concurrency: 2 });
1885+ await dnsResults .forEach ((result ) => {
1886+ // Logs result, similar to `for await (const result of dnsResults)`
1887+ console .log (result);
1888+ });
1889+ console .log (' done' ); // Stream has finished
1890+ ```
1891+
18331892### Duplex and transform streams
18341893
18351894#### Class: ` stream.Duplex `
0 commit comments