@@ -13,6 +13,9 @@ const {
1313 ArrayPrototypePush,
1414 ArrayPrototypeSlice,
1515 PromisePrototypeThen,
16+ PromiseWithResolvers,
17+ SafePromiseRace,
18+ SafePromisePrototypeFinally,
1619 SymbolAsyncIterator,
1720 SymbolIterator,
1821 TypedArrayPrototypeGetByteLength,
@@ -607,6 +610,77 @@ async function* applyValidatedStatefulAsyncTransform(source, transform, options)
607610 options . signal ?. throwIfAborted ( ) ;
608611}
609612
613+ /**
614+ * Read one item from an async iterator, rejecting early if the signal aborts.
615+ * @param {AsyncIterator } iterator - The iterator to read from.
616+ * @param {AbortSignal|undefined } signal - Optional abort signal.
617+ * @returns {Promise<IteratorResult<Uint8Array[]>>|IteratorResult<Uint8Array[]> }
618+ */
619+ function abortableNext ( iterator , signal ) {
620+ if ( signal === undefined ) {
621+ return iterator . next ( ) ;
622+ }
623+
624+ signal . throwIfAborted ( ) ;
625+
626+ const next = iterator . next ( ) ;
627+ const { promise, reject } = PromiseWithResolvers ( ) ;
628+ const onAbort = ( ) => reject ( signal . reason ) ;
629+ signal . addEventListener ( 'abort' , onAbort , { __proto__ : null , once : true } ) ;
630+ if ( signal . aborted ) {
631+ onAbort ( ) ;
632+ }
633+
634+ return SafePromisePrototypeFinally ( SafePromiseRace ( [ next , promise ] ) , ( ) => {
635+ signal . removeEventListener ( 'abort' , onAbort ) ;
636+ } ) ;
637+ }
638+
639+ /**
640+ * Wrap an async source so each pending read is abort-aware.
641+ * @param {AsyncIterable<Uint8Array[]> } source - The source to read from.
642+ * @param {AbortSignal|undefined } signal - Optional abort signal.
643+ * @returns {AsyncIterable<Uint8Array[]> }
644+ */
645+ function yieldAbortable ( source , signal ) {
646+ if ( signal === undefined ) {
647+ return source ;
648+ }
649+
650+ return {
651+ __proto__ : null ,
652+ async * [ SymbolAsyncIterator ] ( ) {
653+ const iterator = source [ SymbolAsyncIterator ] ( ) ;
654+ let completed = false ;
655+ let aborted = false ;
656+
657+ try {
658+ while ( true ) {
659+ const { done, value } = await abortableNext ( iterator , signal ) ;
660+ if ( done ) {
661+ completed = true ;
662+ return ;
663+ }
664+ signal . throwIfAborted ( ) ;
665+ yield value ;
666+ }
667+ } catch ( error ) {
668+ aborted = signal . aborted ;
669+ throw error ;
670+ } finally {
671+ if ( ! completed && typeof iterator . return === 'function' ) {
672+ const result = iterator . return ( ) ;
673+ if ( aborted ) {
674+ PromisePrototypeThen ( result , undefined , ( ) => { } ) ;
675+ } else {
676+ await result ;
677+ }
678+ }
679+ }
680+ } ,
681+ } ;
682+ }
683+
610684/**
611685 * Create an async pipeline from source through transforms.
612686 * @yields {Uint8Array[]}
@@ -615,17 +689,14 @@ async function* createAsyncPipeline(source, transforms, signal) {
615689 // Check for abort
616690 signal ?. throwIfAborted ( ) ;
617691
618- const normalized = source ;
619-
620692 // Fast path: no transforms, just yield normalized source directly
621693 if ( transforms . length === 0 ) {
622- for await ( const batch of normalized ) {
623- signal ?. throwIfAborted ( ) ;
624- yield batch ;
625- }
694+ yield * yieldAbortable ( source , signal ) ;
626695 return ;
627696 }
628697
698+ const normalized = yieldAbortable ( source , signal ) ;
699+
629700 // Create internal controller for transform cancellation.
630701 // Note: if signal was already aborted, we threw above - no need to check here.
631702 const controller = new AbortController ( ) ;
0 commit comments