diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index a392866afc..2361d2dc4e 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -15,6 +15,10 @@ import assertNever from "assert-never"; import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions"; import { $replica, prisma } from "~/db.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { + findRunByIdWithMollifierFallback, + type SyntheticRun, +} from "~/v3/mollifier/readFallback.server"; import { generatePresignedUrl } from "~/v3/objectStore.server"; import { tracer } from "~/v3/tracer.server"; import { startSpanWithEnv } from "~/v3/tracing.server"; @@ -64,13 +68,34 @@ type CommonRelatedRun = Prisma.Result< "findFirstOrThrow" >; -type FoundRun = NonNullable>>; +// Full shape returned by findRun() — the commonRunSelect fields plus the +// extras the route handler reads. Declared explicitly (not inferred via +// ReturnType) so findRun can return a synthesised buffered +// run without the type becoming self-referential. +type FoundRun = CommonRelatedRun & { + traceId: string; + payload: string; + payloadType: string; + output: string | null; + outputType: string; + error: Prisma.JsonValue; + attempts: { id: string }[]; + attemptNumber: number | null; + engine: "V1" | "V2"; + taskEventStore: string; + parentTaskRun: CommonRelatedRun | null; + rootTaskRun: CommonRelatedRun | null; + childRuns: CommonRelatedRun[]; +}; export class ApiRetrieveRunPresenter { constructor(private readonly apiVersion: API_VERSIONS) {} - public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) { - return $replica.taskRun.findFirst({ + public static async findRun( + friendlyId: string, + env: AuthenticatedEnvironment, + ): Promise { + const pgRow = await $replica.taskRun.findFirst({ where: { friendlyId, runtimeEnvironmentId: env.id, @@ -102,6 +127,23 @@ export class ApiRetrieveRunPresenter { }, }, }); + + if (pgRow) return pgRow; + + // Postgres miss → fall back to the mollifier buffer. When the gate + // diverted a trigger, the run lives in Redis until the drainer replays + // it through engine.trigger. Synthesise the FoundRun shape so call() + // returns a `QUEUED` (or `FAILED`) response with empty output, no + // attempts, no relations. + const buffered = await findRunByIdWithMollifierFallback({ + runId: friendlyId, + environmentId: env.id, + organizationId: env.organizationId, + }); + + if (!buffered) return null; + + return synthesiseFoundRunFromBuffer(buffered); } public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) { @@ -475,3 +517,107 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction { return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger"; } } + +// Build a FoundRun-shaped object from a buffered (mollified) run. The run +// is in the Redis buffer; engine.trigger hasn't created the Postgres row +// yet, so every field that comes from execution state (output, attempts, +// completedAt, cost, relations) takes a default. The presenter's call() +// handles QUEUED-state runs without surprise. +function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus { + switch (status) { + case "FAILED": + return "SYSTEM_FAILURE"; + case "CANCELED": + return "CANCELED"; + default: + return "PENDING"; + } +} + +// The PG path stores `TaskRun.payload` as `String?`, so in production +// the buffered snapshot's `payload` is always a string. We defensively +// coerce other types instead of silently dropping them: an object gets +// JSON-stringified (matches how the trigger path would serialise it), +// anything truly unrenderable falls back to an empty string. The log +// line surfaces format drift to ops without crashing the read path. +function synthesisePayload(buffered: SyntheticRun): string { + const payload = buffered.payload; + if (typeof payload === "string") return payload; + if (payload === undefined || payload === null) return ""; + try { + const serialised = JSON.stringify(payload); + logger.warn("ApiRetrieveRunPresenter: buffered snapshot.payload non-string coerced", { + runFriendlyId: buffered.friendlyId, + payloadType: typeof payload, + }); + return typeof serialised === "string" ? serialised : ""; + } catch { + logger.error("ApiRetrieveRunPresenter: buffered snapshot.payload unserialisable", { + runFriendlyId: buffered.friendlyId, + payloadType: typeof payload, + }); + return ""; + } +} + +function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { + const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status); + + const errorJson: Prisma.JsonValue = buffered.error + ? { + type: "STRING_ERROR", + raw: `${buffered.error.code}: ${buffered.error.message}`, + } + : null; + + const metadata: Prisma.JsonValue = + typeof buffered.metadata === "string" ? buffered.metadata : null; + + return { + // `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId` + // is the user-facing `run_xxx` token. Downstream logging keyed off + // `taskRun.id` correlates with other systems via the cuid — using + // the friendlyId here breaks log correlation. `SyntheticRun` carries + // the cuid alongside the friendlyId for exactly this reason + // (RunId.fromFriendlyId in readFallback.server.ts). + id: buffered.id, + friendlyId: buffered.friendlyId, + status, + taskIdentifier: buffered.taskIdentifier ?? "", + createdAt: buffered.createdAt, + startedAt: null, + updatedAt: buffered.cancelledAt ?? buffered.createdAt, + completedAt: buffered.cancelledAt ?? null, + expiredAt: null, + delayUntil: buffered.delayUntil ?? null, + metadata, + metadataType: buffered.metadataType ?? "application/json", + ttl: buffered.ttl ?? null, + costInCents: 0, + baseCostInCents: 0, + usageDurationMs: 0, + idempotencyKey: buffered.idempotencyKey ?? null, + idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null, + isTest: buffered.isTest, + depth: buffered.depth, + scheduleId: null, + lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null, + resumeParentOnCompletion: buffered.resumeParentOnCompletion, + batch: null, + runTags: buffered.tags, + traceId: buffered.traceId ?? "", + payload: synthesisePayload(buffered), + payloadType: buffered.payloadType ?? "application/json", + output: null, + outputType: "application/json", + error: errorJson, + attempts: [], + attemptNumber: null, + engine: "V2", + taskEventStore: "taskEvent", + workerQueue: buffered.workerQueue ?? "main", + parentTaskRun: null, + rootTaskRun: null, + childRuns: [], + }; +} diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts index be0d12087b..cc48faf5d8 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts @@ -9,42 +9,101 @@ import { } from "~/services/routeBuilders/apiBuilder.server"; import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runId: z.string(), spanId: z.string(), }); +// Phase A2 — discriminated union for PG vs buffered runs. Buffered runs +// only have one valid spanId (the queued span recorded at gate time and +// reused as the run's root spanId when the drainer materialises). Any +// other spanId returns a deterministic 404; the queued span returns a +// minimal synthesised shape so the customer's SDK sees the same 200 +// contract they'd get for a freshly-triggered run. +type ResolvedRun = + | { source: "pg"; run: Awaited> & {} } + | { source: "buffer"; run: NonNullable>> }; + +async function findPgRun(runId: string, environmentId: string) { + return $replica.taskRun.findFirst({ + where: { friendlyId: runId, runtimeEnvironmentId: environmentId }, + }); +} + export const loader = createLoaderApiRoute( { params: ParamsSchema, allowJWT: true, corsStrategy: "all", - findResource: (params, auth) => { - return $replica.taskRun.findFirst({ - where: { - friendlyId: params.runId, - runtimeEnvironmentId: auth.environment.id, - }, + findResource: async (params, auth): Promise => { + const pgRun = await findPgRun(params.runId, auth.environment.id); + if (pgRun) return { source: "pg", run: pgRun }; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: params.runId, + environmentId: auth.environment.id, + organizationId: auth.environment.organizationId, }); + if (buffered) return { source: "buffer", run: buffered }; + + return null; }, shouldRetryNotFound: true, authorization: { action: "read", - resource: (run) => { + resource: (resolved) => { + if (resolved.source === "pg") { + const run = resolved.run; + const resources = [ + { type: "runs", id: run.friendlyId }, + { type: "tasks", id: run.taskIdentifier }, + ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ]; + if (run.batchId) { + resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); + } + return anyResource(resources); + } + const run = resolved.run; const resources = [ { type: "runs", id: run.friendlyId }, - { type: "tasks", id: run.taskIdentifier }, - ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []), + ...run.tags.map((tag) => ({ type: "tags", id: tag })), ]; - if (run.batchId) { - resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); - } return anyResource(resources); }, }, }, - async ({ params, resource: run, authentication }) => { + async ({ params, resource: resolved, authentication }) => { + if (resolved.source === "buffer") { + // Buffered runs have exactly one valid spanId — the queued span the + // mollifier gate recorded at trigger time, which becomes the run's + // root spanId once the drainer materialises. Any other spanId is a + // deterministic 404. The matching spanId returns a minimal shape + // representing "span exists, no execution data yet." + if (resolved.run.spanId !== params.spanId) { + return json({ error: "Span not found" }, { status: 404 }); + } + return json( + { + spanId: resolved.run.spanId, + parentId: resolved.run.parentSpanId ?? null, + runId: resolved.run.friendlyId, + message: resolved.run.taskIdentifier ?? "", + isError: false, + isPartial: resolved.run.status !== "CANCELED", + isCancelled: resolved.run.status === "CANCELED", + level: "TRACE", + startTime: resolved.run.createdAt, + durationMs: 0, + }, + { status: 200 } + ); + } + + const run = resolved.run; const eventRepository = await getEventRepositoryForStore( run.taskEventStore, authentication.environment.organization.id diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts index 77e6a4df04..b87bae396c 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.trace.ts @@ -8,41 +8,112 @@ import { } from "~/services/routeBuilders/apiBuilder.server"; import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runId: z.string(), // This is the run friendly ID }); +// Discriminator on the resolved resource — `pg` is the real Prisma TaskRun +// row, `buffer` is a synthesised shape from the mollifier buffer for runs +// whose drainer hasn't yet materialised them. The handler renders an empty +// trace for buffered runs so the customer sees the same 200 shape they'd +// get for a freshly-triggered PG run with no spans yet (matches the +// pass-through control case in scripts/mollifier-api-parity.sh). +type ResolvedRun = + | { source: "pg"; run: Awaited> & {} } + | { source: "buffer"; run: NonNullable>> }; + +async function findPgRun(runId: string, environmentId: string) { + return $replica.taskRun.findFirst({ + where: { friendlyId: runId, runtimeEnvironmentId: environmentId }, + }); +} + export const loader = createLoaderApiRoute( { params: ParamsSchema, allowJWT: true, corsStrategy: "all", - findResource: (params, auth) => { - return $replica.taskRun.findFirst({ - where: { - friendlyId: params.runId, - runtimeEnvironmentId: auth.environment.id, - }, + findResource: async (params, auth): Promise => { + const pgRun = await findPgRun(params.runId, auth.environment.id); + if (pgRun) return { source: "pg", run: pgRun }; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: params.runId, + environmentId: auth.environment.id, + organizationId: auth.environment.organizationId, }); + if (buffered) return { source: "buffer", run: buffered }; + + return null; }, shouldRetryNotFound: true, authorization: { action: "read", - resource: (run) => { + resource: (resolved) => { + if (resolved.source === "pg") { + const run = resolved.run; + const resources = [ + { type: "runs", id: run.friendlyId }, + { type: "tasks", id: run.taskIdentifier }, + ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ]; + if (run.batchId) { + resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); + } + return anyResource(resources); + } + const run = resolved.run; const resources = [ { type: "runs", id: run.friendlyId }, - { type: "tasks", id: run.taskIdentifier }, - ...run.runTags.map((tag) => ({ type: "tags", id: tag })), + ...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []), + ...run.tags.map((tag) => ({ type: "tags", id: tag })), ]; - if (run.batchId) { - resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) }); - } return anyResource(resources); }, }, }, - async ({ resource: run, authentication }) => { + async ({ resource: resolved, authentication }) => { + if (resolved.source === "buffer") { + // Buffered runs have no events ingested yet — the drainer hasn't + // materialised the PG row and the worker hasn't started executing. + // Synthesise a single partial span that satisfies the SDK's + // RetrieveRunTraceResponseBody schema (rootSpan is non-nullable). + const buffered = resolved.run; + return json( + { + trace: { + traceId: buffered.traceId ?? "", + rootSpan: { + id: buffered.spanId ?? "", + runId: buffered.friendlyId, + data: { + message: buffered.taskIdentifier ?? "", + taskSlug: buffered.taskIdentifier ?? undefined, + events: [], + startTime: buffered.createdAt, + duration: 0, + isError: false, + // Cancelled is a terminal state — the span shouldn't + // signal "still in progress" once it's been cancelled. + // Mirrors the sibling api.v1.runs.$runId.spans.$spanId.ts + // and syntheticTrace.server.ts logic. + isPartial: buffered.status !== "CANCELED", + isCancelled: buffered.status === "CANCELED", + level: "TRACE", + queueName: buffered.queue ?? undefined, + machinePreset: buffered.machinePreset ?? undefined, + }, + children: [], + }, + }, + }, + { status: 200 } + ); + } + + const run = resolved.run; const eventRepository = await getEventRepositoryForStore( run.taskEventStore, authentication.environment.organization.id diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts index 790e52bee4..d14246786c 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts @@ -1,8 +1,10 @@ -import type { ActionFunctionArgs } from "@remix-run/server-runtime"; +import type { ActionFunctionArgs, LoaderFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; import { z } from "zod"; +import { $replica } from "~/db.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { CreateTaskRunAttemptService } from "~/v3/services/createTaskRunAttempt.server"; @@ -11,6 +13,55 @@ const ParamsSchema = z.object({ runParam: z.string(), }); +// Phase A5 — fixes the pre-existing route bug where GET on this URL +// returned a Remix "no loader" 400 with an internal error message. The +// route only exposed `action` (POST creates a new attempt); GET had no +// handler, so any well-intentioned SDK probe hit the framework error +// instead of a proper API response. +// +// Returns `{ attempts: [] }` for both PG and buffered runs. The detailed +// attempt list belongs on the v3 retrieve endpoint, not here — this is +// the dual of the POST that creates attempts, and the empty-list shape +// gives the parity script a stable contract to assert against. +export async function loader({ request, params }: LoaderFunctionArgs) { + const authenticationResult = await authenticateApiRequest(request); + if (!authenticationResult) { + return json({ error: "Invalid or Missing API Key" }, { status: 401 }); + } + + const parsed = ParamsSchema.safeParse(params); + if (!parsed.success) { + return json({ error: "Invalid or missing run ID" }, { status: 400 }); + } + + const { runParam } = parsed.data; + const env = authenticationResult.environment; + + // Verify the run belongs to the authenticated environment before + // returning the parity-empty list. The response body is empty either + // way, but other run-scoped endpoints (spans, trace, retrieve) all + // 404 on cross-env access; matching that here means a third party + // can't distinguish "run exists" from "doesn't exist" via this + // endpoint either. PG-first then buffer fallback, consistent with + // the other read paths. + const pgRun = await $replica.taskRun.findFirst({ + where: { friendlyId: runParam, runtimeEnvironmentId: env.id }, + select: { id: true }, + }); + if (!pgRun) { + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: env.id, + organizationId: env.organizationId, + }); + if (!buffered) { + return json({ error: "Run not found" }, { status: 404 }); + } + } + + return json({ attempts: [] }, { status: 200 }); +} + export async function action({ request, params }: ActionFunctionArgs) { // Authenticate the request const authenticationResult = await authenticateApiRequest(request); diff --git a/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts b/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts new file mode 100644 index 0000000000..e316846d70 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticRedirectInfo.server.ts @@ -0,0 +1,119 @@ +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import { z } from "zod"; +import { prisma } from "~/db.server"; +import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; +// Use the webapp-side wrapper (not `deserialiseSnapshot` from +// @trigger.dev/redis-worker directly) so this file shares a single +// deserialisation path with readFallback.server.ts. The two are +// behaviourally identical today (both wrap `JSON.parse`), but pinning +// the shared helper keeps the two read-side modules from drifting if +// snapshot encoding ever changes. +import { deserialiseMollifierSnapshot } from "./mollifierSnapshot.server"; + +// Validated subset of a mollifier snapshot — just the fields needed to +// rebuild a canonical run-detail URL for a buffered run. Anything else +// in the payload is ignored. `safeParse` against this schema replaces +// the ad-hoc `as Record` + `typeof === "string"` checks +// that the redirect path used to do by hand; missing or wrong-typed +// fields collapse into a single `parsed.success === false` branch. +const BufferedSnapshotSchema = z.object({ + spanId: z.string().optional(), + environment: z.object({ + slug: z.string(), + project: z.object({ slug: z.string() }), + organization: z.object({ slug: z.string() }), + }), +}); + +export type BufferedRunRedirectInfo = { + organizationSlug: string; + projectSlug: string; + environmentSlug: string; + spanId: string | undefined; +}; + +export type FindBufferedRunRedirectInfoDeps = { + getBuffer?: () => MollifierBuffer | null; + prismaClient?: PrismaClientOrTransaction; +}; + +// Resolve the org/project/env slugs needed to build the canonical run-detail +// URL for a buffered run. Used by the short-URL redirect routes +// (`runs.$runParam`, `@.runs.$runParam`, `projects.v3.$projectRef.runs.$runParam`) +// so a customer clicking the trigger-API-returned run link doesn't 404 +// during the buffered window. +// +// Authorisation: PG query confirms the requesting user belongs to the +// organisation the buffer entry says owns the run. Without this check a +// known runId would leak slugs. +export async function findBufferedRunRedirectInfo( + args: { + runFriendlyId: string; + userId: string; + // Admin impersonation paths bypass org-membership; mirrors the existing + // PG-side admin route behaviour (`@.runs.$runParam` doesn't filter by + // org membership in the PG query either). + skipOrgMembershipCheck?: boolean; + }, + deps: FindBufferedRunRedirectInfoDeps = {}, +): Promise { + const buffer = (deps.getBuffer ?? getMollifierBuffer)(); + const prismaClient = deps.prismaClient ?? prisma; + if (!buffer) return null; + + let entry; + try { + entry = await buffer.getEntry(args.runFriendlyId); + } catch (err) { + logger.warn("buffered redirect: buffer.getEntry failed", { + runFriendlyId: args.runFriendlyId, + err: err instanceof Error ? err.message : String(err), + }); + return null; + } + if (!entry) return null; + + if (!args.skipOrgMembershipCheck) { + const member = await prismaClient.orgMember.findFirst({ + where: { userId: args.userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) return null; + } + + let raw: unknown; + try { + raw = deserialiseMollifierSnapshot(entry.payload); + } catch (err) { + logger.warn("buffered redirect: snapshot deserialise failed", { + runFriendlyId: args.runFriendlyId, + err: err instanceof Error ? err.message : String(err), + }); + return null; + } + + const parsed = BufferedSnapshotSchema.safeParse(raw); + if (!parsed.success) { + // Either the snapshot is from a different writer that doesn't carry + // environment slugs (in which case we genuinely can't build a URL) + // or a buffer-format drift snuck through. Log at debug; the caller + // 404s and the user sees the standard not-found page, not a 500. + logger.debug("buffered redirect: snapshot shape mismatch", { + runFriendlyId: args.runFriendlyId, + issues: parsed.error.issues.map((issue) => ({ + path: issue.path.join("."), + code: issue.code, + })), + }); + return null; + } + + return { + organizationSlug: parsed.data.environment.organization.slug, + projectSlug: parsed.data.environment.project.slug, + environmentSlug: parsed.data.environment.slug, + spanId: parsed.data.spanId, + }; +} diff --git a/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts new file mode 100644 index 0000000000..bfbcdae36d --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts @@ -0,0 +1,165 @@ +import { prettyPrintPacket, RunAnnotations } from "@trigger.dev/core/v3"; +import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; +import { + extractIdempotencyKeyScope, + getUserProvidedIdempotencyKey, +} from "@trigger.dev/core/v3/serverOnly"; +import { MachinePresetName } from "@trigger.dev/core/v3/schemas"; +import type { SpanRun } from "~/presenters/v3/SpanPresenter.server"; +import type { SyntheticRun } from "./readFallback.server"; + +// `SyntheticRun.machinePreset` is sourced from the snapshot payload as +// a plain string, but `SpanRun.machinePreset` is the narrowed enum. +// Validate against the canonical enum so an unknown / stale preset +// string collapses to undefined rather than fighting the type checker. +function narrowMachinePreset(value: string | undefined): SpanRun["machinePreset"] { + if (value === undefined) return undefined; + const parsed = MachinePresetName.safeParse(value); + return parsed.success ? parsed.data : undefined; +} + +// Synthesise a SpanRun-shaped object from a buffered run so the run-detail +// page's right-side details panel renders identically to a PG-resident +// run. The shape matches `SpanPresenter.getRun`'s return value exactly; +// buffered-irrelevant fields (output, error, attempts, schedule, session, +// region, batch) are filled with sensible defaults. +// +// Pretty-printing for payload and metadata mirrors SpanPresenter so the +// UI receives data in the same shape. Buffered runs cannot use the +// `application/store` packet path (no R2 object yet) so we treat raw +// snapshot fields as inline packets. +export async function buildSyntheticSpanRun(args: { + run: SyntheticRun; + environment: { id: string; slug: string; type: "PRODUCTION" | "DEVELOPMENT" | "STAGING" | "PREVIEW" }; +}): Promise { + const { run, environment } = args; + + const payload = + typeof run.payload !== "undefined" && run.payload !== null + ? await prettyPrintPacket(run.payload, run.payloadType ?? undefined) + : undefined; + + const metadata = run.metadata + ? await prettyPrintPacket(run.metadata, run.metadataType, { + filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"], + }) + : undefined; + + const idempotencyShape = { + idempotencyKey: run.idempotencyKey ?? null, + idempotencyKeyExpiresAt: null, + idempotencyKeyOptions: run.idempotencyKeyOptions ?? null, + }; + + const idempotencyKey = getUserProvidedIdempotencyKey(idempotencyShape); + const idempotencyKeyScope = extractIdempotencyKeyScope(idempotencyShape); + const idempotencyKeyStatus: SpanRun["idempotencyKeyStatus"] = idempotencyKey + ? "active" + : idempotencyKeyScope + ? "inactive" + : undefined; + + const taskKind = RunAnnotations.safeParse(run.annotations).data?.taskKind; + const isAgentRun = taskKind === "AGENT"; + + const queueName = run.queue ?? "task/"; + const isCancelled = run.status === "CANCELED"; + return { + id: run.id, + friendlyId: run.friendlyId, + status: isCancelled ? "CANCELED" : "PENDING", + statusReason: isCancelled ? run.cancelReason ?? undefined : undefined, + createdAt: run.createdAt, + startedAt: null, + executedAt: null, + updatedAt: run.cancelledAt ?? run.createdAt, + delayUntil: run.delayUntil ?? null, + expiredAt: null, + completedAt: run.cancelledAt ?? null, + logsDeletedAt: null, + ttl: run.ttl ?? null, + taskIdentifier: run.taskIdentifier ?? "", + version: undefined, + sdkVersion: undefined, + runtime: undefined, + runtimeVersion: undefined, + isTest: run.isTest, + replayedFromTaskRunFriendlyId: run.replayedFromTaskRunFriendlyId ?? null, + environmentId: environment.id, + idempotencyKey, + idempotencyKeyExpiresAt: null, + idempotencyKeyScope, + idempotencyKeyStatus, + debounce: null, + schedule: undefined, + queue: { + name: queueName, + isCustomQueue: !queueName.startsWith("task/"), + concurrencyKey: run.concurrencyKey ?? null, + }, + tags: run.runTags, + baseCostInCents: 0, + costInCents: 0, + totalCostInCents: 0, + usageDurationMs: 0, + isFinished: false, + isRunning: false, + isError: false, + isAgentRun, + payload, + payloadType: run.payloadType ?? "application/json", + output: undefined, + outputType: "application/json", + error: undefined, + relationships: { + root: run.rootTaskRunFriendlyId + ? { + friendlyId: run.rootTaskRunFriendlyId, + spanId: "", + taskIdentifier: "", + createdAt: run.createdAt, + isParent: run.parentTaskRunFriendlyId === run.rootTaskRunFriendlyId, + } + : undefined, + parent: run.parentTaskRunFriendlyId + ? { + friendlyId: run.parentTaskRunFriendlyId, + spanId: "", + taskIdentifier: "", + } + : undefined, + }, + context: JSON.stringify( + { + task: { + id: run.taskIdentifier ?? "", + }, + run: { + id: run.friendlyId, + createdAt: run.createdAt, + isTest: run.isTest, + }, + environment: { + id: environment.id, + slug: environment.slug, + type: environment.type, + }, + }, + null, + 2, + ), + metadata, + maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds), + batch: undefined, + session: undefined, + engine: "V2", + region: null, + workerQueue: run.workerQueue ?? "", + traceId: run.traceId ?? "", + spanId: run.spanId ?? "", + isCached: false, + machinePreset: narrowMachinePreset(run.machinePreset), + taskEventStore: "taskEvent", + externalTraceId: undefined, + }; +} diff --git a/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts b/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts new file mode 100644 index 0000000000..acde2ccee9 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts @@ -0,0 +1,66 @@ +import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; +import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/TreeView/TreeView"; +import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents"; +import type { SpanSummary } from "~/v3/eventRepository/eventRepository.types"; +import type { SyntheticRun } from "./readFallback.server"; + +// Build a single-span trace for a buffered run so the run-detail page +// renders a meaningful timeline before the drainer materialises the +// row. Mirrors the shape produced by `RunPresenter` when its trace +// store lookup returns no spans, so the dashboard consumer treats the +// buffered run identically to a freshly enqueued PG run that hasn't +// emitted any events yet. +export function buildSyntheticTraceForBufferedRun(run: SyntheticRun) { + const spanId = run.spanId ?? ""; + const isCancelled = run.status === "CANCELED"; + const span: SpanSummary = { + id: spanId, + parentId: run.parentSpanId, + runId: run.friendlyId, + data: { + message: run.taskIdentifier ?? "Task", + style: { icon: "task", variant: "primary" }, + events: [], + startTime: run.createdAt, + duration: 0, + isError: false, + isPartial: !isCancelled, + isCancelled, + isDebug: false, + level: "TRACE", + }, + }; + + const tree = createTreeFromFlatItems([span], spanId); + const treeRootStartTimeMs = tree?.data.startTime.getTime() ?? 0; + const totalDuration = Math.max(tree?.data.duration ?? 0, millisecondsToNanoseconds(1)); + + const events = tree + ? flattenTree(tree).map((n) => { + const offset = millisecondsToNanoseconds( + n.data.startTime.getTime() - treeRootStartTimeMs + ); + return { + ...n, + data: { + ...n.data, + timelineEvents: createTimelineSpanEventsFromSpanEvents(n.data.events, false, treeRootStartTimeMs), + duration: n.data.isPartial ? null : n.data.duration, + offset, + isRoot: n.id === spanId, + }, + }; + }) + : []; + + return { + rootSpanStatus: (isCancelled ? "completed" : "executing") as "executing" | "completed" | "failed", + events, + duration: totalDuration, + rootStartedAt: tree?.data.startTime, + startedAt: null, + queuedDuration: undefined, + overridesBySpanId: undefined, + linkedRunIdBySpanId: {} as Record, + }; +} diff --git a/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts b/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts new file mode 100644 index 0000000000..a996b9de69 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticRedirectInfo.test.ts @@ -0,0 +1,197 @@ +import { describe, expect, vi } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { MollifierBuffer } from "@trigger.dev/redis-worker"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server"; + +const SNAPSHOT = { + spanId: "span_1", + environment: { + slug: "dev", + project: { slug: "hello-world-bN7m" }, + organization: { slug: "references-6120" }, + }, +}; + +function fakePrisma(member: { id: string } | null) { + return { + orgMember: { findFirst: vi.fn(async () => member) }, + } as unknown as Parameters[1]["prismaClient"]; +} + +describe("findBufferedRunRedirectInfo (testcontainers)", () => { + redisTest("returns slugs + spanId for a real buffer entry when user is a member", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_1", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_1", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toEqual({ + organizationSlug: "references-6120", + projectSlug: "hello-world-bN7m", + environmentSlug: "dev", + spanId: "span_1", + }); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when no buffer entry exists for the runId", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_missing", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when the user is not an org member (default check enforced)", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_2", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_2", userId: "user_other" }, + { getBuffer: () => buffer, prismaClient: fakePrisma(null) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("skips the org-membership check when skipOrgMembershipCheck is set (admin path)", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_3", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const findFirst = vi.fn(); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_3", userId: "user_admin", skipOrgMembershipCheck: true }, + { + getBuffer: () => buffer, + prismaClient: { orgMember: { findFirst } } as unknown as Parameters[1]["prismaClient"], + }, + ); + expect(info?.organizationSlug).toBe("references-6120"); + expect(findFirst).not.toHaveBeenCalled(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when snapshot is malformed JSON", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_4", + envId: "env_a", + orgId: "org_1", + payload: "{not-json", + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_4", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns null when snapshot lacks org/project slugs", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_5", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ spanId: "s", environment: { slug: "dev" } }), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_5", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }); + + redisTest("returns info with undefined spanId when snapshot has no spanId", async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_6", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ environment: SNAPSHOT.environment }), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_6", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info?.spanId).toBeUndefined(); + expect(info?.environmentSlug).toBe("dev"); + } finally { + await buffer.close(); + } + }); + + redisTest( + "rejects snapshots where a slug is the wrong type (Zod guard, not just typeof)", + async ({ redisOptions }) => { + // Regression for the pre-Zod implementation: the slug check was + // `typeof slug !== "string"` so any string passed, including ones + // that should've been rejected on shape grounds. The Zod schema + // gives us full structural validation — a `slug: 42` (number) + // collapses into the parse-fail branch like any other shape + // mismatch and we return null instead of leaking a half-built + // redirect URL. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_real_7", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify({ + environment: { + slug: 42, + project: { slug: "p" }, + organization: { slug: "o" }, + }, + }), + }); + const info = await findBufferedRunRedirectInfo( + { runFriendlyId: "run_real_7", userId: "user_1" }, + { getBuffer: () => buffer, prismaClient: fakePrisma({ id: "member_1" }) }, + ); + expect(info).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); +}); diff --git a/apps/webapp/test/mollifierSyntheticSpanRun.test.ts b/apps/webapp/test/mollifierSyntheticSpanRun.test.ts new file mode 100644 index 0000000000..68c3c4cfc4 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticSpanRun.test.ts @@ -0,0 +1,158 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { buildSyntheticSpanRun } from "~/v3/mollifier/syntheticSpanRun.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-21T10:00:00Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + taskIdentifier: "hello-world", + createdAt: NOW, + payload: { message: "hi" }, + payloadType: "application/json", + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: "10m", + tags: ["a", "b"], + runTags: ["a", "b"], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: "worker-queue-1", + queue: "task/hello-world", + concurrencyKey: undefined, + machinePreset: "small-1x", + realtimeStreamsVersion: "v1", + maxAttempts: 3, + maxDurationInSeconds: 3600, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +const ENV = { + id: "env_a", + slug: "dev", + type: "DEVELOPMENT" as const, +}; + +describe("buildSyntheticSpanRun", () => { + it("populates the core identity fields from the snapshot", async () => { + const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun(), environment: ENV }); + expect(synth.id).toBe("run_internal_1"); + expect(synth.friendlyId).toBe("run_friendly_1"); + expect(synth.taskIdentifier).toBe("hello-world"); + expect(synth.traceId).toBe("trace_1"); + expect(synth.spanId).toBe("span_1"); + expect(synth.environmentId).toBe("env_a"); + expect(synth.engine).toBe("V2"); + expect(synth.workerQueue).toBe("worker-queue-1"); + }); + + it("reports PENDING status and the non-final flags", async () => { + const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun(), environment: ENV }); + expect(synth.status).toBe("PENDING"); + expect(synth.isFinished).toBe(false); + expect(synth.isRunning).toBe(false); + expect(synth.isError).toBe(false); + expect(synth.startedAt).toBeNull(); + expect(synth.completedAt).toBeNull(); + }); + + it("pretty-prints the JSON payload from the snapshot", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ payload: { message: "hi" }, payloadType: "application/json" }), + environment: ENV, + }); + // prettyPrintPacket round-trips JSON with 2-space indent. + expect(synth.payload).toContain('"message": "hi"'); + expect(synth.payloadType).toBe("application/json"); + }); + + it("forwards runTags onto `tags` exactly", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ runTags: ["alpha", "beta"] }), + environment: ENV, + }); + expect(synth.tags).toEqual(["alpha", "beta"]); + }); + + it("classifies the queue name as custom when it does not start with 'task/'", async () => { + const taskQueue = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ queue: "task/hello-world" }), + environment: ENV, + }); + expect(taskQueue.queue.isCustomQueue).toBe(false); + + const customQueue = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ queue: "my-custom" }), + environment: ENV, + }); + expect(customQueue.queue.isCustomQueue).toBe(true); + }); + + it("derives idempotency status from the snapshot key/options", async () => { + const withKey = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ idempotencyKey: "abc", idempotencyKeyOptions: ["scope"] }), + environment: ENV, + }); + expect(withKey.idempotencyKey).toBe("abc"); + expect(withKey.idempotencyKeyStatus).toBe("active"); + + const noKey = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ idempotencyKey: undefined, idempotencyKeyOptions: undefined }), + environment: ENV, + }); + expect(noKey.idempotencyKeyStatus).toBeUndefined(); + }); + + it("fills relationship metadata from parent/root snapshot fields when present", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun({ + parentTaskRunFriendlyId: "run_parent", + rootTaskRunFriendlyId: "run_root", + }), + environment: ENV, + }); + expect(synth.relationships.parent?.friendlyId).toBe("run_parent"); + expect(synth.relationships.root?.friendlyId).toBe("run_root"); + expect(synth.relationships.root?.isParent).toBe(false); + }); + + it("returns no relationship objects when the snapshot has no parent/root", async () => { + const synth = await buildSyntheticSpanRun({ + run: makeSyntheticRun(), + environment: ENV, + }); + expect(synth.relationships.parent).toBeUndefined(); + expect(synth.relationships.root).toBeUndefined(); + }); + + it("flags the synthetic run as 'not cached' since cache lookup did not match it", async () => { + const synth = await buildSyntheticSpanRun({ run: makeSyntheticRun(), environment: ENV }); + expect(synth.isCached).toBe(false); + }); +});