@@ -1870,16 +1870,14 @@ const { pipeline } = require('stream/promises');
18701870
18711871async function run () {
18721872 const ac = new AbortController ();
1873- const options = {
1874- signal: ac .signal ,
1875- };
1873+ const signal = ac .signal ;
18761874
18771875 setTimeout (() => ac .abort (), 1 );
18781876 await pipeline (
18791877 fs .createReadStream (' archive.tar' ),
18801878 zlib .createGzip (),
18811879 fs .createWriteStream (' archive.tar.gz' ),
1882- options ,
1880+ { signal } ,
18831881 );
18841882}
18851883
@@ -1895,10 +1893,10 @@ const fs = require('fs');
18951893async function run () {
18961894 await pipeline (
18971895 fs .createReadStream (' lowercase.txt' ),
1898- async function * (source ) {
1896+ async function * (source , signal ) {
18991897 source .setEncoding (' utf8' ); // Work with strings rather than `Buffer`s.
19001898 for await (const chunk of source ) {
1901- yield chunk . toUpperCase ( );
1899+ yield await processChunk (chunk, { signal } );
19021900 }
19031901 },
19041902 fs .createWriteStream (' uppercase.txt' )
@@ -1909,6 +1907,28 @@ async function run() {
19091907run ().catch (console .error );
19101908```
19111909
1910+ Remember to handle the ` signal ` argument passed into the async generator.
1911+ Especially in the case where the async generator is the source for the
1912+ pipeline (i.e. first argument) or the pipeline will never complete.
1913+
1914+ ``` js
1915+ const { pipeline } = require (' stream/promises' );
1916+ const fs = require (' fs' );
1917+
1918+ async function run () {
1919+ await pipeline (
1920+ async function * (signal ) {
1921+ await someLongRunningfn ({ signal });
1922+ yield ' asd' ;
1923+ },
1924+ fs .createWriteStream (' uppercase.txt' )
1925+ );
1926+ console .log (' Pipeline succeeded.' );
1927+ }
1928+
1929+ run ().catch (console .error );
1930+ ```
1931+
19121932` stream.pipeline() ` will call ` stream.destroy(err) ` on all streams except:
19131933* ` Readable ` streams which have emitted ` 'end' ` or ` 'close' ` .
19141934* ` Writable ` streams which have emitted ` 'finish' ` or ` 'close' ` .
@@ -3407,13 +3427,20 @@ the `Readable.from()` utility method:
34073427``` js
34083428const { Readable } = require (' stream' );
34093429
3430+ const ac = new AbortController ();
3431+ const signal = ac .signal ;
3432+
34103433async function * generate () {
34113434 yield ' a' ;
3435+ await someLongRunningFn ({ signal });
34123436 yield ' b' ;
34133437 yield ' c' ;
34143438}
34153439
34163440const readable = Readable .from (generate ());
3441+ readable .on (' close' , () => {
3442+ ac .abort ();
3443+ });
34173444
34183445readable .on (' data' , (chunk ) => {
34193446 console .log (chunk);
@@ -3433,21 +3460,31 @@ const { pipeline: pipelinePromise } = require('stream/promises');
34333460
34343461const writable = fs .createWriteStream (' ./file' );
34353462
3463+ const ac = new AbortController ();
3464+ const signal = ac .signal ;
3465+
3466+ const iterator = createIterator ({ signal });
3467+
34363468// Callback Pattern
34373469pipeline (iterator, writable, (err , value ) => {
34383470 if (err) {
34393471 console .error (err);
34403472 } else {
34413473 console .log (value, ' value returned' );
34423474 }
3475+ }).on (' close' , () => {
3476+ ac .abort ();
34433477});
34443478
34453479// Promise Pattern
34463480pipelinePromise (iterator, writable)
34473481 .then ((value ) => {
34483482 console .log (value, ' value returned' );
34493483 })
3450- .catch (console .error );
3484+ .catch ((err ) => {
3485+ console .error (err);
3486+ ac .abort ();
3487+ });
34513488```
34523489
34533490<!-- type=misc-->
0 commit comments