@@ -27,7 +27,7 @@ const {
2727const kEmpty = Symbol ( 'kEmpty' ) ;
2828const kEof = Symbol ( 'kEof' ) ;
2929
30- async function * map ( fn , options ) {
30+ function map ( fn , options ) {
3131 if ( typeof fn !== 'function' ) {
3232 throw new ERR_INVALID_ARG_TYPE (
3333 'fn' , [ 'Function' , 'AsyncFunction' ] , fn ) ;
@@ -44,118 +44,120 @@ async function * map(fn, options) {
4444
4545 validateInteger ( concurrency , 'concurrency' , 1 ) ;
4646
47- const ac = new AbortController ( ) ;
48- const stream = this ;
49- const queue = [ ] ;
50- const signal = ac . signal ;
51- const signalOpt = { signal } ;
52-
53- const abort = ( ) => ac . abort ( ) ;
54- if ( options ?. signal ?. aborted ) {
55- abort ( ) ;
56- }
57-
58- options ?. signal ?. addEventListener ( 'abort' , abort ) ;
59-
60- let next ;
61- let resume ;
62- let done = false ;
63-
64- function onDone ( ) {
65- done = true ;
66- }
47+ return async function * map ( ) {
48+ const ac = new AbortController ( ) ;
49+ const stream = this ;
50+ const queue = [ ] ;
51+ const signal = ac . signal ;
52+ const signalOpt = { signal } ;
6753
68- async function pump ( ) {
69- try {
70- for await ( let val of stream ) {
71- if ( done ) {
72- return ;
73- }
54+ const abort = ( ) => ac . abort ( ) ;
55+ if ( options ?. signal ?. aborted ) {
56+ abort ( ) ;
57+ }
7458
75- if ( signal . aborted ) {
76- throw new AbortError ( ) ;
77- }
59+ options ?. signal ?. addEventListener ( 'abort' , abort ) ;
7860
79- try {
80- val = fn ( val , signalOpt ) ;
81- } catch ( err ) {
82- val = PromiseReject ( err ) ;
83- }
61+ let next ;
62+ let resume ;
63+ let done = false ;
8464
85- if ( val === kEmpty ) {
86- continue ;
87- }
65+ function onDone ( ) {
66+ done = true ;
67+ }
8868
89- if ( typeof val ?. catch === 'function' ) {
90- val . catch ( onDone ) ;
69+ async function pump ( ) {
70+ try {
71+ for await ( let val of stream ) {
72+ if ( done ) {
73+ return ;
74+ }
75+
76+ if ( signal . aborted ) {
77+ throw new AbortError ( ) ;
78+ }
79+
80+ try {
81+ val = fn ( val , signalOpt ) ;
82+ } catch ( err ) {
83+ val = PromiseReject ( err ) ;
84+ }
85+
86+ if ( val === kEmpty ) {
87+ continue ;
88+ }
89+
90+ if ( typeof val ?. catch === 'function' ) {
91+ val . catch ( onDone ) ;
92+ }
93+
94+ queue . push ( val ) ;
95+ if ( next ) {
96+ next ( ) ;
97+ next = null ;
98+ }
99+
100+ if ( ! done && queue . length && queue . length >= concurrency ) {
101+ await new Promise ( ( resolve ) => {
102+ resume = resolve ;
103+ } ) ;
104+ }
91105 }
92-
106+ queue . push ( kEof ) ;
107+ } catch ( err ) {
108+ const val = PromiseReject ( err ) ;
109+ PromisePrototypeCatch ( val , onDone ) ;
93110 queue . push ( val ) ;
111+ } finally {
112+ done = true ;
94113 if ( next ) {
95114 next ( ) ;
96115 next = null ;
97116 }
98-
99- if ( ! done && queue . length && queue . length >= concurrency ) {
100- await new Promise ( ( resolve ) => {
101- resume = resolve ;
102- } ) ;
103- }
104- }
105- queue . push ( kEof ) ;
106- } catch ( err ) {
107- const val = PromiseReject ( err ) ;
108- PromisePrototypeCatch ( val , onDone ) ;
109- queue . push ( val ) ;
110- } finally {
111- done = true ;
112- if ( next ) {
113- next ( ) ;
114- next = null ;
117+ options ?. signal ?. removeEventListener ( 'abort' , abort ) ;
115118 }
116- options ?. signal ?. removeEventListener ( 'abort' , abort ) ;
117119 }
118- }
119-
120- pump ( ) ;
121-
122- try {
123- while ( true ) {
124- while ( queue . length > 0 ) {
125- const val = await queue [ 0 ] ;
126-
127- if ( val === kEof ) {
128- return ;
129- }
130120
131- if ( signal . aborted ) {
132- throw new AbortError ( ) ;
133- }
121+ pump ( ) ;
134122
135- if ( val !== kEmpty ) {
136- yield val ;
123+ try {
124+ while ( true ) {
125+ while ( queue . length > 0 ) {
126+ const val = await queue [ 0 ] ;
127+
128+ if ( val === kEof ) {
129+ return ;
130+ }
131+
132+ if ( signal . aborted ) {
133+ throw new AbortError ( ) ;
134+ }
135+
136+ if ( val !== kEmpty ) {
137+ yield val ;
138+ }
139+
140+ queue . shift ( ) ;
141+ if ( resume ) {
142+ resume ( ) ;
143+ resume = null ;
144+ }
137145 }
138146
139- queue . shift ( ) ;
140- if ( resume ) {
141- resume ( ) ;
142- resume = null ;
143- }
147+ await new Promise ( ( resolve ) => {
148+ next = resolve ;
149+ } ) ;
144150 }
151+ } finally {
152+ ac . abort ( ) ;
145153
146- await new Promise ( ( resolve ) => {
147- next = resolve ;
148- } ) ;
149- }
150- } finally {
151- ac . abort ( ) ;
152-
153- done = true ;
154- if ( resume ) {
155- resume ( ) ;
156- resume = null ;
154+ done = true ;
155+ if ( resume ) {
156+ resume ( ) ;
157+ resume = null ;
158+ }
157159 }
158- }
160+ } . call ( this ) ;
159161}
160162
161163async function * asIndexedPairs ( options ) {
@@ -215,7 +217,7 @@ async function forEach(fn, options) {
215217 for await ( const unused of this . map ( forEachFn , options ) ) ;
216218}
217219
218- async function * filter ( fn , options ) {
220+ function filter ( fn , options ) {
219221 if ( typeof fn !== 'function' ) {
220222 throw new ERR_INVALID_ARG_TYPE (
221223 'fn' , [ 'Function' , 'AsyncFunction' ] , fn ) ;
@@ -226,7 +228,7 @@ async function * filter(fn, options) {
226228 }
227229 return kEmpty ;
228230 }
229- yield * this . map ( filterFn , options ) ;
231+ return this . map ( filterFn , options ) ;
230232}
231233
232234async function toArray ( options ) {
@@ -243,10 +245,13 @@ async function toArray(options) {
243245 return result ;
244246}
245247
246- async function * flatMap ( fn , options ) {
247- for await ( const val of this . map ( fn , options ) ) {
248- yield * val ;
249- }
248+ function flatMap ( fn , options ) {
249+ const values = this . map ( fn , options ) ;
250+ return async function * flatMap ( ) {
251+ for await ( const val of values ) {
252+ yield * val ;
253+ }
254+ } . call ( this ) ;
250255}
251256
252257function toIntegerOrInfinity ( number ) {
0 commit comments