Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mollifier-global-gate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/redis-worker": patch
---

Add `MollifierBuffer.evaluateTripGlobal` — a fleet-wide variant of `evaluateTrip` that increments a single shared fixed-window counter regardless of env, so the mollifier can rate-limit the aggregate trigger rate rather than per-env. Reuses the existing trip Lua; keys are hash-tagged for Redis Cluster safety.
12 changes: 12 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,18 @@ const EnvironmentSchema = z
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
// Gate mode. "per_env" (default) rate-limits each env independently via the
// TRIGGER_MOLLIFIER_TRIP_* values above. "global" rate-limits the aggregate
// fleet-wide trigger.run rate via a single shared counter (ignoring per-env
// contributions) using the TRIGGER_MOLLIFIER_GLOBAL_* values below — it
// protects the primary DB from the aggregate rate that per-env tripping
// cannot bound. The two parameter sets are kept separate so switching modes
// never silently reuses the other regime's tuning. Global threshold default
// (1000 per 200ms ≈ 5k/s) targets the observed aggregate metastable edge.
TRIGGER_MOLLIFIER_GATE_MODE: z.enum(["per_env", "global"]).default("per_env"),
TRIGGER_MOLLIFIER_GLOBAL_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
TRIGGER_MOLLIFIER_GLOBAL_TRIP_THRESHOLD: z.coerce.number().int().positive().default(1000),
TRIGGER_MOLLIFIER_GLOBAL_HOLD_MS: z.coerce.number().int().positive().default(500),
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
Expand Down
23 changes: 17 additions & 6 deletions apps/webapp/app/v3/mollifier/mollifierGate.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export type TripDecision =
| { divert: false }
| {
divert: true;
reason: "per_env_rate";
reason: "per_env_rate" | "global_rate";
count: number;
threshold: number;
windowMs: number;
Expand Down Expand Up @@ -88,11 +88,22 @@ export type GateDependencies = {
// gate observing whichever env values are live at trigger time.
const defaultEvaluator = createRealTripEvaluator({
getBuffer: () => getMollifierBuffer(),
options: () => ({
windowMs: env.TRIGGER_MOLLIFIER_TRIP_WINDOW_MS,
threshold: env.TRIGGER_MOLLIFIER_TRIP_THRESHOLD,
holdMs: env.TRIGGER_MOLLIFIER_HOLD_MS,
}),
// Pick the per-env or global rate parameters based on the configured gate
// mode. Kept as separate env vars so the two regimes never share tuning.
options: () =>
env.TRIGGER_MOLLIFIER_GATE_MODE === "global"
? {
mode: "global",
windowMs: env.TRIGGER_MOLLIFIER_GLOBAL_TRIP_WINDOW_MS,
threshold: env.TRIGGER_MOLLIFIER_GLOBAL_TRIP_THRESHOLD,
holdMs: env.TRIGGER_MOLLIFIER_GLOBAL_HOLD_MS,
}
: {
mode: "per_env",
windowMs: env.TRIGGER_MOLLIFIER_TRIP_WINDOW_MS,
threshold: env.TRIGGER_MOLLIFIER_TRIP_THRESHOLD,
holdMs: env.TRIGGER_MOLLIFIER_HOLD_MS,
},
});

function logDivertDecision(
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export const mollifierDecisionsCounter = meter.createCounter("mollifier.decision
});

export type DecisionOutcome = "pass_through" | "shadow_log" | "mollify";
export type DecisionReason = "per_env_rate";
export type DecisionReason = "per_env_rate" | "global_rate";

export function recordDecision(outcome: DecisionOutcome, reason?: DecisionReason): void {
mollifierDecisionsCounter.add(1, {
Expand Down
12 changes: 10 additions & 2 deletions apps/webapp/app/v3/mollifier/mollifierTripEvaluator.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import { logger } from "~/services/logger.server";
import type { GateInputs, TripDecision, TripEvaluator } from "./mollifierGate.server";

export type TripEvaluatorOptions = {
// "per_env" (default) rate-limits each env independently. "global" rate-limits
// the aggregate fleet-wide trigger.run rate via a single shared counter and
// ignores per-env contributions — it protects shared infra (the primary DB)
// from the aggregate rate that per-env tripping structurally cannot bound.
mode?: "per_env" | "global";
windowMs: number;
threshold: number;
holdMs: number;
Expand All @@ -21,12 +26,15 @@ export function createRealTripEvaluator(deps: CreateRealTripEvaluatorDeps): Trip
const opts = deps.options();

try {
const { tripped, count } = await buffer.evaluateTrip(inputs.envId, opts);
const { tripped, count } =
opts.mode === "global"
? await buffer.evaluateTripGlobal(opts)
: await buffer.evaluateTrip(inputs.envId, opts);
if (!tripped) return { divert: false };

return {
divert: true,
reason: "per_env_rate",
reason: opts.mode === "global" ? "global_rate" : "per_env_rate",
count,
threshold: opts.threshold,
windowMs: opts.windowMs,
Expand Down
45 changes: 45 additions & 0 deletions apps/webapp/test/mollifierGate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,51 @@ describe("evaluateGate cascade — exhaustive truth table", () => {
});
});

// Global-mode trips surface `reason: "global_rate"` (vs "per_env_rate"). The
// gate is reason-agnostic — it forwards `decision.reason` straight to
// `recordDecision` — but `reason` is the metric label we alert/dashboard on, so
// pin the contract end-to-end for both the shadow-log and mollify outcomes.
const globalTrippedDecision = {
divert: true as const,
reason: "global_rate" as const,
count: 1200,
threshold: 1000,
windowMs: 200,
holdMs: 500,
};

describe("evaluateGate — global_rate reason propagates to the metric", () => {
it("shadow path records shadow_log with reason global_rate", async () => {
const { deps, spies } = makeDeps({
enabled: true,
shadow: true,
flag: false,
decision: globalTrippedDecision,
});

const outcome = await evaluateGate(inputs, deps);

expect(outcome.action).toBe("shadow_log");
expect(spies.recordDecisionCalls).toEqual([{ outcome: "shadow_log", reason: "global_rate" }]);
expect(spies.logShadowCalls).toEqual([{ inputs, decision: globalTrippedDecision }]);
});

it("mollify path records mollify with reason global_rate", async () => {
const { deps, spies } = makeDeps({
enabled: true,
shadow: false,
flag: true,
decision: globalTrippedDecision,
});

const outcome = await evaluateGate(inputs, deps);

expect(outcome.action).toBe("mollify");
expect(spies.recordDecisionCalls).toEqual([{ outcome: "mollify", reason: "global_rate" }]);
expect(spies.logMollifiedCalls).toEqual([{ inputs, decision: globalTrippedDecision }]);
});
});

// Hot-path guard: `triggerTask.server.ts` calls `evaluateGate` on every
// trigger when `TRIGGER_MOLLIFIER_ENABLED=1`. The per-org override path must resolve
// without a Prisma round-trip — otherwise the gate adds a DB query to the
Expand Down
88 changes: 88 additions & 0 deletions apps/webapp/test/mollifierTripEvaluator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,94 @@ describe("createRealTripEvaluator", () => {
},
);

redisTest(
"global mode trips on aggregate load across distinct envs with reason global_rate",
async ({ redisOptions }) => {
const buffer = new MollifierBuffer({ redisOptions });
try {
// threshold=3 → the 4th trigger trips. Crucially every trigger is a
// DIFFERENT env, so per-env tripping would never fire (each env count=1).
const options = { mode: "global", windowMs: 5000, threshold: 3, holdMs: 5000 } as const;
const evaluator = createRealTripEvaluator({
getBuffer: () => buffer,
options: () => options,
});

await evaluator({ ...inputs, envId: "g1" });
await evaluator({ ...inputs, envId: "g2" });
await evaluator({ ...inputs, envId: "g3" });
const decision = await evaluator({ ...inputs, envId: "g4" });

expect(decision.divert).toBe(true);
if (decision.divert) {
expect(decision.reason).toBe("global_rate");
expect(decision.count).toBeGreaterThan(options.threshold);
}
} finally {
await buffer.close();
}
},
);

redisTest(
"per_env mode does NOT trip on the same load spread across distinct envs",
async ({ redisOptions }) => {
const buffer = new MollifierBuffer({ redisOptions });
try {
const options = { mode: "per_env", windowMs: 5000, threshold: 3, holdMs: 5000 } as const;
const evaluator = createRealTripEvaluator({
getBuffer: () => buffer,
options: () => options,
});

// Four triggers, four distinct envs — every per-env counter stays at 1.
for (const envId of ["p1", "p2", "p3", "p4"]) {
const decision = await evaluator({ ...inputs, envId });
expect(decision.divert).toBe(false);
}
} finally {
await buffer.close();
}
},
);

redisTest(
"switching to global mid-flight starts the global counter cold (per-env load does not preload it)",
async ({ redisOptions }) => {
const buffer = new MollifierBuffer({ redisOptions });
try {
let mode: "per_env" | "global" = "per_env";
const evaluator = createRealTripEvaluator({
getBuffer: () => buffer,
options: () => ({ mode, windowMs: 5000, threshold: 2, holdMs: 5000 }),
});

// Per-env load on env "s1": the 3rd call trips its per-env counter.
await evaluator({ ...inputs, envId: "s1" });
await evaluator({ ...inputs, envId: "s1" });
const perEnvTrip = await evaluator({ ...inputs, envId: "s1" });
expect(perEnvTrip.divert).toBe(true);

// Flip to global. If per-env activity had leaked into the global
// counter it would already be over threshold; instead the global
// counter starts at 0, so the first two ticks don't trip and the third
// does — proving cold start + isolation from the per-env counters.
mode = "global";
expect((await evaluator({ ...inputs, envId: "s2" })).divert).toBe(false);
expect((await evaluator({ ...inputs, envId: "s3" })).divert).toBe(false);
const globalTrip = await evaluator({ ...inputs, envId: "s4" });

expect(globalTrip.divert).toBe(true);
if (globalTrip.divert) {
expect(globalTrip.reason).toBe("global_rate");
expect(globalTrip.count).toBe(3);
}
} finally {
await buffer.close();
}
},
);

redisTest("returns divert=false when getBuffer returns null (fail-open)", async () => {
const evaluator = createRealTripEvaluator({
getBuffer: () => null,
Expand Down
Loading
Loading