diff --git a/.server-changes/mollifier.md b/.server-changes/mollifier.md new file mode 100644 index 0000000000..399ad5c650 --- /dev/null +++ b/.server-changes/mollifier.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Mollifier — Redis-backed burst buffer in front of `engine.trigger` with a fair drainer, full read/write parity for buffered runs across the API + dashboard + realtime stream, alertable `mollifier.stale_entries.current` gauge for drainer health, and `runFailed` alerts on drainer-terminal `SYSTEM_FAILURE` rows. diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index 60c234402d..9996eb7b30 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -9,6 +9,7 @@ import { renderToPipeableStream } from "react-dom/server"; import { PassThrough } from "stream"; import * as Worker from "~/services/worker.server"; import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server"; +import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server"; import { bootstrap } from "./bootstrap"; import { LocaleContextProvider } from "./components/primitives/LocaleProvider"; import { @@ -228,6 +229,7 @@ Worker.init().catch((error) => { }); initMollifierDrainerWorker(); +initMollifierStaleSweepWorker(); bootstrap().catch((error) => { logError(error); diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index d1a257048e..b4f4879fd8 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1063,13 +1063,16 @@ const EnvironmentSchema = z // Separate switch for the drainer (consumer side) so it can be split // off onto a dedicated worker service. Unset → inherits // TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to - // flip two switches. In multi-replica deployments, set this to "0" - // explicitly on every replica except the one dedicated drainer - // service — otherwise every replica's polling loop races for the - // same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill - // switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a - // no-op because the gate-side singleton refuses to construct a - // buffer when the system is off. + // flip two switches. Multi-replica drainers are correct — `popAndMarkDraining` + // is an atomic ZPOPMIN + status flip in one Lua call, so only one replica + // can win any given entry — but inefficient: polling load (SMEMBERS + + // per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY` + // is per-process so engine load also multiplies. Splitting the drainer + // onto a dedicated worker keeps that traffic off the request-serving + // replicas. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill switch; + // setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a + // no-op because the gate-side singleton refuses to construct a buffer + // when the system is off. TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"), TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"), TRIGGER_MOLLIFIER_REDIS_HOST: z @@ -1098,6 +1101,26 @@ const EnvironmentSchema = z TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3), TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000), TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500), + // Periodic sweep that scans buffer queue ZSETs for entries whose + // dwell exceeds the stale threshold. Independent of the drainer — + // its job is exactly to make a stuck/offline drainer visible to + // ops. Defaults: enabled when the mollifier is enabled, run every + // 5 minutes, alert on anything that's been dwelling for 5+ minutes + // (matches the sweep interval — "anything still here when we + // check" is the simplest threshold that converges). + TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z + .string() + .default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"), + TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce + .number() + .int() + .positive() + .default(5 * 60_000), + TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce + .number() + .int() + .positive() + .default(5 * 60_000), BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce .number() diff --git a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts index 5f985b684c..a26f6c3e74 100644 --- a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts @@ -6,6 +6,7 @@ import type { PrismaClientOrTransaction } from "@trigger.dev/database"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { getEventRepository } from "~/v3/eventRepository/index.server"; +import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server"; import { DefaultQueueManager } from "../concerns/queues.server"; import type { TriggerTaskRequest } from "../types"; @@ -176,6 +177,14 @@ export class TriggerFailedTaskService { event.setAttribute("runId", failedRunFriendlyId); event.failWithError(taskRunError); + // `emitRunFailedEvent: false` because this call site owns the + // trace-event lifecycle via the outer `traceEvent({ + // incomplete: false, isError: true })`. Letting the engine + // emit `runFailed` here would race the + // `completeFailedRunEvent` listener against the outer trace + // event's own completion write for the same (traceId, spanId). + // We re-trigger the alerts side directly after the trace + // event closes, below. return await this.engine.createFailedTaskRun({ friendlyId: failedRunFriendlyId, environment: { @@ -200,12 +209,30 @@ export class TriggerFailedTaskService { spanId: event.spanId, traceContext: traceContext as Record, taskEventStore: store, + emitRunFailedEvent: false, ...(queueName !== undefined && { queue: queueName }), ...(lockedQueueId !== undefined && { lockedQueueId }), }); } ); + // Alerts side of `runFailed` — the engine emit was suppressed + // above so the trace-event completion isn't double-written; we + // still need the alert pipeline to fire so customers' ERROR + // channels see the failure. Best-effort: a failed enqueue logs + // but doesn't block returning the friendlyId, mirroring the + // engine handler's behaviour at runEngineHandlers.server.ts:81. + try { + await PerformTaskRunAlertsService.enqueue(failedRun.id); + } catch (alertsError) { + logger.warn("TriggerFailedTaskService: alert enqueue failed", { + taskId: request.taskId, + friendlyId: failedRun.friendlyId, + error: + alertsError instanceof Error ? alertsError.message : String(alertsError), + }); + } + return failedRun.friendlyId; } catch (createError) { const createErrorMsg = diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts index 139aeaf9a6..fc75210be3 100644 --- a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts @@ -1,10 +1,15 @@ -import { createHash } from "node:crypto"; -import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker"; +import { MollifierDrainer } from "@trigger.dev/redis-worker"; +import { prisma } from "~/db.server"; import { env } from "~/env.server"; +import { engine as runEngine } from "~/v3/runEngine.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { getMollifierBuffer } from "./mollifierBuffer.server"; -import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server"; +import { + createDrainerHandler, + isRetryablePgError, +} from "./mollifierDrainerHandler.server"; +import type { MollifierSnapshot } from "./mollifierSnapshot.server"; // Distinct error class for the deterministic "fail loud at boot" throws // below. The bootstrap in `mollifierDrainerWorker.server.ts` catches @@ -25,7 +30,7 @@ export class MollifierConfigurationError extends Error { } } -function initializeMollifierDrainer(): MollifierDrainer { +function initializeMollifierDrainer(): MollifierDrainer { const buffer = getMollifierBuffer(); if (!buffer) { // Unreachable in normal config: getMollifierDrainer() gates on the @@ -68,40 +73,13 @@ function initializeMollifierDrainer(): MollifierDrainer maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS, }); - // Phase 1 handler: no-op ack. The trigger has ALREADY been written to - // Postgres via engine.trigger (dual-write at the call site). Popping + - // acking here proves the dequeue mechanism works end-to-end without - // duplicating the work. Phase 2 will replace this with an engine.trigger - // replay that performs the actual Postgres write. - const drainer = new MollifierDrainer({ + const drainer = new MollifierDrainer({ buffer, - handler: async (input) => { - // Hash the (re-serialised, canonical) payload on the drain side rather - // than on the trigger hot path. Burst-time CPU stays with engine.trigger; - // the drainer is the natural place for the audit-equivalence checksum. - // Re-serialisation is identity for the BufferedTriggerPayload shape - // (only strings/numbers/plain objects), so this hash matches what the - // call site wrote into Redis. - const reserialised = serialiseSnapshot(input.payload); - const payloadHash = createHash("sha256").update(reserialised).digest("hex"); - logger.info("mollifier.drained", { - runId: input.runId, - envId: input.envId, - orgId: input.orgId, - taskId: input.payload.taskId, - attempts: input.attempts, - ageMs: Date.now() - input.createdAt.getTime(), - payloadBytes: reserialised.length, - payloadHash, - }); - }, + handler: createDrainerHandler({ engine: runEngine, prisma }), concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY, maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS, maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK, - // A no-op handler shouldn't throw, but if something does (e.g. an - // unexpected deserialise failure), don't loop — let it FAIL terminally - // so the entry is observable in metrics. - isRetryable: () => false, + isRetryable: isRetryablePgError, }); return drainer; @@ -114,7 +92,7 @@ function initializeMollifierDrainer(): MollifierDrainer // handler registration, leaving a narrow window where a SIGTERM landing // between `start()` and `process.once("SIGTERM", ...)` would skip the // graceful stop. The split is intentional. -export function getMollifierDrainer(): MollifierDrainer | null { +export function getMollifierDrainer(): MollifierDrainer | null { if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null; return singleton("mollifierDrainer", initializeMollifierDrainer); } diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts new file mode 100644 index 0000000000..741f1afed0 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts @@ -0,0 +1,170 @@ +import { context, trace, TraceFlags } from "@opentelemetry/api"; +import type { RunEngine } from "@internal/run-engine"; +import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import type { MollifierDrainerHandler } from "@trigger.dev/redis-worker"; +import { startSpan } from "~/v3/tracing.server"; +import type { MollifierSnapshot } from "./mollifierSnapshot.server"; + +const tracer = trace.getTracer("mollifier-drainer"); + +export function isRetryablePgError(err: unknown): boolean { + if (!(err instanceof Error)) return false; + const msg = err.message ?? ""; + // Prisma surfaces P1001 ("Can't reach database server") via two + // different error classes — `PrismaClientKnownRequestError` exposes + // it as `err.code`, `PrismaClientInitializationError` exposes it as + // `err.errorCode`. Check both so reconnection-time errors retry + // regardless of which class fires. + const code = (err as { code?: string }).code; + const errorCode = (err as { errorCode?: string }).errorCode; + if (code === "P2024") return true; + if (code === "P1001" || errorCode === "P1001") return true; + if (msg.includes("Can't reach database server")) return true; + if (msg.includes("Connection lost")) return true; + if (msg.includes("ECONNRESET")) return true; + return false; +} + +export function createDrainerHandler(deps: { + engine: RunEngine; + prisma: PrismaClientOrTransaction; +}): MollifierDrainerHandler { + return async (input) => { + const dwellMs = Date.now() - input.createdAt.getTime(); + + // Re-attach to the trace started by the caller's mollifier.queued span + // (its traceId + spanId were captured into the snapshot at buffer time). + // Without this the drainer would emit mollifier.drained in a brand-new + // trace and the engine.trigger instrumentation would inherit an empty + // active context — leaving the run-detail page with only the root span. + const snapshotTraceId = + typeof input.payload.traceId === "string" ? input.payload.traceId : undefined; + const snapshotSpanId = + typeof input.payload.spanId === "string" ? input.payload.spanId : undefined; + + const parentContext = + snapshotTraceId && snapshotSpanId + ? trace.setSpanContext(context.active(), { + traceId: snapshotTraceId, + spanId: snapshotSpanId, + traceFlags: TraceFlags.SAMPLED, + isRemote: true, + }) + : context.active(); + + // Cancel-wins-over-trigger (Q4 bifurcation). If a cancel API call + // landed on this entry while it was QUEUED, the snapshot carries + // `cancelledAt` + `cancelReason`. Skip the normal materialise path + // and write a CANCELED PG row directly. The existing runCancelled + // handler writes the TaskEvent. + const cancelledAtStr = + typeof input.payload.cancelledAt === "string" ? input.payload.cancelledAt : undefined; + if (cancelledAtStr) { + const cancelReason = + typeof input.payload.cancelReason === "string" + ? input.payload.cancelReason + : "Canceled by user"; + await context.with(parentContext, async () => { + await startSpan(tracer, "mollifier.drained.cancelled", async (span) => { + span.setAttribute("mollifier.drained", true); + span.setAttribute("mollifier.dwell_ms", dwellMs); + span.setAttribute("mollifier.attempts", input.attempts); + span.setAttribute("mollifier.run_friendly_id", input.runId); + span.setAttribute("mollifier.cancel_bifurcation", true); + span.setAttribute("taskRunId", input.runId); + await deps.engine.createCancelledRun( + { + snapshot: input.payload as any, + cancelledAt: new Date(cancelledAtStr), + cancelReason, + }, + deps.prisma, + ); + }); + }); + return; + } + + await context.with(parentContext, async () => { + await startSpan(tracer, "mollifier.drained", async (span) => { + span.setAttribute("mollifier.drained", true); + span.setAttribute("mollifier.dwell_ms", dwellMs); + span.setAttribute("mollifier.attempts", input.attempts); + span.setAttribute("mollifier.run_friendly_id", input.runId); + span.setAttribute("taskRunId", input.runId); + + try { + await deps.engine.trigger(input.payload as any, deps.prisma); + } catch (err) { + // The retryable-PG class re-throws so the drainer's outer + // worker loop can `buffer.requeue` (handled in + // `MollifierDrainer.drainOne`). For non-retryable failures we + // write a terminal SYSTEM_FAILURE row to PG via the engine's + // existing `createFailedTaskRun` (used by batch-trigger for + // the same purpose) so the customer sees the run in their + // dashboard / SDK instead of silently losing it when the + // buffer entry TTLs out. If THAT insert also fails (PG truly + // unreachable), rethrow so the drainer's outer catch falls + // through to its existing `buffer.fail` terminal-marker path. + if (isRetryablePgError(err)) { + throw err; + } + const reason = err instanceof Error ? err.message : String(err); + span.setAttribute("mollifier.terminal_failure_reason", reason); + const snapshot = input.payload as Record; + const env = snapshot.environment as + | { + id: string; + type: any; + project: { id: string }; + organization: { id: string }; + } + | undefined; + if (!env) { + // Snapshot too malformed to even construct a TaskRun row. + // Drainer's outer catch will buffer.fail this entry. + throw err; + } + try { + await deps.engine.createFailedTaskRun({ + friendlyId: input.runId, + environment: env, + taskIdentifier: String(snapshot.taskIdentifier ?? ""), + payload: typeof snapshot.payload === "string" ? snapshot.payload : undefined, + payloadType: + typeof snapshot.payloadType === "string" ? snapshot.payloadType : undefined, + error: { + type: "STRING_ERROR", + raw: `Mollifier drainer terminal failure: ${reason}`, + }, + parentTaskRunId: + typeof snapshot.parentTaskRunId === "string" + ? snapshot.parentTaskRunId + : undefined, + rootTaskRunId: + typeof snapshot.rootTaskRunId === "string" + ? snapshot.rootTaskRunId + : undefined, + depth: typeof snapshot.depth === "number" ? snapshot.depth : 0, + resumeParentOnCompletion: snapshot.resumeParentOnCompletion === true, + traceId: typeof snapshot.traceId === "string" ? snapshot.traceId : undefined, + spanId: typeof snapshot.spanId === "string" ? snapshot.spanId : undefined, + taskEventStore: + typeof snapshot.taskEventStore === "string" + ? snapshot.taskEventStore + : undefined, + queue: typeof snapshot.queue === "string" ? snapshot.queue : undefined, + lockedQueueId: + typeof snapshot.lockedQueueId === "string" ? snapshot.lockedQueueId : undefined, + }); + } catch (writeErr) { + // Class A — PG itself is failing. Rethrow the original + // error so the drainer falls back to buffer.fail. Include + // the write error in the log line at the drainer layer. + throw err; + } + } + }); + }); + }; +} diff --git a/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts new file mode 100644 index 0000000000..c2f94495cc --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts @@ -0,0 +1,149 @@ +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { logger as defaultLogger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; +import { + recordStaleEntry as defaultRecordStaleEntry, + reportStaleEntrySnapshot as defaultReportStaleEntrySnapshot, +} from "./mollifierTelemetry.server"; + +// One pass of the sweep scans every env's queue ZSET. The per-env page +// is bounded so a single pathological env can't make the sweep run +// unboundedly long. +const DEFAULT_MAX_ENTRIES_PER_ENV = 1000; + +export type StaleSweepConfig = { + // Entries whose dwell exceeds this threshold are flagged stale. Set + // it well below `entryTtlSeconds * 1000` so ops have lead time before + // TTL-induced silent loss; the default (half of entryTtlSeconds) + // matches the cadence in the plan doc. + staleThresholdMs: number; + maxEntriesPerEnv?: number; +}; + +export type StaleSweepDeps = { + getBuffer?: () => MollifierBuffer | null; + // No `envId` arg — `envId` is a high-cardinality metric attribute and + // is intentionally not emitted as a metric label. The structured warn + // log below carries envId for forensic drill-down. + recordStaleEntry?: () => void; + reportStaleEntrySnapshot?: (snapshot: Map) => void; + logger?: { warn: (message: string, fields: Record) => void }; + now?: () => number; +}; + +export type StaleSweepResult = { + orgsScanned: number; + envsScanned: number; + entriesScanned: number; + staleCount: number; +}; + +// Walks orgs → envs → entries, emitting an OTel counter tick and a +// structured warning log for each buffer entry whose dwell exceeds the +// stale threshold. Read-only: the sweep does NOT remove or salvage +// entries; that decision is deferred to a separate retention-policy +// change. The signal here exists so ops sees the drainer falling +// behind well before TTL-induced loss kicks in. +export async function runStaleSweepOnce( + config: StaleSweepConfig, + deps: StaleSweepDeps = {}, +): Promise { + const getBuffer = deps.getBuffer ?? getMollifierBuffer; + const recordStale = deps.recordStaleEntry ?? defaultRecordStaleEntry; + const reportSnapshot = + deps.reportStaleEntrySnapshot ?? defaultReportStaleEntrySnapshot; + const log = deps.logger ?? defaultLogger; + const now = (deps.now ?? Date.now)(); + const maxEntries = config.maxEntriesPerEnv ?? DEFAULT_MAX_ENTRIES_PER_ENV; + + const buffer = getBuffer(); + if (!buffer) { + // Replace any previous snapshot with empty so a previously-paging + // env doesn't stay latched if mollifier is turned off mid-flight. + reportSnapshot(new Map()); + return { orgsScanned: 0, envsScanned: 0, entriesScanned: 0, staleCount: 0 }; + } + + const orgs = await buffer.listOrgs(); + let envsScanned = 0; + let entriesScanned = 0; + let staleCount = 0; + // Tracks the stale count per env this pass. Includes zero counts for + // envs that have entries but none stale — that's what lets the gauge + // drop back to 0 when the drainer catches up. Envs absent from this + // map are also absent from the new snapshot, clearing any latched + // alerts on envs that have fully drained. + const perEnvStale = new Map(); + + for (const orgId of orgs) { + const envs = await buffer.listEnvsForOrg(orgId); + for (const envId of envs) { + envsScanned += 1; + let envStale = 0; + const entries = await buffer.listEntriesForEnv(envId, maxEntries); + for (const entry of entries) { + entriesScanned += 1; + const dwellMs = now - entry.createdAt.getTime(); + if (dwellMs > config.staleThresholdMs) { + recordStale(); + log.warn("mollifier.stale_entry", { + runId: entry.runId, + envId, + orgId, + dwellMs, + staleThresholdMs: config.staleThresholdMs, + }); + envStale += 1; + } + } + perEnvStale.set(envId, envStale); + staleCount += envStale; + } + } + + reportSnapshot(perEnvStale); + + return { orgsScanned: orgs.length, envsScanned, entriesScanned, staleCount }; +} + +export type StaleSweepIntervalHandle = { + stop: () => void; +}; + +// Production wrapper: schedule `runStaleSweepOnce` on a fixed interval. +// One pass at a time — if a sweep is still running when the timer fires +// the next tick is skipped (a backed-up Redis would otherwise queue +// overlapping sweeps that all log the same stale entries). +export function startStaleSweepInterval( + config: StaleSweepConfig & { intervalMs: number }, + deps: StaleSweepDeps = {}, +): StaleSweepIntervalHandle { + let stopped = false; + let inFlight = false; + + const tick = async () => { + if (stopped || inFlight) return; + inFlight = true; + try { + await runStaleSweepOnce(config, deps); + } catch (err) { + const log = deps.logger ?? defaultLogger; + log.warn("mollifier.stale_sweep.failed", { + err: err instanceof Error ? err.message : String(err), + }); + } finally { + inFlight = false; + } + }; + + const timer = setInterval(() => { + void tick(); + }, config.intervalMs); + + return { + stop: () => { + stopped = true; + clearInterval(timer); + }, + }; +} diff --git a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts index 0fe302584c..f9c7ca72f1 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts @@ -15,3 +15,87 @@ export function recordDecision(outcome: DecisionOutcome, reason?: DecisionReason ...(reason ? { reason } : {}), }); } + +// Counts subscriptions hitting `/realtime/v1/runs/` for a run that +// lives only in the mollifier buffer (no PG row yet). The route opens +// the Electric stream anyway so the eventual drainer-INSERT propagates +// to the client; this counter is the signal of how often customers +// subscribe inside the buffered window. +export const realtimeBufferedSubscriptionsCounter = meter.createCounter( + "mollifier.realtime_subscriptions.buffered", + { + description: + "Realtime subscriptions opened against a runId that exists only in the mollifier buffer", + }, +); + +// No `envId` attribute — `envId` is a banned high-cardinality metric +// label per the repo's OTel rules. The structured warn log emitted +// alongside the counter tick (in `mollifierStaleSweep.server.ts`) +// carries the envId / orgId / runId for forensic drill-down; the +// metric stays an aggregate. +export function recordRealtimeBufferedSubscription(): void { + realtimeBufferedSubscriptionsCounter.add(1); +} + +// Counts buffer entries that have been waiting in the queue ZSET longer +// than the configured stale threshold. Useful for historical "stale +// events over time" views, but not directly alertable on its own — a +// single stuck entry observed by N sweep ticks adds N to the counter, +// so `rate()` over an alerting window reflects (entries × ticks), not +// "entries that are stale right now". +export const staleEntriesCounter = meter.createCounter( + "mollifier.stale_entries", + { + description: + "Mollifier buffer entries whose dwell exceeds the stale threshold (per sweep pass)", + }, +); + +// No `envId` attribute — see comment above. +export function recordStaleEntry(): void { + staleEntriesCounter.add(1); +} + +// Alertable signal: the total count of stale entries observed by the +// latest sweep. The sweep snapshots the full picture on each pass so +// the gauge drops back to 0 when the drainer catches up instead of +// staying latched. Recommended alert: +// mollifier_stale_entries_current > 0 for 5m +export const staleEntriesGauge = meter.createObservableGauge( + "mollifier.stale_entries.current", + { + description: + "Buffer entries whose dwell exceeds the stale threshold, as observed by the latest sweep pass", + }, +); + +let latestStaleTotal = 0; + +export function reportStaleEntrySnapshot(snapshot: Map): void { + // Sum across envs. Per-env breakdown is intentionally NOT emitted as + // a metric label (high-cardinality); the structured warn log lines + // from the sweep carry per-env detail for ops to drill down. + let total = 0; + for (const count of snapshot.values()) { + total += count; + } + latestStaleTotal = total; +} + +meter.addBatchObservableCallback( + (result) => { + result.observe(staleEntriesGauge, latestStaleTotal); + }, + [staleEntriesGauge], +); + +// Electric SQL's shape-stream protocol adds a `handle=` query param on +// every reconnect after the initial GET. Gating the realtime-buffered +// log/counter on its absence keeps the signal at one tick per +// subscription instead of one tick per ~20s live-poll iteration — +// without it the counter would over-count by the long-poll factor. +export function isInitialBufferedSubscriptionRequest(url: string | URL): boolean { + const u = typeof url === "string" ? new URL(url) : url; + return !u.searchParams.has("handle"); +} diff --git a/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts b/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts new file mode 100644 index 0000000000..5325018baf --- /dev/null +++ b/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts @@ -0,0 +1,47 @@ +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { signalsEmitter } from "~/services/signals.server"; +import { + startStaleSweepInterval, + type StaleSweepIntervalHandle, +} from "./mollifier/mollifierStaleSweep.server"; + +declare global { + // eslint-disable-next-line no-var + var __mollifierStaleSweepRegistered__: boolean | undefined; + // eslint-disable-next-line no-var + var __mollifierStaleSweepHandle__: StaleSweepIntervalHandle | undefined; +} + +/** + * Bootstraps the mollifier stale-entry sweep. + * + * Independent of the drainer — its purpose is to alert when entries are + * piling up despite the drainer being supposedly healthy, so it runs + * any time the mollifier itself is enabled (gated separately from + * `TRIGGER_MOLLIFIER_DRAINER_ENABLED`). The sweep is read-only: it + * counts and logs stale entries but does not remove or salvage them. + * + * The Remix dev server re-evaluates `entry.server.tsx` on every change, + * so the registration guard + handle cache make the bootstrap + * idempotent across hot reloads. + */ +export function initMollifierStaleSweepWorker(): void { + if (env.TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED !== "1") return; + if (global.__mollifierStaleSweepRegistered__) return; + + logger.debug("Initializing mollifier stale-entry sweep", { + intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS, + staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS, + }); + + const handle = startStaleSweepInterval({ + intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS, + staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS, + }); + + signalsEmitter.on("SIGTERM", handle.stop); + signalsEmitter.on("SIGINT", handle.stop); + global.__mollifierStaleSweepRegistered__ = true; + global.__mollifierStaleSweepHandle__ = handle; +} diff --git a/apps/webapp/test/mollifierDrainerHandler.test.ts b/apps/webapp/test/mollifierDrainerHandler.test.ts new file mode 100644 index 0000000000..6f66cf2ab7 --- /dev/null +++ b/apps/webapp/test/mollifierDrainerHandler.test.ts @@ -0,0 +1,206 @@ +import { describe, expect, it, vi } from "vitest"; +import { trace } from "@opentelemetry/api"; + +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { + createDrainerHandler, + isRetryablePgError, +} from "~/v3/mollifier/mollifierDrainerHandler.server"; + +describe("isRetryablePgError", () => { + it("returns true for P2024 (connection pool timeout)", () => { + const err = Object.assign(new Error("Timed out fetching a new connection"), { + code: "P2024", + }); + expect(isRetryablePgError(err)).toBe(true); + }); + + it("returns true for generic connection-lost messages", () => { + expect(isRetryablePgError(new Error("Connection lost"))).toBe(true); + expect(isRetryablePgError(new Error("Can't reach database server"))).toBe(true); + }); + + it("returns false for validation errors", () => { + expect(isRetryablePgError(new Error("Invalid payload"))).toBe(false); + }); + + it("returns false for non-Error inputs", () => { + expect(isRetryablePgError("string error")).toBe(false); + expect(isRetryablePgError({ message: "object" })).toBe(false); + }); +}); + +describe("createDrainerHandler", () => { + it("invokes engine.trigger with the deserialised snapshot", async () => { + const trigger = vi.fn(async () => ({ friendlyId: "run_x" })); + const handler = createDrainerHandler({ + engine: { trigger } as any, + prisma: {} as any, + }); + + await handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", payload: "{}" }, + attempts: 0, + createdAt: new Date(), + } as any); + + expect(trigger).toHaveBeenCalledOnce(); + const callArg = trigger.mock.calls[0][0] as { taskIdentifier: string }; + expect(callArg.taskIdentifier).toBe("t"); + }); + + it("re-attaches the snapshot's traceId so engine.trigger inherits the original trace", async () => { + // Captures the active traceId at the moment engine.trigger is invoked. + // Without context propagation it would be a fresh traceId, leaving the + // run-detail page with only the root span. + let observedTraceId: string | undefined; + const trigger = vi.fn(async () => { + observedTraceId = trace.getActiveSpan()?.spanContext().traceId; + return { friendlyId: "run_x" }; + }); + + const handler = createDrainerHandler({ + engine: { trigger } as any, + prisma: {} as any, + }); + + const snapshotTraceId = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + const snapshotSpanId = "bbbbbbbbbbbbbbbb"; + + await handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { + taskIdentifier: "t", + traceId: snapshotTraceId, + spanId: snapshotSpanId, + }, + attempts: 0, + createdAt: new Date(), + } as any); + + expect(observedTraceId).toBe(snapshotTraceId); + }); + + it("rethrows retryable PG errors so MollifierDrainer requeues the entry", async () => { + const err = new Error("Can't reach database server"); + const trigger = vi.fn(async () => { + throw err; + }); + const createFailedTaskRun = vi.fn(); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t" }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("Can't reach database server"); + // Retryable: we do NOT write a SYSTEM_FAILURE row, the entry should + // be requeued for another shot. + expect(createFailedTaskRun).not.toHaveBeenCalled(); + }); + + const envFixture = { + id: "env_a", + type: "DEVELOPMENT", + project: { id: "proj_1" }, + organization: { id: "org_1" }, + }; + + it("writes a SYSTEM_FAILURE PG row when engine.trigger fails non-retryably", async () => { + const trigger = vi.fn(async () => { + throw new Error("validation failed: payload too large"); + }); + const createFailedTaskRun = vi.fn(async () => ({ + id: "internal", + friendlyId: "run_x", + })); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any), + ).resolves.toBeUndefined(); + + expect(trigger).toHaveBeenCalledOnce(); + expect(createFailedTaskRun).toHaveBeenCalledOnce(); + const arg = createFailedTaskRun.mock.calls[0][0] as { error: { raw: string } }; + expect(arg.error.raw).toContain("validation failed"); + }); + + it("rethrows the original error when createFailedTaskRun also fails (PG genuinely unreachable)", async () => { + const triggerErr = new Error("engine rejected the snapshot"); + const trigger = vi.fn(async () => { + throw triggerErr; + }); + const createFailedTaskRun = vi.fn(async () => { + throw new Error("connection refused"); + }); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("engine rejected the snapshot"); + // Drainer's outer drainOne loop now decides retry vs buffer.fail. + expect(createFailedTaskRun).toHaveBeenCalledOnce(); + }); + + it("rethrows the original error when the snapshot lacks an environment block", async () => { + const triggerErr = new Error("engine rejected the snapshot"); + const trigger = vi.fn(async () => { + throw triggerErr; + }); + const createFailedTaskRun = vi.fn(); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t" /* no environment */ }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("engine rejected the snapshot"); + expect(createFailedTaskRun).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/webapp/test/mollifierDrainerWorker.test.ts b/apps/webapp/test/mollifierDrainerWorker.test.ts index e5f38229d8..0d4e931fd8 100644 --- a/apps/webapp/test/mollifierDrainerWorker.test.ts +++ b/apps/webapp/test/mollifierDrainerWorker.test.ts @@ -1,4 +1,17 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; + +// Importing `~/v3/mollifier/mollifierDrainer.server` (below) transitively +// loads `~/v3/runEngine.server`, whose top-level `singleton(...)` call +// eagerly constructs a RunEngine. That spins up Prisma + Redis workers +// that try to connect to localhost — which in CI (no PG, no Redis) +// produces an unhandled `PrismaClientInitializationError` that fails +// the test run even though the assertions all pass. Mocking the +// runEngine module short-circuits the singleton so no worker starts. +vi.mock("~/v3/runEngine.server", () => ({ engine: {} })); +// Same problem: prisma.server.ts's top-level singleton tries to open a +// PG client. The test never makes a query; an empty stub is enough. +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + import { MollifierConfigurationError } from "~/v3/mollifier/mollifierDrainer.server"; import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server"; diff --git a/apps/webapp/test/mollifierStaleSweep.test.ts b/apps/webapp/test/mollifierStaleSweep.test.ts new file mode 100644 index 0000000000..cc452aff9f --- /dev/null +++ b/apps/webapp/test/mollifierStaleSweep.test.ts @@ -0,0 +1,241 @@ +import { describe, expect, it, vi } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { MollifierBuffer } from "@trigger.dev/redis-worker"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { runStaleSweepOnce } from "~/v3/mollifier/mollifierStaleSweep.server"; + +const SNAPSHOT = { + taskIdentifier: "hello-world", + payload: '{"x":1}', + payloadType: "application/json", + traceContext: {}, +}; + +function spyDeps() { + // Counter ticks — metric carries no `envId` label (high-cardinality) + // so the spy is a simple call count. Per-env detail lives on the + // structured warn log and the snapshot map. + let staleEntryCount = 0; + const snapshots: Array> = []; + const warnings: Array<{ message: string; fields: Record }> = []; + return { + get staleEntryCount() { + return staleEntryCount; + }, + snapshots, + warnings, + deps: { + recordStaleEntry: () => { + staleEntryCount += 1; + }, + reportStaleEntrySnapshot: (snapshot: Map) => { + // Clone so post-sweep assertions see what was reported *at that + // call site*, not whatever subsequent passes mutate the source + // map into. + snapshots.push(new Map(snapshot)); + }, + logger: { + warn: (message: string, fields: Record) => { + warnings.push({ message, fields }); + }, + }, + }, + }; +} + +describe("runStaleSweepOnce — unit", () => { + it("returns zeros when the buffer is null", async () => { + // Mirrors the prod gate: if TRIGGER_MOLLIFIER_ENABLED=0 the buffer + // singleton is null and the sweep is a no-op. We don't want it to + // emit a metric (or throw) just because mollifier is disabled. + const spies = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 1000 }, + { ...spies.deps, getBuffer: () => null }, + ); + expect(result).toEqual({ + orgsScanned: 0, + envsScanned: 0, + entriesScanned: 0, + staleCount: 0, + }); + expect(spies.staleEntryCount).toBe(0); + expect(spies.warnings).toEqual([]); + const snapshots = spies.snapshots; + // An empty snapshot is still reported so any previously-paging env + // (from a prior sweep before mollifier was disabled) clears. + expect(snapshots).toHaveLength(1); + expect(snapshots[0].size).toBe(0); + }); +}); + +describe("runStaleSweepOnce — testcontainers", () => { + redisTest( + "flags every entry whose dwell exceeds the stale threshold", + { timeout: 20_000 }, + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + // Three entries across two envs in the same org. The sweep below + // runs against a `now` advanced by 5 minutes, so all three have + // dwell ~5min and ALL THREE are stale against a 1-minute + // threshold — there is no "fresh" entry in this scenario. The + // assertions below pin the all-three-stale shape. + await buffer.accept({ + runId: "run_stale_a", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_stale_b", + envId: "env_b", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_stale_c", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + // Yank the system clock forward 5 minutes for the sweep — way + // past the threshold below. The `now` deps seam lets us drive + // the threshold without actually waiting in real time. + const futureNow = Date.now() + 5 * 60 * 1000; + + const spies = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { + ...spies.deps, + getBuffer: () => buffer, + now: () => futureNow, + }, + ); + + expect(result.envsScanned).toBe(2); + expect(result.entriesScanned).toBe(3); + expect(result.staleCount).toBe(3); + // All three entries exceed the threshold; each emits one + // counter tick + one warning. + expect(spies.staleEntryCount).toBe(3); + expect(spies.warnings).toHaveLength(3); + for (const w of spies.warnings) { + expect(w.message).toBe("mollifier.stale_entry"); + expect(w.fields.staleThresholdMs).toBe(60 * 1000); + expect(w.fields.dwellMs).toBeGreaterThan(60 * 1000); + } + // Snapshot drives the alertable gauge — env_a has 2 stale + // entries, env_b has 1. Per-env detail is still passed to + // `reportStaleEntrySnapshot` for forensic value even though the + // gauge itself aggregates the total. + expect(spies.snapshots).toHaveLength(1); + expect(Object.fromEntries(spies.snapshots[0])).toEqual({ + env_a: 2, + env_b: 1, + }); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "snapshot reports zero for envs that have entries but none stale (clears latched alerts)", + { timeout: 20_000 }, + async ({ redisOptions }) => { + // Critical for alert behaviour: a previous sweep reported env_a + // stale, alert fired, drainer caught up. The next sweep must + // report `env_a -> 0` so the gauge drops below the alert + // threshold instead of staying latched at the last stale value. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_just_arrived", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const spies = spyDeps(); + await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...spies.deps, getBuffer: () => buffer }, + ); + expect(spies.snapshots).toHaveLength(1); + expect(Object.fromEntries(spies.snapshots[0])).toEqual({ env_a: 0 }); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "leaves fresh entries alone (dwell below threshold)", + { timeout: 20_000 }, + async ({ redisOptions }) => { + // Regression guard for the inequality direction. A bug that flipped + // `dwellMs > threshold` to `dwellMs >= threshold` would flag every + // entry the first time the sweep runs after a perfectly synchronised + // accept call — the dashboard would page on every burst. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_fresh_only", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const spies = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...spies.deps, getBuffer: () => buffer }, + ); + expect(result.staleCount).toBe(0); + expect(spies.staleEntryCount).toBe(0); + expect(spies.warnings).toEqual([]); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "scans across multiple orgs", + { timeout: 20_000 }, + async ({ redisOptions }) => { + // Phase-3 design has org-level fairness in the drainer; the sweep + // must walk every org/env, not just the first one it finds. If a + // future refactor collapsed listOrgs/listEnvsForOrg into a single + // env-flat list this test catches a regression there. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_x", + envId: "env_x", + orgId: "org_x", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_y", + envId: "env_y", + orgId: "org_y", + payload: JSON.stringify(SNAPSHOT), + }); + const futureNow = Date.now() + 5 * 60 * 1000; + const spies = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...spies.deps, getBuffer: () => buffer, now: () => futureNow }, + ); + expect(result.orgsScanned).toBe(2); + expect(result.envsScanned).toBe(2); + expect(result.staleCount).toBe(2); + } finally { + await buffer.close(); + } + }, + ); +}); diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index da42247111..ee8cf06cf3 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -450,6 +450,176 @@ export class RunEngine { //MARK: - Run functions + /** + * Writes a TaskRun row in CANCELED state directly, bypassing the trigger + * pipeline. Used by the mollifier drainer when a cancel API call lands on + * a buffered run before it materialises (Q4 mollifier-cancel design). + * + * Skips: queue insertion (no execution), waitpoint creation (single- + * triggerAndWait can't enter the buffer; F4 bypass), concurrency + * reservation. Emits `runCancelled` so the existing TaskEvent handler + * writes the cancellation event row — the only side effect PG-side cancel + * has today per audit. + * + * Idempotent: if a row with the same friendlyId already exists (double + * drainer pop after requeue), Prisma's P2002 unique-constraint violation + * is caught and the existing row is returned. The duplicate runCancelled + * emission is skipped — the original drain's emit already wrote the + * TaskEvent. + */ + async createCancelledRun( + { + snapshot, + cancelledAt, + cancelReason, + }: { + snapshot: TriggerParams; + cancelledAt: Date; + cancelReason: string; + }, + tx?: PrismaClientOrTransaction, + ): Promise { + const prisma = tx ?? this.prisma; + return startSpan(this.tracer, "createCancelledRun", async (span) => { + span.setAttribute("friendlyId", snapshot.friendlyId); + span.setAttribute("taskIdentifier", snapshot.taskIdentifier); + const id = RunId.fromFriendlyId(snapshot.friendlyId); + const error: TaskRunError = { type: "STRING_ERROR", raw: cancelReason }; + + try { + const taskRun = await prisma.taskRun.create({ + data: { + id, + engine: "V2", + status: "CANCELED", + friendlyId: snapshot.friendlyId, + runtimeEnvironmentId: snapshot.environment.id, + environmentType: snapshot.environment.type, + organizationId: snapshot.environment.organization.id, + projectId: snapshot.environment.project.id, + idempotencyKey: snapshot.idempotencyKey, + idempotencyKeyExpiresAt: snapshot.idempotencyKeyExpiresAt, + idempotencyKeyOptions: snapshot.idempotencyKeyOptions, + taskIdentifier: snapshot.taskIdentifier, + payload: snapshot.payload, + payloadType: snapshot.payloadType, + context: snapshot.context, + traceContext: snapshot.traceContext, + traceId: snapshot.traceId, + spanId: snapshot.spanId, + parentSpanId: snapshot.parentSpanId, + lockedToVersionId: snapshot.lockedToVersionId, + taskVersion: snapshot.taskVersion, + sdkVersion: snapshot.sdkVersion, + cliVersion: snapshot.cliVersion, + concurrencyKey: snapshot.concurrencyKey, + queue: snapshot.queue, + lockedQueueId: snapshot.lockedQueueId, + workerQueue: snapshot.workerQueue, + isTest: snapshot.isTest, + taskEventStore: snapshot.taskEventStore, + // Defensive: the snapshot comes from a cjson-encoded buffer + // payload, where empty Lua tables encode as `{}` not `[]`. If + // the drainer pops a buffered run with no tags, snapshot.tags + // will be an empty object, which Prisma misreads as a relation + // update op. Normalise to a real array (or undefined for the + // empty case). + runTags: Array.isArray(snapshot.tags) && snapshot.tags.length > 0 + ? snapshot.tags + : undefined, + oneTimeUseToken: snapshot.oneTimeUseToken, + parentTaskRunId: snapshot.parentTaskRunId, + rootTaskRunId: snapshot.rootTaskRunId, + replayedFromTaskRunFriendlyId: snapshot.replayedFromTaskRunFriendlyId, + batchId: snapshot.batch?.id, + resumeParentOnCompletion: snapshot.resumeParentOnCompletion, + depth: snapshot.depth, + seedMetadata: snapshot.seedMetadata, + seedMetadataType: snapshot.seedMetadataType, + metadata: snapshot.metadata, + metadataType: snapshot.metadataType, + machinePreset: snapshot.machine, + scheduleId: snapshot.scheduleId, + scheduleInstanceId: snapshot.scheduleInstanceId, + createdAt: snapshot.createdAt, + bulkActionGroupIds: snapshot.bulkActionId ? [snapshot.bulkActionId] : undefined, + planType: snapshot.planType, + realtimeStreamsVersion: snapshot.realtimeStreamsVersion, + streamBasinName: snapshot.streamBasinName, + annotations: snapshot.annotations, + completedAt: cancelledAt, + updatedAt: cancelledAt, + error: error as unknown as Prisma.InputJsonValue, + attemptNumber: 0, + executionSnapshots: { + create: { + engine: "V2", + executionStatus: "FINISHED", + description: "Run cancelled before materialisation", + runStatus: "CANCELED", + environmentId: snapshot.environment.id, + environmentType: snapshot.environment.type, + projectId: snapshot.environment.project.id, + organizationId: snapshot.environment.organization.id, + }, + }, + }, + }); + + this.eventBus.emit("runCancelled", { + time: cancelledAt, + run: { + id: taskRun.id, + status: taskRun.status, + friendlyId: taskRun.friendlyId, + spanId: taskRun.spanId, + taskEventStore: taskRun.taskEventStore, + createdAt: taskRun.createdAt, + completedAt: taskRun.completedAt, + error, + updatedAt: taskRun.updatedAt, + attemptNumber: taskRun.attemptNumber ?? 0, + }, + organization: { id: snapshot.environment.organization.id }, + project: { id: snapshot.environment.project.id }, + environment: { id: snapshot.environment.id }, + }); + + return taskRun; + } catch (err) { + // P2002 = unique constraint violation. Double-pop after a drainer + // requeue can reach this. Idempotent: return the existing row + // without re-emitting. + if ( + err instanceof Prisma.PrismaClientKnownRequestError && + err.code === "P2002" + ) { + this.logger.info( + "createCancelledRun: row already exists, returning existing (idempotent)", + { friendlyId: snapshot.friendlyId }, + ); + const existing = await prisma.taskRun.findFirst({ where: { id } }); + if (existing) { + // Only treat the conflict as idempotent when the existing + // row is ALREADY canceled. If a non-canceled row landed + // first (e.g. the drainer's normal `engine.trigger` replay + // path raced ahead of the cancel) we surface a conflict + // rather than silently reporting "cancelled" — the run is + // genuinely live and the caller must decide between + // engine.cancelRun() and skipping. + if (existing.status === "CANCELED") { + return existing; + } + throw new Error( + `createCancelledRun conflict: existing run ${snapshot.friendlyId} has status ${existing.status}`, + ); + } + } + throw err; + } + }); + } + /** "Triggers" one run. */ async trigger( { @@ -881,6 +1051,7 @@ export class RunEngine { taskEventStore, queue: queueOverride, lockedQueueId: lockedQueueIdOverride, + emitRunFailedEvent = true, }: { friendlyId: string; environment: { @@ -908,6 +1079,19 @@ export class RunEngine { queue?: string; /** Resolved TaskQueue.id when the task is locked to a specific queue. */ lockedQueueId?: string; + /** + * Whether to emit the `runFailed` engine-bus event. Defaults to true. + * + * Set to `false` when the caller is ALREADY managing the trace-event + * lifecycle for this run via `repository.traceEvent({ incomplete: false, + * isError: true, ... })`. In that path the outer trace event handles + * span completion itself; emitting `runFailed` from here causes the + * `runFailed` → `completeFailedRunEvent` handler to write a second + * completion row for the same (traceId, spanId), racing with the + * outer trace event's own write. The alert side of `runFailed` is + * preserved by emitting from the caller after `traceEvent` returns. + */ + emitRunFailedEvent?: boolean; }): Promise { return startSpan( this.tracer, @@ -983,6 +1167,57 @@ export class RunEngine { }); } + // Emit `runFailed` so the alert pipeline picks up the + // SYSTEM_FAILURE row and the event-store handler writes the + // completion event into the trace. Without this the mollifier + // drainer's terminal failures (and batch-trigger's + // exceed-limit failures) land in PG silently — visible in the + // dashboard list but never reaching customers' configured + // ERROR alert channels. + // + // Gated by `emitRunFailedEvent` so call sites that already wrap + // this inside `repository.traceEvent({ incomplete: false, + // isError: true })` can opt out — the outer trace event writes + // the completion row itself, and a second write via + // `completeFailedRunEvent` would race against it. Callers that + // disable the emit are responsible for triggering the alerts + // side themselves (e.g. by calling + // `PerformTaskRunAlertsService.enqueue` directly after the + // trace event closes). + if (!emitRunFailedEvent) { + return taskRun; + } + this.eventBus.emit("runFailed", { + time: taskRun.completedAt ?? new Date(), + run: { + id: taskRun.id, + status: taskRun.status, + spanId: taskRun.spanId, + error, + taskEventStore: taskRun.taskEventStore, + createdAt: taskRun.createdAt, + completedAt: taskRun.completedAt, + updatedAt: taskRun.updatedAt, + // This row never attempted execution — it's a synthesised + // terminal failure. The alert payload's `attemptNumber=0` + // is the signal downstream consumers can use to + // distinguish a never-ran failure from a run that + // exhausted its retries. + attemptNumber: 0, + usageDurationMs: 0, + costInCents: 0, + }, + organization: { + id: environment.organization.id, + }, + project: { + id: environment.project.id, + }, + environment: { + id: environment.id, + }, + }); + return taskRun; }, { diff --git a/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts new file mode 100644 index 0000000000..71223ce077 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts @@ -0,0 +1,294 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; + +function freshRunId() { + return RunId.generate().friendlyId; +} +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import type { EventBusEventArgs } from "../eventBus.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +function baseEngineOptions(redisOptions: Parameters[0]["queue"]["redis"]) { + return { + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x" as const, + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }; +} + +// Phase C1 / Q4 design — engine.createCancelledRun writes a CANCELED +// TaskRun row directly from a buffer snapshot. Verifies the bypass- +// queue / bypass-waitpoint / emit-runCancelled contract. +describe("RunEngine.createCancelledRun", () => { + containerTest( + "writes CANCELED PG row with snapshot fields, completedAt, error", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + const cancelledAt = new Date("2026-05-20T12:00:00.000Z"); + const cancelReason = "Canceled by user"; + + const result = await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: '{"hello":"world"}', + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + queue: "task/test-task", + isTest: false, + tags: ["test-tag"], + }, + cancelledAt, + cancelReason, + }); + + expect(result.status).toBe("CANCELED"); + expect(result.friendlyId).toBe(friendlyId); + expect(result.id).toBe(RunId.fromFriendlyId(friendlyId)); + expect(result.completedAt?.toISOString()).toBe(cancelledAt.toISOString()); + expect(result.taskIdentifier).toBe("test-task"); + expect(result.runTags).toEqual(["test-tag"]); + expect(result.payload).toBe('{"hello":"world"}'); + const err = result.error as { type?: string; raw?: string }; + expect(err.type).toBe("STRING_ERROR"); + expect(err.raw).toBe(cancelReason); + + // Verify the PG row is canonical (findFirst returns the row). + const stored = await prisma.taskRun.findFirst({ + where: { friendlyId }, + }); + expect(stored).not.toBeNull(); + expect(stored!.status).toBe("CANCELED"); + } finally { + await engine.quit(); + } + }, + ); + + containerTest( + "emits runCancelled with correct payload", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + const captured: EventBusEventArgs<"runCancelled">[0][] = []; + engine.eventBus.on("runCancelled", (event) => { + captured.push(event); + }); + + try { + const cancelledAt = new Date(); + const cancelReason = "Test cancel"; + const friendlyId = freshRunId(); + await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000cccc000000000000", + spanId: "dddd000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }, + cancelledAt, + cancelReason, + }); + + expect(captured).toHaveLength(1); + expect(captured[0]!.run.status).toBe("CANCELED"); + expect(captured[0]!.run.friendlyId).toBe(friendlyId); + expect(captured[0]!.run.error).toEqual({ type: "STRING_ERROR", raw: cancelReason }); + expect(captured[0]!.organization.id).toBe(env.organization.id); + } finally { + await engine.quit(); + } + }, + ); + + containerTest( + "idempotent on double-pop: second call returns existing row without re-emitting", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + const captured: EventBusEventArgs<"runCancelled">[0][] = []; + engine.eventBus.on("runCancelled", (event) => { + captured.push(event); + }); + + try { + const snapshot = { + friendlyId: freshRunId(), + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000eeee000000000000", + spanId: "ffff000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }; + const cancelledAt = new Date(); + const cancelReason = "Test idempotent"; + + const first = await engine.createCancelledRun({ snapshot, cancelledAt, cancelReason }); + const second = await engine.createCancelledRun({ snapshot, cancelledAt, cancelReason }); + + expect(second.id).toBe(first.id); + // Only the first call's emit fired; the P2002 path skips re-emission. + expect(captured).toHaveLength(1); + } finally { + await engine.quit(); + } + }, + ); + + // Regression: cjson encodes empty Lua tables as `{}`, not `[]`. When + // the drainer pops a buffered run that never had a tag set, the + // deserialised snapshot's `tags` field is an empty object. The old + // implementation passed it straight into Prisma's `runTags:` field; + // Prisma misread the object as a relation update op and threw + // `Argument 'set' is missing`. The drainer caught the error and + // marked the buffer entry FAILED — so the CANCELED PG row never + // landed. Found while running the Phase F challenge suite. + containerTest( + "tolerates snapshot.tags being an empty object (cjson edge case)", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + // Cast through unknown to simulate the cjson-decode output shape + // for an empty Lua table — TypeScript's snapshot type says + // string[], but the buffer Lua delivers {} for the empty case. + const result = await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000abcd000000000000", + spanId: "1234000000000000", + queue: "task/test-task", + isTest: false, + tags: {} as unknown as string[], + }, + cancelledAt: new Date(), + cancelReason: "Cancelled — empty tags", + }); + expect(result.status).toBe("CANCELED"); + expect(result.friendlyId).toBe(friendlyId); + // Prisma normalises the absent-tags case to either [] or null + // depending on the column default; assert it's an empty array. + expect(result.runTags).toEqual([]); + } finally { + await engine.quit(); + } + }, + ); + + // Regression: the P2002-on-id idempotency path used to return ANY + // existing row, which would silently report success even if a live + // (non-CANCELED) row landed first. The guard now requires the + // existing row's status to be CANCELED; anything else surfaces a + // conflict so the caller can route to engine.cancelRun() or skip. + containerTest( + "P2002 conflict with non-CANCELED existing row throws (does not silently succeed)", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + const id = RunId.fromFriendlyId(friendlyId); + + // Plant a live (non-CANCELED) row with the same id so the + // cancelled-run INSERT hits P2002 and the guard finds a row + // that ISN'T CANCELED. + await prisma.taskRun.create({ + data: { + id, + friendlyId, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + status: "PENDING", + runtimeEnvironmentId: env.id, + projectId: env.project.id, + organizationId: env.organizationId, + queue: "task/test-task", + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + engine: "V2", + }, + }); + + await expect( + engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }, + cancelledAt: new Date(), + cancelReason: "Should not silently overwrite a live row", + }), + ).rejects.toThrow(/createCancelledRun conflict.*PENDING/); + } finally { + await engine.quit(); + } + }, + ); +}); diff --git a/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts new file mode 100644 index 0000000000..84d33baa87 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts @@ -0,0 +1,176 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import { EventBusEventArgs } from "../eventBus.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunEngine.createFailedTaskRun", () => { + containerTest("emits runFailed so the alert pipeline wakes up", async ({ prisma, redisOptions }) => { + // The mollifier drainer (and batch-trigger over-limit path) call + // createFailedTaskRun to write a terminal SYSTEM_FAILURE PG row + // for runs that never actually executed. Without an explicit + // runFailed emit, the row lands silently — the + // runEngineHandlers' `runFailed` listener (which enqueues + // PerformTaskRunAlertsService) never fires, so customers' + // configured TASK_RUN alert channels miss the failure entirely. + // + // Regression intent: if the emit is removed or moved out of + // createFailedTaskRun's success path, this test fails. The + // shape assertions pin the fields the alert delivery service + // reads from the event payload (run.id, run.status, error, + // attemptNumber=0 as the never-ran-marker). + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const failedEvents: EventBusEventArgs<"runFailed">[0][] = []; + engine.eventBus.on("runFailed", (event) => { + failedEvents.push(event); + }); + + const friendlyId = generateFriendlyId("run"); + const taskIdentifier = "drainer-terminal-test"; + + const failed = await engine.createFailedTaskRun({ + friendlyId, + environment: { + id: authenticatedEnvironment.id, + type: authenticatedEnvironment.type, + project: { id: authenticatedEnvironment.project.id }, + organization: { id: authenticatedEnvironment.organization.id }, + }, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + error: { + type: "STRING_ERROR", + raw: "Mollifier drainer terminal failure: synthetic engine.trigger panic", + }, + traceId: "0123456789abcdef0123456789abcdef", + spanId: "fedcba9876543210", + }); + + expect(failed.status).toBe("SYSTEM_FAILURE"); + + expect(failedEvents).toHaveLength(1); + const event = failedEvents[0]; + expect(event.run.id).toBe(failed.id); + expect(event.run.status).toBe("SYSTEM_FAILURE"); + expect(event.run.spanId).toBe("fedcba9876543210"); + // attemptNumber=0 is the marker that the run never executed — + // it's a synthesised terminal failure, not an exhausted-retries + // failure. Downstream consumers can use this to distinguish. + expect(event.run.attemptNumber).toBe(0); + expect(event.run.usageDurationMs).toBe(0); + expect(event.run.costInCents).toBe(0); + expect(event.run.error).toEqual({ + type: "STRING_ERROR", + raw: "Mollifier drainer terminal failure: synthetic engine.trigger panic", + }); + expect(event.organization.id).toBe(authenticatedEnvironment.organization.id); + expect(event.project.id).toBe(authenticatedEnvironment.project.id); + expect(event.environment.id).toBe(authenticatedEnvironment.id); + } finally { + await engine.quit(); + } + }); + + // The TriggerFailedTaskService.call() path wraps createFailedTaskRun + // inside `repository.traceEvent({ incomplete: false, isError: true })` + // which already writes the completion row for the (traceId, spanId). + // Emitting `runFailed` from here would cause the + // `completeFailedRunEvent` handler to race a second write against + // the same span — the `emitRunFailedEvent: false` opt-out is what + // suppresses the emit. The PG row + alert side stay correct because + // the caller enqueues `PerformTaskRunAlertsService.enqueue(run.id)` + // directly after the trace event closes. + containerTest( + "emitRunFailedEvent: false suppresses the bus emit but still creates the PG row", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions, masterQueueConsumersDisabled: true, processWorkerQueueDebounceMs: 50 }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x" as const, cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const failedEvents: EventBusEventArgs<"runFailed">[0][] = []; + engine.eventBus.on("runFailed", (event) => { + failedEvents.push(event); + }); + + const friendlyId = generateFriendlyId("run"); + const failed = await engine.createFailedTaskRun({ + friendlyId, + environment: { + id: authenticatedEnvironment.id, + type: authenticatedEnvironment.type, + project: { id: authenticatedEnvironment.project.id }, + organization: { id: authenticatedEnvironment.organization.id }, + }, + taskIdentifier: "outer-trace-event-test", + payload: "{}", + payloadType: "application/json", + error: { type: "STRING_ERROR", raw: "outer trace event manages span" }, + traceId: "0123456789abcdef0123456789abcdef", + spanId: "fedcba9876543210", + emitRunFailedEvent: false, + }); + + // PG row landed (caller still gets a usable TaskRun). + expect(failed.status).toBe("SYSTEM_FAILURE"); + expect(failed.friendlyId).toBe(friendlyId); + + // Bus emit was suppressed. + expect(failedEvents).toHaveLength(0); + } finally { + await engine.quit(); + } + }, + ); +});