@@ -22,6 +22,7 @@ const {
2222 ERR_STREAM_DESTROYED ,
2323 ERR_STREAM_PREMATURE_CLOSE ,
2424 } ,
25+ AbortError,
2526} = require ( 'internal/errors' ) ;
2627
2728const { validateCallback } = require ( 'internal/validators' ) ;
@@ -34,6 +35,7 @@ const {
3435 isStream,
3536} = require ( 'internal/streams/utils' ) ;
3637const assert = require ( 'internal/assert' ) ;
38+ const { AbortController } = require ( 'internal/abort_controller' ) ;
3739
3840let PassThrough ;
3941let Readable ;
@@ -176,10 +178,24 @@ function pipeline(...streams) {
176178 streams = streams [ 0 ] ;
177179 }
178180
181+ return pipelineImpl ( streams , callback ) ;
182+ }
183+
184+ function pipelineImpl ( streams , callback , opts ) {
179185 if ( streams . length < 2 ) {
180186 throw new ERR_MISSING_ARGS ( 'streams' ) ;
181187 }
182188
189+ const ac = new AbortController ( ) ;
190+ const signal = ac . signal ;
191+ const outerSignal = opts ?. signal ;
192+
193+ function abort ( ) {
194+ finishImpl ( new AbortError ( ) ) ;
195+ }
196+
197+ outerSignal ?. addEventListener ( 'abort' , abort ) ;
198+
183199 let error ;
184200 let value ;
185201 const destroys = [ ] ;
@@ -188,7 +204,10 @@ function pipeline(...streams) {
188204
189205 function finish ( err ) {
190206 const final = -- finishCount === 0 ;
207+ finishImpl ( err , final ) ;
208+ }
191209
210+ function finishImpl ( err , final ) {
192211 if ( err && ( ! error || error . code === 'ERR_STREAM_PREMATURE_CLOSE' ) ) {
193212 error = err ;
194213 }
@@ -201,6 +220,9 @@ function pipeline(...streams) {
201220 destroys . shift ( ) ( error ) ;
202221 }
203222
223+ outerSignal ?. removeEventListener ( 'abort' , abort ) ;
224+ ac . abort ( ) ;
225+
204226 if ( final ) {
205227 callback ( error , value ) ;
206228 }
@@ -219,7 +241,7 @@ function pipeline(...streams) {
219241
220242 if ( i === 0 ) {
221243 if ( typeof stream === 'function' ) {
222- ret = stream ( ) ;
244+ ret = stream ( signal ) ;
223245 if ( ! isIterable ( ret ) ) {
224246 throw new ERR_INVALID_RETURN_VALUE (
225247 'Iterable, AsyncIterable or Stream' , 'source' , ret ) ;
@@ -233,7 +255,7 @@ function pipeline(...streams) {
233255 }
234256 } else if ( typeof stream === 'function' ) {
235257 ret = makeAsyncIterable ( ret ) ;
236- ret = stream ( ret ) ;
258+ ret = stream ( ret , signal ) ;
237259
238260 if ( reading ) {
239261 if ( ! isIterable ( ret , true ) ) {
@@ -309,4 +331,4 @@ function pipeline(...streams) {
309331 return ret ;
310332}
311333
312- module . exports = pipeline ;
334+ module . exports = { pipelineImpl , pipeline } ;
0 commit comments