diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 346585c..6ab9e0a 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -10,8 +10,9 @@ jobs: fail-fast: false matrix: node-version: + - 24 + - 22 - 20 - - 18 steps: - uses: actions/checkout@v4 - uses: actions/setup-node@v4 diff --git a/gc-test.js b/gc-test.js new file mode 100644 index 0000000..be7d094 --- /dev/null +++ b/gc-test.js @@ -0,0 +1,64 @@ +import process from 'node:process'; +import pThrottle from './index.js'; + +const mb = bytes => `${(bytes / 1024 / 1024).toFixed(2)} MB`; + +const heap = () => process.memoryUsage().heapUsed; + +const delay = ms => new Promise(resolve => { + setTimeout(resolve, ms); +}); + +async function runGcTest() { + console.log(`Start: ${mb(heap())}`); + + const controller = new AbortController(); + const batches = 100; // Total throttled created = batches * perBatch + const perBatch = 1000; + const callsPerThrottled = 2; // Keep small to avoid large queues + const limit = 50; + const interval = 10; + + for (let batch = 0; batch < batches; batch++) { + const throttle = pThrottle({ + limit, interval, strict: false, signal: controller.signal, + }); + const throttledFns = Array.from({length: perBatch}, () => throttle(() => {})); + + const promises = []; + for (const function_ of throttledFns) { + for (let k = 0; k < callsPerThrottled; k++) { + promises.push(function_()); + } + } + + // eslint-disable-next-line no-await-in-loop + await Promise.all(promises); + + if ((batch + 1) % 10 === 0) { + if (global.gc) { + global.gc(); + } + + console.log(`After batch ${batch + 1}: ${mb(heap())}`); + // Let the event loop breathe a bit between batches + // eslint-disable-next-line no-await-in-loop + await delay(1); + } + } + + if (global.gc) { + console.log(`Before GC: ${mb(heap())}`); + global.gc(); + console.log(`After GC: ${mb(heap())}`); + } else { + console.log('GC not exposed. Run with: node --expose-gc x.js'); + } +} + +try { + await runGcTest(); +} catch (error) { + console.error(error); + throw error; +} diff --git a/index.d.ts b/index.d.ts index a5f1a95..0ccbce9 100644 --- a/index.d.ts +++ b/index.d.ts @@ -2,7 +2,7 @@ type AnyFunction = (...arguments_: readonly any[]) => unknown; export type ThrottledFunction = F & { /** - Whether future function calls should be throttled or count towards throttling thresholds. + Whether future function calls should be throttled and count towards throttling thresholds. @default true */ @@ -26,7 +26,7 @@ export type Options = { readonly interval: number; /** - Use a strict, more resource intensive, throttling algorithm. The default algorithm uses a windowed approach that will work correctly in most cases, limiting the total number of calls at the specified limit per interval window. The strict algorithm throttles each call individually, ensuring the limit is not exceeded for any interval. + Use a strict, more resource-intensive, throttling algorithm. The default algorithm uses a windowed approach that will work correctly in most cases, limiting the total number of calls at the specified limit per interval window. The strict algorithm throttles each call individually, ensuring the limit is not exceeded for any interval. @default false */ @@ -53,7 +53,7 @@ export type Options = { await throttled(); await throttled(); - controller.abort('aborted') + controller.abort('aborted'); await throttled(); //=> Executing... //=> Executing... @@ -65,7 +65,7 @@ export type Options = { /** Get notified when function calls are delayed due to exceeding the `limit` of allowed calls within the given `interval`. - Can be useful for monitoring the throttling efficiency. + The delayed call arguments are passed to the `onDelay` callback. Can be useful for monitoring the throttling efficiency. @example ``` @@ -79,8 +79,8 @@ export type Options = { }, }); - const throttled = throttle(() => { - console.log('Executing...'); + const throttled = throttle((a, b) => { + console.log(`Executing with ${a} ${b}...`); }); await throttled(1, 2); @@ -96,7 +96,9 @@ export type Options = { }; /** -Throttle promise-returning/async/normal functions. +Throttle promise-returning & async functions. + +Also works with normal functions. It rate-limits function calls without discarding them, making it ideal for external API interactions where avoiding call loss is crucial. diff --git a/index.js b/index.js index e54af84..106f4c4 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,21 @@ -const registry = new FinalizationRegistry(({signal, aborted}) => { - signal?.removeEventListener('abort', aborted); +const states = new WeakMap(); +const signalThrottleds = new WeakMap(); // AbortSignal -> {throttleds: Set, listener: Function} + +const finalizationRegistry = new FinalizationRegistry(({signalWeakRef, weakReference}) => { + const signal = signalWeakRef.deref(); + if (!signal) { + return; // Signal already GC'd + } + + const registration = signalThrottleds.get(signal); + if (registration) { + registration.throttleds.delete(weakReference); + if (registration.throttleds.size === 0) { + // Remove the abort listener when no throttleds remain + signal.removeEventListener('abort', registration.listener); + signalThrottleds.delete(signal); + } + } }); export default function pThrottle({limit, interval, strict, signal, onDelay}) { @@ -11,52 +27,61 @@ export default function pThrottle({limit, interval, strict, signal, onDelay}) { throw new TypeError('Expected `interval` to be a finite number'); } - const queue = new Map(); + if (limit < 0) { + throw new TypeError('Expected `limit` to be >= 0'); + } - let currentTick = 0; - let activeCount = 0; + if (interval < 0) { + throw new TypeError('Expected `interval` to be >= 0'); + } + + const state = { + queue: new Map(), + strictTicks: [], + // Track windowed algorithm state so it can be reset on abort + currentTick: 0, + activeCount: 0, + }; function windowedDelay() { const now = Date.now(); - if ((now - currentTick) > interval) { - activeCount = 1; - currentTick = now; + if ((now - state.currentTick) > interval) { + state.activeCount = 1; + state.currentTick = now; return 0; } - if (activeCount < limit) { - activeCount++; + if (state.activeCount < limit) { + state.activeCount++; } else { - currentTick += interval; - activeCount = 1; + state.currentTick += interval; + state.activeCount = 1; } - return currentTick - now; + return state.currentTick - now; } - const strictTicks = []; - function strictDelay() { const now = Date.now(); // Clear the queue if there's a significant delay since the last execution - if (strictTicks.length > 0 && now - strictTicks.at(-1) > interval) { - strictTicks.length = 0; + if (state.strictTicks.length > 0 && now - state.strictTicks.at(-1) > interval) { + state.strictTicks.length = 0; } // If the queue is not full, add the current time and execute immediately - if (strictTicks.length < limit) { - strictTicks.push(now); + if (state.strictTicks.length < limit) { + state.strictTicks.push(now); return 0; } // Calculate the next execution time based on the first item in the queue - const nextExecutionTime = strictTicks[0] + interval; + const nextExecutionTime = state.strictTicks[0] + interval; // Shift the queue and add the new execution time - strictTicks.shift(); - strictTicks.push(nextExecutionTime); + state.strictTicks.shift(); + state.strictTicks.push(nextExecutionTime); // Calculate the delay for the current execution return Math.max(0, nextExecutionTime - now); @@ -73,44 +98,87 @@ export default function pThrottle({limit, interval, strict, signal, onDelay}) { let timeoutId; return new Promise((resolve, reject) => { const execute = () => { - resolve(function_.apply(this, arguments_)); - queue.delete(timeoutId); + try { + resolve(function_.apply(this, arguments_)); + } catch (error) { + reject(error); + } + + state.queue.delete(timeoutId); }; const delay = getDelay(); if (delay > 0) { timeoutId = setTimeout(execute, delay); - queue.set(timeoutId, reject); - onDelay?.(...arguments_); + state.queue.set(timeoutId, reject); + try { + onDelay?.(...arguments_); + } catch {} } else { execute(); } }); }; - const aborted = () => { - for (const timeout of queue.keys()) { - clearTimeout(timeout); - queue.get(timeout)(signal.reason); - } + signal?.throwIfAborted(); - queue.clear(); - strictTicks.splice(0, strictTicks.length); - }; + if (signal) { + let registration = signalThrottleds.get(signal); + if (!registration) { + registration = { + throttleds: new Set(), + listener: null, + }; - registry.register(throttled, {signal, aborted}); + registration.listener = () => { + for (const weakReference of registration.throttleds) { + const function_ = weakReference.deref(); + if (!function_) { + continue; + } + + const functionState = states.get(function_); + if (!functionState) { + continue; + } + + for (const timeout of functionState.queue.keys()) { + clearTimeout(timeout); + functionState.queue.get(timeout)(signal.reason); + } + + functionState.queue.clear(); + functionState.strictTicks.length = 0; + // Reset windowed state so subsequent calls are not artificially delayed + functionState.currentTick = 0; + functionState.activeCount = 0; + } + + signalThrottleds.delete(signal); + }; - signal?.throwIfAborted(); - signal?.addEventListener('abort', aborted, {once: true}); + signalThrottleds.set(signal, registration); + signal.addEventListener('abort', registration.listener, {once: true}); + } + + const weakReference = new WeakRef(throttled); + registration.throttleds.add(weakReference); + finalizationRegistry.register(throttled, { + signalWeakRef: new WeakRef(signal), + weakReference, + }); + } throttled.isEnabled = true; Object.defineProperty(throttled, 'queueSize', { get() { - return queue.size; + return state.queue.size; }, }); + states.set(throttled, state); + return throttled; }; } diff --git a/package.json b/package.json index 04922ae..b7e9e8e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "p-throttle", - "version": "7.0.0", + "version": "8.0.0", "description": "Throttle promise-returning & async functions", "license": "MIT", "repository": "sindresorhus/p-throttle", @@ -17,10 +17,11 @@ }, "sideEffects": false, "engines": { - "node": ">=18" + "node": ">=20" }, "scripts": { - "test": "xo && ava && tsd" + "test": "xo && ava && tsd", + "test-gc": "node --expose-gc gc-test" }, "files": [ "index.js", @@ -50,6 +51,7 @@ "ava": "^6.2.0", "delay": "^6.0.0", "in-range": "^3.0.0", + "pretty-bytes": "^7.0.1", "time-span": "^5.1.0", "tsd": "^0.31.2", "xo": "^0.59.3" diff --git a/readme.md b/readme.md index b1adfbf..9e70651 100644 --- a/readme.md +++ b/readme.md @@ -2,7 +2,7 @@ > Throttle promise-returning & async functions -It also works with normal functions. +Also works with normal functions. It rate-limits function calls without discarding them, making it ideal for external API interactions where avoiding call loss is crucial. @@ -14,7 +14,7 @@ npm install p-throttle ## Usage -Here, the throttled function is only called twice a second: +This calls the function at most twice per second: ```js import pThrottle from 'p-throttle'; @@ -73,9 +73,9 @@ The timespan for `limit` in milliseconds. Type: `boolean`\ Default: `false` -Use a strict, more resource intensive, throttling algorithm. The default algorithm uses a windowed approach that will work correctly in most cases, limiting the total number of calls at the specified limit per interval window. The strict algorithm throttles each call individually, ensuring the limit is not exceeded for any interval. +Use a strict, more resource-intensive, throttling algorithm. The default algorithm uses a windowed approach that will work correctly in most cases, limiting the total number of calls at the specified limit per interval window. The strict algorithm throttles each call individually, ensuring the limit is not exceeded for any interval. -#### signal +##### signal Type: [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) @@ -98,7 +98,7 @@ const throttled = throttle(() => { await throttled(); await throttled(); -controller.abort('aborted') +controller.abort('aborted'); await throttled(); //=> Executing... //=> Executing... diff --git a/test.js b/test.js index 80133e3..654f3ca 100644 --- a/test.js +++ b/test.js @@ -1,3 +1,4 @@ +import {setMaxListeners} from 'node:events'; import test from 'ava'; import inRange from 'in-range'; import timeSpan from 'time-span'; @@ -44,7 +45,6 @@ test('queue size', async t => { t.is(throttled.queueSize, delayedExecutions); await Promise.all(promises); - t.is(throttled.queueSize, 0); }); @@ -258,6 +258,37 @@ test('disable and re-enable functionality', async t => { t.true(timeReEnabled - start >= 1000); }); +test('isEnabled=false does not cancel already queued items', async t => { + const interval = 100; + const throttled = pThrottle({limit: 1, interval})(() => Date.now()); + const first = await throttled(); + const queued = throttled(); // Will be queued + throttled.isEnabled = false; // Disable after queueing + const resolved = await queued; // Should still resolve + const delta = resolved - first; + t.true(inRange(delta, {start: interval - 10, end: interval + 50})); +}); + +test('double abort is safe', async t => { + const controller = new AbortController(); + const throttled = pThrottle({limit: 1, interval: 100, signal: controller.signal})(() => Date.now()); + await throttled(); + const p1 = throttled(); + const p2 = throttled(); + controller.abort('R'); + controller.abort('R'); // Second abort should be a no-op + const [r1, r2] = await Promise.allSettled([p1, p2]); + if (r1.status === 'rejected') { + t.is(r1.reason, 'R'); + } + + if (r2.status === 'rejected') { + t.is(r2.reason, 'R'); + } + + t.is(throttled.queueSize, 0); +}); + test('stability under high load', async t => { const limit = 5; const interval = 100; @@ -280,6 +311,35 @@ test('handles zero interval', async t => { t.true(end - start < 50); // Small buffer to account for execution time }); +test('handles zero interval in strict mode with no delays and no onDelay', async t => { + const seen = []; + const throttled = pThrottle({ + limit: 3, + interval: 0, + strict: true, + onDelay: (...arguments_) => seen.push(arguments_), + })(() => Date.now()); + const start = Date.now(); + const results = await Promise.all([throttled(), throttled(), throttled(), throttled(), throttled()]); + const end = Date.now(); + for (const time of results) { + t.true(time - start < 50); + } + + t.true(end - start < 50); + t.deepEqual(seen, []); +}); + +test('invalid options throw', t => { + t.throws(() => { + pThrottle({limit: -1, interval: 100})(() => {}); + }, {message: 'Expected `limit` to be >= 0'}); + + t.throws(() => { + pThrottle({limit: 1, interval: -1})(() => {}); + }, {message: 'Expected `interval` to be >= 0'}); +}); + test('handles simultaneous calls', async t => { const limit = 5; const interval = 100; @@ -293,21 +353,66 @@ test('handles simultaneous calls', async t => { }); test('clears queue after abort', async t => { - const limit = 2; + const limit = 1; const interval = 100; const controller = new AbortController(); const throttled = pThrottle({limit, interval, signal: controller.signal})(() => Date.now()); - try { - await throttled(); - await throttled(); - } catch {} + await throttled(); // Immediate + const queued = throttled(); // Queued due to limit - controller.abort(); + controller.abort('aborted'); + const [result] = await Promise.allSettled([queued]); + t.is(result.status, 'rejected'); + t.is(result.reason, 'aborted'); t.is(throttled.queueSize, 0); }); +test('abort resets windowed counters so next call is not delayed', async t => { + const limit = 1; + const interval = 100; + const controller = new AbortController(); + setMaxListeners(0, controller.signal); + const throttled = pThrottle({limit, interval, signal: controller.signal})(() => Date.now()); + + await throttled(); // Uses capacity, sets window + const queued = throttled(); // Queued into next window + + controller.abort('stop'); + const [r] = await Promise.allSettled([queued]); + t.is(r.status, 'rejected'); + t.is(r.reason, 'stop'); + + const start = Date.now(); + const time = await throttled(); + t.true(time - start < 50); +}); + +test('abort resets strict counters so next call is not delayed', async t => { + const limit = 1; + const interval = 100; + const controller = new AbortController(); + const throttled = pThrottle({ + limit, + interval, + strict: true, + signal: controller.signal, + })(() => Date.now()); + + await throttled(); // Uses capacity + const queued = throttled(); // Queued by strict algorithm + + controller.abort('stop'); + const [r] = await Promise.allSettled([queued]); + t.is(r.status, 'rejected'); + t.is(r.reason, 'stop'); + + const start = Date.now(); + const time = await throttled(); + t.true(time - start < 50); +}); + test('allows immediate execution with high limit', async t => { const limit = 10; const interval = 100; @@ -376,6 +481,23 @@ test('handles extremely short intervals', async t => { t.pass(); // If it gets here without error, the test passes }); +test('windowed with interval 0 executes all immediately without onDelay', async t => { + const seen = []; + const throttled = pThrottle({ + limit: 2, + interval: 0, + onDelay: (...arguments_) => seen.push(arguments_), + })(() => Date.now()); + + const start = Date.now(); + const results = await Promise.all([throttled(), throttled(), throttled(), throttled(), throttled()]); + for (const time of results) { + t.true(time - start < 50); + } + + t.deepEqual(seen, []); +}); + test('executes immediately for limit greater than calls', async t => { const limit = 10; const interval = 100; @@ -439,3 +561,377 @@ test('onDelay', async t => { await Promise.all(promises); }); + +test('onDelay receives arguments for blocked calls (limit 0)', async t => { + const seen = []; + const controller = new AbortController(); + const throttled = pThrottle({ + limit: 0, + interval: 100, + signal: controller.signal, + onDelay: (a, b) => seen.push([a, b]), + })((a, b) => [a, b]); + + const p1 = throttled('x', 1); + const p2 = throttled('y', 2); + + // Still pending before abort. onDelay must have been called at least once with the latest args + await delay(20); + // At least one onDelay should have fired with the latest arguments + t.deepEqual(seen[0], ['y', 2]); + + controller.abort('Z'); + const [r1, r2] = await Promise.allSettled([p1, p2]); + if (r1.status === 'rejected') { + t.is(r1.reason, 'Z'); + } + + if (r2.status === 'rejected') { + t.is(r2.reason, 'Z'); + } +}); + +test('onDelay exceptions do not affect execution', async t => { + const limit = 1; + const interval = 50; + const seen = []; + const onDelay = value => { + seen.push(value); + throw new Error('listener failed'); + }; + + const throttled = pThrottle({limit, interval, onDelay})(x => x); + + const a = await throttled('a'); + const bPromise = throttled('b'); // Will be delayed, onDelay throws + + t.is(a, 'a'); + const b = await bPromise; + t.is(b, 'b'); + t.deepEqual(seen, ['b']); +}); + +test('onDelay fires for delayed calls even if later aborted', async t => { + const limit = 1; + const interval = 100; + const controller = new AbortController(); + setMaxListeners(0, controller.signal); + + const seen = []; + const onDelay = value => { + seen.push(value); + }; + + const throttled = pThrottle({ + limit, + interval, + signal: controller.signal, + onDelay, + })(value => value); + + const first = throttled('a'); + const second = throttled('b'); // This will be delayed -> onDelay('b') + + controller.abort('stop'); + + const [r1, r2] = await Promise.allSettled([first, second]); + + // Ensure onDelay captured the delayed value 'b' + t.deepEqual(seen, ['b']); + + // Second should be rejected due to abort + if (r2.status === 'rejected') { + t.is(r2.reason, 'stop'); + } else { + t.fail('Expected second call to be rejected'); + } + + // First may resolve or be rejected depending on timing; both acceptable + if (r1.status === 'fulfilled') { + t.is(r1.value, 'a'); + } + + // Queue must be empty + t.is(throttled.queueSize, 0); +}); + +test('very short intervals preserve order and resolve (windowed and strict)', async t => { + for (const strict of [false, true]) { + const limit = 1; + const interval = 1; // Very short + const throttle = pThrottle({limit, interval, strict}); + const throttled = throttle(async x => x); + const count = 25; + const promises = []; + for (let i = 0; i < count; i++) { + promises.push(throttled(i)); + } + + // eslint-disable-next-line no-await-in-loop + const results = await Promise.all(promises); + // Order preserved + t.deepEqual(results, Array.from({length: count}, (_, i) => i)); + } +}); + +test('supports errors in the throttled function', async t => { + const limit = 1; + const interval = 100; + const pause = 1; + const throttle = pThrottle({limit, interval}); + + const syncFunction = () => { + throw new Error('test error'); + }; + + const throttledSync = throttle(syncFunction); + + const asyncFunction = async () => { + await delay(pause); + throw new Error('test error'); + }; + + const throttledAsync = throttle(asyncFunction); + + await throttle(() => {})(); // Create a delay + + await t.throwsAsync(throttledSync, {message: 'test error'}); // Has delay + + await t.throwsAsync(throttledAsync, {message: 'test error'}); // Has delay +}); + +test('shared signal abort clears queues (windowed and strict)', async t => { + for (const strict of [false, true]) { + const controller = new AbortController(); + setMaxListeners(0, controller.signal); + const limit = strict ? 2 : 1; + const interval = 100; + const instances = 24; + const throttledFunctions = []; + const promises = []; + + for (let i = 0; i < instances; i++) { + const throttled = pThrottle({ + limit, + interval, + strict, + signal: controller.signal, + })(() => Date.now()); + throttledFunctions.push(throttled); + promises.push(throttled(), throttled()); + if (strict) { + promises.push(throttled()); + } + } + + controller.abort('boom'); + + // eslint-disable-next-line no-await-in-loop + const results = await Promise.allSettled(promises); + for (const result of results) { + if (result.status === 'rejected') { + t.is(result.reason, 'boom'); + } + } + + for (const throttled of throttledFunctions) { + t.is(throttled.queueSize, 0); + } + } +}); + +test('abort affects only instances using that signal', async t => { + const controller = new AbortController(); + setMaxListeners(0, controller.signal); + const withSignal = pThrottle({ + limit: 1, + interval: 1000, + signal: controller.signal, + })(async () => 'with-signal'); + const withoutSignal = pThrottle({ + limit: 1, + interval: 1000, + })(async () => 'no-signal'); + + const p1 = withSignal(); + const p2 = withSignal(); // Will be queued then aborted + + controller.abort('x'); + + const [r1, r2] = await Promise.allSettled([p1, p2]); + if (r1.status === 'rejected') { + // First may race into queue and get aborted; tolerate either + t.pass(); + } else { + t.is(r1.value, 'with-signal'); + } + + if (r2.status === 'rejected') { + t.is(r2.reason, 'x'); + } else { + // Very unlikely, but allow if both resolved before abort + t.is(r2.value, 'with-signal'); + } + + // Ensure the instance without a signal is unaffected + const value = await withoutSignal(); + t.is(value, 'no-signal'); +}); + +test('signal registration shared across multiple throttled functions', async t => { + const controller = new AbortController(); + const {signal} = controller; + + const throttle = pThrottle({limit: 1, interval: 100, signal}); + const function1 = throttle(() => 'result1'); + const function2 = throttle(() => 'result2'); + + await function1(); // Execute immediately + const promise1 = function1(); // Queued + const promise2 = function2(); // Queued + + controller.abort('shared-registration'); + + const [result1, result2] = await Promise.allSettled([promise1, promise2]); + t.is(result1.status, 'rejected'); + t.is(result1.reason, 'shared-registration'); + t.is(result2.status, 'rejected'); + t.is(result2.reason, 'shared-registration'); +}); + +test('signal registration cleanup after abort', async t => { + const controller = new AbortController(); + const throttle = pThrottle({limit: 1, interval: 100, signal: controller.signal}); + const function_ = throttle(() => 'result'); + + function_(); // Execute immediately + const promise = function_(); // Queue + + controller.abort('cleanup-test'); + + const result = await Promise.allSettled([promise]); + t.is(result[0].status, 'rejected'); + t.is(result[0].reason, 'cleanup-test'); + t.is(function_.queueSize, 0); +}); + +test('signal registration isolation between different signals', async t => { + const controller1 = new AbortController(); + const controller2 = new AbortController(); + + const throttle1 = pThrottle({limit: 1, interval: 200, signal: controller1.signal}); + const throttle2 = pThrottle({limit: 1, interval: 200, signal: controller2.signal}); + + const function1 = throttle1(() => 'result1'); + const function2 = throttle2(() => 'result2'); + + await function1(); // Execute immediately + await function2(); // Execute immediately + const promise1 = function1(); // Queued + const promise2 = function2(); // Queued + + controller1.abort('signal1-abort'); + + const result1 = await Promise.allSettled([promise1]); + t.is(result1[0].status, 'rejected'); + t.is(result1[0].reason, 'signal1-abort'); + t.is(await promise2, 'result2'); +}); + +test('shared signal abort rejects both blocked and windowed throttles', async t => { + const controller = new AbortController(); + + const blocked = pThrottle({ + limit: 0, + interval: 100, + signal: controller.signal, + })(() => 'blocked'); + + const windowed = pThrottle({ + limit: 1, + interval: 100, + signal: controller.signal, + })(() => 'windowed'); + + const p1 = blocked(); + const p2 = windowed(); + const p3 = windowed(); // Queued + + controller.abort('S'); + const [r1, r2, r3] = await Promise.allSettled([p1, p2, p3]); + if (r1.status === 'rejected') { + t.is(r1.reason, 'S'); + } + + if (r2.status === 'rejected') { + t.is(r2.reason, 'S'); + } + + if (r3.status === 'rejected') { + t.is(r3.reason, 'S'); + } +}); + +test('signal registration with complex abort reasons', async t => { + const controller = new AbortController(); + const throttle = pThrottle({limit: 1, interval: 100, signal: controller.signal}); + const function_ = throttle(() => 'result'); + + function_(); // Execute immediately + const promise = function_(); // Queue + + const complexReason = { + code: 'CUSTOM_ABORT', + message: 'Complex abort reason', + timestamp: Date.now(), + nested: {data: [1, 2, 3]}, + }; + + controller.abort(complexReason); + + const result = await Promise.allSettled([promise]); + t.is(result[0].status, 'rejected'); + t.deepEqual(result[0].reason, complexReason); +}); + +test('bypassed calls while disabled do not affect future throttling state', async t => { + const limit = 2; + const interval = 200; + const throttled = pThrottle({limit, interval})(() => Date.now()); + + // Bypass throttling and make many calls quickly + throttled.isEnabled = false; + await Promise.all(Array.from({length: 20}, () => throttled())); + throttled.isEnabled = true; + + // First calls after re-enable should behave as fresh + const start = Date.now(); + const [a, b, c] = await Promise.all([throttled(), throttled(), throttled()]); + + // First two should be within the interval window; third should be after >= interval + t.true(a - start < 50); + t.true(b - start < 50); + t.true(c - start >= interval - 10); +}); + +test('FinalizationRegistry WeakRef behavior with signal registration', async t => { + const controller = new AbortController(); + const throttle = pThrottle({limit: 1, interval: 100, signal: controller.signal}); + + const function1 = throttle(() => 'result1'); + const function2 = throttle(() => 'result2'); + + await function1(); // Execute immediately + await function2(); // Execute immediately + + const promise1 = function1(); // Queue + const promise2 = function2(); // Queue + + controller.abort('weakref-test'); + + const [result1, result2] = await Promise.allSettled([promise1, promise2]); + t.is(result1.status, 'rejected'); + t.is(result1.reason, 'weakref-test'); + t.is(result2.status, 'rejected'); + t.is(result2.reason, 'weakref-test'); +});