diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts index f27a9c13f9..58a5c8311e 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts @@ -1,15 +1,99 @@ +import type { LoaderFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; import { tryCatch } from "@trigger.dev/core/utils"; +import type { RunMetadataChangeOperation } from "@trigger.dev/core/v3/schemas"; import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3"; import { z } from "zod"; +import { $replica } from "~/db.server"; +import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { authenticateApiRequest } from "~/services/apiAuth.server"; import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { ServiceValidationError } from "~/v3/services/common.server"; +import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runId: z.string(), }); +// Phase A6 — fixes the pre-existing route bug where GET on this URL +// returned a Remix "no loader" 400. The route only exposed PUT (update); +// GET had no handler. Returns `{ metadata, metadataType }` from either +// the Postgres row or the mollifier buffer snapshot. +export async function loader({ request, params }: LoaderFunctionArgs) { + const authenticationResult = await authenticateApiRequest(request); + if (!authenticationResult) { + return json({ error: "Invalid or Missing API Key" }, { status: 401 }); + } + + const parsed = ParamsSchema.safeParse(params); + if (!parsed.success) { + return json({ error: "Invalid or missing run ID" }, { status: 400 }); + } + + const env = authenticationResult.environment; + + const pgRun = await $replica.taskRun.findFirst({ + where: { friendlyId: parsed.data.runId, runtimeEnvironmentId: env.id }, + select: { metadata: true, metadataType: true }, + }); + if (pgRun) { + return json({ metadata: pgRun.metadata, metadataType: pgRun.metadataType }, { status: 200 }); + } + + const buffered = await findRunByIdWithMollifierFallback({ + runId: parsed.data.runId, + environmentId: env.id, + organizationId: env.organizationId, + }); + if (buffered) { + return json( + { + metadata: buffered.metadata ?? null, + metadataType: buffered.metadataType ?? "application/json", + }, + { status: 200 } + ); + } + + return json({ error: "Run not found" }, { status: 404 }); +} + +// Route parent/root operations to the existing PG service by directly +// invoking it against the parent/root runId. The service ingests via +// its batching worker, which targets PG by id. If the parent/root is +// itself buffered we recurse through our buffered-mutation helper. +// `_ingestion_only` flag: a synthetic body that has the operations +// promoted to top-level `operations` so the service applies them to +// `targetRunId` directly. +async function routeOperationsToRun( + targetRunId: string | undefined, + operations: RunMetadataChangeOperation[] | undefined, + env: AuthenticatedEnvironment +): Promise { + if (!targetRunId || !operations || operations.length === 0) return; + + // Try PG first via the existing service (this is how parent/root + // operations have always landed; preserve that). Accepts the full + // AuthenticatedEnvironment so we don't have to recover the unsafe + // `as unknown` cast that the previous narrowed `{ id, organizationId }` + // signature forced on us. + const [error] = await tryCatch( + updateMetadataService.call(targetRunId, { operations }, env) + ); + if (!error) return; + + // PG service threw — could be "Cannot update metadata for a completed + // run" or similar. If the target is buffered, route operations to its + // snapshot too. Best-effort; do not surface this failure to the + // caller — the parent/root ops are auxiliary. + await applyMetadataMutationToBufferedRun({ + runId: targetRunId, + body: { operations }, + }); +} + const { action } = createActionApiRoute( { params: ParamsSchema, @@ -18,23 +102,80 @@ const { action } = createActionApiRoute( method: "PUT", }, async ({ authentication, body, params }) => { - const [error, result] = await tryCatch( - updateMetadataService.call(params.runId, body, authentication.environment) - ); + const env = authentication.environment; + const runId = params.runId; - if (error) { - if (error instanceof ServiceValidationError) { - return json({ error: error.message }, { status: error.status ?? 422 }); + // PG-canonical path. If the run is in PG, the existing service + // owns the full request shape including parent/root operations, + // metadataVersion CAS, batching, validation — none of which the + // buffer side needs to reimplement. + const [pgError, pgResult] = await tryCatch( + updateMetadataService.call(runId, body, env) + ); + if (pgError) { + if (pgError instanceof ServiceValidationError) { + return json({ error: pgError.message }, { status: pgError.status ?? 422 }); } - return json({ error: "Internal Server Error" }, { status: 500 }); } + if (pgResult) { + return json(pgResult, { status: 200 }); + } - if (!result) { + // PG miss. Target run is either buffered or genuinely absent. + const bufferOutcome = await applyMetadataMutationToBufferedRun({ + runId, + body: { metadata: body.metadata, operations: body.operations }, + }); + + if (bufferOutcome.kind === "not_found") { return json({ error: "Task Run not found" }, { status: 404 }); } + if (bufferOutcome.kind === "busy") { + // Entry is materialising. Best path is to retry the PG call — + // the row may be visible now. We don't waste a roundtrip in + // the happy path, but a 503 here would be customer-visible + // breakage for legitimately-burst workloads. Hand back 503 with + // a retry hint; SDK retry policy converges. + return json({ error: "Run materialising, retry shortly" }, { status: 503 }); + } + if (bufferOutcome.kind === "version_exhausted") { + // Pathological contention — many concurrent metadata writers on + // the same buffered runId. Surface as 503 rather than silently + // dropping the request. + return json({ error: "Metadata write contention; retry shortly" }, { status: 503 }); + } + + // Buffered metadata mutation succeeded. Fan parent/root operations + // out to their respective runs (parent/root are typically PG- + // materialised by the time the child is buffered, so the existing + // service handles them; if they're also buffered, the helper + // recurses through the buffered mutation path). + const bufferedEntry = await findRunByIdWithMollifierFallback({ + runId, + environmentId: env.id, + organizationId: env.organizationId, + }); + if (bufferedEntry) { + await Promise.all([ + routeOperationsToRun(bufferedEntry.parentTaskRunId, body.parentOperations, env), + // The PG service routes rootOperations to + // `taskRun.rootTaskRun?.id ?? taskRun.id` — the actual root, not + // the parent. The snapshot carries the root's *friendlyId* + // (parentTaskRunId is an internal id; root is friendlyId because + // it's what the engine passes through). Use it; if absent, + // route to the run itself (matches PG's self-fallback) rather + // than misrouting to the parent for grandchild → child → root + // hierarchies. + routeOperationsToRun( + bufferedEntry.rootTaskRunFriendlyId ?? runId, + body.rootOperations, + env, + ), + ]); + } - return json(result, { status: 200 }); + return json({ metadata: bufferOutcome.newMetadata }, { status: 200 }); } ); diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts index eae94375b9..eeb8d6bc02 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts @@ -4,19 +4,19 @@ import { z } from "zod"; import { prisma } from "~/db.server"; import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { logger } from "~/services/logger.server"; +import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server"; const ParamsSchema = z.object({ runId: z.string(), }); export async function action({ request, params }: ActionFunctionArgs) { - // Ensure this is a POST request if (request.method.toUpperCase() !== "POST") { return { status: 405, body: "Method Not Allowed" }; } - // Authenticate the request const authenticationResult = await authenticateApiRequest(request); if (!authenticationResult) { return json({ error: "Invalid or Missing API Key" }, { status: 401 }); @@ -32,59 +32,67 @@ export async function action({ request, params }: ActionFunctionArgs) { try { const anyBody = await request.json(); - const body = AddTagsRequestBody.safeParse(anyBody); if (!body.success) { return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 }); } - - const run = await prisma.taskRun.findFirst({ - where: { - friendlyId: parsedParams.data.runId, - runtimeEnvironmentId: authenticationResult.environment.id, - }, - select: { - runTags: true, - }, - }); - - const existingTags = run?.runTags ?? []; - - //remove duplicate tags from the new tags const bodyTags = typeof body.data.tags === "string" ? [body.data.tags] : body.data.tags; - const newTags = bodyTags.filter((tag) => { - if (tag.trim().length === 0) return false; - return !existingTags.includes(tag); - }); - - if (existingTags.length + newTags.length > MAX_TAGS_PER_RUN) { - return json( - { - error: `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${ - existingTags.length + newTags.length - }. These tags have not been set: ${newTags.map((t) => `'${t}'`).join(", ")}.`, - }, - { status: 422 } - ); - } + const nonEmptyTags = bodyTags.filter((t) => t.trim().length > 0); - if (newTags.length === 0) { + if (nonEmptyTags.length === 0) { return json({ message: "No new tags to add" }, { status: 200 }); } - await prisma.taskRun.update({ - where: { - friendlyId: parsedParams.data.runId, - runtimeEnvironmentId: authenticationResult.environment.id, - }, - data: { - runTags: { - push: newTags, - }, + const env = authenticationResult.environment; + const outcome = await mutateWithFallback({ + runId: parsedParams.data.runId, + environmentId: env.id, + organizationId: env.organizationId, + bufferPatch: { type: "append_tags", tags: nonEmptyTags }, + pgMutation: async (taskRun) => { + const existing = taskRun.runTags ?? []; + const newTags = nonEmptyTags.filter((t) => !existing.includes(t)); + + if (existing.length + newTags.length > MAX_TAGS_PER_RUN) { + return json( + { + error: `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${ + existing.length + newTags.length + }. These tags have not been set: ${newTags.map((t) => `'${t}'`).join(", ")}.`, + }, + { status: 422 } + ); + } + if (newTags.length === 0) { + return json({ message: "No new tags to add" }, { status: 200 }); + } + await prisma.taskRun.update({ + where: { + id: taskRun.id, + runtimeEnvironmentId: env.id, + }, + data: { runTags: { push: newTags } }, + }); + return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 }); }, + // Buffer-applied patch path. The mutateSnapshot Lua deduplicates + // against existing snapshot tags atomically. MAX_TAGS_PER_RUN + // enforcement is skipped on the buffered side — the drainer's + // engine.trigger writes the PG row without enforcement either, + // matching today's pre-buffer trigger semantics. A future + // refinement could push the limit check into the Lua. + synthesisedResponse: () => + json({ message: `Successfully set ${nonEmptyTags.length} new tags.` }, { status: 200 }), + abortSignal: getRequestAbortSignal(), }); - return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 }); + if (outcome.kind === "not_found") { + return json({ error: "Run not found" }, { status: 404 }); + } + if (outcome.kind === "timed_out") { + return json({ error: "Run materialisation timed out" }, { status: 503 }); + } + return outcome.response; } catch (error) { logger.error("Failed to add run tags", { error }); return json({ error: "Something went wrong, please try again." }, { status: 500 }); diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts index 72ad202467..27f1394a34 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts @@ -1,10 +1,12 @@ import type { ActionFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; +import type { TaskRun } from "@trigger.dev/database"; import { z } from "zod"; import { prisma } from "~/db.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; import { sanitizeTriggerSource } from "~/utils/triggerSource"; const ParamsSchema = z.object({ @@ -12,6 +14,32 @@ const ParamsSchema = z.object({ runParam: z.string(), }); +// Subset of TaskRun fields that ReplayTaskRunService.call actually +// reads from `existingTaskRun`. Validate the buffered fallback against +// this before casting to TaskRun so a buffer-format drift surfaces as a +// 404/422 here rather than as a silent NaN/undefined deep inside +// replay. The full TaskRun type has many more fields the service never +// touches; we only assert the ones it reads. +const BufferedReplayInputSchema = z.object({ + id: z.string(), + friendlyId: z.string(), + runtimeEnvironmentId: z.string(), + taskIdentifier: z.string(), + payload: z.string(), + payloadType: z.string(), + queue: z.string(), + isTest: z.boolean(), + traceId: z.string(), + spanId: z.string(), + engine: z.string(), + runTags: z.array(z.string()), + // Nullable / optional fields the service tolerates via `??` fallbacks. + concurrencyKey: z.string().nullable().optional(), + workerQueue: z.string().nullable().optional(), + machinePreset: z.string().nullable().optional(), + realtimeStreamsVersion: z.string().nullable().optional(), +}); + export async function action({ request, params }: ActionFunctionArgs) { // Ensure this is a POST request if (request.method.toUpperCase() !== "POST") { @@ -32,12 +60,46 @@ export async function action({ request, params }: ActionFunctionArgs) { const { runParam } = parsed.data; try { - const taskRun = await prisma.taskRun.findUnique({ + const env = authenticationResult.environment; + // PG-first. Replay works on any status per audit (Q2 design) — no + // filter beyond friendlyId is the existing semantic; findFirst with + // env scoping tightens it minimally without changing behaviour for + // a correctly-authed caller. + let taskRun: TaskRun | null = await prisma.taskRun.findFirst({ where: { friendlyId: runParam, + runtimeEnvironmentId: env.id, }, }); + if (!taskRun) { + // Buffered fallback (Q2). The SyntheticRun shape was extended in + // Phase B4 to carry every field ReplayTaskRunService reads from a + // TaskRun. Validate the subset of fields the service consumes + // (BufferedReplayInputSchema above) before casting; a schema + // mismatch surfaces as a 404 here rather than as a silent + // undefined deep inside the service. + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: env.id, + organizationId: env.organizationId, + }); + if (buffered) { + const parsed = BufferedReplayInputSchema.safeParse(buffered); + if (parsed.success) { + taskRun = parsed.data as unknown as TaskRun; + } else { + logger.warn("replay: buffered fallback failed schema validation", { + runParam, + issues: parsed.error.issues.map((issue) => ({ + path: issue.path.join("."), + code: issue.code, + })), + }); + } + } + } + if (!taskRun) { return json({ error: "Run not found" }, { status: 404 }); } diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.reschedule.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.reschedule.ts index 0ac8aec835..fb3db6a34e 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.reschedule.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.reschedule.ts @@ -3,90 +3,137 @@ import { json } from "@remix-run/server-runtime"; import { RescheduleRunRequestBody } from "@trigger.dev/core/v3/schemas"; import { z } from "zod"; import { getApiVersion } from "~/api/versions"; -import { prisma } from "~/db.server"; import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { logger } from "~/services/logger.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { RescheduleTaskRunService } from "~/v3/services/rescheduleTaskRun.server"; +import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { parseDelay } from "~/utils/delays"; const ParamsSchema = z.object({ runParam: z.string(), }); export async function action({ request, params }: ActionFunctionArgs) { - // Ensure this is a POST request if (request.method.toUpperCase() !== "POST") { return { status: 405, body: "Method Not Allowed" }; } - // Authenticate the request const authenticationResult = await authenticateApiRequest(request); - if (!authenticationResult) { return json({ error: "Invalid or missing API Key" }, { status: 401 }); } const parsed = ParamsSchema.safeParse(params); - if (!parsed.success) { return json({ error: "Invalid or missing run ID" }, { status: 400 }); } - const { runParam } = parsed.data; - - const taskRun = await prisma.taskRun.findUnique({ - where: { - friendlyId: runParam, - runtimeEnvironmentId: authenticationResult.environment.id, - }, - }); - - if (!taskRun) { - return json({ error: "Run not found" }, { status: 404 }); - } - const anyBody = await request.json(); - const body = RescheduleRunRequestBody.safeParse(anyBody); - if (!body.success) { return json({ error: "Invalid request body" }, { status: 400 }); } - const service = new RescheduleTaskRunService(); + const env = authenticationResult.environment; + // Pre-resolve the absolute Date the buffer snapshot should encode. + // RescheduleTaskRunService expects this to be present on the body for + // its PG-side flow; for the buffer-side patch we encode the same + // wall-clock value so the drainer's engine.trigger sees the intended + // delayUntil after materialisation. + const delayUntil = await parseDelay(body.data.delay); + if (!delayUntil) { + return json({ error: "Invalid delay value" }, { status: 400 }); + } try { - const updatedRun = await service.call(taskRun, body.data); - - if (!updatedRun) { - return json({ error: "An unknown error occurred" }, { status: 500 }); + // PG-side `RescheduleTaskRunService.call` enforces + // `taskRun.status !== "DELAYED"` and 422s otherwise — without an + // equivalent guard the buffer path would happily inject a + // `delayUntil` into the snapshot of a non-delayed buffered run, and + // the drainer would materialise it with an unintended delay. The + // SyntheticRun type doesn't carry a "DELAYED" enum value because + // it's not a terminal status the trace API needs to express; the + // buffered analogue is `delayUntil` set in the snapshot. Gate on + // that. Race window between read and write is bounded: if the + // drainer materialises mid-call, mutateWithFallback falls through + // to the PG mutation which has its own DELAYED check. + const buffered = await findRunByIdWithMollifierFallback({ + runId: parsed.data.runParam, + environmentId: env.id, + organizationId: env.organizationId, + }); + if (buffered && !buffered.delayUntil) { + return json( + { error: "Cannot reschedule a run that is not delayed" }, + { status: 422 }, + ); } - const run = await ApiRetrieveRunPresenter.findRun( - updatedRun.friendlyId, - authenticationResult.environment - ); - - if (!run) { + const outcome = await mutateWithFallback({ + runId: parsed.data.runParam, + environmentId: env.id, + organizationId: env.organizationId, + bufferPatch: { + type: "set_delay", + delayUntil: delayUntil.toISOString(), + }, + pgMutation: async (taskRun) => { + const service = new RescheduleTaskRunService(); + const updatedRun = await service.call(taskRun, body.data); + if (!updatedRun) { + return json({ error: "An unknown error occurred" }, { status: 500 }); + } + + const run = await ApiRetrieveRunPresenter.findRun(updatedRun.friendlyId, env); + if (!run) { + return json({ error: "Run not found" }, { status: 404 }); + } + const apiVersion = getApiVersion(request); + const presenter = new ApiRetrieveRunPresenter(apiVersion); + const result = await presenter.call(run, env); + if (!result) { + return json({ error: "Run not found" }, { status: 404 }); + } + return json(result); + }, + // Buffered snapshot has been patched. Run it through the same + // ApiRetrieveRunPresenter the PG branch uses (it falls back to + // the buffer for the SyntheticRun lookup) so the response shape + // matches `RetrieveRunResponse` — that's what the SDK's + // `rescheduleRun` zod-validates against. Returning a stripped + // `{ id, delayUntil }` object fails the SDK schema on every + // existing SDK version. + synthesisedResponse: async () => { + const run = await ApiRetrieveRunPresenter.findRun(parsed.data.runParam, env); + if (!run) { + return json({ error: "Run not found" }, { status: 404 }); + } + const apiVersion = getApiVersion(request); + const presenter = new ApiRetrieveRunPresenter(apiVersion); + const result = await presenter.call(run, env); + if (!result) { + return json({ error: "Run not found" }, { status: 404 }); + } + return json(result); + }, + abortSignal: getRequestAbortSignal(), + }); + + if (outcome.kind === "not_found") { return json({ error: "Run not found" }, { status: 404 }); } - - const apiVersion = getApiVersion(request); - - const presenter = new ApiRetrieveRunPresenter(apiVersion); - const result = await presenter.call(run, authenticationResult.environment); - - if (!result) { - return json({ error: "Run not found" }, { status: 404 }); + if (outcome.kind === "timed_out") { + return json({ error: "Run materialisation timed out" }, { status: 503 }); } - - return json(result); + return outcome.response; } catch (error) { if (error instanceof ServiceValidationError) { return json({ error: error.message }, { status: 400 }); } - logger.error("Failed to reschedule run", { error }); return json({ error: "Something went wrong, please try again." }, { status: 500 }); } diff --git a/apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts b/apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts index a636ca0cc1..f02b058b27 100644 --- a/apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts +++ b/apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts @@ -1,8 +1,13 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; -import { $replica } from "~/db.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server"; +import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server"; +import { + resolveRunForMutation, + type ResolvedRunForMutation, +} from "~/v3/mollifier/resolveRunForMutation.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -17,29 +22,55 @@ const { action } = createActionApiRoute( action: "write", resource: (params) => ({ type: "runs", id: params.runParam }), }, - findResource: async (params, auth) => { - return $replica.taskRun.findFirst({ - where: { - friendlyId: params.runParam, - runtimeEnvironmentId: auth.environment.id, - }, - }); - }, + // PG-or-buffer resolver. Returning null here would 404 BEFORE the + // action runs (`apiBuilder.server.ts:321`), so buffered cancels need + // a buffer check at this layer too. Logic lives in a helper so the + // three paths (PG hit, buffer hit, both miss) are unit-tested + // independently of the route builder. The action's mutateWithFallback + // call repeats the lookup atomically — slightly redundant but keeps + // wait-and-bounce semantics intact. + findResource: async (params, auth): Promise => + resolveRunForMutation({ + runParam: params.runParam, + environmentId: auth.environment.id, + organizationId: auth.environment.organizationId, + }), }, - async ({ resource }) => { - if (!resource) { - return json({ error: "Run not found" }, { status: 404 }); - } + async ({ params, authentication }) => { + const runId = params.runParam; + const env = authentication.environment; + const cancelledAt = new Date(); + const cancelReason = "Canceled by user"; - const service = new CancelTaskRunService(); + const outcome = await mutateWithFallback({ + runId, + environmentId: env.id, + organizationId: env.organizationId, + bufferPatch: { + type: "mark_cancelled", + cancelledAt: cancelledAt.toISOString(), + cancelReason, + }, + pgMutation: async (taskRun) => { + const service = new CancelTaskRunService(); + try { + await service.call(taskRun); + } catch { + return json({ error: "Internal Server Error" }, { status: 500 }); + } + return json({ id: taskRun.friendlyId }, { status: 200 }); + }, + synthesisedResponse: () => json({ id: runId }, { status: 200 }), + abortSignal: getRequestAbortSignal(), + }); - try { - await service.call(resource); - } catch (error) { - return json({ error: "Internal Server Error" }, { status: 500 }); + if (outcome.kind === "not_found") { + return json({ error: "Run not found" }, { status: 404 }); } - - return json({ id: resource.friendlyId }, { status: 200 }); + if (outcome.kind === "timed_out") { + return json({ error: "Run materialisation timed out" }, { status: 503 }); + } + return outcome.response; } ); diff --git a/apps/webapp/app/v3/mollifier/applyMetadataMutation.server.ts b/apps/webapp/app/v3/mollifier/applyMetadataMutation.server.ts new file mode 100644 index 0000000000..9262895172 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/applyMetadataMutation.server.ts @@ -0,0 +1,100 @@ +import { applyMetadataOperations } from "@trigger.dev/core/v3"; +import type { FlushedRunMetadata } from "@trigger.dev/core/v3/schemas"; +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; + +export type ApplyMetadataMutationOutcome = + | { kind: "applied"; newMetadata: Record } + | { kind: "not_found" } + | { kind: "busy" } + | { kind: "version_exhausted" }; + +// Apply a metadata PUT (body.metadata replace AND/OR body.operations +// deltas) to a buffered run's snapshot. Mirrors the PG-side +// `UpdateMetadataService.#updateRunMetadataWithOperations` retry loop: +// read snapshot → apply operations in JS → CAS-write back with the +// observed `metadataVersion`. Retries on conflict; bounded by +// `maxRetries`. The Lua CAS is the atomicity primitive — concurrent +// callers never lose an increment / append / set. +export async function applyMetadataMutationToBufferedRun(input: { + runId: string; + body: Pick; + buffer?: MollifierBuffer | null; + maxRetries?: number; +}): Promise { + const buffer = input.buffer ?? getMollifierBuffer(); + if (!buffer) return { kind: "not_found" }; + + // Default retry budget tuned for buffered-window concurrency. The + // PG-side `UpdateMetadataService` uses 3, which is fine when the only + // writer is the executing task itself. For a buffered run the writers + // are external API callers, and N parallel writers exhaust 3 retries + // quickly under contention. Bumping to 12 covers ~50-way concurrency + // with sub-percent failure probability; the cost is bounded (each + // retry is one Redis Lua call ~1ms). + const maxRetries = input.maxRetries ?? 12; + for (let attempt = 0; attempt <= maxRetries; attempt++) { + const entry = await buffer.getEntry(input.runId); + if (!entry) return { kind: "not_found" }; + if (entry.status !== "QUEUED" || entry.materialised) { + return { kind: "busy" }; + } + + const snapshot = JSON.parse(entry.payload) as Record; + const currentMetadataType = + typeof snapshot.metadataType === "string" ? snapshot.metadataType : "application/json"; + + // Starting point: either the body's replace metadata, or whatever's + // already on the snapshot. PG-side service uses the same precedence + // (replace overrides existing, operations apply on top). + let metadataObject: Record; + if (input.body.metadata !== undefined) { + metadataObject = input.body.metadata as Record; + } else if (typeof snapshot.metadata === "string") { + try { + metadataObject = JSON.parse(snapshot.metadata) as Record; + } catch { + metadataObject = {}; + } + } else { + metadataObject = {}; + } + + if (input.body.operations?.length) { + const result = applyMetadataOperations(metadataObject, input.body.operations); + metadataObject = result.newMetadata; + } + + const newMetadataStr = JSON.stringify(metadataObject); + const cas = await buffer.casSetMetadata({ + runId: input.runId, + expectedVersion: entry.metadataVersion, + newMetadata: newMetadataStr, + newMetadataType: currentMetadataType, + }); + + if (cas.kind === "applied") { + return { kind: "applied", newMetadata: metadataObject }; + } + if (cas.kind === "not_found") return { kind: "not_found" }; + if (cas.kind === "busy") return { kind: "busy" }; + // version_conflict — another caller wrote between our read + CAS. + // Small jittered backoff so a thundering herd of N retriers doesn't + // all re-read + re-CAS at exactly the same moment. + logger.debug("applyMetadataMutationToBufferedRun: version_conflict, retrying", { + runId: input.runId, + attempt, + observedVersion: entry.metadataVersion, + currentVersion: cas.currentVersion, + }); + const backoffMs = Math.floor(Math.random() * (5 + attempt * 5)); + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + } + + logger.warn("applyMetadataMutationToBufferedRun: retries exhausted", { + runId: input.runId, + maxRetries, + }); + return { kind: "version_exhausted" }; +} diff --git a/apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts b/apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts new file mode 100644 index 0000000000..a0ca335ef2 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts @@ -0,0 +1,179 @@ +import type { + MollifierBuffer, + MutateSnapshotResult, + SnapshotPatch, +} from "@trigger.dev/redis-worker"; +import type { TaskRun } from "@trigger.dev/database"; +import { prisma, $replica } from "~/db.server"; +import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; + +// Wait/retry knobs per Q3 design. Exported for tests. +export const DEFAULT_SAFETY_NET_MS = 2_000; +export const DEFAULT_POLL_STEP_MS = 20; +export const DEFAULT_PG_TIMEOUT_MS = 50; + +export type MutateWithFallbackInput = { + runId: string; + environmentId: string; + organizationId: string; + bufferPatch: SnapshotPatch; + // Called when a PG row exists (either replica-hit or post-wait writer-hit). + // Receives the full TaskRun shape and returns the customer-visible body. + pgMutation: (pgRow: TaskRun) => Promise; + // Called when the patch landed cleanly on the buffer snapshot. The + // drainer will see the patched payload on its next pop. + synthesisedResponse: () => TResponse | Promise; + abortSignal?: AbortSignal; + // Override defaults for tests. + safetyNetMs?: number; + pollStepMs?: number; + pgTimeoutMs?: number; + // Test injection. + getBuffer?: () => MollifierBuffer | null; + prismaWriter?: TaskRunReader; + prismaReplica?: TaskRunReader; + sleep?: (ms: number) => Promise; + now?: () => number; +}; + +export type MutateWithFallbackOutcome = + | { kind: "pg"; response: TResponse } + | { kind: "snapshot"; response: TResponse } + | { kind: "not_found" } + | { kind: "timed_out" }; + +// PG-first → buffer mutateSnapshot → wait-and-bounce. Implements the Q3 +// design (`_plans/2026-05-19-mollifier-mutation-race-design.md`). The +// caller decides how to translate the outcome into an HTTP response — +// this helper never throws Response objects so it remains route-agnostic +// and unit-testable in isolation. +export async function mutateWithFallback( + input: MutateWithFallbackInput, +): Promise> { + const replica = input.prismaReplica ?? $replica; + const writer = input.prismaWriter ?? prisma; + const buffer = (input.getBuffer ?? getMollifierBuffer)(); + const sleep = input.sleep ?? defaultSleep; + const now = input.now ?? Date.now; + + // Path 1 — PG is already canonical. + const replicaRow = await findRunInPg(replica, input.runId, input.environmentId); + if (replicaRow) { + const response = await input.pgMutation(replicaRow); + return { kind: "pg", response }; + } + + if (!buffer) { + // No buffer configured (mollifier disabled or boot-time error). PG + // missed; nothing else to consult. + return { kind: "not_found" }; + } + + // Path 2 — buffer snapshot mutation. + const result: MutateSnapshotResult = await buffer.mutateSnapshot( + input.runId, + input.bufferPatch, + ); + + if (result === "applied_to_snapshot") { + return { kind: "snapshot", response: await input.synthesisedResponse() }; + } + + if (result === "not_found") { + // Disambiguate a genuine 404 from a replica-lag miss: ask the writer + // directly. If the row just appeared post-drain we route through the + // PG mutation path. + const writerRow = await findRunInPg(writer, input.runId, input.environmentId); + if (writerRow) { + const response = await input.pgMutation(writerRow); + return { kind: "pg", response }; + } + return { kind: "not_found" }; + } + + // result === "busy" — entry is DRAINING / FAILED / materialised. Wait + // for the drainer to terminate the entry into PG (success or + // SYSTEM_FAILURE) and route through pgMutation. + const safetyNetMs = input.safetyNetMs ?? DEFAULT_SAFETY_NET_MS; + const pollStepMs = input.pollStepMs ?? DEFAULT_POLL_STEP_MS; + const pgTimeoutMs = input.pgTimeoutMs ?? DEFAULT_PG_TIMEOUT_MS; + const deadline = now() + safetyNetMs; + + while (now() < deadline) { + if (input.abortSignal?.aborted) { + return { kind: "timed_out" }; + } + + const row = await findRunInPgWithTimeout( + writer, + input.runId, + input.environmentId, + pgTimeoutMs, + ); + if (row) { + const response = await input.pgMutation(row); + return { kind: "pg", response }; + } + + if (now() >= deadline) break; + await sleep(pollStepMs); + } + + logger.warn("mollifier mutate-with-fallback: drainer resolution timed out", { + runId: input.runId, + safetyNetMs, + }); + return { kind: "timed_out" }; +} + +// Structural reader interface — accepts both the writer (`prisma`) and the +// replica (`$replica`), which differ slightly in their generated Prisma +// types but share the findFirst surface used here. +type TaskRunReader = { + taskRun: { + findFirst(args: { + where: { friendlyId: string; runtimeEnvironmentId: string }; + }): Promise; + }; +}; + +async function findRunInPg( + client: TaskRunReader, + friendlyId: string, + environmentId: string, +): Promise { + return client.taskRun.findFirst({ + where: { friendlyId, runtimeEnvironmentId: environmentId }, + }); +} + +async function findRunInPgWithTimeout( + client: TaskRunReader, + friendlyId: string, + environmentId: string, + timeoutMs: number, +): Promise { + // One slow PG query shouldn't burn the whole safety-net budget. + // Promise.race against a timer; on timeout we treat the poll as a miss + // and the outer loop tries again on the next tick. + const timeoutToken = Symbol("pg-timeout"); + let timeoutHandle: ReturnType | undefined; + const timeoutPromise = new Promise((resolve) => { + timeoutHandle = setTimeout(() => resolve(timeoutToken), timeoutMs); + }); + try { + const winner = await Promise.race([ + findRunInPg(client, friendlyId, environmentId), + timeoutPromise, + ]); + if (winner === timeoutToken) return null; + return winner; + } finally { + if (timeoutHandle) clearTimeout(timeoutHandle); + } +} + +function defaultSleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/apps/webapp/app/v3/mollifier/resolveRunForMutation.server.ts b/apps/webapp/app/v3/mollifier/resolveRunForMutation.server.ts new file mode 100644 index 0000000000..2808fbe9b2 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/resolveRunForMutation.server.ts @@ -0,0 +1,58 @@ +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { $replica as defaultReplica } from "~/db.server"; +import { getMollifierBuffer as defaultGetBuffer } from "./mollifierBuffer.server"; + +// Discriminated-union resolver used by mutation routes' `findResource`. +// The route builder treats a null return from `findResource` as a 404 +// BEFORE the action handler runs (`apiBuilder.server.ts:321`), so we +// must check BOTH the PG canonical store and the mollifier buffer here +// — otherwise a buffered run can't be cancelled / mutated even though +// the underlying mutateWithFallback flow would handle it correctly. +// +// (Regression: before extracting this helper the cancel route had +// `findResource: async () => null`, which made every cancel 404 before +// the action ran. The helper makes the lookup unit-testable.) +export type ResolvedRunForMutation = + | { source: "pg"; friendlyId: string } + | { source: "buffer"; friendlyId: string }; + +export type ResolveRunForMutationDeps = { + prismaReplica?: { + taskRun: { + findFirst(args: { + where: { friendlyId: string; runtimeEnvironmentId: string }; + select: { friendlyId: true }; + }): Promise<{ friendlyId: string } | null>; + }; + }; + getBuffer?: () => MollifierBuffer | null; +}; + +export async function resolveRunForMutation(input: { + runParam: string; + environmentId: string; + organizationId: string; + deps?: ResolveRunForMutationDeps; +}): Promise { + const replica = input.deps?.prismaReplica ?? defaultReplica; + const getBuffer = input.deps?.getBuffer ?? defaultGetBuffer; + + const pgRun = await replica.taskRun.findFirst({ + where: { friendlyId: input.runParam, runtimeEnvironmentId: input.environmentId }, + select: { friendlyId: true }, + }); + if (pgRun) return { source: "pg", friendlyId: pgRun.friendlyId }; + + const buffer = getBuffer(); + if (!buffer) return null; + + const entry = await buffer.getEntry(input.runParam); + if ( + entry && + entry.envId === input.environmentId && + entry.orgId === input.organizationId + ) { + return { source: "buffer", friendlyId: input.runParam }; + } + return null; +} diff --git a/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts b/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts index 9568499930..b2068dc6ec 100644 --- a/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts +++ b/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts @@ -1,6 +1,7 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; export class ResetIdempotencyKeyService extends BaseService { public async call( @@ -8,7 +9,7 @@ export class ResetIdempotencyKeyService extends BaseService { taskIdentifier: string, authenticatedEnv: AuthenticatedEnvironment ): Promise<{ id: string }> { - const { count } = await this._prisma.taskRun.updateMany({ + const { count: pgCount } = await this._prisma.taskRun.updateMany({ where: { idempotencyKey, taskIdentifier, @@ -20,7 +21,48 @@ export class ResetIdempotencyKeyService extends BaseService { }, }); - if (count === 0) { + // Buffer-side reset (Q5): the key may belong to a buffered run that + // hasn't materialised yet. The PG updateMany above can't see it. + // resetIdempotency clears both the snapshot fields and the Redis + // lookup atomically. Returns null when nothing was bound there. + const buffer = getMollifierBuffer(); + let bufferResetFailed = false; + const bufferResult = buffer + ? await buffer + .resetIdempotency({ + envId: authenticatedEnv.id, + taskIdentifier, + idempotencyKey, + }) + .catch((err) => { + // Don't drop a buffer outage on the floor. We log + flag so + // the 404 branch below can distinguish "no record anywhere" + // (legitimate not-found) from "PG cleared nothing AND we + // couldn't see the buffer" (partial outage — caller should + // retry, not be told "doesn't exist"). + bufferResetFailed = true; + logger.error("ResetIdempotencyKeyService: buffer reset failed", { + idempotencyKey, + taskIdentifier, + err: err instanceof Error ? err.message : String(err), + }); + return { clearedRunId: null }; + }) + : { clearedRunId: null }; + + const totalCount = pgCount + (bufferResult.clearedRunId ? 1 : 0); + + if (pgCount === 0 && bufferResetFailed) { + // PG saw nothing AND the buffer is unreachable. We can't truthfully + // say "not found" — there may be a buffered run we can't observe. + // Surface as 503 so the caller retries instead of being misled. + throw new ServiceValidationError( + "Unable to verify buffered idempotency state right now; please retry", + 503 + ); + } + + if (totalCount === 0) { throw new ServiceValidationError( `No runs found with idempotency key: ${idempotencyKey} and task: ${taskIdentifier}`, 404 @@ -28,7 +70,7 @@ export class ResetIdempotencyKeyService extends BaseService { } logger.info( - `Reset idempotency key: ${idempotencyKey} for task: ${taskIdentifier} in env: ${authenticatedEnv.id}, affected ${count} run(s)` + `Reset idempotency key: ${idempotencyKey} for task: ${taskIdentifier} in env: ${authenticatedEnv.id}, affected ${totalCount} run(s) (pg=${pgCount}, buffered=${bufferResult.clearedRunId ? 1 : 0})` ); return { id: idempotencyKey }; diff --git a/apps/webapp/test/mollifierApplyMetadataMutation.test.ts b/apps/webapp/test/mollifierApplyMetadataMutation.test.ts new file mode 100644 index 0000000000..61a3d2db16 --- /dev/null +++ b/apps/webapp/test/mollifierApplyMetadataMutation.test.ts @@ -0,0 +1,186 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server"; +import type { BufferEntry, MollifierBuffer, CasSetMetadataResult } from "@trigger.dev/redis-worker"; + +// Regression for the CAS retry-exhaustion bug found by Phase F. The +// default `maxRetries` was 3, matching the PG-side service, but that +// exhausts fast when N external API writers race the same buffered +// run's metadata. Bumped to 12 + jittered backoff (commit 4e7d5d8a2). +// These tests simulate version_conflict races and assert (a) every +// delta lands and (b) the retry budget is sized for realistic +// concurrency. + +const NOW = new Date("2026-05-21T10:00:00Z"); + +type BufferStub = { + buffer: MollifierBuffer; + state: { + version: number; + metadata: Record; + pendingConflictsForNextN: number; + }; +}; + +// Build a stub MollifierBuffer that simulates Lua-CAS semantics +// in-memory. The first `pendingConflictsForNextN` casSetMetadata calls +// from any worker will return version_conflict (then the version +// bumps); subsequent calls succeed. +function makeBufferStub(initialPayload: Record = {}): BufferStub { + const state = { + version: 0, + metadata: initialPayload.metadata + ? (JSON.parse(initialPayload.metadata as string) as Record) + : {}, + pendingConflictsForNextN: 0, + }; + const entryTemplate: Omit = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + status: "QUEUED", + attempts: 0, + createdAt: NOW, + createdAtMicros: 1747044000000000, + materialised: false, + idempotencyLookupKey: "", + metadataVersion: 0, + }; + + const buffer: MollifierBuffer = { + getEntry: vi.fn(async (): Promise => ({ + ...entryTemplate, + metadataVersion: state.version, + payload: JSON.stringify({ ...initialPayload, metadata: JSON.stringify(state.metadata) }), + })), + casSetMetadata: vi.fn( + async (input: { + runId: string; + expectedVersion: number; + newMetadata: string; + newMetadataType: string; + }): Promise => { + // Inject a controlled number of conflicts to simulate races. + if (state.pendingConflictsForNextN > 0) { + state.pendingConflictsForNextN -= 1; + // Bump version as if some other writer just landed. + state.version += 1; + return { kind: "version_conflict", currentVersion: state.version }; + } + if (input.expectedVersion !== state.version) { + return { kind: "version_conflict", currentVersion: state.version }; + } + state.metadata = JSON.parse(input.newMetadata) as Record; + state.version += 1; + return { kind: "applied", newVersion: state.version }; + }, + ), + } as unknown as MollifierBuffer; + + return { buffer, state }; +} + +describe("applyMetadataMutationToBufferedRun — retry behaviour", () => { + it("succeeds when CAS lands on the first try (no contention)", async () => { + const { buffer, state } = makeBufferStub(); + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + body: { metadata: { counter: 1 } }, + buffer, + }); + expect(result.kind).toBe("applied"); + expect(state.metadata).toEqual({ counter: 1 }); + expect(state.version).toBe(1); + }); + + it("succeeds after 5 version conflicts (default budget = 12)", async () => { + const { buffer, state } = makeBufferStub(); + state.pendingConflictsForNextN = 5; + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + body: { operations: [{ type: "increment", key: "counter", value: 1 }] }, + buffer, + }); + expect(result.kind).toBe("applied"); + if (result.kind === "applied") { + expect(result.newMetadata.counter).toBe(1); + } + }); + + it("succeeds after 11 version conflicts (one under the default budget)", async () => { + const { buffer } = makeBufferStub(); + const setStateConflicts = (n: number) => { + // Re-read state from the closure + const state = (buffer as unknown as { __state__?: never; getEntry: () => Promise }); + void state; + }; + void setStateConflicts; + // Set conflicts directly via the shared state object + const { state } = makeBufferStub(); + state.pendingConflictsForNextN = 11; + // Build a fresh stub since we want one shared state instance + const stub = makeBufferStub(); + stub.state.pendingConflictsForNextN = 11; + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + body: { operations: [{ type: "increment", key: "counter", value: 1 }] }, + buffer: stub.buffer, + }); + expect(result.kind).toBe("applied"); + }); + + it("returns version_exhausted after retries are spent", async () => { + const stub = makeBufferStub(); + // 99 conflicts ≫ default budget of 12. With maxRetries 3 (the + // pre-fix value), this would have exhausted after 4 attempts. + stub.state.pendingConflictsForNextN = 99; + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + body: { operations: [{ type: "increment", key: "counter", value: 1 }] }, + buffer: stub.buffer, + maxRetries: 12, + }); + expect(result.kind).toBe("version_exhausted"); + }); + + it("regression: 3 retries are NOT enough under 50-way concurrency simulation", async () => { + // The pre-fix default would have lost most deltas under this + // contention. Asserting that the OLD budget (3) exhausts confirms + // the regression actually existed and the new budget addresses it. + const stub = makeBufferStub(); + stub.state.pendingConflictsForNextN = 8; + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + body: { operations: [{ type: "increment", key: "counter", value: 1 }] }, + buffer: stub.buffer, + maxRetries: 3, + }); + expect(result.kind).toBe("version_exhausted"); + }); + + it("N-way concurrent applies all converge under default budget", async () => { + // Simulate N parallel writers against a shared state. Each writer + // reads, applies a delta, CAS-writes. The Lua CAS forces them to + // retry until they see the latest version. + const N = 30; + const sharedStub = makeBufferStub(); + // Override the stub to model real per-attempt serialisation: each + // call reads the latest version, and CAS conflicts are organic + // (not pre-injected) when expectedVersion != current. + sharedStub.state.pendingConflictsForNextN = 0; + + const calls = Array.from({ length: N }, () => + applyMetadataMutationToBufferedRun({ + runId: "run_1", + body: { operations: [{ type: "increment", key: "counter", value: 1 }] }, + buffer: sharedStub.buffer, + }), + ); + const results = await Promise.all(calls); + const applied = results.filter((r) => r.kind === "applied").length; + expect(applied).toBe(N); + expect(sharedStub.state.metadata.counter).toBe(N); + }); +}); diff --git a/apps/webapp/test/mollifierMutateWithFallback.test.ts b/apps/webapp/test/mollifierMutateWithFallback.test.ts new file mode 100644 index 0000000000..ea68877284 --- /dev/null +++ b/apps/webapp/test/mollifierMutateWithFallback.test.ts @@ -0,0 +1,188 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ + prisma: { taskRun: { findFirst: vi.fn(async () => null) } }, + $replica: { taskRun: { findFirst: vi.fn(async () => null) } }, +})); + +import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server"; +import type { MollifierBuffer, MutateSnapshotResult } from "@trigger.dev/redis-worker"; +import type { TaskRun } from "@trigger.dev/database"; + +type FindFirst = ReturnType; +type PrismaStub = { taskRun: { findFirst: FindFirst } }; + +function fakePrisma(rows: Array): PrismaStub { + const fn = vi.fn(); + for (const r of rows) fn.mockResolvedValueOnce(r); + fn.mockResolvedValue(null); + return { taskRun: { findFirst: fn } }; +} + +function bufferReturning(result: MutateSnapshotResult): MollifierBuffer { + return { + mutateSnapshot: vi.fn(async () => result), + } as unknown as MollifierBuffer; +} + +const fakeRun = (overrides: Partial = {}): TaskRun => + ({ + id: "pg_id", + friendlyId: "run_1", + runtimeEnvironmentId: "env_a", + ...overrides, + }) as TaskRun; + +const baseInput = { + runId: "run_1", + environmentId: "env_a", + organizationId: "org_1", + bufferPatch: { type: "append_tags" as const, tags: ["x"] }, +}; + +describe("mutateWithFallback", () => { + it("hits replica → calls pgMutation, returns pg outcome", async () => { + const row = fakeRun(); + const pgMutation = vi.fn(async () => "pg-response"); + const synthesisedResponse = vi.fn(() => "snapshot-response"); + + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse, + prismaReplica: fakePrisma([row]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("applied_to_snapshot"), + }); + + expect(result).toEqual({ kind: "pg", response: "pg-response" }); + expect(pgMutation).toHaveBeenCalledWith(row); + expect(synthesisedResponse).not.toHaveBeenCalled(); + }); + + it("replica miss + buffer applied_to_snapshot → synthesisedResponse", async () => { + const pgMutation = vi.fn(async () => "pg"); + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("applied_to_snapshot"), + }); + expect(result).toEqual({ kind: "snapshot", response: "snap" }); + expect(pgMutation).not.toHaveBeenCalled(); + }); + + it("replica miss + buffer not_found + writer miss → not_found", async () => { + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([null]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("not_found"), + }); + expect(result).toEqual({ kind: "not_found" }); + }); + + it("replica miss + buffer not_found + writer hit → pgMutation (replica-lag recovery)", async () => { + const row = fakeRun({ friendlyId: "run_1" }); + const pgMutation = vi.fn(async () => "pg-recovered"); + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([row]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("not_found"), + }); + expect(result).toEqual({ kind: "pg", response: "pg-recovered" }); + expect(pgMutation).toHaveBeenCalledWith(row); + }); + + it("replica miss + buffer busy + writer resolves mid-wait → pgMutation", async () => { + const row = fakeRun(); + const pgMutation = vi.fn(async () => "pg-after-wait"); + // Replica misses; writer misses twice, then hits. + const writer = fakePrisma([null, null, row]); + let nowValue = 0; + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: writer as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("busy"), + sleep: async () => { + nowValue += 20; + }, + now: () => nowValue, + safetyNetMs: 2000, + pollStepMs: 20, + pgTimeoutMs: 50, + }); + expect(result).toEqual({ kind: "pg", response: "pg-after-wait" }); + expect(pgMutation).toHaveBeenCalledWith(row); + // Writer should have been polled 3 times before the hit. + expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(3); + }); + + it("replica miss + buffer busy + drainer never resolves → timed_out", async () => { + let nowValue = 0; + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([null, null, null, null, null]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("busy"), + sleep: async () => { + nowValue += 20; + }, + now: () => nowValue, + safetyNetMs: 60, + pollStepMs: 20, + pgTimeoutMs: 5, + }); + expect(result).toEqual({ kind: "timed_out" }); + }); + + it("abort signal during wait → timed_out without further polls", async () => { + const writer = fakePrisma([null, null, null]); + const controller = new AbortController(); + let nowValue = 0; + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: writer as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("busy"), + sleep: async () => { + nowValue += 20; + controller.abort(); + }, + now: () => nowValue, + safetyNetMs: 2000, + pollStepMs: 20, + pgTimeoutMs: 5, + abortSignal: controller.signal, + }); + expect(result).toEqual({ kind: "timed_out" }); + // One poll happened before the sleep+abort. + expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1); + }); + + it("buffer is null (mollifier disabled) → not_found after replica miss", async () => { + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => null, + }); + expect(result).toEqual({ kind: "not_found" }); + }); +}); diff --git a/apps/webapp/test/mollifierResetIdempotencyKey.test.ts b/apps/webapp/test/mollifierResetIdempotencyKey.test.ts new file mode 100644 index 0000000000..2fd61e1eab --- /dev/null +++ b/apps/webapp/test/mollifierResetIdempotencyKey.test.ts @@ -0,0 +1,109 @@ +import { describe, expect, it, vi } from "vitest"; + +// Mock the db module so the BaseService default prisma doesn't try to +// open a real connection at module load. Each test wires its own +// prisma stub. +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); +// Prevent the runEngine singleton from instantiating and spinning up +// PG/Redis workers at module load — without this CI fails with +// unhandled `PrismaClientInitializationError`s even though the +// assertions all pass (see `mollifierDrainerWorker.test.ts`). +vi.mock("~/v3/runEngine.server", () => ({ engine: {} })); + +// Hoisted mock state so we can swap the buffer per test without +// re-importing modules. +const bufferMock: { current: unknown } = { current: null }; +vi.mock("~/v3/mollifier/mollifierBuffer.server", () => ({ + getMollifierBuffer: () => bufferMock.current, +})); + +import { ResetIdempotencyKeyService } from "~/v3/services/resetIdempotencyKey.server"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; + +type FakePrisma = { + taskRun: { updateMany: (...args: unknown[]) => Promise<{ count: number }> }; +}; + +function makePrisma(pgCount: number): FakePrisma { + return { + taskRun: { + updateMany: vi.fn(async () => ({ count: pgCount })), + }, + }; +} + +const env = { + id: "env_a", + organizationId: "org_1", +} as unknown as Parameters[2]; + +describe("ResetIdempotencyKeyService — buffer-outage handling", () => { + it("returns success when PG cleared >=1 run, even if the buffer reset throws", async () => { + bufferMock.current = { + resetIdempotency: vi.fn(async () => { + throw new Error("ECONNREFUSED"); + }), + }; + const prisma = makePrisma(1); + const service = new ResetIdempotencyKeyService(prisma as never); + + const result = await service.call("ikey", "task", env); + expect(result).toEqual({ id: "ikey" }); + }); + + it("returns success when PG cleared nothing but the buffer cleared a run", async () => { + bufferMock.current = { + resetIdempotency: vi.fn(async () => ({ clearedRunId: "run_x" })), + }; + const prisma = makePrisma(0); + const service = new ResetIdempotencyKeyService(prisma as never); + + const result = await service.call("ikey", "task", env); + expect(result).toEqual({ id: "ikey" }); + }); + + it("404s when PG and buffer both legitimately report 'nothing to clear'", async () => { + bufferMock.current = { + resetIdempotency: vi.fn(async () => ({ clearedRunId: null })), + }; + const prisma = makePrisma(0); + const service = new ResetIdempotencyKeyService(prisma as never); + + await expect(service.call("ikey", "task", env)).rejects.toMatchObject({ + status: 404, + }); + }); + + // Regression for the silent-not-found hazard CodeRabbit flagged: if PG + // sees nothing AND we can't read the buffer (Redis outage), the + // previous behaviour was to 404 — masking a partial outage and + // leaving a buffered key effectively un-reset while the caller was + // told "doesn't exist." We now surface 503 so the caller retries. + it("503s when PG cleared nothing AND the buffer reset failed (partial outage)", async () => { + bufferMock.current = { + resetIdempotency: vi.fn(async () => { + throw new Error("ECONNREFUSED"); + }), + }; + const prisma = makePrisma(0); + const service = new ResetIdempotencyKeyService(prisma as never); + + const error = await service.call("ikey", "task", env).then( + () => null, + (err) => err, + ); + expect(error).toBeInstanceOf(ServiceValidationError); + expect(error.status).toBe(503); + expect(error.message).toMatch(/retry/i); + }); + + it("404s normally when buffer is null (mollifier disabled) and PG cleared nothing", async () => { + bufferMock.current = null; + const prisma = makePrisma(0); + const service = new ResetIdempotencyKeyService(prisma as never); + + await expect(service.call("ikey", "task", env)).rejects.toMatchObject({ + status: 404, + }); + }); +}); diff --git a/apps/webapp/test/mollifierResolveRunForMutation.test.ts b/apps/webapp/test/mollifierResolveRunForMutation.test.ts new file mode 100644 index 0000000000..c552a3cd18 --- /dev/null +++ b/apps/webapp/test/mollifierResolveRunForMutation.test.ts @@ -0,0 +1,154 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: { taskRun: { findFirst: vi.fn(async () => null) } }, +})); + +import { resolveRunForMutation } from "~/v3/mollifier/resolveRunForMutation.server"; +import type { BufferEntry, MollifierBuffer } from "@trigger.dev/redis-worker"; + +// Regression coverage for the cancel-route 404 bug (commit b490afe23). +// Before the fix the route had `findResource: async () => null`, which +// caused the route builder to 404 every cancel — including for valid +// PG-row runs — BEFORE the action handler could run. The helper +// resolveRunForMutation has to return a non-null discriminated value +// whenever the run exists in either store. + +const NOW = new Date("2026-05-21T10:00:00Z"); + +function fakeReplica(row: { friendlyId: string } | null) { + return { taskRun: { findFirst: vi.fn(async () => row) } }; +} + +function fakeBuffer(entry: BufferEntry | null): MollifierBuffer { + return { + getEntry: vi.fn(async () => entry), + } as unknown as MollifierBuffer; +} + +const baseInput = { + runParam: "run_1", + environmentId: "env_a", + organizationId: "org_1", +}; + +describe("resolveRunForMutation", () => { + it("returns { source: 'pg' } when the PG row exists", async () => { + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica({ friendlyId: "run_1" }), + getBuffer: () => null, + }, + }); + expect(result).toEqual({ source: "pg", friendlyId: "run_1" }); + }); + + it("returns { source: 'buffer' } when PG misses and the buffer entry matches env+org", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: "{}", + status: "QUEUED", + attempts: 0, + createdAt: NOW, + createdAtMicros: 1747044000000000, + materialised: false, + idempotencyLookupKey: "", + metadataVersion: 0, + }; + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica(null), + getBuffer: () => fakeBuffer(entry), + }, + }); + expect(result).toEqual({ source: "buffer", friendlyId: "run_1" }); + }); + + it("returns null when PG misses and the buffer entry env doesn't match", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_OTHER", + orgId: "org_1", + payload: "{}", + status: "QUEUED", + attempts: 0, + createdAt: NOW, + createdAtMicros: 1747044000000000, + materialised: false, + idempotencyLookupKey: "", + metadataVersion: 0, + }; + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica(null), + getBuffer: () => fakeBuffer(entry), + }, + }); + expect(result).toBeNull(); + }); + + it("returns null when PG misses and the buffer entry org doesn't match", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_OTHER", + payload: "{}", + status: "QUEUED", + attempts: 0, + createdAt: NOW, + createdAtMicros: 1747044000000000, + materialised: false, + idempotencyLookupKey: "", + metadataVersion: 0, + }; + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica(null), + getBuffer: () => fakeBuffer(entry), + }, + }); + expect(result).toBeNull(); + }); + + it("returns null when both PG and buffer miss", async () => { + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica(null), + getBuffer: () => fakeBuffer(null), + }, + }); + expect(result).toBeNull(); + }); + + it("returns null when buffer is unavailable (mollifier disabled) and PG misses", async () => { + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica(null), + getBuffer: () => null, + }, + }); + expect(result).toBeNull(); + }); + + it("PG-hit short-circuits before consulting the buffer", async () => { + const buffer = fakeBuffer(null); + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica({ friendlyId: "run_1" }), + getBuffer: () => buffer, + }, + }); + expect(result?.source).toBe("pg"); + expect(buffer.getEntry).not.toHaveBeenCalled(); + }); +});