diff --git a/.changeset/mollifier-global-gate.md b/.changeset/mollifier-global-gate.md new file mode 100644 index 0000000000..b9eb0ee973 --- /dev/null +++ b/.changeset/mollifier-global-gate.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/redis-worker": patch +--- + +Add `MollifierBuffer.evaluateTripGlobal` — a fleet-wide variant of `evaluateTrip` that increments a single shared fixed-window counter regardless of env, so the mollifier can rate-limit the aggregate trigger rate rather than per-env. Reuses the existing trip Lua; keys are hash-tagged for Redis Cluster safety. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index abda2fd963..42096bac5d 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1097,6 +1097,18 @@ const EnvironmentSchema = z TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200), TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100), TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500), + // Gate mode. "per_env" (default) rate-limits each env independently via the + // TRIGGER_MOLLIFIER_TRIP_* values above. "global" rate-limits the aggregate + // fleet-wide trigger.run rate via a single shared counter (ignoring per-env + // contributions) using the TRIGGER_MOLLIFIER_GLOBAL_* values below — it + // protects the primary DB from the aggregate rate that per-env tripping + // cannot bound. The two parameter sets are kept separate so switching modes + // never silently reuses the other regime's tuning. Global threshold default + // (1000 per 200ms ≈ 5k/s) targets the observed aggregate metastable edge. + TRIGGER_MOLLIFIER_GATE_MODE: z.enum(["per_env", "global"]).default("per_env"), + TRIGGER_MOLLIFIER_GLOBAL_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200), + TRIGGER_MOLLIFIER_GLOBAL_TRIP_THRESHOLD: z.coerce.number().int().positive().default(1000), + TRIGGER_MOLLIFIER_GLOBAL_HOLD_MS: z.coerce.number().int().positive().default(500), TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50), TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3), TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000), diff --git a/apps/webapp/app/v3/mollifier/mollifierGate.server.ts b/apps/webapp/app/v3/mollifier/mollifierGate.server.ts index 63146b4c32..a676d3a37a 100644 --- a/apps/webapp/app/v3/mollifier/mollifierGate.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierGate.server.ts @@ -23,7 +23,7 @@ export type TripDecision = | { divert: false } | { divert: true; - reason: "per_env_rate"; + reason: "per_env_rate" | "global_rate"; count: number; threshold: number; windowMs: number; @@ -88,11 +88,22 @@ export type GateDependencies = { // gate observing whichever env values are live at trigger time. const defaultEvaluator = createRealTripEvaluator({ getBuffer: () => getMollifierBuffer(), - options: () => ({ - windowMs: env.TRIGGER_MOLLIFIER_TRIP_WINDOW_MS, - threshold: env.TRIGGER_MOLLIFIER_TRIP_THRESHOLD, - holdMs: env.TRIGGER_MOLLIFIER_HOLD_MS, - }), + // Pick the per-env or global rate parameters based on the configured gate + // mode. Kept as separate env vars so the two regimes never share tuning. + options: () => + env.TRIGGER_MOLLIFIER_GATE_MODE === "global" + ? { + mode: "global", + windowMs: env.TRIGGER_MOLLIFIER_GLOBAL_TRIP_WINDOW_MS, + threshold: env.TRIGGER_MOLLIFIER_GLOBAL_TRIP_THRESHOLD, + holdMs: env.TRIGGER_MOLLIFIER_GLOBAL_HOLD_MS, + } + : { + mode: "per_env", + windowMs: env.TRIGGER_MOLLIFIER_TRIP_WINDOW_MS, + threshold: env.TRIGGER_MOLLIFIER_TRIP_THRESHOLD, + holdMs: env.TRIGGER_MOLLIFIER_HOLD_MS, + }, }); function logDivertDecision( diff --git a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts index deaa32bb74..de0e0a5f60 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts @@ -7,7 +7,7 @@ export const mollifierDecisionsCounter = meter.createCounter("mollifier.decision }); export type DecisionOutcome = "pass_through" | "shadow_log" | "mollify"; -export type DecisionReason = "per_env_rate"; +export type DecisionReason = "per_env_rate" | "global_rate"; export function recordDecision(outcome: DecisionOutcome, reason?: DecisionReason): void { mollifierDecisionsCounter.add(1, { diff --git a/apps/webapp/app/v3/mollifier/mollifierTripEvaluator.server.ts b/apps/webapp/app/v3/mollifier/mollifierTripEvaluator.server.ts index 9032467d20..055fb4a6cd 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTripEvaluator.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTripEvaluator.server.ts @@ -3,6 +3,11 @@ import { logger } from "~/services/logger.server"; import type { GateInputs, TripDecision, TripEvaluator } from "./mollifierGate.server"; export type TripEvaluatorOptions = { + // "per_env" (default) rate-limits each env independently. "global" rate-limits + // the aggregate fleet-wide trigger.run rate via a single shared counter and + // ignores per-env contributions — it protects shared infra (the primary DB) + // from the aggregate rate that per-env tripping structurally cannot bound. + mode?: "per_env" | "global"; windowMs: number; threshold: number; holdMs: number; @@ -21,12 +26,15 @@ export function createRealTripEvaluator(deps: CreateRealTripEvaluatorDeps): Trip const opts = deps.options(); try { - const { tripped, count } = await buffer.evaluateTrip(inputs.envId, opts); + const { tripped, count } = + opts.mode === "global" + ? await buffer.evaluateTripGlobal(opts) + : await buffer.evaluateTrip(inputs.envId, opts); if (!tripped) return { divert: false }; return { divert: true, - reason: "per_env_rate", + reason: opts.mode === "global" ? "global_rate" : "per_env_rate", count, threshold: opts.threshold, windowMs: opts.windowMs, diff --git a/apps/webapp/test/mollifierGate.test.ts b/apps/webapp/test/mollifierGate.test.ts index e40a29b248..4589b119e9 100644 --- a/apps/webapp/test/mollifierGate.test.ts +++ b/apps/webapp/test/mollifierGate.test.ts @@ -182,6 +182,51 @@ describe("evaluateGate cascade — exhaustive truth table", () => { }); }); +// Global-mode trips surface `reason: "global_rate"` (vs "per_env_rate"). The +// gate is reason-agnostic — it forwards `decision.reason` straight to +// `recordDecision` — but `reason` is the metric label we alert/dashboard on, so +// pin the contract end-to-end for both the shadow-log and mollify outcomes. +const globalTrippedDecision = { + divert: true as const, + reason: "global_rate" as const, + count: 1200, + threshold: 1000, + windowMs: 200, + holdMs: 500, +}; + +describe("evaluateGate — global_rate reason propagates to the metric", () => { + it("shadow path records shadow_log with reason global_rate", async () => { + const { deps, spies } = makeDeps({ + enabled: true, + shadow: true, + flag: false, + decision: globalTrippedDecision, + }); + + const outcome = await evaluateGate(inputs, deps); + + expect(outcome.action).toBe("shadow_log"); + expect(spies.recordDecisionCalls).toEqual([{ outcome: "shadow_log", reason: "global_rate" }]); + expect(spies.logShadowCalls).toEqual([{ inputs, decision: globalTrippedDecision }]); + }); + + it("mollify path records mollify with reason global_rate", async () => { + const { deps, spies } = makeDeps({ + enabled: true, + shadow: false, + flag: true, + decision: globalTrippedDecision, + }); + + const outcome = await evaluateGate(inputs, deps); + + expect(outcome.action).toBe("mollify"); + expect(spies.recordDecisionCalls).toEqual([{ outcome: "mollify", reason: "global_rate" }]); + expect(spies.logMollifiedCalls).toEqual([{ inputs, decision: globalTrippedDecision }]); + }); +}); + // Hot-path guard: `triggerTask.server.ts` calls `evaluateGate` on every // trigger when `TRIGGER_MOLLIFIER_ENABLED=1`. The per-org override path must resolve // without a Prisma round-trip — otherwise the gate adds a DB query to the diff --git a/apps/webapp/test/mollifierTripEvaluator.test.ts b/apps/webapp/test/mollifierTripEvaluator.test.ts index 14ac0cc55b..da5df00feb 100644 --- a/apps/webapp/test/mollifierTripEvaluator.test.ts +++ b/apps/webapp/test/mollifierTripEvaluator.test.ts @@ -60,6 +60,94 @@ describe("createRealTripEvaluator", () => { }, ); + redisTest( + "global mode trips on aggregate load across distinct envs with reason global_rate", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + // threshold=3 → the 4th trigger trips. Crucially every trigger is a + // DIFFERENT env, so per-env tripping would never fire (each env count=1). + const options = { mode: "global", windowMs: 5000, threshold: 3, holdMs: 5000 } as const; + const evaluator = createRealTripEvaluator({ + getBuffer: () => buffer, + options: () => options, + }); + + await evaluator({ ...inputs, envId: "g1" }); + await evaluator({ ...inputs, envId: "g2" }); + await evaluator({ ...inputs, envId: "g3" }); + const decision = await evaluator({ ...inputs, envId: "g4" }); + + expect(decision.divert).toBe(true); + if (decision.divert) { + expect(decision.reason).toBe("global_rate"); + expect(decision.count).toBeGreaterThan(options.threshold); + } + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "per_env mode does NOT trip on the same load spread across distinct envs", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + const options = { mode: "per_env", windowMs: 5000, threshold: 3, holdMs: 5000 } as const; + const evaluator = createRealTripEvaluator({ + getBuffer: () => buffer, + options: () => options, + }); + + // Four triggers, four distinct envs — every per-env counter stays at 1. + for (const envId of ["p1", "p2", "p3", "p4"]) { + const decision = await evaluator({ ...inputs, envId }); + expect(decision.divert).toBe(false); + } + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "switching to global mid-flight starts the global counter cold (per-env load does not preload it)", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + let mode: "per_env" | "global" = "per_env"; + const evaluator = createRealTripEvaluator({ + getBuffer: () => buffer, + options: () => ({ mode, windowMs: 5000, threshold: 2, holdMs: 5000 }), + }); + + // Per-env load on env "s1": the 3rd call trips its per-env counter. + await evaluator({ ...inputs, envId: "s1" }); + await evaluator({ ...inputs, envId: "s1" }); + const perEnvTrip = await evaluator({ ...inputs, envId: "s1" }); + expect(perEnvTrip.divert).toBe(true); + + // Flip to global. If per-env activity had leaked into the global + // counter it would already be over threshold; instead the global + // counter starts at 0, so the first two ticks don't trip and the third + // does — proving cold start + isolation from the per-env counters. + mode = "global"; + expect((await evaluator({ ...inputs, envId: "s2" })).divert).toBe(false); + expect((await evaluator({ ...inputs, envId: "s3" })).divert).toBe(false); + const globalTrip = await evaluator({ ...inputs, envId: "s4" }); + + expect(globalTrip.divert).toBe(true); + if (globalTrip.divert) { + expect(globalTrip.reason).toBe("global_rate"); + expect(globalTrip.count).toBe(3); + } + } finally { + await buffer.close(); + } + }, + ); + redisTest("returns divert=false when getBuffer returns null (fail-open)", async () => { const evaluator = createRealTripEvaluator({ getBuffer: () => null, diff --git a/packages/redis-worker/src/mollifier/buffer.test.ts b/packages/redis-worker/src/mollifier/buffer.test.ts index 3a775bbb8f..9625e2f2fe 100644 --- a/packages/redis-worker/src/mollifier/buffer.test.ts +++ b/packages/redis-worker/src/mollifier/buffer.test.ts @@ -126,6 +126,175 @@ describe("MollifierBuffer construction", () => { }); }); +describe("MollifierBuffer.evaluateTripGlobal", () => { + redisTest( + "trips once the global counter exceeds the threshold, ignoring env", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + // Long window so the fixed-window counter doesn't expire mid-test. + const opts = { windowMs: 60_000, threshold: 3, holdMs: 5_000 }; + + // Calls 1..threshold stay under the line (count > threshold is false). + for (let i = 1; i <= 3; i++) { + const res = await buffer.evaluateTripGlobal(opts); + expect(res.count).toBe(i); + expect(res.tripped).toBe(false); + } + + // The (threshold + 1)th tick crosses it and trips. + const tripping = await buffer.evaluateTripGlobal(opts); + expect(tripping.count).toBe(4); + expect(tripping.tripped).toBe(true); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "catches aggregate load spread across envs that per-env tripping misses", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + const opts = { windowMs: 60_000, threshold: 5, holdMs: 5_000 }; + const envs = ["env_a", "env_b", "env_c", "env_d", "env_e", "env_f"]; + + // Per-env: each env triggers exactly once -> each per-env counter is 1, + // well under the threshold, so the per-env gate NEVER trips. + for (const envId of envs) { + const perEnv = await buffer.evaluateTrip(envId, opts); + expect(perEnv.tripped).toBe(false); + } + + // Global: the same six triggers accumulate into one counter and trip, + // because the global gate does not consider per-env contributions. + let last = { tripped: false, count: 0 }; + for (const _envId of envs) { + last = await buffer.evaluateTripGlobal(opts); + } + expect(last.count).toBe(envs.length); + expect(last.tripped).toBe(true); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "global window expires and the shared counter resets when no traffic", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + // The fixed-window TTL is only (re)armed when count hits 1. Verify the + // GLOBAL key still resets after a full window of silence — i.e. the + // gate is not permanently latched once it has seen traffic. + const fastWindow = { windowMs: 100, threshold: 100, holdMs: 100 }; + await buffer.evaluateTripGlobal(fastWindow); + await buffer.evaluateTripGlobal(fastWindow); + + await new Promise((r) => setTimeout(r, 150)); + const fresh = await buffer.evaluateTripGlobal(fastWindow); + expect(fresh.count).toBe(1); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "global tripped marker outlives the rate counter window (holdMs > windowMs)", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + // This is the production-default shape (holdMs 500 > windowMs 200): the + // tripped marker must keep the gate diverted across rate-window resets. + const opts = { windowMs: 50, threshold: 2, holdMs: 1000 }; + await buffer.evaluateTripGlobal(opts); + await buffer.evaluateTripGlobal(opts); + const tripped = await buffer.evaluateTripGlobal(opts); + expect(tripped.tripped).toBe(true); + + // Past windowMs (rate counter expires) but well inside holdMs. + await new Promise((r) => setTimeout(r, 120)); + + const after = await buffer.evaluateTripGlobal(opts); + expect(after.tripped).toBe(true); + expect(after.count).toBeLessThanOrEqual(2); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "global INCR is atomic under 100 concurrent calls (no lost increments)", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + // The global key is the one counter every webapp replica hammers, so + // atomicity matters more here than per-env. Wide window + huge threshold + // isolate the count assertion from window/trip semantics. + const opts = { windowMs: 5000, threshold: 1_000_000, holdMs: 100 }; + const results = await Promise.all( + Array.from({ length: 100 }, () => buffer.evaluateTripGlobal(opts)), + ); + + const counts = results.map((r) => r.count).sort((a, b) => a - b); + expect(counts).toEqual(Array.from({ length: 100 }, (_, i) => i + 1)); + expect(results.every((r) => !r.tripped)).toBe(true); + } finally { + await buffer.close(); + } + }, + ); +}); + describe("MollifierBuffer.accept", () => { redisTest("accept writes entry, enqueues, and tracks env", { timeout: 20_000 }, async ({ redisContainer }) => { const buffer = new MollifierBuffer({ diff --git a/packages/redis-worker/src/mollifier/buffer.ts b/packages/redis-worker/src/mollifier/buffer.ts index 66afe186a8..ef42fa6e07 100644 --- a/packages/redis-worker/src/mollifier/buffer.ts +++ b/packages/redis-worker/src/mollifier/buffer.ts @@ -610,6 +610,32 @@ export class MollifierBuffer { return { count: result[0], tripped: result[1] === 1 }; } + // Fleet-wide variant of `evaluateTrip`. Every trigger increments the SAME + // fixed-window counter regardless of which env it belongs to, so the gate + // bounds the *aggregate* trigger.run rate — the thing that actually saturates + // shared infra (the primary DB), and which per-env tripping structurally + // cannot bound (N envs each just under their per-env threshold sum to N× the + // intended ceiling). Used when the gate runs in "global" mode. Reuses the + // same Lua as the per-env path; only the keys differ. The `{global}` hash tag + // keeps both keys in one slot so the two-key script is Cluster-safe. + async evaluateTripGlobal(options: { + windowMs: number; + threshold: number; + holdMs: number; + }): Promise<{ tripped: boolean; count: number }> { + const rateKey = "mollifier:rate:{global}"; + const trippedKey = "mollifier:tripped:{global}"; + const result = (await this.redis.mollifierEvaluateTrip( + rateKey, + trippedKey, + String(options.windowMs), + String(options.threshold), + String(options.holdMs), + )) as [number, number]; + + return { count: result[0], tripped: result[1] === 1 }; + } + async close(): Promise { await this.redis.quit(); }