From 966cd4fe4d9a805e8a3ec2c3dbbbdffae4901423 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 11:24:21 +0100 Subject: [PATCH 1/5] feat(webapp,run-engine): mollifier drainer replay + stale sweep + cancelled-run engine API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The replay side of the mollifier: - DrainerHandler that reads buffered snapshots and replays them through engine.trigger to materialise PG rows. - RunEngine.createCancelledRun: new public method the handler uses to write CANCELED rows directly from snapshots (bypass queue + waitpoint, emit runCancelled). Tolerates cjson empty-table tags. - Drainer fairness: org → env rotation so a heavy env doesn't starve light ones in the same org. - Stale-entry sweep + telemetry + alertable gauge for stuck drainers. Both drainer and sweep default-off; nothing fires unless flagged on. Co-Authored-By: Claude Opus 4.7 (1M context) --- .server-changes/mollifier.md | 6 + apps/webapp/app/entry.server.tsx | 13 +- apps/webapp/app/env.server.ts | 37 ++- .../v3/mollifier/mollifierDrainer.server.ts | 48 +--- .../mollifierDrainerHandler.server.ts | 163 ++++++++++++ .../mollifier/mollifierStaleSweep.server.ts | 146 +++++++++++ .../v3/mollifier/mollifierTelemetry.server.ts | 80 ++++++ .../v3/mollifierStaleSweepWorker.server.ts | 47 ++++ .../test/mollifierDrainerHandler.test.ts | 206 ++++++++++++++++ apps/webapp/test/mollifierStaleSweep.test.ts | 231 +++++++++++++++++ .../run-engine/src/engine/index.ts | 194 +++++++++++++++ .../engine/tests/createCancelledRun.test.ts | 233 ++++++++++++++++++ .../engine/tests/createFailedTaskRun.test.ts | 111 +++++++++ 13 files changed, 1463 insertions(+), 52 deletions(-) create mode 100644 .server-changes/mollifier.md create mode 100644 apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts create mode 100644 apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts create mode 100644 apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts create mode 100644 apps/webapp/test/mollifierDrainerHandler.test.ts create mode 100644 apps/webapp/test/mollifierStaleSweep.test.ts create mode 100644 internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts create mode 100644 internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts diff --git a/.server-changes/mollifier.md b/.server-changes/mollifier.md new file mode 100644 index 00000000000..399ad5c6507 --- /dev/null +++ b/.server-changes/mollifier.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Mollifier — Redis-backed burst buffer in front of `engine.trigger` with a fair drainer, full read/write parity for buffered runs across the API + dashboard + realtime stream, alertable `mollifier.stale_entries.current` gauge for drainer health, and `runFailed` alerts on drainer-terminal `SYSTEM_FAILURE` rows. diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index 60c234402d5..ca53d03eb67 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -9,6 +9,7 @@ import { renderToPipeableStream } from "react-dom/server"; import { PassThrough } from "stream"; import * as Worker from "~/services/worker.server"; import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server"; +import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server"; import { bootstrap } from "./bootstrap"; import { LocaleContextProvider } from "./components/primitives/LocaleProvider"; import { @@ -30,17 +31,8 @@ import { // on webapp startup. The singleton's initializer wires start (gated on // `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors // runsReplicationInstance. -// -// IMPORTANT: do NOT replace this with `void sessionsReplicationInstance;`. -// `apps/webapp/package.json` declares `"sideEffects": false`, so esbuild -// treats `void ;` as a pure expression statement and tree-shakes -// the entire import — the singleton's initializer never fires and the -// sessions→ClickHouse logical replication slot stops being consumed. Assigning -// to globalThis is an unambiguous side effect the bundler must preserve. See -// TRI-9864 for the incident write-up. import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server"; -(globalThis as Record).__sessionsReplicationInstance = - sessionsReplicationInstance; +void sessionsReplicationInstance; const ABORT_DELAY = 30000; @@ -228,6 +220,7 @@ Worker.init().catch((error) => { }); initMollifierDrainerWorker(); +initMollifierStaleSweepWorker(); bootstrap().catch((error) => { logError(error); diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index d1a257048e1..b4f4879fd85 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1063,13 +1063,16 @@ const EnvironmentSchema = z // Separate switch for the drainer (consumer side) so it can be split // off onto a dedicated worker service. Unset → inherits // TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to - // flip two switches. In multi-replica deployments, set this to "0" - // explicitly on every replica except the one dedicated drainer - // service — otherwise every replica's polling loop races for the - // same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill - // switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a - // no-op because the gate-side singleton refuses to construct a - // buffer when the system is off. + // flip two switches. Multi-replica drainers are correct — `popAndMarkDraining` + // is an atomic ZPOPMIN + status flip in one Lua call, so only one replica + // can win any given entry — but inefficient: polling load (SMEMBERS + + // per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY` + // is per-process so engine load also multiplies. Splitting the drainer + // onto a dedicated worker keeps that traffic off the request-serving + // replicas. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill switch; + // setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a + // no-op because the gate-side singleton refuses to construct a buffer + // when the system is off. TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"), TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"), TRIGGER_MOLLIFIER_REDIS_HOST: z @@ -1098,6 +1101,26 @@ const EnvironmentSchema = z TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3), TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000), TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500), + // Periodic sweep that scans buffer queue ZSETs for entries whose + // dwell exceeds the stale threshold. Independent of the drainer — + // its job is exactly to make a stuck/offline drainer visible to + // ops. Defaults: enabled when the mollifier is enabled, run every + // 5 minutes, alert on anything that's been dwelling for 5+ minutes + // (matches the sweep interval — "anything still here when we + // check" is the simplest threshold that converges). + TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z + .string() + .default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"), + TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce + .number() + .int() + .positive() + .default(5 * 60_000), + TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce + .number() + .int() + .positive() + .default(5 * 60_000), BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce .number() diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts index 139aeaf9a6e..fc75210be3f 100644 --- a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts @@ -1,10 +1,15 @@ -import { createHash } from "node:crypto"; -import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker"; +import { MollifierDrainer } from "@trigger.dev/redis-worker"; +import { prisma } from "~/db.server"; import { env } from "~/env.server"; +import { engine as runEngine } from "~/v3/runEngine.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { getMollifierBuffer } from "./mollifierBuffer.server"; -import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server"; +import { + createDrainerHandler, + isRetryablePgError, +} from "./mollifierDrainerHandler.server"; +import type { MollifierSnapshot } from "./mollifierSnapshot.server"; // Distinct error class for the deterministic "fail loud at boot" throws // below. The bootstrap in `mollifierDrainerWorker.server.ts` catches @@ -25,7 +30,7 @@ export class MollifierConfigurationError extends Error { } } -function initializeMollifierDrainer(): MollifierDrainer { +function initializeMollifierDrainer(): MollifierDrainer { const buffer = getMollifierBuffer(); if (!buffer) { // Unreachable in normal config: getMollifierDrainer() gates on the @@ -68,40 +73,13 @@ function initializeMollifierDrainer(): MollifierDrainer maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS, }); - // Phase 1 handler: no-op ack. The trigger has ALREADY been written to - // Postgres via engine.trigger (dual-write at the call site). Popping + - // acking here proves the dequeue mechanism works end-to-end without - // duplicating the work. Phase 2 will replace this with an engine.trigger - // replay that performs the actual Postgres write. - const drainer = new MollifierDrainer({ + const drainer = new MollifierDrainer({ buffer, - handler: async (input) => { - // Hash the (re-serialised, canonical) payload on the drain side rather - // than on the trigger hot path. Burst-time CPU stays with engine.trigger; - // the drainer is the natural place for the audit-equivalence checksum. - // Re-serialisation is identity for the BufferedTriggerPayload shape - // (only strings/numbers/plain objects), so this hash matches what the - // call site wrote into Redis. - const reserialised = serialiseSnapshot(input.payload); - const payloadHash = createHash("sha256").update(reserialised).digest("hex"); - logger.info("mollifier.drained", { - runId: input.runId, - envId: input.envId, - orgId: input.orgId, - taskId: input.payload.taskId, - attempts: input.attempts, - ageMs: Date.now() - input.createdAt.getTime(), - payloadBytes: reserialised.length, - payloadHash, - }); - }, + handler: createDrainerHandler({ engine: runEngine, prisma }), concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY, maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS, maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK, - // A no-op handler shouldn't throw, but if something does (e.g. an - // unexpected deserialise failure), don't loop — let it FAIL terminally - // so the entry is observable in metrics. - isRetryable: () => false, + isRetryable: isRetryablePgError, }); return drainer; @@ -114,7 +92,7 @@ function initializeMollifierDrainer(): MollifierDrainer // handler registration, leaving a narrow window where a SIGTERM landing // between `start()` and `process.once("SIGTERM", ...)` would skip the // graceful stop. The split is intentional. -export function getMollifierDrainer(): MollifierDrainer | null { +export function getMollifierDrainer(): MollifierDrainer | null { if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null; return singleton("mollifierDrainer", initializeMollifierDrainer); } diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts new file mode 100644 index 00000000000..7f2608d5b21 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts @@ -0,0 +1,163 @@ +import { context, trace, TraceFlags } from "@opentelemetry/api"; +import type { RunEngine } from "@internal/run-engine"; +import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import type { MollifierDrainerHandler } from "@trigger.dev/redis-worker"; +import { startSpan } from "~/v3/tracing.server"; +import type { MollifierSnapshot } from "./mollifierSnapshot.server"; + +const tracer = trace.getTracer("mollifier-drainer"); + +export function isRetryablePgError(err: unknown): boolean { + if (!(err instanceof Error)) return false; + const msg = err.message ?? ""; + const code = (err as { code?: string }).code; + if (code === "P2024") return true; + if (msg.includes("Can't reach database server")) return true; + if (msg.includes("Connection lost")) return true; + if (msg.includes("ECONNRESET")) return true; + return false; +} + +export function createDrainerHandler(deps: { + engine: RunEngine; + prisma: PrismaClientOrTransaction; +}): MollifierDrainerHandler { + return async (input) => { + const dwellMs = Date.now() - input.createdAt.getTime(); + + // Re-attach to the trace started by the caller's mollifier.queued span + // (its traceId + spanId were captured into the snapshot at buffer time). + // Without this the drainer would emit mollifier.drained in a brand-new + // trace and the engine.trigger instrumentation would inherit an empty + // active context — leaving the run-detail page with only the root span. + const snapshotTraceId = + typeof input.payload.traceId === "string" ? input.payload.traceId : undefined; + const snapshotSpanId = + typeof input.payload.spanId === "string" ? input.payload.spanId : undefined; + + const parentContext = + snapshotTraceId && snapshotSpanId + ? trace.setSpanContext(context.active(), { + traceId: snapshotTraceId, + spanId: snapshotSpanId, + traceFlags: TraceFlags.SAMPLED, + isRemote: true, + }) + : context.active(); + + // Cancel-wins-over-trigger (Q4 bifurcation). If a cancel API call + // landed on this entry while it was QUEUED, the snapshot carries + // `cancelledAt` + `cancelReason`. Skip the normal materialise path + // and write a CANCELED PG row directly. The existing runCancelled + // handler writes the TaskEvent. + const cancelledAtStr = + typeof input.payload.cancelledAt === "string" ? input.payload.cancelledAt : undefined; + if (cancelledAtStr) { + const cancelReason = + typeof input.payload.cancelReason === "string" + ? input.payload.cancelReason + : "Canceled by user"; + await context.with(parentContext, async () => { + await startSpan(tracer, "mollifier.drained.cancelled", async (span) => { + span.setAttribute("mollifier.drained", true); + span.setAttribute("mollifier.dwell_ms", dwellMs); + span.setAttribute("mollifier.attempts", input.attempts); + span.setAttribute("mollifier.run_friendly_id", input.runId); + span.setAttribute("mollifier.cancel_bifurcation", true); + span.setAttribute("taskRunId", input.runId); + await deps.engine.createCancelledRun( + { + snapshot: input.payload as any, + cancelledAt: new Date(cancelledAtStr), + cancelReason, + }, + deps.prisma, + ); + }); + }); + return; + } + + await context.with(parentContext, async () => { + await startSpan(tracer, "mollifier.drained", async (span) => { + span.setAttribute("mollifier.drained", true); + span.setAttribute("mollifier.dwell_ms", dwellMs); + span.setAttribute("mollifier.attempts", input.attempts); + span.setAttribute("mollifier.run_friendly_id", input.runId); + span.setAttribute("taskRunId", input.runId); + + try { + await deps.engine.trigger(input.payload as any, deps.prisma); + } catch (err) { + // The retryable-PG class re-throws so the drainer's outer + // worker loop can `buffer.requeue` (handled in + // `MollifierDrainer.drainOne`). For non-retryable failures we + // write a terminal SYSTEM_FAILURE row to PG via the engine's + // existing `createFailedTaskRun` (used by batch-trigger for + // the same purpose) so the customer sees the run in their + // dashboard / SDK instead of silently losing it when the + // buffer entry TTLs out. If THAT insert also fails (PG truly + // unreachable), rethrow so the drainer's outer catch falls + // through to its existing `buffer.fail` terminal-marker path. + if (isRetryablePgError(err)) { + throw err; + } + const reason = err instanceof Error ? err.message : String(err); + span.setAttribute("mollifier.terminal_failure_reason", reason); + const snapshot = input.payload as Record; + const env = snapshot.environment as + | { + id: string; + type: any; + project: { id: string }; + organization: { id: string }; + } + | undefined; + if (!env) { + // Snapshot too malformed to even construct a TaskRun row. + // Drainer's outer catch will buffer.fail this entry. + throw err; + } + try { + await deps.engine.createFailedTaskRun({ + friendlyId: input.runId, + environment: env, + taskIdentifier: String(snapshot.taskIdentifier ?? ""), + payload: typeof snapshot.payload === "string" ? snapshot.payload : undefined, + payloadType: + typeof snapshot.payloadType === "string" ? snapshot.payloadType : undefined, + error: { + type: "STRING_ERROR", + raw: `Mollifier drainer terminal failure: ${reason}`, + }, + parentTaskRunId: + typeof snapshot.parentTaskRunId === "string" + ? snapshot.parentTaskRunId + : undefined, + rootTaskRunId: + typeof snapshot.rootTaskRunId === "string" + ? snapshot.rootTaskRunId + : undefined, + depth: typeof snapshot.depth === "number" ? snapshot.depth : 0, + resumeParentOnCompletion: snapshot.resumeParentOnCompletion === true, + traceId: typeof snapshot.traceId === "string" ? snapshot.traceId : undefined, + spanId: typeof snapshot.spanId === "string" ? snapshot.spanId : undefined, + taskEventStore: + typeof snapshot.taskEventStore === "string" + ? snapshot.taskEventStore + : undefined, + queue: typeof snapshot.queue === "string" ? snapshot.queue : undefined, + lockedQueueId: + typeof snapshot.lockedQueueId === "string" ? snapshot.lockedQueueId : undefined, + }); + } catch (writeErr) { + // Class A — PG itself is failing. Rethrow the original + // error so the drainer falls back to buffer.fail. Include + // the write error in the log line at the drainer layer. + throw err; + } + } + }); + }); + }; +} diff --git a/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts new file mode 100644 index 00000000000..5c31618efec --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts @@ -0,0 +1,146 @@ +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { logger as defaultLogger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; +import { + recordStaleEntry as defaultRecordStaleEntry, + reportStaleEntrySnapshot as defaultReportStaleEntrySnapshot, +} from "./mollifierTelemetry.server"; + +// One pass of the sweep scans every env's queue ZSET. The per-env page +// is bounded so a single pathological env can't make the sweep run +// unboundedly long. +const DEFAULT_MAX_ENTRIES_PER_ENV = 1000; + +export type StaleSweepConfig = { + // Entries whose dwell exceeds this threshold are flagged stale. Set + // it well below `entryTtlSeconds * 1000` so ops have lead time before + // TTL-induced silent loss; the default (half of entryTtlSeconds) + // matches the cadence in the plan doc. + staleThresholdMs: number; + maxEntriesPerEnv?: number; +}; + +export type StaleSweepDeps = { + getBuffer?: () => MollifierBuffer | null; + recordStaleEntry?: (envId: string) => void; + reportStaleEntrySnapshot?: (snapshot: Map) => void; + logger?: { warn: (message: string, fields: Record) => void }; + now?: () => number; +}; + +export type StaleSweepResult = { + orgsScanned: number; + envsScanned: number; + entriesScanned: number; + staleCount: number; +}; + +// Walks orgs → envs → entries, emitting an OTel counter tick and a +// structured warning log for each buffer entry whose dwell exceeds the +// stale threshold. Read-only: the sweep does NOT remove or salvage +// entries; that decision is deferred to a separate retention-policy +// change. The signal here exists so ops sees the drainer falling +// behind well before TTL-induced loss kicks in. +export async function runStaleSweepOnce( + config: StaleSweepConfig, + deps: StaleSweepDeps = {}, +): Promise { + const getBuffer = deps.getBuffer ?? getMollifierBuffer; + const recordStale = deps.recordStaleEntry ?? defaultRecordStaleEntry; + const reportSnapshot = + deps.reportStaleEntrySnapshot ?? defaultReportStaleEntrySnapshot; + const log = deps.logger ?? defaultLogger; + const now = (deps.now ?? Date.now)(); + const maxEntries = config.maxEntriesPerEnv ?? DEFAULT_MAX_ENTRIES_PER_ENV; + + const buffer = getBuffer(); + if (!buffer) { + // Replace any previous snapshot with empty so a previously-paging + // env doesn't stay latched if mollifier is turned off mid-flight. + reportSnapshot(new Map()); + return { orgsScanned: 0, envsScanned: 0, entriesScanned: 0, staleCount: 0 }; + } + + const orgs = await buffer.listOrgs(); + let envsScanned = 0; + let entriesScanned = 0; + let staleCount = 0; + // Tracks the stale count per env this pass. Includes zero counts for + // envs that have entries but none stale — that's what lets the gauge + // drop back to 0 when the drainer catches up. Envs absent from this + // map are also absent from the new snapshot, clearing any latched + // alerts on envs that have fully drained. + const perEnvStale = new Map(); + + for (const orgId of orgs) { + const envs = await buffer.listEnvsForOrg(orgId); + for (const envId of envs) { + envsScanned += 1; + let envStale = 0; + const entries = await buffer.listEntriesForEnv(envId, maxEntries); + for (const entry of entries) { + entriesScanned += 1; + const dwellMs = now - entry.createdAt.getTime(); + if (dwellMs > config.staleThresholdMs) { + recordStale(envId); + log.warn("mollifier.stale_entry", { + runId: entry.runId, + envId, + orgId, + dwellMs, + staleThresholdMs: config.staleThresholdMs, + }); + envStale += 1; + } + } + perEnvStale.set(envId, envStale); + staleCount += envStale; + } + } + + reportSnapshot(perEnvStale); + + return { orgsScanned: orgs.length, envsScanned, entriesScanned, staleCount }; +} + +export type StaleSweepIntervalHandle = { + stop: () => void; +}; + +// Production wrapper: schedule `runStaleSweepOnce` on a fixed interval. +// One pass at a time — if a sweep is still running when the timer fires +// the next tick is skipped (a backed-up Redis would otherwise queue +// overlapping sweeps that all log the same stale entries). +export function startStaleSweepInterval( + config: StaleSweepConfig & { intervalMs: number }, + deps: StaleSweepDeps = {}, +): StaleSweepIntervalHandle { + let stopped = false; + let inFlight = false; + + const tick = async () => { + if (stopped || inFlight) return; + inFlight = true; + try { + await runStaleSweepOnce(config, deps); + } catch (err) { + const log = deps.logger ?? defaultLogger; + log.warn("mollifier.stale_sweep.failed", { + err: err instanceof Error ? err.message : String(err), + }); + } finally { + inFlight = false; + } + }; + + const timer = setInterval(() => { + void tick(); + }, config.intervalMs); + + return { + stop: () => { + stopped = true; + clearInterval(timer); + }, + }; +} diff --git a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts index 0fe302584ce..ba58ce47f63 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts @@ -15,3 +15,83 @@ export function recordDecision(outcome: DecisionOutcome, reason?: DecisionReason ...(reason ? { reason } : {}), }); } + +// Counts subscriptions hitting `/realtime/v1/runs/` for a run that +// lives only in the mollifier buffer (no PG row yet). The route opens +// the Electric stream anyway so the eventual drainer-INSERT propagates +// to the client; this counter is the signal of how often customers +// subscribe inside the buffered window. +export const realtimeBufferedSubscriptionsCounter = meter.createCounter( + "mollifier.realtime_subscriptions.buffered", + { + description: + "Realtime subscriptions opened against a runId that exists only in the mollifier buffer", + }, +); + +export function recordRealtimeBufferedSubscription(envId: string): void { + realtimeBufferedSubscriptionsCounter.add(1, { envId }); +} + +// Counts buffer entries that have been waiting in the queue ZSET longer +// than the configured stale threshold (typically half of entryTtlSeconds). +// Useful for historical "stale events over time" views, but not directly +// alertable on its own — a single stuck entry observed by N sweep ticks +// adds N to the counter, so `rate()` over an alerting window reflects +// (entries × ticks), not "entries that are stale right now". +export const staleEntriesCounter = meter.createCounter( + "mollifier.stale_entries", + { + description: + "Mollifier buffer entries whose dwell exceeds the stale threshold (per sweep pass)", + }, +); + +export function recordStaleEntry(envId: string): void { + staleEntriesCounter.add(1, { envId }); +} + +// Alertable signal: the count of stale entries observed by the latest +// sweep, per env. The sweep snapshots the full per-env picture on each +// pass (including zeros for envs that no longer have any stale entries) +// so an env that was paging can clear when the drainer catches up +// instead of staying latched. Recommended alert: +// mollifier_stale_entries_current{envId=...} > 0 for 5m +export const staleEntriesGauge = meter.createObservableGauge( + "mollifier.stale_entries.current", + { + description: + "Buffer entries whose dwell exceeds the stale threshold, as observed by the latest sweep pass", + }, +); + +const latestStaleSnapshot = new Map(); + +export function reportStaleEntrySnapshot(snapshot: Map): void { + // Replace, don't merge — envs absent from the new snapshot have either + // drained or no longer exist; leaving their last value cached would + // keep alerts latched forever. + latestStaleSnapshot.clear(); + for (const [envId, count] of snapshot) { + latestStaleSnapshot.set(envId, count); + } +} + +meter.addBatchObservableCallback( + (result) => { + for (const [envId, count] of latestStaleSnapshot) { + result.observe(staleEntriesGauge, count, { envId }); + } + }, + [staleEntriesGauge], +); + +// Electric SQL's shape-stream protocol adds a `handle=` query param on +// every reconnect after the initial GET. Gating the realtime-buffered +// log/counter on its absence keeps the signal at one tick per +// subscription instead of one tick per ~20s live-poll iteration — +// without it the counter would over-count by the long-poll factor. +export function isInitialBufferedSubscriptionRequest(url: string | URL): boolean { + const u = typeof url === "string" ? new URL(url) : url; + return !u.searchParams.has("handle"); +} diff --git a/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts b/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts new file mode 100644 index 00000000000..5325018baf1 --- /dev/null +++ b/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts @@ -0,0 +1,47 @@ +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { signalsEmitter } from "~/services/signals.server"; +import { + startStaleSweepInterval, + type StaleSweepIntervalHandle, +} from "./mollifier/mollifierStaleSweep.server"; + +declare global { + // eslint-disable-next-line no-var + var __mollifierStaleSweepRegistered__: boolean | undefined; + // eslint-disable-next-line no-var + var __mollifierStaleSweepHandle__: StaleSweepIntervalHandle | undefined; +} + +/** + * Bootstraps the mollifier stale-entry sweep. + * + * Independent of the drainer — its purpose is to alert when entries are + * piling up despite the drainer being supposedly healthy, so it runs + * any time the mollifier itself is enabled (gated separately from + * `TRIGGER_MOLLIFIER_DRAINER_ENABLED`). The sweep is read-only: it + * counts and logs stale entries but does not remove or salvage them. + * + * The Remix dev server re-evaluates `entry.server.tsx` on every change, + * so the registration guard + handle cache make the bootstrap + * idempotent across hot reloads. + */ +export function initMollifierStaleSweepWorker(): void { + if (env.TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED !== "1") return; + if (global.__mollifierStaleSweepRegistered__) return; + + logger.debug("Initializing mollifier stale-entry sweep", { + intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS, + staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS, + }); + + const handle = startStaleSweepInterval({ + intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS, + staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS, + }); + + signalsEmitter.on("SIGTERM", handle.stop); + signalsEmitter.on("SIGINT", handle.stop); + global.__mollifierStaleSweepRegistered__ = true; + global.__mollifierStaleSweepHandle__ = handle; +} diff --git a/apps/webapp/test/mollifierDrainerHandler.test.ts b/apps/webapp/test/mollifierDrainerHandler.test.ts new file mode 100644 index 00000000000..6f66cf2ab79 --- /dev/null +++ b/apps/webapp/test/mollifierDrainerHandler.test.ts @@ -0,0 +1,206 @@ +import { describe, expect, it, vi } from "vitest"; +import { trace } from "@opentelemetry/api"; + +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { + createDrainerHandler, + isRetryablePgError, +} from "~/v3/mollifier/mollifierDrainerHandler.server"; + +describe("isRetryablePgError", () => { + it("returns true for P2024 (connection pool timeout)", () => { + const err = Object.assign(new Error("Timed out fetching a new connection"), { + code: "P2024", + }); + expect(isRetryablePgError(err)).toBe(true); + }); + + it("returns true for generic connection-lost messages", () => { + expect(isRetryablePgError(new Error("Connection lost"))).toBe(true); + expect(isRetryablePgError(new Error("Can't reach database server"))).toBe(true); + }); + + it("returns false for validation errors", () => { + expect(isRetryablePgError(new Error("Invalid payload"))).toBe(false); + }); + + it("returns false for non-Error inputs", () => { + expect(isRetryablePgError("string error")).toBe(false); + expect(isRetryablePgError({ message: "object" })).toBe(false); + }); +}); + +describe("createDrainerHandler", () => { + it("invokes engine.trigger with the deserialised snapshot", async () => { + const trigger = vi.fn(async () => ({ friendlyId: "run_x" })); + const handler = createDrainerHandler({ + engine: { trigger } as any, + prisma: {} as any, + }); + + await handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", payload: "{}" }, + attempts: 0, + createdAt: new Date(), + } as any); + + expect(trigger).toHaveBeenCalledOnce(); + const callArg = trigger.mock.calls[0][0] as { taskIdentifier: string }; + expect(callArg.taskIdentifier).toBe("t"); + }); + + it("re-attaches the snapshot's traceId so engine.trigger inherits the original trace", async () => { + // Captures the active traceId at the moment engine.trigger is invoked. + // Without context propagation it would be a fresh traceId, leaving the + // run-detail page with only the root span. + let observedTraceId: string | undefined; + const trigger = vi.fn(async () => { + observedTraceId = trace.getActiveSpan()?.spanContext().traceId; + return { friendlyId: "run_x" }; + }); + + const handler = createDrainerHandler({ + engine: { trigger } as any, + prisma: {} as any, + }); + + const snapshotTraceId = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + const snapshotSpanId = "bbbbbbbbbbbbbbbb"; + + await handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { + taskIdentifier: "t", + traceId: snapshotTraceId, + spanId: snapshotSpanId, + }, + attempts: 0, + createdAt: new Date(), + } as any); + + expect(observedTraceId).toBe(snapshotTraceId); + }); + + it("rethrows retryable PG errors so MollifierDrainer requeues the entry", async () => { + const err = new Error("Can't reach database server"); + const trigger = vi.fn(async () => { + throw err; + }); + const createFailedTaskRun = vi.fn(); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t" }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("Can't reach database server"); + // Retryable: we do NOT write a SYSTEM_FAILURE row, the entry should + // be requeued for another shot. + expect(createFailedTaskRun).not.toHaveBeenCalled(); + }); + + const envFixture = { + id: "env_a", + type: "DEVELOPMENT", + project: { id: "proj_1" }, + organization: { id: "org_1" }, + }; + + it("writes a SYSTEM_FAILURE PG row when engine.trigger fails non-retryably", async () => { + const trigger = vi.fn(async () => { + throw new Error("validation failed: payload too large"); + }); + const createFailedTaskRun = vi.fn(async () => ({ + id: "internal", + friendlyId: "run_x", + })); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any), + ).resolves.toBeUndefined(); + + expect(trigger).toHaveBeenCalledOnce(); + expect(createFailedTaskRun).toHaveBeenCalledOnce(); + const arg = createFailedTaskRun.mock.calls[0][0] as { error: { raw: string } }; + expect(arg.error.raw).toContain("validation failed"); + }); + + it("rethrows the original error when createFailedTaskRun also fails (PG genuinely unreachable)", async () => { + const triggerErr = new Error("engine rejected the snapshot"); + const trigger = vi.fn(async () => { + throw triggerErr; + }); + const createFailedTaskRun = vi.fn(async () => { + throw new Error("connection refused"); + }); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("engine rejected the snapshot"); + // Drainer's outer drainOne loop now decides retry vs buffer.fail. + expect(createFailedTaskRun).toHaveBeenCalledOnce(); + }); + + it("rethrows the original error when the snapshot lacks an environment block", async () => { + const triggerErr = new Error("engine rejected the snapshot"); + const trigger = vi.fn(async () => { + throw triggerErr; + }); + const createFailedTaskRun = vi.fn(); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t" /* no environment */ }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("engine rejected the snapshot"); + expect(createFailedTaskRun).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/webapp/test/mollifierStaleSweep.test.ts b/apps/webapp/test/mollifierStaleSweep.test.ts new file mode 100644 index 00000000000..029b90cb761 --- /dev/null +++ b/apps/webapp/test/mollifierStaleSweep.test.ts @@ -0,0 +1,231 @@ +import { describe, expect, it, vi } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { MollifierBuffer } from "@trigger.dev/redis-worker"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { runStaleSweepOnce } from "~/v3/mollifier/mollifierStaleSweep.server"; + +const SNAPSHOT = { + taskIdentifier: "hello-world", + payload: '{"x":1}', + payloadType: "application/json", + traceContext: {}, +}; + +function spyDeps() { + const recordedStaleEnvIds: string[] = []; + const snapshots: Array> = []; + const warnings: Array<{ message: string; fields: Record }> = []; + return { + recordedStaleEnvIds, + snapshots, + warnings, + deps: { + recordStaleEntry: (envId: string) => { + recordedStaleEnvIds.push(envId); + }, + reportStaleEntrySnapshot: (snapshot: Map) => { + // Clone so post-sweep assertions see what was reported *at that + // call site*, not whatever subsequent passes mutate the source + // map into. + snapshots.push(new Map(snapshot)); + }, + logger: { + warn: (message: string, fields: Record) => { + warnings.push({ message, fields }); + }, + }, + }, + }; +} + +describe("runStaleSweepOnce — unit", () => { + it("returns zeros when the buffer is null", async () => { + // Mirrors the prod gate: if TRIGGER_MOLLIFIER_ENABLED=0 the buffer + // singleton is null and the sweep is a no-op. We don't want it to + // emit a metric (or throw) just because mollifier is disabled. + const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 1000 }, + { ...deps, getBuffer: () => null }, + ); + expect(result).toEqual({ + orgsScanned: 0, + envsScanned: 0, + entriesScanned: 0, + staleCount: 0, + }); + expect(recordedStaleEnvIds).toEqual([]); + expect(warnings).toEqual([]); + // An empty snapshot is still reported so any previously-paging env + // (from a prior sweep before mollifier was disabled) clears. + expect(snapshots).toHaveLength(1); + expect(snapshots[0].size).toBe(0); + }); +}); + +describe("runStaleSweepOnce — testcontainers", () => { + redisTest( + "flags entries whose dwell exceeds the stale threshold and skips fresh ones", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + // Two stale entries (one in each env) + one fresh entry. Sweep + // should flag the two stale, leave the fresh one alone, record + // the counter once per stale entry, and emit a warning per + // stale entry with the dwell + threshold. + await buffer.accept({ + runId: "run_stale_a", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_stale_b", + envId: "env_b", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_fresh", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + // Yank the system clock forward 5 minutes for the sweep — way + // past the threshold below. The `now` deps seam lets us drive + // the threshold without actually waiting in real time. + const futureNow = Date.now() + 5 * 60 * 1000; + + const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { + ...deps, + getBuffer: () => buffer, + now: () => futureNow, + }, + ); + + expect(result.envsScanned).toBe(2); + expect(result.entriesScanned).toBe(3); + expect(result.staleCount).toBe(3); + // All three entries have dwell ~5min, all exceed the 1-min + // threshold; each emits one counter tick + one warning. + expect(recordedStaleEnvIds.sort()).toEqual( + ["env_a", "env_a", "env_b"].sort(), + ); + expect(warnings).toHaveLength(3); + for (const w of warnings) { + expect(w.message).toBe("mollifier.stale_entry"); + expect(w.fields.staleThresholdMs).toBe(60 * 1000); + expect(w.fields.dwellMs).toBeGreaterThan(60 * 1000); + } + // Snapshot drives the alertable gauge — env_a has 2 stale + // entries, env_b has 1. Both must appear so a future alert can + // identify which env is paging. + expect(snapshots).toHaveLength(1); + expect(Object.fromEntries(snapshots[0])).toEqual({ + env_a: 2, + env_b: 1, + }); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "snapshot reports zero for envs that have entries but none stale (clears latched alerts)", + async ({ redisOptions }) => { + // Critical for alert behaviour: a previous sweep reported env_a + // stale, alert fired, drainer caught up. The next sweep must + // report `env_a -> 0` so the gauge drops below the alert + // threshold instead of staying latched at the last stale value. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_just_arrived", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const { deps, snapshots } = spyDeps(); + await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...deps, getBuffer: () => buffer }, + ); + expect(snapshots).toHaveLength(1); + expect(Object.fromEntries(snapshots[0])).toEqual({ env_a: 0 }); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "leaves fresh entries alone (dwell below threshold)", + async ({ redisOptions }) => { + // Regression guard for the inequality direction. A bug that flipped + // `dwellMs > threshold` to `dwellMs >= threshold` would flag every + // entry the first time the sweep runs after a perfectly synchronised + // accept call — the dashboard would page on every burst. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_fresh_only", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const { deps, recordedStaleEnvIds, warnings } = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...deps, getBuffer: () => buffer }, + ); + expect(result.staleCount).toBe(0); + expect(recordedStaleEnvIds).toEqual([]); + expect(warnings).toEqual([]); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "scans across multiple orgs", + async ({ redisOptions }) => { + // Phase-3 design has org-level fairness in the drainer; the sweep + // must walk every org/env, not just the first one it finds. If a + // future refactor collapsed listOrgs/listEnvsForOrg into a single + // env-flat list this test catches a regression there. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_x", + envId: "env_x", + orgId: "org_x", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_y", + envId: "env_y", + orgId: "org_y", + payload: JSON.stringify(SNAPSHOT), + }); + const futureNow = Date.now() + 5 * 60 * 1000; + const { deps } = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...deps, getBuffer: () => buffer, now: () => futureNow }, + ); + expect(result.orgsScanned).toBe(2); + expect(result.envsScanned).toBe(2); + expect(result.staleCount).toBe(2); + } finally { + await buffer.close(); + } + }, + ); +}); diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index da42247111a..e461fddf6c5 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -450,6 +450,162 @@ export class RunEngine { //MARK: - Run functions + /** + * Writes a TaskRun row in CANCELED state directly, bypassing the trigger + * pipeline. Used by the mollifier drainer when a cancel API call lands on + * a buffered run before it materialises (Q4 mollifier-cancel design). + * + * Skips: queue insertion (no execution), waitpoint creation (single- + * triggerAndWait can't enter the buffer; F4 bypass), concurrency + * reservation. Emits `runCancelled` so the existing TaskEvent handler + * writes the cancellation event row — the only side effect PG-side cancel + * has today per audit. + * + * Idempotent: if a row with the same friendlyId already exists (double + * drainer pop after requeue), Prisma's P2002 unique-constraint violation + * is caught and the existing row is returned. The duplicate runCancelled + * emission is skipped — the original drain's emit already wrote the + * TaskEvent. + */ + async createCancelledRun( + { + snapshot, + cancelledAt, + cancelReason, + }: { + snapshot: TriggerParams; + cancelledAt: Date; + cancelReason: string; + }, + tx?: PrismaClientOrTransaction, + ): Promise { + const prisma = tx ?? this.prisma; + return startSpan(this.tracer, "createCancelledRun", async (span) => { + span.setAttribute("friendlyId", snapshot.friendlyId); + span.setAttribute("taskIdentifier", snapshot.taskIdentifier); + const id = RunId.fromFriendlyId(snapshot.friendlyId); + const error: TaskRunError = { type: "STRING_ERROR", raw: cancelReason }; + + try { + const taskRun = await prisma.taskRun.create({ + data: { + id, + engine: "V2", + status: "CANCELED", + friendlyId: snapshot.friendlyId, + runtimeEnvironmentId: snapshot.environment.id, + environmentType: snapshot.environment.type, + organizationId: snapshot.environment.organization.id, + projectId: snapshot.environment.project.id, + idempotencyKey: snapshot.idempotencyKey, + idempotencyKeyExpiresAt: snapshot.idempotencyKeyExpiresAt, + idempotencyKeyOptions: snapshot.idempotencyKeyOptions, + taskIdentifier: snapshot.taskIdentifier, + payload: snapshot.payload, + payloadType: snapshot.payloadType, + context: snapshot.context, + traceContext: snapshot.traceContext, + traceId: snapshot.traceId, + spanId: snapshot.spanId, + parentSpanId: snapshot.parentSpanId, + lockedToVersionId: snapshot.lockedToVersionId, + taskVersion: snapshot.taskVersion, + sdkVersion: snapshot.sdkVersion, + cliVersion: snapshot.cliVersion, + concurrencyKey: snapshot.concurrencyKey, + queue: snapshot.queue, + lockedQueueId: snapshot.lockedQueueId, + workerQueue: snapshot.workerQueue, + isTest: snapshot.isTest, + taskEventStore: snapshot.taskEventStore, + // Defensive: the snapshot comes from a cjson-encoded buffer + // payload, where empty Lua tables encode as `{}` not `[]`. If + // the drainer pops a buffered run with no tags, snapshot.tags + // will be an empty object, which Prisma misreads as a relation + // update op. Normalise to a real array (or undefined for the + // empty case). + runTags: Array.isArray(snapshot.tags) && snapshot.tags.length > 0 + ? snapshot.tags + : undefined, + oneTimeUseToken: snapshot.oneTimeUseToken, + parentTaskRunId: snapshot.parentTaskRunId, + rootTaskRunId: snapshot.rootTaskRunId, + replayedFromTaskRunFriendlyId: snapshot.replayedFromTaskRunFriendlyId, + batchId: snapshot.batch?.id, + resumeParentOnCompletion: snapshot.resumeParentOnCompletion, + depth: snapshot.depth, + seedMetadata: snapshot.seedMetadata, + seedMetadataType: snapshot.seedMetadataType, + metadata: snapshot.metadata, + metadataType: snapshot.metadataType, + machinePreset: snapshot.machine, + scheduleId: snapshot.scheduleId, + scheduleInstanceId: snapshot.scheduleInstanceId, + createdAt: snapshot.createdAt, + bulkActionGroupIds: snapshot.bulkActionId ? [snapshot.bulkActionId] : undefined, + planType: snapshot.planType, + realtimeStreamsVersion: snapshot.realtimeStreamsVersion, + streamBasinName: snapshot.streamBasinName, + annotations: snapshot.annotations, + completedAt: cancelledAt, + updatedAt: cancelledAt, + error: error as unknown as Prisma.InputJsonValue, + attemptNumber: 0, + executionSnapshots: { + create: { + engine: "V2", + executionStatus: "FINISHED", + description: "Run cancelled before materialisation", + runStatus: "CANCELED", + environmentId: snapshot.environment.id, + environmentType: snapshot.environment.type, + projectId: snapshot.environment.project.id, + organizationId: snapshot.environment.organization.id, + }, + }, + }, + }); + + this.eventBus.emit("runCancelled", { + time: cancelledAt, + run: { + id: taskRun.id, + status: taskRun.status, + friendlyId: taskRun.friendlyId, + spanId: taskRun.spanId, + taskEventStore: taskRun.taskEventStore, + createdAt: taskRun.createdAt, + completedAt: taskRun.completedAt, + error, + updatedAt: taskRun.updatedAt, + attemptNumber: taskRun.attemptNumber ?? 0, + }, + organization: { id: snapshot.environment.organization.id }, + project: { id: snapshot.environment.project.id }, + environment: { id: snapshot.environment.id }, + }); + + return taskRun; + } catch (err) { + // P2002 = unique constraint violation. Double-pop after a drainer + // requeue can reach this. Idempotent: return the existing row + // without re-emitting. + if ( + err instanceof Prisma.PrismaClientKnownRequestError && + err.code === "P2002" + ) { + this.logger.info( + "createCancelledRun: row already exists, returning existing (idempotent)", + { friendlyId: snapshot.friendlyId }, + ); + const existing = await prisma.taskRun.findFirst({ where: { id } }); + if (existing) return existing; + } + throw err; + } + }); + } + /** "Triggers" one run. */ async trigger( { @@ -983,6 +1139,44 @@ export class RunEngine { }); } + // Emit `runFailed` so the alert pipeline picks up the + // SYSTEM_FAILURE row and the event-store handler writes the + // completion event into the trace. Without this the mollifier + // drainer's terminal failures (and batch-trigger's + // exceed-limit failures) land in PG silently — visible in the + // dashboard list but never reaching customers' configured + // ERROR alert channels. + this.eventBus.emit("runFailed", { + time: taskRun.completedAt ?? new Date(), + run: { + id: taskRun.id, + status: taskRun.status, + spanId: taskRun.spanId, + error, + taskEventStore: taskRun.taskEventStore, + createdAt: taskRun.createdAt, + completedAt: taskRun.completedAt, + updatedAt: taskRun.updatedAt, + // This row never attempted execution — it's a synthesised + // terminal failure. The alert payload's `attemptNumber=0` + // is the signal downstream consumers can use to + // distinguish a never-ran failure from a run that + // exhausted its retries. + attemptNumber: 0, + usageDurationMs: 0, + costInCents: 0, + }, + organization: { + id: environment.organization.id, + }, + project: { + id: environment.project.id, + }, + environment: { + id: environment.id, + }, + }); + return taskRun; }, { diff --git a/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts new file mode 100644 index 00000000000..0a541b5349e --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts @@ -0,0 +1,233 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; + +function freshRunId() { + return RunId.generate().friendlyId; +} +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import type { EventBusEventArgs } from "../eventBus.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +function baseEngineOptions(redisOptions: Parameters[0]["queue"]["redis"]) { + return { + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x" as const, + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }; +} + +// Phase C1 / Q4 design — engine.createCancelledRun writes a CANCELED +// TaskRun row directly from a buffer snapshot. Verifies the bypass- +// queue / bypass-waitpoint / emit-runCancelled contract. +describe("RunEngine.createCancelledRun", () => { + containerTest( + "writes CANCELED PG row with snapshot fields, completedAt, error", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + const cancelledAt = new Date("2026-05-20T12:00:00.000Z"); + const cancelReason = "Canceled by user"; + + const result = await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: '{"hello":"world"}', + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + queue: "task/test-task", + isTest: false, + tags: ["test-tag"], + }, + cancelledAt, + cancelReason, + }); + + expect(result.status).toBe("CANCELED"); + expect(result.friendlyId).toBe(friendlyId); + expect(result.id).toBe(RunId.fromFriendlyId(friendlyId)); + expect(result.completedAt?.toISOString()).toBe(cancelledAt.toISOString()); + expect(result.taskIdentifier).toBe("test-task"); + expect(result.runTags).toEqual(["test-tag"]); + expect(result.payload).toBe('{"hello":"world"}'); + const err = result.error as { type?: string; raw?: string }; + expect(err.type).toBe("STRING_ERROR"); + expect(err.raw).toBe(cancelReason); + + // Verify the PG row is canonical (findFirst returns the row). + const stored = await prisma.taskRun.findFirst({ + where: { friendlyId }, + }); + expect(stored).not.toBeNull(); + expect(stored!.status).toBe("CANCELED"); + } finally { + await engine.quit(); + } + }, + ); + + containerTest( + "emits runCancelled with correct payload", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + const captured: EventBusEventArgs<"runCancelled">[0][] = []; + engine.eventBus.on("runCancelled", (event) => { + captured.push(event); + }); + + try { + const cancelledAt = new Date(); + const cancelReason = "Test cancel"; + const friendlyId = freshRunId(); + await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000cccc000000000000", + spanId: "dddd000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }, + cancelledAt, + cancelReason, + }); + + expect(captured).toHaveLength(1); + expect(captured[0]!.run.status).toBe("CANCELED"); + expect(captured[0]!.run.friendlyId).toBe(friendlyId); + expect(captured[0]!.run.error).toEqual({ type: "STRING_ERROR", raw: cancelReason }); + expect(captured[0]!.organization.id).toBe(env.organization.id); + } finally { + await engine.quit(); + } + }, + ); + + containerTest( + "idempotent on double-pop: second call returns existing row without re-emitting", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + const captured: EventBusEventArgs<"runCancelled">[0][] = []; + engine.eventBus.on("runCancelled", (event) => { + captured.push(event); + }); + + try { + const snapshot = { + friendlyId: freshRunId(), + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000eeee000000000000", + spanId: "ffff000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }; + const cancelledAt = new Date(); + const cancelReason = "Test idempotent"; + + const first = await engine.createCancelledRun({ snapshot, cancelledAt, cancelReason }); + const second = await engine.createCancelledRun({ snapshot, cancelledAt, cancelReason }); + + expect(second.id).toBe(first.id); + // Only the first call's emit fired; the P2002 path skips re-emission. + expect(captured).toHaveLength(1); + } finally { + await engine.quit(); + } + }, + ); + + // Regression: cjson encodes empty Lua tables as `{}`, not `[]`. When + // the drainer pops a buffered run that never had a tag set, the + // deserialised snapshot's `tags` field is an empty object. The old + // implementation passed it straight into Prisma's `runTags:` field; + // Prisma misread the object as a relation update op and threw + // `Argument 'set' is missing`. The drainer caught the error and + // marked the buffer entry FAILED — so the CANCELED PG row never + // landed. Found while running the Phase F challenge suite. + containerTest( + "tolerates snapshot.tags being an empty object (cjson edge case)", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + // Cast through unknown to simulate the cjson-decode output shape + // for an empty Lua table — TypeScript's snapshot type says + // string[], but the buffer Lua delivers {} for the empty case. + const result = await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000abcd000000000000", + spanId: "1234000000000000", + queue: "task/test-task", + isTest: false, + tags: {} as unknown as string[], + }, + cancelledAt: new Date(), + cancelReason: "Cancelled — empty tags", + }); + expect(result.status).toBe("CANCELED"); + expect(result.friendlyId).toBe(friendlyId); + // Prisma normalises the absent-tags case to either [] or null + // depending on the column default; assert it's an empty array. + expect(result.runTags).toEqual([]); + } finally { + await engine.quit(); + } + }, + ); +}); diff --git a/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts new file mode 100644 index 00000000000..0619eeffc2f --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts @@ -0,0 +1,111 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import { EventBusEventArgs } from "../eventBus.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunEngine.createFailedTaskRun", () => { + containerTest("emits runFailed so the alert pipeline wakes up", async ({ prisma, redisOptions }) => { + // The mollifier drainer (and batch-trigger over-limit path) call + // createFailedTaskRun to write a terminal SYSTEM_FAILURE PG row + // for runs that never actually executed. Without an explicit + // runFailed emit, the row lands silently — the + // runEngineHandlers' `runFailed` listener (which enqueues + // PerformTaskRunAlertsService) never fires, so customers' + // configured TASK_RUN alert channels miss the failure entirely. + // + // Regression intent: if the emit is removed or moved out of + // createFailedTaskRun's success path, this test fails. The + // shape assertions pin the fields the alert delivery service + // reads from the event payload (run.id, run.status, error, + // attemptNumber=0 as the never-ran-marker). + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const failedEvents: EventBusEventArgs<"runFailed">[0][] = []; + engine.eventBus.on("runFailed", (event) => { + failedEvents.push(event); + }); + + const friendlyId = generateFriendlyId("run"); + const taskIdentifier = "drainer-terminal-test"; + + const failed = await engine.createFailedTaskRun({ + friendlyId, + environment: { + id: authenticatedEnvironment.id, + type: authenticatedEnvironment.type, + project: { id: authenticatedEnvironment.project.id }, + organization: { id: authenticatedEnvironment.organization.id }, + }, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + error: { + type: "STRING_ERROR", + raw: "Mollifier drainer terminal failure: synthetic engine.trigger panic", + }, + traceId: "0123456789abcdef0123456789abcdef", + spanId: "fedcba9876543210", + }); + + expect(failed.status).toBe("SYSTEM_FAILURE"); + + expect(failedEvents).toHaveLength(1); + const event = failedEvents[0]; + expect(event.run.id).toBe(failed.id); + expect(event.run.status).toBe("SYSTEM_FAILURE"); + expect(event.run.spanId).toBe("fedcba9876543210"); + // attemptNumber=0 is the marker that the run never executed — + // it's a synthesised terminal failure, not an exhausted-retries + // failure. Downstream consumers can use this to distinguish. + expect(event.run.attemptNumber).toBe(0); + expect(event.run.usageDurationMs).toBe(0); + expect(event.run.costInCents).toBe(0); + expect(event.run.error).toEqual({ + type: "STRING_ERROR", + raw: "Mollifier drainer terminal failure: synthetic engine.trigger panic", + }); + expect(event.organization.id).toBe(authenticatedEnvironment.organization.id); + expect(event.project.id).toBe(authenticatedEnvironment.project.id); + expect(event.environment.id).toBe(authenticatedEnvironment.id); + } finally { + await engine.quit(); + } + }); +}); From fb7cec5bcdf2dbd2233f4d3c5a4b9974fed78200 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 15:05:53 +0100 Subject: [PATCH 2/5] fix(webapp,run-engine): replay-layer code-review follow-ups - `isRetryablePgError`: also accept `errorCode === "P1001"` so `PrismaClientInitializationError` (which surfaces P1001 on a different field than `PrismaClientKnownRequestError`) retries. - Drop `envId` from OTel metric labels on `mollifier.realtime_subscriptions.buffered`, `mollifier.stale_entries`, and the `mollifier.stale_entries.current` gauge. `envId` is a banned high-cardinality attribute; the structured warn log alongside each counter tick still carries envId for forensic drill-down. - Stale-sweep test name + comments now match the assertion shape (all three entries stale, not "two stale + one fresh"). - `RunEngine.createCancelledRun` P2002 path now requires the existing row's status to be CANCELED; a non-canceled conflict throws rather than silently reporting success, so the caller can route to `engine.cancelRun()` or skip. - Regression test pins the new conflict guard. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../mollifierDrainerHandler.server.ts | 7 ++ .../mollifier/mollifierStaleSweep.server.ts | 7 +- .../v3/mollifier/mollifierTelemetry.server.ts | 54 +++++++------ apps/webapp/test/mollifierStaleSweep.test.ts | 80 ++++++++++--------- .../run-engine/src/engine/index.ts | 16 +++- .../engine/tests/createCancelledRun.test.ts | 61 ++++++++++++++ 6 files changed, 160 insertions(+), 65 deletions(-) diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts index 7f2608d5b21..741f1afed07 100644 --- a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts @@ -10,8 +10,15 @@ const tracer = trace.getTracer("mollifier-drainer"); export function isRetryablePgError(err: unknown): boolean { if (!(err instanceof Error)) return false; const msg = err.message ?? ""; + // Prisma surfaces P1001 ("Can't reach database server") via two + // different error classes — `PrismaClientKnownRequestError` exposes + // it as `err.code`, `PrismaClientInitializationError` exposes it as + // `err.errorCode`. Check both so reconnection-time errors retry + // regardless of which class fires. const code = (err as { code?: string }).code; + const errorCode = (err as { errorCode?: string }).errorCode; if (code === "P2024") return true; + if (code === "P1001" || errorCode === "P1001") return true; if (msg.includes("Can't reach database server")) return true; if (msg.includes("Connection lost")) return true; if (msg.includes("ECONNRESET")) return true; diff --git a/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts index 5c31618efec..c2f94495cc9 100644 --- a/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts @@ -22,7 +22,10 @@ export type StaleSweepConfig = { export type StaleSweepDeps = { getBuffer?: () => MollifierBuffer | null; - recordStaleEntry?: (envId: string) => void; + // No `envId` arg — `envId` is a high-cardinality metric attribute and + // is intentionally not emitted as a metric label. The structured warn + // log below carries envId for forensic drill-down. + recordStaleEntry?: () => void; reportStaleEntrySnapshot?: (snapshot: Map) => void; logger?: { warn: (message: string, fields: Record) => void }; now?: () => number; @@ -82,7 +85,7 @@ export async function runStaleSweepOnce( entriesScanned += 1; const dwellMs = now - entry.createdAt.getTime(); if (dwellMs > config.staleThresholdMs) { - recordStale(envId); + recordStale(); log.warn("mollifier.stale_entry", { runId: entry.runId, envId, diff --git a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts index ba58ce47f63..f9c7ca72f1f 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts @@ -29,16 +29,21 @@ export const realtimeBufferedSubscriptionsCounter = meter.createCounter( }, ); -export function recordRealtimeBufferedSubscription(envId: string): void { - realtimeBufferedSubscriptionsCounter.add(1, { envId }); +// No `envId` attribute — `envId` is a banned high-cardinality metric +// label per the repo's OTel rules. The structured warn log emitted +// alongside the counter tick (in `mollifierStaleSweep.server.ts`) +// carries the envId / orgId / runId for forensic drill-down; the +// metric stays an aggregate. +export function recordRealtimeBufferedSubscription(): void { + realtimeBufferedSubscriptionsCounter.add(1); } // Counts buffer entries that have been waiting in the queue ZSET longer -// than the configured stale threshold (typically half of entryTtlSeconds). -// Useful for historical "stale events over time" views, but not directly -// alertable on its own — a single stuck entry observed by N sweep ticks -// adds N to the counter, so `rate()` over an alerting window reflects -// (entries × ticks), not "entries that are stale right now". +// than the configured stale threshold. Useful for historical "stale +// events over time" views, but not directly alertable on its own — a +// single stuck entry observed by N sweep ticks adds N to the counter, +// so `rate()` over an alerting window reflects (entries × ticks), not +// "entries that are stale right now". export const staleEntriesCounter = meter.createCounter( "mollifier.stale_entries", { @@ -47,16 +52,16 @@ export const staleEntriesCounter = meter.createCounter( }, ); -export function recordStaleEntry(envId: string): void { - staleEntriesCounter.add(1, { envId }); +// No `envId` attribute — see comment above. +export function recordStaleEntry(): void { + staleEntriesCounter.add(1); } -// Alertable signal: the count of stale entries observed by the latest -// sweep, per env. The sweep snapshots the full per-env picture on each -// pass (including zeros for envs that no longer have any stale entries) -// so an env that was paging can clear when the drainer catches up -// instead of staying latched. Recommended alert: -// mollifier_stale_entries_current{envId=...} > 0 for 5m +// Alertable signal: the total count of stale entries observed by the +// latest sweep. The sweep snapshots the full picture on each pass so +// the gauge drops back to 0 when the drainer catches up instead of +// staying latched. Recommended alert: +// mollifier_stale_entries_current > 0 for 5m export const staleEntriesGauge = meter.createObservableGauge( "mollifier.stale_entries.current", { @@ -65,23 +70,22 @@ export const staleEntriesGauge = meter.createObservableGauge( }, ); -const latestStaleSnapshot = new Map(); +let latestStaleTotal = 0; export function reportStaleEntrySnapshot(snapshot: Map): void { - // Replace, don't merge — envs absent from the new snapshot have either - // drained or no longer exist; leaving their last value cached would - // keep alerts latched forever. - latestStaleSnapshot.clear(); - for (const [envId, count] of snapshot) { - latestStaleSnapshot.set(envId, count); + // Sum across envs. Per-env breakdown is intentionally NOT emitted as + // a metric label (high-cardinality); the structured warn log lines + // from the sweep carry per-env detail for ops to drill down. + let total = 0; + for (const count of snapshot.values()) { + total += count; } + latestStaleTotal = total; } meter.addBatchObservableCallback( (result) => { - for (const [envId, count] of latestStaleSnapshot) { - result.observe(staleEntriesGauge, count, { envId }); - } + result.observe(staleEntriesGauge, latestStaleTotal); }, [staleEntriesGauge], ); diff --git a/apps/webapp/test/mollifierStaleSweep.test.ts b/apps/webapp/test/mollifierStaleSweep.test.ts index 029b90cb761..6b57931407f 100644 --- a/apps/webapp/test/mollifierStaleSweep.test.ts +++ b/apps/webapp/test/mollifierStaleSweep.test.ts @@ -14,16 +14,21 @@ const SNAPSHOT = { }; function spyDeps() { - const recordedStaleEnvIds: string[] = []; + // Counter ticks — metric carries no `envId` label (high-cardinality) + // so the spy is a simple call count. Per-env detail lives on the + // structured warn log and the snapshot map. + let staleEntryCount = 0; const snapshots: Array> = []; const warnings: Array<{ message: string; fields: Record }> = []; return { - recordedStaleEnvIds, + get staleEntryCount() { + return staleEntryCount; + }, snapshots, warnings, deps: { - recordStaleEntry: (envId: string) => { - recordedStaleEnvIds.push(envId); + recordStaleEntry: () => { + staleEntryCount += 1; }, reportStaleEntrySnapshot: (snapshot: Map) => { // Clone so post-sweep assertions see what was reported *at that @@ -45,10 +50,10 @@ describe("runStaleSweepOnce — unit", () => { // Mirrors the prod gate: if TRIGGER_MOLLIFIER_ENABLED=0 the buffer // singleton is null and the sweep is a no-op. We don't want it to // emit a metric (or throw) just because mollifier is disabled. - const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps(); + const spies = spyDeps(); const result = await runStaleSweepOnce( { staleThresholdMs: 1000 }, - { ...deps, getBuffer: () => null }, + { ...spies.deps, getBuffer: () => null }, ); expect(result).toEqual({ orgsScanned: 0, @@ -56,8 +61,9 @@ describe("runStaleSweepOnce — unit", () => { entriesScanned: 0, staleCount: 0, }); - expect(recordedStaleEnvIds).toEqual([]); - expect(warnings).toEqual([]); + expect(spies.staleEntryCount).toBe(0); + expect(spies.warnings).toEqual([]); + const snapshots = spies.snapshots; // An empty snapshot is still reported so any previously-paging env // (from a prior sweep before mollifier was disabled) clears. expect(snapshots).toHaveLength(1); @@ -67,14 +73,15 @@ describe("runStaleSweepOnce — unit", () => { describe("runStaleSweepOnce — testcontainers", () => { redisTest( - "flags entries whose dwell exceeds the stale threshold and skips fresh ones", + "flags every entry whose dwell exceeds the stale threshold", async ({ redisOptions }) => { const buffer = new MollifierBuffer({ redisOptions }); try { - // Two stale entries (one in each env) + one fresh entry. Sweep - // should flag the two stale, leave the fresh one alone, record - // the counter once per stale entry, and emit a warning per - // stale entry with the dwell + threshold. + // Three entries across two envs in the same org. The sweep below + // runs against a `now` advanced by 5 minutes, so all three have + // dwell ~5min and ALL THREE are stale against a 1-minute + // threshold — there is no "fresh" entry in this scenario. The + // assertions below pin the all-three-stale shape. await buffer.accept({ runId: "run_stale_a", envId: "env_a", @@ -88,7 +95,7 @@ describe("runStaleSweepOnce — testcontainers", () => { payload: JSON.stringify(SNAPSHOT), }); await buffer.accept({ - runId: "run_fresh", + runId: "run_stale_c", envId: "env_a", orgId: "org_1", payload: JSON.stringify(SNAPSHOT), @@ -98,11 +105,11 @@ describe("runStaleSweepOnce — testcontainers", () => { // the threshold without actually waiting in real time. const futureNow = Date.now() + 5 * 60 * 1000; - const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps(); + const spies = spyDeps(); const result = await runStaleSweepOnce( { staleThresholdMs: 60 * 1000 }, { - ...deps, + ...spies.deps, getBuffer: () => buffer, now: () => futureNow, }, @@ -111,22 +118,21 @@ describe("runStaleSweepOnce — testcontainers", () => { expect(result.envsScanned).toBe(2); expect(result.entriesScanned).toBe(3); expect(result.staleCount).toBe(3); - // All three entries have dwell ~5min, all exceed the 1-min - // threshold; each emits one counter tick + one warning. - expect(recordedStaleEnvIds.sort()).toEqual( - ["env_a", "env_a", "env_b"].sort(), - ); - expect(warnings).toHaveLength(3); - for (const w of warnings) { + // All three entries exceed the threshold; each emits one + // counter tick + one warning. + expect(spies.staleEntryCount).toBe(3); + expect(spies.warnings).toHaveLength(3); + for (const w of spies.warnings) { expect(w.message).toBe("mollifier.stale_entry"); expect(w.fields.staleThresholdMs).toBe(60 * 1000); expect(w.fields.dwellMs).toBeGreaterThan(60 * 1000); } // Snapshot drives the alertable gauge — env_a has 2 stale - // entries, env_b has 1. Both must appear so a future alert can - // identify which env is paging. - expect(snapshots).toHaveLength(1); - expect(Object.fromEntries(snapshots[0])).toEqual({ + // entries, env_b has 1. Per-env detail is still passed to + // `reportStaleEntrySnapshot` for forensic value even though the + // gauge itself aggregates the total. + expect(spies.snapshots).toHaveLength(1); + expect(Object.fromEntries(spies.snapshots[0])).toEqual({ env_a: 2, env_b: 1, }); @@ -151,13 +157,13 @@ describe("runStaleSweepOnce — testcontainers", () => { orgId: "org_1", payload: JSON.stringify(SNAPSHOT), }); - const { deps, snapshots } = spyDeps(); + const spies = spyDeps(); await runStaleSweepOnce( { staleThresholdMs: 60 * 1000 }, - { ...deps, getBuffer: () => buffer }, + { ...spies.deps, getBuffer: () => buffer }, ); - expect(snapshots).toHaveLength(1); - expect(Object.fromEntries(snapshots[0])).toEqual({ env_a: 0 }); + expect(spies.snapshots).toHaveLength(1); + expect(Object.fromEntries(spies.snapshots[0])).toEqual({ env_a: 0 }); } finally { await buffer.close(); } @@ -179,14 +185,14 @@ describe("runStaleSweepOnce — testcontainers", () => { orgId: "org_1", payload: JSON.stringify(SNAPSHOT), }); - const { deps, recordedStaleEnvIds, warnings } = spyDeps(); + const spies = spyDeps(); const result = await runStaleSweepOnce( { staleThresholdMs: 60 * 1000 }, - { ...deps, getBuffer: () => buffer }, + { ...spies.deps, getBuffer: () => buffer }, ); expect(result.staleCount).toBe(0); - expect(recordedStaleEnvIds).toEqual([]); - expect(warnings).toEqual([]); + expect(spies.staleEntryCount).toBe(0); + expect(spies.warnings).toEqual([]); } finally { await buffer.close(); } @@ -215,10 +221,10 @@ describe("runStaleSweepOnce — testcontainers", () => { payload: JSON.stringify(SNAPSHOT), }); const futureNow = Date.now() + 5 * 60 * 1000; - const { deps } = spyDeps(); + const spies = spyDeps(); const result = await runStaleSweepOnce( { staleThresholdMs: 60 * 1000 }, - { ...deps, getBuffer: () => buffer, now: () => futureNow }, + { ...spies.deps, getBuffer: () => buffer, now: () => futureNow }, ); expect(result.orgsScanned).toBe(2); expect(result.envsScanned).toBe(2); diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index e461fddf6c5..244d2b014ed 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -599,7 +599,21 @@ export class RunEngine { { friendlyId: snapshot.friendlyId }, ); const existing = await prisma.taskRun.findFirst({ where: { id } }); - if (existing) return existing; + if (existing) { + // Only treat the conflict as idempotent when the existing + // row is ALREADY canceled. If a non-canceled row landed + // first (e.g. the drainer's normal `engine.trigger` replay + // path raced ahead of the cancel) we surface a conflict + // rather than silently reporting "cancelled" — the run is + // genuinely live and the caller must decide between + // engine.cancelRun() and skipping. + if (existing.status === "CANCELED") { + return existing; + } + throw new Error( + `createCancelledRun conflict: existing run ${snapshot.friendlyId} has status ${existing.status}`, + ); + } } throw err; } diff --git a/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts index 0a541b5349e..71223ce0773 100644 --- a/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts +++ b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts @@ -230,4 +230,65 @@ describe("RunEngine.createCancelledRun", () => { } }, ); + + // Regression: the P2002-on-id idempotency path used to return ANY + // existing row, which would silently report success even if a live + // (non-CANCELED) row landed first. The guard now requires the + // existing row's status to be CANCELED; anything else surfaces a + // conflict so the caller can route to engine.cancelRun() or skip. + containerTest( + "P2002 conflict with non-CANCELED existing row throws (does not silently succeed)", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + const id = RunId.fromFriendlyId(friendlyId); + + // Plant a live (non-CANCELED) row with the same id so the + // cancelled-run INSERT hits P2002 and the guard finds a row + // that ISN'T CANCELED. + await prisma.taskRun.create({ + data: { + id, + friendlyId, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + status: "PENDING", + runtimeEnvironmentId: env.id, + projectId: env.project.id, + organizationId: env.organizationId, + queue: "task/test-task", + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + engine: "V2", + }, + }); + + await expect( + engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }, + cancelledAt: new Date(), + cancelReason: "Should not silently overwrite a live row", + }), + ).rejects.toThrow(/createCancelledRun conflict.*PENDING/); + } finally { + await engine.quit(); + } + }, + ); }); From 3453923a83c20964c21a072782f561187780613e Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 15:44:06 +0100 Subject: [PATCH 3/5] test(webapp): isolate mollifierDrainerWorker test from runEngine singleton MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Importing the production drainer wiring transitively loads \`~/v3/runEngine.server\`, whose top-level \`singleton(...)\` eagerly constructs a RunEngine. The constructor spins up Prisma + Redis workers that try to connect to localhost — in CI (no PG, no Redis) that produces an unhandled \`PrismaClientInitializationError\` which fails the run even though every assertion passes. Mock the runEngine and prisma modules so the unit test exercises only the bootstrap's error classification, not a live engine. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/webapp/test/mollifierDrainerWorker.test.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/apps/webapp/test/mollifierDrainerWorker.test.ts b/apps/webapp/test/mollifierDrainerWorker.test.ts index e5f38229d8f..0d4e931fd83 100644 --- a/apps/webapp/test/mollifierDrainerWorker.test.ts +++ b/apps/webapp/test/mollifierDrainerWorker.test.ts @@ -1,4 +1,17 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; + +// Importing `~/v3/mollifier/mollifierDrainer.server` (below) transitively +// loads `~/v3/runEngine.server`, whose top-level `singleton(...)` call +// eagerly constructs a RunEngine. That spins up Prisma + Redis workers +// that try to connect to localhost — which in CI (no PG, no Redis) +// produces an unhandled `PrismaClientInitializationError` that fails +// the test run even though the assertions all pass. Mocking the +// runEngine module short-circuits the singleton so no worker starts. +vi.mock("~/v3/runEngine.server", () => ({ engine: {} })); +// Same problem: prisma.server.ts's top-level singleton tries to open a +// PG client. The test never makes a query; an empty stub is enough. +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + import { MollifierConfigurationError } from "~/v3/mollifier/mollifierDrainer.server"; import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server"; From c6fa61f89aa033dbe5b1ba8a69acb975ab6a9f73 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 16:12:22 +0100 Subject: [PATCH 4/5] test(webapp): bump mollifierStaleSweep redisTest timeouts to 20s Container startup + the sweep loop can exceed Vitest's 5s default on CI runners (passes in ~1.7-2s locally). Matches the explicit \`{ timeout: 20_000 }\` other mollifier redisTests carry across the project. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/webapp/test/mollifierStaleSweep.test.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/webapp/test/mollifierStaleSweep.test.ts b/apps/webapp/test/mollifierStaleSweep.test.ts index 6b57931407f..cc452aff9f6 100644 --- a/apps/webapp/test/mollifierStaleSweep.test.ts +++ b/apps/webapp/test/mollifierStaleSweep.test.ts @@ -74,6 +74,7 @@ describe("runStaleSweepOnce — unit", () => { describe("runStaleSweepOnce — testcontainers", () => { redisTest( "flags every entry whose dwell exceeds the stale threshold", + { timeout: 20_000 }, async ({ redisOptions }) => { const buffer = new MollifierBuffer({ redisOptions }); try { @@ -144,6 +145,7 @@ describe("runStaleSweepOnce — testcontainers", () => { redisTest( "snapshot reports zero for envs that have entries but none stale (clears latched alerts)", + { timeout: 20_000 }, async ({ redisOptions }) => { // Critical for alert behaviour: a previous sweep reported env_a // stale, alert fired, drainer caught up. The next sweep must @@ -172,6 +174,7 @@ describe("runStaleSweepOnce — testcontainers", () => { redisTest( "leaves fresh entries alone (dwell below threshold)", + { timeout: 20_000 }, async ({ redisOptions }) => { // Regression guard for the inequality direction. A bug that flipped // `dwellMs > threshold` to `dwellMs >= threshold` would flag every @@ -201,6 +204,7 @@ describe("runStaleSweepOnce — testcontainers", () => { redisTest( "scans across multiple orgs", + { timeout: 20_000 }, async ({ redisOptions }) => { // Phase-3 design has org-level fairness in the drainer; the sweep // must walk every org/env, not just the first one it finds. If a From 242ba7357ead4288b9682513c479363dfdfe73b0 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 17:32:17 +0100 Subject: [PATCH 5/5] fix(webapp,run-engine): replay-layer Devin follow-ups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs flagged by Devin on PR #3754: 1. entry.server.tsx reverted to \`void sessionsReplicationInstance;\`, which esbuild tree-shakes under \`"sideEffects": false\`. Restored the globalThis assignment + warning comment from #3738 (incident TRI-9864). Without this the sessions→ClickHouse logical replication slot stops being consumed at boot. 2. createFailedTaskRun unconditionally emitted \`runFailed\`, which the \`completeFailedRunEvent\` listener uses to write a span completion into ClickHouse. But TriggerFailedTaskService.call() already wraps createFailedTaskRun inside \`repository.traceEvent({ incomplete: false, isError: true })\` which writes its own completion row for the same (traceId, spanId). Two completions racing on the same span row is a real observability bug. Added an \`emitRunFailedEvent: boolean = true\` opt-out. The TriggerFailedTaskService.call() path now passes \`false\` and enqueues \`PerformTaskRunAlertsService\` directly after the trace event closes so the alerts side of \`runFailed\` is preserved. \`callWithoutTraceEvents\` and the mollifier drainer's terminal- failure path keep the default emit (they have no outer trace event managing the span). Regression test pins the opt-out: \`emitRunFailedEvent: false\` writes the PG row but does NOT fire the bus event. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/webapp/app/entry.server.tsx | 11 +++- .../services/triggerFailedTask.server.ts | 27 ++++++++ .../run-engine/src/engine/index.ts | 27 ++++++++ .../engine/tests/createFailedTaskRun.test.ts | 65 +++++++++++++++++++ 4 files changed, 129 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index ca53d03eb67..9996eb7b30a 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -31,8 +31,17 @@ import { // on webapp startup. The singleton's initializer wires start (gated on // `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors // runsReplicationInstance. +// +// IMPORTANT: do NOT replace this with `void sessionsReplicationInstance;`. +// `apps/webapp/package.json` declares `"sideEffects": false`, so esbuild +// treats `void ;` as a pure expression statement and tree-shakes +// the entire import — the singleton's initializer never fires and the +// sessions→ClickHouse logical replication slot stops being consumed. Assigning +// to globalThis is an unambiguous side effect the bundler must preserve. See +// TRI-9864 for the incident write-up. import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server"; -void sessionsReplicationInstance; +(globalThis as Record).__sessionsReplicationInstance = + sessionsReplicationInstance; const ABORT_DELAY = 30000; diff --git a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts index 5f985b684c1..a26f6c3e74d 100644 --- a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts @@ -6,6 +6,7 @@ import type { PrismaClientOrTransaction } from "@trigger.dev/database"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { getEventRepository } from "~/v3/eventRepository/index.server"; +import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server"; import { DefaultQueueManager } from "../concerns/queues.server"; import type { TriggerTaskRequest } from "../types"; @@ -176,6 +177,14 @@ export class TriggerFailedTaskService { event.setAttribute("runId", failedRunFriendlyId); event.failWithError(taskRunError); + // `emitRunFailedEvent: false` because this call site owns the + // trace-event lifecycle via the outer `traceEvent({ + // incomplete: false, isError: true })`. Letting the engine + // emit `runFailed` here would race the + // `completeFailedRunEvent` listener against the outer trace + // event's own completion write for the same (traceId, spanId). + // We re-trigger the alerts side directly after the trace + // event closes, below. return await this.engine.createFailedTaskRun({ friendlyId: failedRunFriendlyId, environment: { @@ -200,12 +209,30 @@ export class TriggerFailedTaskService { spanId: event.spanId, traceContext: traceContext as Record, taskEventStore: store, + emitRunFailedEvent: false, ...(queueName !== undefined && { queue: queueName }), ...(lockedQueueId !== undefined && { lockedQueueId }), }); } ); + // Alerts side of `runFailed` — the engine emit was suppressed + // above so the trace-event completion isn't double-written; we + // still need the alert pipeline to fire so customers' ERROR + // channels see the failure. Best-effort: a failed enqueue logs + // but doesn't block returning the friendlyId, mirroring the + // engine handler's behaviour at runEngineHandlers.server.ts:81. + try { + await PerformTaskRunAlertsService.enqueue(failedRun.id); + } catch (alertsError) { + logger.warn("TriggerFailedTaskService: alert enqueue failed", { + taskId: request.taskId, + friendlyId: failedRun.friendlyId, + error: + alertsError instanceof Error ? alertsError.message : String(alertsError), + }); + } + return failedRun.friendlyId; } catch (createError) { const createErrorMsg = diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 244d2b014ed..ee8cf06cf3c 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1051,6 +1051,7 @@ export class RunEngine { taskEventStore, queue: queueOverride, lockedQueueId: lockedQueueIdOverride, + emitRunFailedEvent = true, }: { friendlyId: string; environment: { @@ -1078,6 +1079,19 @@ export class RunEngine { queue?: string; /** Resolved TaskQueue.id when the task is locked to a specific queue. */ lockedQueueId?: string; + /** + * Whether to emit the `runFailed` engine-bus event. Defaults to true. + * + * Set to `false` when the caller is ALREADY managing the trace-event + * lifecycle for this run via `repository.traceEvent({ incomplete: false, + * isError: true, ... })`. In that path the outer trace event handles + * span completion itself; emitting `runFailed` from here causes the + * `runFailed` → `completeFailedRunEvent` handler to write a second + * completion row for the same (traceId, spanId), racing with the + * outer trace event's own write. The alert side of `runFailed` is + * preserved by emitting from the caller after `traceEvent` returns. + */ + emitRunFailedEvent?: boolean; }): Promise { return startSpan( this.tracer, @@ -1160,6 +1174,19 @@ export class RunEngine { // exceed-limit failures) land in PG silently — visible in the // dashboard list but never reaching customers' configured // ERROR alert channels. + // + // Gated by `emitRunFailedEvent` so call sites that already wrap + // this inside `repository.traceEvent({ incomplete: false, + // isError: true })` can opt out — the outer trace event writes + // the completion row itself, and a second write via + // `completeFailedRunEvent` would race against it. Callers that + // disable the emit are responsible for triggering the alerts + // side themselves (e.g. by calling + // `PerformTaskRunAlertsService.enqueue` directly after the + // trace event closes). + if (!emitRunFailedEvent) { + return taskRun; + } this.eventBus.emit("runFailed", { time: taskRun.completedAt ?? new Date(), run: { diff --git a/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts index 0619eeffc2f..84d33baa87d 100644 --- a/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts +++ b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts @@ -108,4 +108,69 @@ describe("RunEngine.createFailedTaskRun", () => { await engine.quit(); } }); + + // The TriggerFailedTaskService.call() path wraps createFailedTaskRun + // inside `repository.traceEvent({ incomplete: false, isError: true })` + // which already writes the completion row for the (traceId, spanId). + // Emitting `runFailed` from here would cause the + // `completeFailedRunEvent` handler to race a second write against + // the same span — the `emitRunFailedEvent: false` opt-out is what + // suppresses the emit. The PG row + alert side stay correct because + // the caller enqueues `PerformTaskRunAlertsService.enqueue(run.id)` + // directly after the trace event closes. + containerTest( + "emitRunFailedEvent: false suppresses the bus emit but still creates the PG row", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions, masterQueueConsumersDisabled: true, processWorkerQueueDebounceMs: 50 }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x" as const, cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const failedEvents: EventBusEventArgs<"runFailed">[0][] = []; + engine.eventBus.on("runFailed", (event) => { + failedEvents.push(event); + }); + + const friendlyId = generateFriendlyId("run"); + const failed = await engine.createFailedTaskRun({ + friendlyId, + environment: { + id: authenticatedEnvironment.id, + type: authenticatedEnvironment.type, + project: { id: authenticatedEnvironment.project.id }, + organization: { id: authenticatedEnvironment.organization.id }, + }, + taskIdentifier: "outer-trace-event-test", + payload: "{}", + payloadType: "application/json", + error: { type: "STRING_ERROR", raw: "outer trace event manages span" }, + traceId: "0123456789abcdef0123456789abcdef", + spanId: "fedcba9876543210", + emitRunFailedEvent: false, + }); + + // PG row landed (caller still gets a usable TaskRun). + expect(failed.status).toBe("SYSTEM_FAILURE"); + expect(failed.friendlyId).toBe(friendlyId); + + // Bus emit was suppressed. + expect(failedEvents).toHaveLength(0); + } finally { + await engine.quit(); + } + }, + ); });