diff --git a/.changeset/mollifier-redis-worker-primitives.md b/.changeset/mollifier-redis-worker-primitives.md new file mode 100644 index 00000000000..0bccff83e5c --- /dev/null +++ b/.changeset/mollifier-redis-worker-primitives.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/redis-worker": patch +--- + +Add MollifierBuffer and MollifierDrainer primitives for burst smoothing (scaffolding only — not active without webapp wiring). diff --git a/.server-changes/mollifier-phase-1-scaffolding.md b/.server-changes/mollifier-phase-1-scaffolding.md new file mode 100644 index 00000000000..1f5b67a3d40 --- /dev/null +++ b/.server-changes/mollifier-phase-1-scaffolding.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Add scaffolding for the trigger mollifier (phase 1). New env vars (all default off), `evaluateGate` (the mollifier gate) wired into the trigger hot path as a no-op, lazy singletons for the dedicated mollifier Redis client and drainer. No behavioural change while `MOLLIFIER_ENABLED=0`. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 97cccbc1710..38cc1d84343 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1030,6 +1030,34 @@ const EnvironmentSchema = z COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + MOLLIFIER_ENABLED: z.string().default("0"), + MOLLIFIER_SHADOW_MODE: z.string().default("0"), + MOLLIFIER_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + MOLLIFIER_REDIS_PORT: z.coerce + .number() + .optional() + .transform( + (v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined), + ), + MOLLIFIER_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + MOLLIFIER_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200), + MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100), + MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500), + MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50), + MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600), + MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3), + BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce .number() .int() diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 445e0eb155a..d53751e666b 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -40,6 +40,7 @@ import type { TriggerTaskRequest, TriggerTaskValidator, } from "../types"; +import { evaluateGate } from "~/v3/mollifier/mollifierGate.server"; import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server"; class NoopTriggerRacepointSystem implements TriggerRacepointSystem { @@ -315,6 +316,16 @@ export class RunEngineTriggerTaskService { rootScheduleId: parentAnnotations?.rootScheduleId || options.scheduleId || undefined, }; + const mollifierOutcome = await evaluateGate({ + envId: environment.id, + orgId: environment.organizationId, + }); + if (mollifierOutcome.action === "mollify") { + throw new Error( + "MollifierGate.mollify reached in phase 1 — should be unreachable until phase 3 wiring lands", + ); + } + try { return await this.traceEventConcern.traceRun( triggerRequest, diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index 902d752ed0a..73524d76897 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -26,6 +26,7 @@ import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server"; import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server"; import { RetryAttemptService } from "~/v3/services/retryAttempt.server"; import { TimeoutDeploymentService } from "~/v3/services/timeoutDeployment.server"; +import { getMollifierDrainer } from "~/v3/mollifier/mollifierDrainer.server"; import { GraphileMigrationHelperService } from "./db/graphileMigrationHelper.server"; import { sendEmail } from "./email.server"; import { logger } from "./logger.server"; @@ -128,6 +129,8 @@ export async function init() { if (env.WORKER_ENABLED === "true") { await workerQueue.initialize(); } + + getMollifierDrainer(); } function getWorkerQueue() { diff --git a/apps/webapp/app/v3/featureFlags.ts b/apps/webapp/app/v3/featureFlags.ts index b40a83c3a35..67033a74f8f 100644 --- a/apps/webapp/app/v3/featureFlags.ts +++ b/apps/webapp/app/v3/featureFlags.ts @@ -8,6 +8,7 @@ export const FEATURE_FLAG = { hasAiAccess: "hasAiAccess", hasComputeAccess: "hasComputeAccess", hasPrivateConnections: "hasPrivateConnections", + mollifierEnabled: "mollifierEnabled", } as const; export const FeatureFlagCatalog = { @@ -18,6 +19,7 @@ export const FeatureFlagCatalog = { [FEATURE_FLAG.hasAiAccess]: z.coerce.boolean(), [FEATURE_FLAG.hasComputeAccess]: z.coerce.boolean(), [FEATURE_FLAG.hasPrivateConnections]: z.coerce.boolean(), + [FEATURE_FLAG.mollifierEnabled]: z.coerce.boolean(), }; export type FeatureFlagKey = keyof typeof FeatureFlagCatalog; diff --git a/apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts b/apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts new file mode 100644 index 00000000000..426458f779c --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts @@ -0,0 +1,28 @@ +import { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; + +function initializeMollifierBuffer(): MollifierBuffer { + logger.debug("Initializing mollifier buffer", { + host: env.MOLLIFIER_REDIS_HOST, + }); + + return new MollifierBuffer({ + redisOptions: { + keyPrefix: "", + host: env.MOLLIFIER_REDIS_HOST, + port: env.MOLLIFIER_REDIS_PORT, + username: env.MOLLIFIER_REDIS_USERNAME, + password: env.MOLLIFIER_REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }, + entryTtlSeconds: env.MOLLIFIER_ENTRY_TTL_S, + }); +} + +export function getMollifierBuffer(): MollifierBuffer | null { + if (env.MOLLIFIER_ENABLED !== "1") return null; + return singleton("mollifierBuffer", initializeMollifierBuffer); +} diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts new file mode 100644 index 00000000000..7342af5ed28 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts @@ -0,0 +1,36 @@ +import { MollifierDrainer } from "@trigger.dev/redis-worker"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; + +function initializeMollifierDrainer(): MollifierDrainer { + const buffer = getMollifierBuffer(); + if (!buffer) { + // Should be unreachable: getMollifierDrainer() guards on the same env flag as getMollifierBuffer(). + throw new Error("MollifierDrainer initialised without a buffer — env vars inconsistent"); + } + + logger.debug("Initializing mollifier drainer", { + concurrency: env.MOLLIFIER_DRAIN_CONCURRENCY, + maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS, + }); + + const drainer = new MollifierDrainer({ + buffer, + handler: async () => { + throw new Error("MollifierDrainer phase 1: no handler wired"); + }, + concurrency: env.MOLLIFIER_DRAIN_CONCURRENCY, + maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS, + isRetryable: () => false, + }); + + drainer.start(); + return drainer; +} + +export function getMollifierDrainer(): MollifierDrainer | null { + if (env.MOLLIFIER_ENABLED !== "1") return null; + return singleton("mollifierDrainer", initializeMollifierDrainer); +} diff --git a/apps/webapp/app/v3/mollifier/mollifierGate.server.ts b/apps/webapp/app/v3/mollifier/mollifierGate.server.ts new file mode 100644 index 00000000000..56a254e051c --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierGate.server.ts @@ -0,0 +1,74 @@ +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { flag } from "~/v3/featureFlags.server"; +import { FEATURE_FLAG } from "~/v3/featureFlags"; + +export type TripDecision = + | { divert: false } + | { divert: true; reason: "per_env_rate" }; + +export type GateOutcome = + | { action: "pass_through" } + | { action: "mollify"; decision: Extract } + | { action: "shadow_log"; decision: Extract }; + +export type GateInputs = { + envId: string; + orgId: string; +}; + +export type TripEvaluator = (inputs: GateInputs) => Promise; + +export type GateDependencies = { + isMollifierEnabled: () => boolean; + isShadowModeOn: () => boolean; + resolveOrgFlag: () => Promise; + evaluator: TripEvaluator; + logShadow: (inputs: GateInputs, reason: "per_env_rate") => void; +}; + +const stubTripEvaluator: TripEvaluator = async () => ({ divert: false }); + +export const defaultGateDependencies: GateDependencies = { + isMollifierEnabled: () => env.MOLLIFIER_ENABLED === "1", + isShadowModeOn: () => env.MOLLIFIER_SHADOW_MODE === "1", + resolveOrgFlag: () => + flag({ key: FEATURE_FLAG.mollifierEnabled, defaultValue: false }), + evaluator: stubTripEvaluator, + logShadow: (inputs, reason) => + logger.info("mollifier shadow decision", { + envId: inputs.envId, + orgId: inputs.orgId, + reason, + }), +}; + +export async function evaluateGate( + inputs: GateInputs, + deps: Partial = {}, +): Promise { + const d = { ...defaultGateDependencies, ...deps }; + + if (!d.isMollifierEnabled()) { + return { action: "pass_through" }; + } + + const orgFlagEnabled = await d.resolveOrgFlag(); + const shadowOn = d.isShadowModeOn(); + + if (!orgFlagEnabled && !shadowOn) { + return { action: "pass_through" }; + } + + const decision = await d.evaluator(inputs); + if (!decision.divert) { + return { action: "pass_through" }; + } + + if (orgFlagEnabled) { + return { action: "mollify", decision }; + } + + d.logShadow(inputs, decision.reason); + return { action: "shadow_log", decision }; +} diff --git a/apps/webapp/app/v3/mollifier/readFallback.server.ts b/apps/webapp/app/v3/mollifier/readFallback.server.ts new file mode 100644 index 00000000000..34a8b48f970 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/readFallback.server.ts @@ -0,0 +1,16 @@ +import { logger } from "~/services/logger.server"; + +export type ReadFallbackInput = { + runId: string; + environmentId: string; + organizationId: string; +}; + +export async function findRunByIdWithMollifierFallback( + input: ReadFallbackInput, +): Promise { + logger.debug("mollifier read-fallback called (phase 1 stub)", { + runId: input.runId, + }); + return null; +} diff --git a/apps/webapp/test/mollifierGate.test.ts b/apps/webapp/test/mollifierGate.test.ts new file mode 100644 index 00000000000..f69ed399f65 --- /dev/null +++ b/apps/webapp/test/mollifierGate.test.ts @@ -0,0 +1,122 @@ +import { describe, expect, it, vi } from "vitest"; +import { + evaluateGate, + type GateDependencies, + type TripDecision, +} from "~/v3/mollifier/mollifierGate.server"; + +type Spies = { + [K in keyof GateDependencies]: ReturnType; +}; + +function makeDeps(overrides: Partial = {}): { + deps: GateDependencies; + spies: Spies; +} { + const defaults: GateDependencies = { + isMollifierEnabled: () => false, + isShadowModeOn: () => false, + resolveOrgFlag: async () => false, + evaluator: async () => ({ divert: false }) as TripDecision, + logShadow: () => {}, + }; + const merged = { ...defaults, ...overrides }; + const spies = { + isMollifierEnabled: vi.fn(merged.isMollifierEnabled), + isShadowModeOn: vi.fn(merged.isShadowModeOn), + resolveOrgFlag: vi.fn(merged.resolveOrgFlag), + evaluator: vi.fn(merged.evaluator), + logShadow: vi.fn(merged.logShadow), + } satisfies Spies; + return { deps: spies, spies }; +} + +describe("evaluateGate", () => { + it("kill switch off: pass_through, evaluator NOT called, flag NOT consulted", async () => { + const { deps, spies } = makeDeps({ isMollifierEnabled: () => false }); + const outcome = await evaluateGate({ envId: "e1", orgId: "o1" }, deps); + + expect(outcome).toEqual({ action: "pass_through" }); + expect(spies.evaluator).not.toHaveBeenCalled(); + expect(spies.resolveOrgFlag).not.toHaveBeenCalled(); + }); + + it("kill switch on, org flag off, shadow off: pass_through, evaluator NOT called", async () => { + const { deps, spies } = makeDeps({ + isMollifierEnabled: () => true, + resolveOrgFlag: async () => false, + isShadowModeOn: () => false, + }); + const outcome = await evaluateGate({ envId: "e1", orgId: "o1" }, deps); + + expect(outcome).toEqual({ action: "pass_through" }); + expect(spies.evaluator).not.toHaveBeenCalled(); + }); + + it("kill switch on, org flag off, shadow on, divert false: evaluator called, pass_through", async () => { + const { deps, spies } = makeDeps({ + isMollifierEnabled: () => true, + resolveOrgFlag: async () => false, + isShadowModeOn: () => true, + evaluator: async () => ({ divert: false }), + }); + const outcome = await evaluateGate({ envId: "e1", orgId: "o1" }, deps); + + expect(outcome).toEqual({ action: "pass_through" }); + expect(spies.evaluator).toHaveBeenCalledOnce(); + }); + + it("kill switch on, org flag off, shadow on, divert true: shadow_log (no mollify), logShadow called", async () => { + const { deps, spies } = makeDeps({ + isMollifierEnabled: () => true, + resolveOrgFlag: async () => false, + isShadowModeOn: () => true, + evaluator: async () => ({ divert: true, reason: "per_env_rate" }), + }); + const outcome = await evaluateGate({ envId: "e1", orgId: "o1" }, deps); + + expect(outcome.action).toBe("shadow_log"); + expect(spies.logShadow).toHaveBeenCalledOnce(); + expect(spies.logShadow).toHaveBeenCalledWith( + { envId: "e1", orgId: "o1" }, + "per_env_rate", + ); + }); + + it("kill switch on, org flag on, divert true: mollify, logShadow NOT called", async () => { + const { deps, spies } = makeDeps({ + isMollifierEnabled: () => true, + resolveOrgFlag: async () => true, + evaluator: async () => ({ divert: true, reason: "per_env_rate" }), + }); + const outcome = await evaluateGate({ envId: "e1", orgId: "o1" }, deps); + + expect(outcome.action).toBe("mollify"); + expect(spies.logShadow).not.toHaveBeenCalled(); + }); + + it("kill switch on, org flag on, divert false: pass_through", async () => { + const { deps } = makeDeps({ + isMollifierEnabled: () => true, + resolveOrgFlag: async () => true, + evaluator: async () => ({ divert: false }), + }); + const outcome = await evaluateGate({ envId: "e1", orgId: "o1" }, deps); + + expect(outcome).toEqual({ action: "pass_through" }); + }); + + it("kill switch on, org flag on, shadow on, divert true: mollify (org flag wins over shadow)", async () => { + const { deps, spies } = makeDeps({ + isMollifierEnabled: () => true, + resolveOrgFlag: async () => true, + isShadowModeOn: () => true, + evaluator: async () => ({ divert: true, reason: "per_env_rate" }), + }); + const outcome = await evaluateGate({ envId: "e1", orgId: "o1" }, deps); + + expect(outcome.action).toBe("mollify"); + expect(spies.logShadow).not.toHaveBeenCalled(); + }); +}); + diff --git a/apps/webapp/test/setup.ts b/apps/webapp/test/setup.ts new file mode 100644 index 00000000000..607ad78f3a9 --- /dev/null +++ b/apps/webapp/test/setup.ts @@ -0,0 +1,6 @@ +// Load apps/webapp/.env into process.env so env.server's top-level +// EnvironmentSchema.parse(process.env) succeeds in vitest workers. +import { config } from "dotenv"; +import path from "node:path"; + +config({ path: path.resolve(__dirname, "../.env") }); diff --git a/apps/webapp/vitest.config.ts b/apps/webapp/vitest.config.ts index 66f697706a5..6a6b550fc64 100644 --- a/apps/webapp/vitest.config.ts +++ b/apps/webapp/vitest.config.ts @@ -10,6 +10,7 @@ export default defineConfig({ exclude: ["test/**/*.e2e.test.ts", "test/**/*.e2e.full.test.ts"], globals: true, pool: "forks", + setupFiles: ["./test/setup.ts"], // load apps/webapp/.env }, // @ts-ignore plugins: [tsconfigPaths({ projects: ["./tsconfig.json"] })], diff --git a/packages/redis-worker/src/index.ts b/packages/redis-worker/src/index.ts index 1c5147ea48d..e5e3db32f12 100644 --- a/packages/redis-worker/src/index.ts +++ b/packages/redis-worker/src/index.ts @@ -4,3 +4,4 @@ export * from "./utils.js"; // Fair Queue System export * from "./fair-queue/index.js"; +export * from "./mollifier/index.js"; diff --git a/packages/redis-worker/src/mollifier/buffer.test.ts b/packages/redis-worker/src/mollifier/buffer.test.ts new file mode 100644 index 00000000000..319f1d6499d --- /dev/null +++ b/packages/redis-worker/src/mollifier/buffer.test.ts @@ -0,0 +1,353 @@ +import { describe, expect, it } from "vitest"; +import { BufferEntrySchema, serialiseSnapshot, deserialiseSnapshot } from "./schemas.js"; +import { redisTest } from "@internal/testcontainers"; +import { Logger } from "@trigger.dev/core/logger"; +import { MollifierBuffer } from "./buffer.js"; + +describe("schemas", () => { + it("serialiseSnapshot then deserialiseSnapshot is identity for plain objects", () => { + const snapshot = { taskId: "my-task", payload: { foo: 42, bar: "baz" } }; + const round = deserialiseSnapshot(serialiseSnapshot(snapshot)); + expect(round).toEqual(snapshot); + }); + + it("BufferEntrySchema parses a complete entry", () => { + const raw = { + runId: "run_abc", + envId: "env_1", + orgId: "org_1", + payload: serialiseSnapshot({ taskId: "t" }), + status: "QUEUED", + attempts: "0", + createdAt: "2026-05-11T10:00:00.000Z", + }; + const parsed = BufferEntrySchema.parse(raw); + expect(parsed.runId).toBe("run_abc"); + expect(parsed.status).toBe("QUEUED"); + expect(parsed.attempts).toBe(0); + expect(parsed.createdAt).toBeInstanceOf(Date); + }); + + it("BufferEntrySchema parses a FAILED entry with lastError", () => { + const raw = { + runId: "run_abc", + envId: "env_1", + orgId: "org_1", + payload: serialiseSnapshot({}), + status: "FAILED", + attempts: "3", + createdAt: "2026-05-11T10:00:00.000Z", + lastError: JSON.stringify({ code: "P2024", message: "connection lost" }), + }; + const parsed = BufferEntrySchema.parse(raw); + expect(parsed.lastError).toEqual({ code: "P2024", message: "connection lost" }); + }); +}); + +describe("MollifierBuffer construction", () => { + redisTest("constructs and closes cleanly", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + await buffer.close(); + }); +}); + +describe("MollifierBuffer.accept", () => { + redisTest("accept writes entry, enqueues, and tracks env", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: serialiseSnapshot({ taskId: "t" }), + }); + + const entry = await buffer.getEntry("run_1"); + expect(entry).not.toBeNull(); + expect(entry!.runId).toBe("run_1"); + expect(entry!.envId).toBe("env_a"); + expect(entry!.orgId).toBe("org_1"); + expect(entry!.status).toBe("QUEUED"); + expect(entry!.attempts).toBe(0); + expect(entry!.createdAt).toBeInstanceOf(Date); + + const envs = await buffer.listEnvs(); + expect(envs).toContain("env_a"); + } finally { + await buffer.close(); + } + }); +}); + +describe("MollifierBuffer.pop", () => { + redisTest("pop returns next QUEUED entry and transitions to DRAINING", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "run_1", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.accept({ runId: "run_2", envId: "env_a", orgId: "org_1", payload: "{}" }); + + const popped = await buffer.pop("env_a"); + expect(popped).not.toBeNull(); + expect(popped!.runId).toBe("run_1"); + expect(popped!.status).toBe("DRAINING"); + + const stored = await buffer.getEntry("run_1"); + expect(stored!.status).toBe("DRAINING"); + } finally { + await buffer.close(); + } + }); + + redisTest("pop returns null when env queue is empty", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + try { + const popped = await buffer.pop("env_nonexistent"); + expect(popped).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("atomic RPOP across two parallel pops on the same env", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "only", envId: "env_a", orgId: "org_1", payload: "{}" }); + + const [a, b] = await Promise.all([buffer.pop("env_a"), buffer.pop("env_a")]); + const winners = [a, b].filter((x) => x !== null); + expect(winners).toHaveLength(1); + expect(winners[0]!.runId).toBe("only"); + } finally { + await buffer.close(); + } + }); +}); + +describe("MollifierBuffer.ack", () => { + redisTest("ack deletes the entry", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "run_x", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + await buffer.ack("run_x"); + + const after = await buffer.getEntry("run_x"); + expect(after).toBeNull(); + } finally { + await buffer.close(); + } + }); +}); + +describe("MollifierBuffer.requeue", () => { + redisTest("requeue increments attempts, restores QUEUED, re-LPUSHes", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "run_r", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + await buffer.requeue("run_r"); + + const entry = await buffer.getEntry("run_r"); + expect(entry!.status).toBe("QUEUED"); + expect(entry!.attempts).toBe(1); + + const popped = await buffer.pop("env_a"); + expect(popped!.runId).toBe("run_r"); + } finally { + await buffer.close(); + } + }); +}); + +describe("MollifierBuffer.fail", () => { + redisTest("fail transitions to FAILED and stores lastError", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "run_f", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.pop("env_a"); + await buffer.fail("run_f", { code: "VALIDATION", message: "boom" }); + + const entry = await buffer.getEntry("run_f"); + expect(entry!.status).toBe("FAILED"); + expect(entry!.lastError).toEqual({ code: "VALIDATION", message: "boom" }); + } finally { + await buffer.close(); + } + }); +}); + +describe("MollifierBuffer TTL", () => { + redisTest("entry has TTL applied on accept", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "run_t", envId: "env_a", orgId: "org_1", payload: "{}" }); + + const ttl = await buffer.getEntryTtlSeconds("run_t"); + expect(ttl).toBeGreaterThan(0); + expect(ttl).toBeLessThanOrEqual(600); + } finally { + await buffer.close(); + } + }); +}); + +describe("MollifierBuffer payload encoding", () => { + redisTest( + "pop round-trips payloads with quotes, backslashes, control chars, unicode", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + const tricky = { + quotes: 'a"b\'c', + backslash: "x\\y\\z", + newlines: "line1\nline2\r\nline3", + tab: "col1\tcol2", + unicode: "héllo 🦀 世界", + lineSep: "before
after
end", + nested: { arr: ["a", "b", 1, true, null], n: 3.14 }, + }; + const payload = serialiseSnapshot(tricky); + + try { + await buffer.accept({ runId: "tricky", envId: "env_a", orgId: "org_1", payload }); + + const popped = await buffer.pop("env_a"); + expect(popped).not.toBeNull(); + expect(popped!.payload).toBe(payload); + + const decoded = JSON.parse(popped!.payload); + expect(decoded).toEqual(tricky); + } finally { + await buffer.close(); + } + }, + ); +}); + +describe("MollifierBuffer.requeue ordering", () => { + redisTest( + "requeued entry is popped AFTER other queued entries on the same env (FIFO retry)", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + entryTtlSeconds: 600, + logger: new Logger("test", "log"), + }); + + try { + await buffer.accept({ runId: "a", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.accept({ runId: "b", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.accept({ runId: "c", envId: "env_a", orgId: "org_1", payload: "{}" }); + + const first = await buffer.pop("env_a"); + expect(first!.runId).toBe("a"); + + await buffer.requeue("a"); + + const next = await buffer.pop("env_a"); + expect(next!.runId).toBe("b"); + const after = await buffer.pop("env_a"); + expect(after!.runId).toBe("c"); + const last = await buffer.pop("env_a"); + expect(last!.runId).toBe("a"); + } finally { + await buffer.close(); + } + }, + ); +}); diff --git a/packages/redis-worker/src/mollifier/buffer.ts b/packages/redis-worker/src/mollifier/buffer.ts new file mode 100644 index 00000000000..9db7790a1ba --- /dev/null +++ b/packages/redis-worker/src/mollifier/buffer.ts @@ -0,0 +1,238 @@ +import { + createRedisClient, + type Callback, + type Redis, + type RedisOptions, + type Result, +} from "@internal/redis"; +import { Logger } from "@trigger.dev/core/logger"; +import { BufferEntry, BufferEntrySchema } from "./schemas.js"; + +export type MollifierBufferOptions = { + redisOptions: RedisOptions; + entryTtlSeconds: number; + logger?: Logger; +}; + +export class MollifierBuffer { + private readonly redis: Redis; + private readonly entryTtlSeconds: number; + private readonly logger: Logger; + + constructor(options: MollifierBufferOptions) { + this.entryTtlSeconds = options.entryTtlSeconds; + this.logger = options.logger ?? new Logger("MollifierBuffer", "debug"); + + this.redis = createRedisClient( + { + ...options.redisOptions, + retryStrategy(times) { + const delay = Math.min(times * 50, 1000); + return delay; + }, + maxRetriesPerRequest: 20, + }, + { + onError: (error) => { + this.logger.error("MollifierBuffer redis client error:", { error }); + }, + }, + ); + this.#registerCommands(); + } + + async accept(input: { + runId: string; + envId: string; + orgId: string; + payload: string; + }): Promise { + const entryKey = `mollifier:entries:${input.runId}`; + const queueKey = `mollifier:queue:${input.envId}`; + const envsKey = "mollifier:envs"; + const createdAt = new Date().toISOString(); + await this.redis.acceptMollifierEntry( + entryKey, + queueKey, + envsKey, + input.runId, + input.envId, + input.orgId, + input.payload, + createdAt, + String(this.entryTtlSeconds), + ); + } + + async pop(envId: string): Promise { + const queueKey = `mollifier:queue:${envId}`; + const entryPrefix = "mollifier:entries:"; + const encoded = (await this.redis.popAndMarkDraining(queueKey, entryPrefix)) as + | string + | null; + if (!encoded) return null; + + let raw: unknown; + try { + raw = JSON.parse(encoded); + } catch { + this.logger.error("MollifierBuffer.pop: failed to parse script result", { envId }); + return null; + } + + const parsed = BufferEntrySchema.safeParse(raw); + if (!parsed.success) { + this.logger.error("MollifierBuffer.pop: invalid entry shape", { + envId, + errors: parsed.error.flatten(), + }); + return null; + } + return parsed.data; + } + + async getEntry(runId: string): Promise { + const raw = await this.redis.hgetall(`mollifier:entries:${runId}`); + if (!raw || Object.keys(raw).length === 0) return null; + + const parsed = BufferEntrySchema.safeParse(raw); + if (!parsed.success) { + this.logger.error("MollifierBuffer.getEntry: invalid entry shape", { + runId, + errors: parsed.error.flatten(), + }); + return null; + } + return parsed.data; + } + + async listEnvs(): Promise { + return this.redis.smembers("mollifier:envs"); + } + + async ack(runId: string): Promise { + await this.redis.del(`mollifier:entries:${runId}`); + } + + async requeue(runId: string): Promise { + await this.redis.requeueMollifierEntry( + `mollifier:entries:${runId}`, + "mollifier:queue:", + runId, + ); + } + + async fail(runId: string, error: { code: string; message: string }): Promise { + await this.redis.hset(`mollifier:entries:${runId}`, { + status: "FAILED", + lastError: JSON.stringify(error), + }); + } + + async getEntryTtlSeconds(runId: string): Promise { + return this.redis.ttl(`mollifier:entries:${runId}`); + } + + async close(): Promise { + await this.redis.quit(); + } + + #registerCommands() { + this.redis.defineCommand("acceptMollifierEntry", { + numberOfKeys: 3, + lua: ` + local entryKey = KEYS[1] + local queueKey = KEYS[2] + local envsKey = KEYS[3] + local runId = ARGV[1] + local envId = ARGV[2] + local orgId = ARGV[3] + local payload = ARGV[4] + local createdAt = ARGV[5] + local ttlSeconds = tonumber(ARGV[6]) + + redis.call('HSET', entryKey, + 'runId', runId, + 'envId', envId, + 'orgId', orgId, + 'payload', payload, + 'status', 'QUEUED', + 'attempts', '0', + 'createdAt', createdAt) + redis.call('EXPIRE', entryKey, ttlSeconds) + redis.call('LPUSH', queueKey, runId) + redis.call('SADD', envsKey, envId) + return 1 + `, + }); + + this.redis.defineCommand("requeueMollifierEntry", { + numberOfKeys: 1, + lua: ` + local entryKey = KEYS[1] + local queuePrefix = ARGV[1] + local runId = ARGV[2] + + local envId = redis.call('HGET', entryKey, 'envId') + if not envId then + return 0 + end + + local currentAttempts = redis.call('HGET', entryKey, 'attempts') + local nextAttempts = tonumber(currentAttempts or '0') + 1 + + redis.call('HSET', entryKey, 'status', 'QUEUED', 'attempts', tostring(nextAttempts)) + redis.call('LPUSH', queuePrefix .. envId, runId) + return 1 + `, + }); + + this.redis.defineCommand("popAndMarkDraining", { + numberOfKeys: 1, + lua: ` + local queueKey = KEYS[1] + local entryPrefix = ARGV[1] + local runId = redis.call('RPOP', queueKey) + if not runId then + return nil + end + local entryKey = entryPrefix .. runId + redis.call('HSET', entryKey, 'status', 'DRAINING') + local raw = redis.call('HGETALL', entryKey) + local result = {} + for i = 1, #raw, 2 do + result[raw[i]] = raw[i + 1] + end + return cjson.encode(result) + `, + }); + } +} + +declare module "@internal/redis" { + interface RedisCommander { + acceptMollifierEntry( + entryKey: string, + queueKey: string, + envsKey: string, + runId: string, + envId: string, + orgId: string, + payload: string, + createdAt: string, + ttlSeconds: string, + callback?: Callback, + ): Result; + popAndMarkDraining( + queueKey: string, + entryPrefix: string, + callback?: Callback, + ): Result; + requeueMollifierEntry( + entryKey: string, + queuePrefix: string, + runId: string, + callback?: Callback, + ): Result; + } +} diff --git a/packages/redis-worker/src/mollifier/drainer.test.ts b/packages/redis-worker/src/mollifier/drainer.test.ts new file mode 100644 index 00000000000..64e38842955 --- /dev/null +++ b/packages/redis-worker/src/mollifier/drainer.test.ts @@ -0,0 +1,312 @@ +import { redisTest } from "@internal/testcontainers"; +import { describe, expect, vi } from "vitest"; +import { Logger } from "@trigger.dev/core/logger"; +import { MollifierBuffer } from "./buffer.js"; +import { MollifierDrainer } from "./drainer.js"; +import { serialiseSnapshot } from "./schemas.js"; + +const noopOptions = { + entryTtlSeconds: 600, + logger: new Logger("test", "log"), +}; + +describe("MollifierDrainer.runOnce", () => { + redisTest("drains one queued entry through the handler and acks", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + ...noopOptions, + }); + + const handler = vi.fn(async () => {}); + const drainer = new MollifierDrainer({ + buffer, + handler, + concurrency: 5, + maxAttempts: 3, + isRetryable: () => false, + logger: new Logger("test-drainer", "log"), + }); + + try { + await buffer.accept({ + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: serialiseSnapshot({ foo: 1 }), + }); + + const result = await drainer.runOnce(); + expect(result.drained).toBe(1); + expect(result.failed).toBe(0); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: { foo: 1 }, + }), + ); + + const entry = await buffer.getEntry("run_1"); + expect(entry).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("runOnce with no entries does nothing", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + ...noopOptions, + }); + + const handler = vi.fn(async () => {}); + const drainer = new MollifierDrainer({ + buffer, + handler, + concurrency: 5, + maxAttempts: 3, + isRetryable: () => false, + logger: new Logger("test-drainer", "log"), + }); + + try { + const result = await drainer.runOnce(); + expect(result.drained).toBe(0); + expect(result.failed).toBe(0); + expect(handler).not.toHaveBeenCalled(); + } finally { + await buffer.close(); + } + }); +}); + +describe("MollifierDrainer error handling", () => { + redisTest("retryable error requeues and increments attempts", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + ...noopOptions, + }); + + let calls = 0; + const handler = vi.fn(async () => { + calls++; + throw new Error("transient"); + }); + + const drainer = new MollifierDrainer({ + buffer, + handler, + concurrency: 1, + maxAttempts: 3, + isRetryable: () => true, + logger: new Logger("test-drainer", "log"), + }); + + try { + await buffer.accept({ runId: "run_r", envId: "env_a", orgId: "org_1", payload: "{}" }); + + await drainer.runOnce(); + const after1 = await buffer.getEntry("run_r"); + expect(after1!.status).toBe("QUEUED"); + expect(after1!.attempts).toBe(1); + + await drainer.runOnce(); + const after2 = await buffer.getEntry("run_r"); + expect(after2!.status).toBe("QUEUED"); + expect(after2!.attempts).toBe(2); + + await drainer.runOnce(); + const after3 = await buffer.getEntry("run_r"); + expect(after3!.status).toBe("FAILED"); + expect(calls).toBe(3); + } finally { + await buffer.close(); + } + }); + + redisTest("non-retryable error transitions directly to FAILED", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + ...noopOptions, + }); + + const handler = vi.fn(async () => { + throw new Error("validation failure"); + }); + + const drainer = new MollifierDrainer({ + buffer, + handler, + concurrency: 1, + maxAttempts: 3, + isRetryable: () => false, + logger: new Logger("test-drainer", "log"), + }); + + try { + await buffer.accept({ runId: "run_nr", envId: "env_a", orgId: "org_1", payload: "{}" }); + + await drainer.runOnce(); + + const entry = await buffer.getEntry("run_nr"); + expect(entry!.status).toBe("FAILED"); + expect(entry!.lastError).toEqual({ code: "Error", message: "validation failure" }); + } finally { + await buffer.close(); + } + }); + + redisTest("multi-env round-robin: drains one item per env per runOnce", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + ...noopOptions, + }); + + const handled: string[] = []; + const handler = vi.fn(async (input: { runId: string }) => { + handled.push(input.runId); + }); + + const drainer = new MollifierDrainer({ + buffer, + handler, + concurrency: 10, + maxAttempts: 3, + isRetryable: () => false, + logger: new Logger("test-drainer", "log"), + }); + + try { + await buffer.accept({ runId: "a1", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.accept({ runId: "a2", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.accept({ runId: "b1", envId: "env_b", orgId: "org_1", payload: "{}" }); + + const r1 = await drainer.runOnce(); + expect(r1.drained).toBe(2); + expect(new Set(handled)).toEqual(new Set(["a1", "b1"])); + + handled.length = 0; + const r2 = await drainer.runOnce(); + expect(r2.drained).toBe(1); + expect(handled).toEqual(["a2"]); + } finally { + await buffer.close(); + } + }); +}); + +describe("MollifierDrainer.start/stop", () => { + redisTest("start polls and processes, stop halts the loop", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + ...noopOptions, + }); + + const handled: string[] = []; + const handler = vi.fn(async (input: { runId: string }) => { + handled.push(input.runId); + }); + + const drainer = new MollifierDrainer({ + buffer, + handler, + concurrency: 5, + maxAttempts: 3, + isRetryable: () => false, + pollIntervalMs: 20, + logger: new Logger("test-drainer", "log"), + }); + + try { + await buffer.accept({ runId: "live_1", envId: "env_a", orgId: "org_1", payload: "{}" }); + await buffer.accept({ runId: "live_2", envId: "env_a", orgId: "org_1", payload: "{}" }); + + drainer.start(); + + const deadline = Date.now() + 5_000; + while (handled.length < 2 && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 50)); + } + + await drainer.stop(); + + expect(new Set(handled)).toEqual(new Set(["live_1", "live_2"])); + } finally { + await buffer.close(); + } + }); + + redisTest("stop returns after timeoutMs even if a handler is hung", { timeout: 20_000 }, async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + ...noopOptions, + }); + + let handlerStarted = false; + const handler = vi.fn(async () => { + handlerStarted = true; + await new Promise(() => {}); + }); + + const drainer = new MollifierDrainer({ + buffer, + handler, + concurrency: 1, + maxAttempts: 3, + isRetryable: () => false, + pollIntervalMs: 20, + logger: new Logger("test-drainer", "log"), + }); + + try { + await buffer.accept({ runId: "hung", envId: "env_a", orgId: "org_1", payload: "{}" }); + + drainer.start(); + + const deadline = Date.now() + 2_000; + while (!handlerStarted && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 25)); + } + expect(handlerStarted).toBe(true); + + const stopStart = Date.now(); + await drainer.stop({ timeoutMs: 500 }); + const stopElapsed = Date.now() - stopStart; + + expect(stopElapsed).toBeGreaterThanOrEqual(500); + expect(stopElapsed).toBeLessThan(2_000); + } finally { + await buffer.close(); + } + }); +}); diff --git a/packages/redis-worker/src/mollifier/drainer.ts b/packages/redis-worker/src/mollifier/drainer.ts new file mode 100644 index 00000000000..e42c1b12570 --- /dev/null +++ b/packages/redis-worker/src/mollifier/drainer.ts @@ -0,0 +1,158 @@ +import { Logger } from "@trigger.dev/core/logger"; +import pLimit from "p-limit"; +import { MollifierBuffer } from "./buffer.js"; +import { BufferEntry, deserialiseSnapshot } from "./schemas.js"; + +export type MollifierDrainerHandler = (input: { + runId: string; + envId: string; + orgId: string; + payload: TPayload; + attempts: number; + createdAt: Date; +}) => Promise; + +export type MollifierDrainerOptions = { + buffer: MollifierBuffer; + handler: MollifierDrainerHandler; + concurrency: number; + maxAttempts: number; + isRetryable: (err: unknown) => boolean; + pollIntervalMs?: number; + logger?: Logger; +}; + +export type DrainResult = { + drained: number; + failed: number; +}; + +export class MollifierDrainer { + private readonly buffer: MollifierBuffer; + private readonly handler: MollifierDrainerHandler; + private readonly maxAttempts: number; + private readonly isRetryable: (err: unknown) => boolean; + private readonly pollIntervalMs: number; + private readonly logger: Logger; + private readonly limit: ReturnType; + private envCursor = 0; + private isRunning = false; + private stopping = false; + + constructor(options: MollifierDrainerOptions) { + this.buffer = options.buffer; + this.handler = options.handler; + this.maxAttempts = options.maxAttempts; + this.isRetryable = options.isRetryable; + this.pollIntervalMs = options.pollIntervalMs ?? 100; + this.logger = options.logger ?? new Logger("MollifierDrainer", "debug"); + this.limit = pLimit(options.concurrency); + } + + async runOnce(): Promise { + const envs = await this.buffer.listEnvs(); + if (envs.length === 0) return { drained: 0, failed: 0 }; + + const ordered = this.rotate(envs); + + const inflight: Promise<"drained" | "failed" | "empty">[] = []; + for (const envId of ordered) { + inflight.push(this.limit(() => this.processOneFromEnv(envId))); + } + + const results = await Promise.all(inflight); + return { + drained: results.filter((r) => r === "drained").length, + failed: results.filter((r) => r === "failed").length, + }; + } + + start(): void { + if (this.isRunning) return; + this.isRunning = true; + this.stopping = false; + void this.loop(); + } + + async stop(options: { timeoutMs?: number } = {}): Promise { + if (!this.isRunning) return; + this.stopping = true; + const deadline = options.timeoutMs != null ? Date.now() + options.timeoutMs : Infinity; + while (this.isRunning) { + if (Date.now() >= deadline) { + this.logger.warn( + "MollifierDrainer.stop: deadline exceeded; returning while loop iteration is in flight", + { timeoutMs: options.timeoutMs }, + ); + return; + } + await this.delay(20); + } + } + + private async loop(): Promise { + try { + while (!this.stopping) { + const result = await this.runOnce(); + if (result.drained === 0 && result.failed === 0) { + await this.delay(this.pollIntervalMs); + } + } + } catch (err) { + this.logger.error("MollifierDrainer loop crashed", { err }); + } finally { + this.isRunning = false; + } + } + + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } + + private rotate(envs: string[]): string[] { + const start = this.envCursor % envs.length; + this.envCursor = (this.envCursor + 1) % Math.max(envs.length, 1); + return [...envs.slice(start), ...envs.slice(0, start)]; + } + + private async processOneFromEnv(envId: string): Promise<"drained" | "failed" | "empty"> { + const entry = await this.buffer.pop(envId); + if (!entry) return "empty"; + return this.processEntry(entry); + } + + private async processEntry(entry: BufferEntry): Promise<"drained" | "failed"> { + try { + const payload = deserialiseSnapshot(entry.payload); + await this.handler({ + runId: entry.runId, + envId: entry.envId, + orgId: entry.orgId, + payload, + attempts: entry.attempts, + createdAt: entry.createdAt, + }); + await this.buffer.ack(entry.runId); + return "drained"; + } catch (err) { + const nextAttempts = entry.attempts + 1; + if (this.isRetryable(err) && nextAttempts < this.maxAttempts) { + await this.buffer.requeue(entry.runId); + this.logger.warn("MollifierDrainer: retryable error, requeued", { + runId: entry.runId, + attempts: nextAttempts, + }); + return "failed"; + } + const code = err instanceof Error ? err.name : "Unknown"; + const message = err instanceof Error ? err.message : String(err); + await this.buffer.fail(entry.runId, { code, message }); + this.logger.error("MollifierDrainer: terminal failure", { + runId: entry.runId, + code, + message, + }); + return "failed"; + } + } +} diff --git a/packages/redis-worker/src/mollifier/index.ts b/packages/redis-worker/src/mollifier/index.ts new file mode 100644 index 00000000000..5e6fe202e3d --- /dev/null +++ b/packages/redis-worker/src/mollifier/index.ts @@ -0,0 +1,15 @@ +export { MollifierBuffer, type MollifierBufferOptions } from "./buffer.js"; +export { + MollifierDrainer, + type MollifierDrainerOptions, + type MollifierDrainerHandler, + type DrainResult, +} from "./drainer.js"; +export { + BufferEntrySchema, + BufferEntryStatus, + BufferEntryError, + serialiseSnapshot, + deserialiseSnapshot, + type BufferEntry, +} from "./schemas.js"; diff --git a/packages/redis-worker/src/mollifier/schemas.ts b/packages/redis-worker/src/mollifier/schemas.ts new file mode 100644 index 00000000000..f93b0f0a3c3 --- /dev/null +++ b/packages/redis-worker/src/mollifier/schemas.ts @@ -0,0 +1,58 @@ +import { z } from "zod"; + +export const BufferEntryStatus = z.enum(["QUEUED", "DRAINING", "FAILED"]); +export type BufferEntryStatus = z.infer; + +export const BufferEntryError = z.object({ + code: z.string(), + message: z.string(), +}); +export type BufferEntryError = z.infer; + +const stringToInt = z.string().transform((v, ctx) => { + const n = Number(v); + if (!Number.isInteger(n) || n < 0) { + ctx.addIssue({ code: z.ZodIssueCode.custom, message: "expected non-negative integer string" }); + return z.NEVER; + } + return n; +}); + +const stringToDate = z.string().transform((v, ctx) => { + const d = new Date(v); + if (Number.isNaN(d.getTime())) { + ctx.addIssue({ code: z.ZodIssueCode.custom, message: "expected ISO date string" }); + return z.NEVER; + } + return d; +}); + +const stringToError = z.string().transform((v, ctx) => { + try { + return BufferEntryError.parse(JSON.parse(v)); + } catch { + ctx.addIssue({ code: z.ZodIssueCode.custom, message: "expected JSON-encoded BufferEntryError" }); + return z.NEVER; + } +}); + +export const BufferEntrySchema = z.object({ + runId: z.string().min(1), + envId: z.string().min(1), + orgId: z.string().min(1), + payload: z.string(), + status: BufferEntryStatus, + attempts: stringToInt, + createdAt: stringToDate, + lastError: stringToError.optional(), +}); + +export type BufferEntry = z.infer; + +export function serialiseSnapshot(snapshot: unknown): string { + return JSON.stringify(snapshot); +} + +export function deserialiseSnapshot(serialised: string): T { + return JSON.parse(serialised) as T; +}