diff --git a/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx b/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx index facff746c5e..72947c4c8f7 100644 --- a/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx +++ b/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx @@ -1,6 +1,7 @@ import { NoSymbolIcon } from "@heroicons/react/24/solid"; import { DialogClose } from "@radix-ui/react-dialog"; import { Form, useNavigation } from "@remix-run/react"; +import { useEffect, useRef } from "react"; import { Button } from "~/components/primitives/Buttons"; import { DialogContent, DialogHeader } from "~/components/primitives/Dialog"; import { FormButtons } from "~/components/primitives/FormButtons"; @@ -10,14 +11,35 @@ import { SpinnerWhite } from "~/components/primitives/Spinner"; type CancelRunDialogProps = { runFriendlyId: string; redirectPath: string; + // Optional: when provided, close the dialog as soon as the cancel + // action transitions to "loading" (the redirect is in flight). Lets + // the caller control the open state without interfering with the + // form's submit name=value pair the way `` + // around the submit button does. + onCancelSubmitted?: () => void; }; -export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialogProps) { +export function CancelRunDialog({ + runFriendlyId, + redirectPath, + onCancelSubmitted, +}: CancelRunDialogProps) { const navigation = useNavigation(); const formAction = `/resources/taskruns/${runFriendlyId}/cancel`; const isLoading = navigation.formAction === formAction; + const wasSubmitting = useRef(false); + useEffect(() => { + if (!onCancelSubmitted) return; + if (navigation.state === "submitting" && navigation.formAction === formAction) { + wasSubmitting.current = true; + } else if (wasSubmitting.current && navigation.state !== "submitting") { + wasSubmitting.current = false; + onCancelSubmitted(); + } + }, [navigation.state, navigation.formAction, formAction, onCancelSubmitted]); + return ( Cancel this run? diff --git a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts index 69560c49e88..c95f68e3f2c 100644 --- a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts @@ -3,6 +3,8 @@ import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse"; import { throttle } from "~/utils/throttle"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { deserialiseSnapshot } from "@trigger.dev/redis-worker"; import { tracePubSub } from "~/v3/services/tracePubSub.server"; const PING_INTERVAL = 5_000; @@ -37,17 +39,45 @@ export class RunStreamPresenter { }, }); - if (!run) { + // Fall back to the mollifier buffer when the run isn't in PG yet. + // The buffered run has no execution events to stream, but we still + // attach a trace-pubsub subscription using the snapshot's traceId + // so that the moment the drainer materialises the row and execution + // begins, those events flow to this open SSE connection. Closing + // with 404 would force the dashboard to keep retrying. + let traceId: string | null = run?.traceId ?? null; + if (!traceId) { + const buffer = getMollifierBuffer(); + if (buffer) { + try { + const entry = await buffer.getEntry(runFriendlyId); + if (entry) { + const snapshot = deserialiseSnapshot<{ traceId?: string }>(entry.payload); + if (typeof snapshot.traceId === "string") { + traceId = snapshot.traceId; + } + } + } catch (err) { + logger.warn("RunStreamPresenter buffer fallback failed", { + runFriendlyId, + err: err instanceof Error ? err.message : String(err), + }); + } + } + } + + if (!traceId) { throw new Response("Not found", { status: 404 }); } + const resolvedRun = { traceId }; logger.info("RunStreamPresenter.start", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, }); // Subscribe to trace updates - const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId); + const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(resolvedRun.traceId); // Only send max every 1 second const throttledSend = throttle( @@ -105,7 +135,7 @@ export class RunStreamPresenter { cleanup: () => { logger.info("RunStreamPresenter.cleanup", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, }); // Remove message listener @@ -119,13 +149,13 @@ export class RunStreamPresenter { .then(() => { logger.info("RunStreamPresenter.cleanup.unsubscribe succeeded", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, }); }) .catch((error) => { logger.error("RunStreamPresenter.cleanup.unsubscribe failed", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, error: { name: error.name, message: error.message, diff --git a/apps/webapp/app/routes/@.runs.$runParam.ts b/apps/webapp/app/routes/@.runs.$runParam.ts index a52600628d8..c2717418ff2 100644 --- a/apps/webapp/app/routes/@.runs.$runParam.ts +++ b/apps/webapp/app/routes/@.runs.$runParam.ts @@ -4,6 +4,7 @@ import { prisma } from "~/db.server"; import { redirectWithErrorMessage } from "~/models/message.server"; import { requireUser } from "~/services/session.server"; import { impersonate, rootPath, v3RunPath } from "~/utils/pathBuilder"; +import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -51,6 +52,26 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }); if (!run) { + // Admin impersonation route — bypass org membership so admins can + // open any buffered run by friendlyId, mirroring the existing PG + // behaviour above (no membership filter on the find). + const buffered = await findBufferedRunRedirectInfo({ + runFriendlyId: runParam, + userId: user.id, + skipOrgMembershipCheck: true, + }); + if (buffered) { + return redirect( + impersonate( + v3RunPath( + { slug: buffered.organizationSlug }, + { slug: buffered.projectSlug }, + { slug: buffered.environmentSlug }, + { friendlyId: runParam } + ) + ) + ); + } return redirectWithErrorMessage(rootPath(), request, "Run doesn't exist", { ephemeral: false, }); diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx index d55511e7ff5..28bae86406f 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx @@ -88,10 +88,13 @@ import { useReplaceSearchParams } from "~/hooks/useReplaceSearchParams"; import { useSearchParams } from "~/hooks/useSearchParam"; import { type Shortcut, useShortcutKeys } from "~/hooks/useShortcutKeys"; import { useHasAdminAccess } from "~/hooks/useUser"; +import { env } from "~/env.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { NextRunListPresenter } from "~/presenters/v3/NextRunListPresenter.server"; import { RunEnvironmentMismatchError, RunPresenter } from "~/presenters/v3/RunPresenter.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { buildSyntheticTraceForBufferedRun } from "~/v3/mollifier/syntheticTrace.server"; import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; import { getImpersonationId } from "~/services/impersonation.server"; import { logger } from "~/services/logger.server"; @@ -277,6 +280,31 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { ); } + // PG miss → try the mollifier buffer. When the gate diverts a trigger + // the run sits in Redis until the drainer materialises it; without + // this fallback the run-detail page 404s for the brief buffered window + // even though the API has accepted the trigger and returned an id. + const buffered = await tryMollifiedRunFallback({ + runFriendlyId: runParam, + organizationSlug, + projectSlug: projectParam, + envSlug: envParam, + userId, + }); + + if (buffered) { + const parent = await getResizableSnapshot(request, resizableSettings.parent.autosaveId); + const tree = await getResizableSnapshot(request, resizableSettings.tree.autosaveId); + + return json({ + run: buffered.run, + trace: buffered.trace, + maximumLiveReloadingSetting: env.MAXIMUM_LIVE_RELOADING_EVENTS, + resizable: { parent, tree }, + runsList: null, + }); + } + throw error; } @@ -305,6 +333,52 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { }); }; +async function tryMollifiedRunFallback(args: { + runFriendlyId: string; + organizationSlug: string; + projectSlug: string; + envSlug: string; + userId: string; +}) { + const project = await findProjectBySlug(args.organizationSlug, args.projectSlug, args.userId); + if (!project) return null; + const environment = await findEnvironmentBySlug(project.id, args.envSlug, args.userId); + if (!environment) return null; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: args.runFriendlyId, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (!buffered) return null; + + return { + run: { + id: buffered.friendlyId, + number: 1, + friendlyId: buffered.friendlyId, + traceId: buffered.traceId ?? "", + spanId: buffered.spanId ?? "", + status: "PENDING" as const, + isFinished: false, + startedAt: null, + completedAt: null, + logsDeletedAt: null, + rootTaskRun: null, + parentTaskRun: null, + environment: { + id: environment.id, + organizationId: project.organizationId, + type: environment.type, + slug: environment.slug, + userId: undefined, + userName: undefined, + }, + }, + trace: buildSyntheticTraceForBufferedRun(buffered), + }; +} + type LoaderData = SerializeFrom; export default function Page() { @@ -407,23 +481,17 @@ export default function Page() { /> {run.isFinished ? null : ( - - - - - - + )} @@ -587,6 +655,35 @@ function TraceView({ ); } +// Controlled wrapper around the cancel dialog. Owns the Radix open state +// so the dialog closes itself once the cancel action transitions through +// submission. We can't ``-wrap the submit button +// because Radix's onClick handler swallows the button's name=value pair +// that the form action depends on for `redirectUrl`. +function ControlledCancelRunDialog({ + runFriendlyId, + redirectPath, +}: { + runFriendlyId: string; + redirectPath: string; +}) { + const [open, setOpen] = useState(false); + return ( + + + + + setOpen(false)} + /> + + ); +} + function NoLogsView({ run, resizable }: Pick) { const plan = useCurrentPlan(); const organization = useOrganization(); @@ -616,9 +713,13 @@ function NoLogsView({ run, resizable }: Pick) { >
{daysSinceCompleted === undefined ? ( - + - We tidy up older logs to keep things running smoothly. + This run is queued. Logs will appear here once it begins executing. ) : isWithinLogRetention ? ( diff --git a/apps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.ts b/apps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.ts index fe267d1f9fa..816b2071ec4 100644 --- a/apps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.ts +++ b/apps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.ts @@ -2,7 +2,8 @@ import { type LoaderFunctionArgs, redirect } from "@remix-run/server-runtime"; import { z } from "zod"; import { prisma } from "~/db.server"; import { requireUserId } from "~/services/session.server"; -import { v3RunSpanPath } from "~/utils/pathBuilder"; +import { v3RunPath, v3RunSpanPath } from "~/utils/pathBuilder"; +import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server"; const ParamsSchema = z.object({ projectRef: z.string(), @@ -44,6 +45,28 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }); if (!run) { + // Fall back to the mollifier buffer so a /projects/v3/{ref}/runs/{id} + // share link works during the buffered window. + const buffered = await findBufferedRunRedirectInfo({ + runFriendlyId: validatedParams.runParam, + userId, + }); + if (buffered) { + const url = new URL(request.url); + const searchParams = url.searchParams; + if (!searchParams.has("span") && buffered.spanId) { + searchParams.set("span", buffered.spanId); + } + return redirect( + v3RunPath( + { slug: buffered.organizationSlug }, + { slug: buffered.projectSlug }, + { slug: buffered.environmentSlug }, + { friendlyId: validatedParams.runParam }, + searchParams + ) + ); + } throw new Response("Not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/realtime.v1.runs.$runId.ts b/apps/webapp/app/routes/realtime.v1.runs.$runId.ts index e03787c6200..a3cd5ac4f99 100644 --- a/apps/webapp/app/routes/realtime.v1.runs.$runId.ts +++ b/apps/webapp/app/routes/realtime.v1.runs.$runId.ts @@ -1,4 +1,3 @@ -import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { $replica } from "~/db.server"; import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; @@ -7,6 +6,13 @@ import { anyResource, createLoaderApiRoute, } from "~/services/routeBuilders/apiBuilder.server"; +import { logger } from "~/services/logger.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { + isInitialBufferedSubscriptionRequest, + recordRealtimeBufferedSubscription, +} from "~/v3/mollifier/mollifierTelemetry.server"; +import { resolveRealtimeRunResource } from "~/v3/mollifier/realtimeRunResource.server"; const ParamsSchema = z.object({ runId: z.string(), @@ -18,7 +24,7 @@ export const loader = createLoaderApiRoute( allowJWT: true, corsStrategy: "all", findResource: async (params, authentication) => { - return $replica.taskRun.findFirst({ + const pgRun = await $replica.taskRun.findFirst({ where: { friendlyId: params.runId, runtimeEnvironmentId: authentication.environment.id, @@ -31,6 +37,23 @@ export const loader = createLoaderApiRoute( }, }, }); + + // Buffered fallback. If the run is sitting in the mollifier buffer + // (no PG row yet), open the Electric subscription anyway: the + // shape stream returns an empty initial snapshot, and when the + // drainer INSERTs the PG row Electric streams it to the client. + // Without this branch the route 404s, ShapeStream stops on the + // first response, and the hook silently hangs even after the run + // materialises (no auto-recovery). + const bufferedSynthetic = pgRun + ? null + : await findRunByIdWithMollifierFallback({ + runId: params.runId, + environmentId: authentication.environment.id, + organizationId: authentication.environment.organizationId, + }); + + return resolveRealtimeRunResource({ pgRun, bufferedSynthetic }); }, authorization: { action: "read", @@ -48,6 +71,22 @@ export const loader = createLoaderApiRoute( }, }, async ({ authentication, request, resource: run, apiVersion }) => { + // Observability for buffered-window subscriptions. The gate keeps + // the counter at one tick per subscription instead of one tick per + // ~20s live-poll iteration (see `isInitialBufferedSubscriptionRequest`). + const bufferedDwellMs = (run as { __bufferedDwellMs?: number }).__bufferedDwellMs; + if ( + typeof bufferedDwellMs === "number" && + isInitialBufferedSubscriptionRequest(request.url) + ) { + recordRealtimeBufferedSubscription(); + logger.info("mollifier.realtime.buffered_subscription", { + runId: run.friendlyId, + envId: authentication.environment.id, + bufferDwellMs: bufferedDwellMs, + }); + } + return realtimeClient.streamRun( request.url, authentication.environment, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx index 614b668f910..8a3f4dd3a6e 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx @@ -5,6 +5,8 @@ import { logger } from "~/services/logger.server"; import { requireUserId } from "~/services/session.server"; import { ResetIdempotencyKeyService } from "~/v3/services/resetIdempotencyKey.server"; import { v3RunParamsSchema } from "~/utils/pathBuilder"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; export const action: ActionFunction = async ({ request, params }) => { const userId = await requireUserId(request); @@ -37,17 +39,53 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); - if (!taskRun) { - return jsonWithErrorMessage({}, request, "Run not found"); - } - - if (!taskRun.idempotencyKey) { - return jsonWithErrorMessage({}, request, "This run does not have an idempotency key"); + // Resolve run from PG or the mollifier buffer (Q5). For a buffered + // run the snapshot carries the idempotencyKey + taskIdentifier; we + // also need the runtimeEnvironmentId to feed ResetIdempotencyKeyService + // (which clears both PG and the buffer lookup — B6b). + let resolved: + | { idempotencyKey: string; taskIdentifier: string; runtimeEnvironmentId: string } + | null = null; + if (taskRun) { + if (!taskRun.idempotencyKey) { + return jsonWithErrorMessage({}, request, "This run does not have an idempotency key"); + } + resolved = { + idempotencyKey: taskRun.idempotencyKey, + taskIdentifier: taskRun.taskIdentifier, + runtimeEnvironmentId: taskRun.runtimeEnvironmentId, + }; + } else { + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (!entry) { + return jsonWithErrorMessage({}, request, "Run not found"); + } + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + return jsonWithErrorMessage({}, request, "Run not found"); + } + const synthetic = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: entry.envId, + organizationId: entry.orgId, + }); + if (!synthetic?.idempotencyKey || !synthetic.taskIdentifier) { + return jsonWithErrorMessage({}, request, "This run does not have an idempotency key"); + } + resolved = { + idempotencyKey: synthetic.idempotencyKey, + taskIdentifier: synthetic.taskIdentifier, + runtimeEnvironmentId: entry.envId, + }; } const environment = await prisma.runtimeEnvironment.findUnique({ where: { - id: taskRun.runtimeEnvironmentId, + id: resolved.runtimeEnvironmentId, }, include: { project: { @@ -64,7 +102,7 @@ export const action: ActionFunction = async ({ request, params }) => { const service = new ResetIdempotencyKeyService(); - await service.call(taskRun.idempotencyKey, taskRun.taskIdentifier, { + await service.call(resolved.idempotencyKey, resolved.taskIdentifier, { ...environment, organizationId: environment.project.organizationId, organization: environment.project.organization, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.sessions.$sessionId.$io.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.sessions.$sessionId.$io.ts index 66135347253..fd1ec765126 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.sessions.$sessionId.$io.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.sessions.$sessionId.$io.ts @@ -12,6 +12,7 @@ import { import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { requireUserId } from "~/services/session.server"; import { EnvironmentParamSchema } from "~/utils/pathBuilder"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -59,6 +60,20 @@ export async function loader({ request, params }: LoaderFunctionArgs) { }); if (!run) { + // Buffered run has no Session linkage yet. Return 204 so the SDK's + // SSE client treats this as "channel not yet active" and retries + // naturally once the drainer materialises the row. + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (buffered) { + return new Response(null, { + status: 204, + headers: { "content-type": "text/event-stream; charset=utf-8" }, + }); + } return new Response("Run not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.$streamId.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.$streamId.ts index 8d0af728df8..58491dd4298 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.$streamId.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.$streamId.ts @@ -7,6 +7,7 @@ import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { requireUserId } from "~/services/session.server"; import { EnvironmentParamSchema } from "~/utils/pathBuilder"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -58,6 +59,22 @@ export async function loader({ request, params }: LoaderFunctionArgs) { }); if (!run) { + // Fall through to a buffered-run lookup. A buffered run has no output + // streams yet (execution hasn't started); return 204 with the + // event-stream content-type so the SDK's SSE client treats this as + // "stream not yet active" and retries naturally once the drainer + // materialises the run. + const buffered = await findRunByIdWithMollifierFallback({ + runId, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (buffered) { + return new Response(null, { + status: 204, + headers: { "content-type": "text/event-stream; charset=utf-8" }, + }); + } return new Response("Run not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.input.$streamId.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.input.$streamId.ts index c9480299cc0..430ed5c52f6 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.input.$streamId.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.input.$streamId.ts @@ -7,6 +7,7 @@ import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { requireUserId } from "~/services/session.server"; import { EnvironmentParamSchema } from "~/utils/pathBuilder"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -60,6 +61,20 @@ export async function loader({ request, params }: LoaderFunctionArgs) { }); if (!run) { + // Fall through to a buffered-run lookup. A buffered run has no input + // streams yet; return 204 so the SDK's SSE client treats this as + // "stream not yet active" and retries naturally. + const buffered = await findRunByIdWithMollifierFallback({ + runId, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (buffered) { + return new Response(null, { + status: 204, + headers: { "content-type": "text/event-stream; charset=utf-8" }, + }); + } return new Response("Run not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index 09f3f33fcb3..ce80b32e1df 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -82,6 +82,10 @@ import { useHasAdminAccess } from "~/hooks/useUser"; import { useCanViewLogsPage } from "~/hooks/useCanViewLogsPage"; import { redirectWithErrorMessage } from "~/models/message.server"; import { type Span, SpanPresenter, type SpanRun } from "~/presenters/v3/SpanPresenter.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { buildSyntheticSpanRun } from "~/v3/mollifier/syntheticSpanRun.server"; +import { findProjectBySlug } from "~/models/project.server"; +import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { logger } from "~/services/logger.server"; import { requireUserId } from "~/services/session.server"; import { cn } from "~/utils/cn"; @@ -117,6 +121,41 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const presenter = new SpanPresenter(); + const tryBufferFallback = async () => { + // Fall back to the mollifier buffer when the run isn't in PG yet. We + // only synthesise a SpanRun for the root span; child spans don't + // exist for a buffered run, so non-root spanParam values resolve to + // "Event not found" (correct behaviour). + const project = await findProjectBySlug(organizationSlug, projectParam, userId); + if (!project) return null; + const environment = await findEnvironmentBySlug(project.id, envParam, userId); + if (!environment) return null; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (!buffered) return null; + if (buffered.spanId !== spanParam) { + // The runId is buffered but this spanId doesn't match the root span. + // Don't toast "Event not found" — that's noisy for the initial-render + // request the dashboard fires before the root span auto-selects. + // 204 No Content matches what the PG path returns for the same case. + return new Response(null, { status: 204 }); + } + + const run = await buildSyntheticSpanRun({ + run: buffered, + environment: { + id: environment.id, + slug: environment.slug, + type: environment.type, + }, + }); + return typedjson({ type: "run" as const, run }); + }; + try { const result = await presenter.call({ projectSlug: projectParam, @@ -127,6 +166,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { }); if (!result) { + const buffered = await tryBufferFallback(); + if (buffered) return buffered; return redirectWithErrorMessage( v3RunPath( { slug: organizationSlug }, @@ -147,6 +188,9 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { } return typedjson({ type: "span" as const, span: result.span }); } catch (error) { + const buffered = await tryBufferFallback(); + if (buffered) return buffered; + logger.error("Error loading span", { projectParam, organizationSlug, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx index 4a9581831c9..5000f68dba1 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx @@ -24,6 +24,7 @@ import { useProject } from "~/hooks/useProject"; import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { requireUserId } from "~/services/session.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; import { cn } from "~/utils/cn"; import { v3RunStreamParamsSchema } from "~/utils/pathBuilder"; @@ -75,6 +76,28 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { }); if (!run) { + // Buffered run has no realtime streams yet. Resolve the env by slug + // (so the buffer auth check below carries the same scope a PG hit + // would) and return 204 so the SDK's SSE client treats this as + // "stream not yet active" and retries on reconnect once the drainer + // materialises the row. + const env = await $replica.runtimeEnvironment.findFirst({ + where: { slug: envParam, projectId: project.id }, + select: { id: true }, + }); + if (env) { + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: env.id, + organizationId: project.organizationId, + }); + if (buffered) { + return new Response(null, { + status: 204, + headers: { "content-type": "text/event-stream; charset=utf-8" }, + }); + } + } throw new Response("Not Found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts index 5c7725c510b..4c5e89c42a7 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts @@ -9,6 +9,7 @@ import { formatDurationMilliseconds } from "@trigger.dev/core/v3/utils/durations import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; import { TaskEventKind } from "@trigger.dev/database"; import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; export async function loader({ params, request }: LoaderFunctionArgs) { const user = await requireUser(request); @@ -30,6 +31,45 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }); if (!run || !run.organizationId) { + // Buffered run has no events to package yet. Return a small gzipped + // placeholder file so the dashboard's "Download logs" button doesn't + // 404 mid-burst. Re-check org membership against the buffer entry's + // orgId so direct URL access can't confirm runId existence across + // orgs (loader-based check on the calling page doesn't apply here). + const buffer = getMollifierBuffer(); + if (buffer) { + try { + const entry = await buffer.getEntry(parsedParams.runParam); + if (entry) { + const member = await prisma.orgMember.findFirst({ + where: { userId: user.id, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + return new Response("Not found", { status: 404 }); + } + const placeholder = new Readable({ + read() { + this.push( + "# This run has not started yet. Logs will be available once it begins executing.\n" + ); + this.push(null); + }, + }); + const compressed = placeholder.pipe(createGzip()); + return new Response(compressed as any, { + status: 200, + headers: { + "Content-Type": "application/octet-stream", + "Content-Disposition": `attachment; filename="${parsedParams.runParam}.log"`, + "Content-Encoding": "gzip", + }, + }); + } + } catch { + // fall through to 404 on buffer error + } + } return new Response("Not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts index 240d7d3d8ed..c3dff252a73 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts @@ -6,6 +6,7 @@ import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/m import { logger } from "~/services/logger.server"; import { requireUserId } from "~/services/session.server"; import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; export const cancelSchema = z.object({ redirectUrl: z.string(), @@ -42,15 +43,56 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); - if (!taskRun) { + if (taskRun) { + const cancelRunService = new CancelTaskRunService(); + await cancelRunService.call(taskRun); + return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + } + + // PG miss — try the mollifier buffer. The customer can hit cancel + // on a buffered run from the dashboard during the burst window. + // Q4 design: snapshot a `mark_cancelled` patch; the drainer's + // bifurcation routes the run to `engine.createCancelledRun` on + // next pop. + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (!entry) { submission.error = { runParam: ["Run not found"] }; return json(submission); } - const cancelRunService = new CancelTaskRunService(); - await cancelRunService.call(taskRun); + // Dashboard auth: verify the requesting user is a member of the + // buffered run's org. The API path scopes by env id from the + // authenticated request; the dashboard route uses org-membership + // because the URL doesn't carry an envId. + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + submission.error = { runParam: ["Run not found"] }; + return json(submission); + } - return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + const result = await buffer!.mutateSnapshot(runParam, { + type: "mark_cancelled", + cancelledAt: new Date().toISOString(), + cancelReason: "Canceled by user", + }); + if (result === "applied_to_snapshot") { + return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + } + if (result === "not_found") { + submission.error = { runParam: ["Run not found"] }; + return json(submission); + } + // "busy" — drainer is materialising. Customer can retry; by then the + // PG row exists and the regular cancel path takes over. + return redirectWithErrorMessage( + submission.value.redirectUrl, + request, + "Run is materialising — retry in a moment" + ); } catch (error) { if (error instanceof Error) { logger.error("Failed to cancel run", { diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts index d7acf18e517..f9fa8521354 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts @@ -5,6 +5,8 @@ import { $replica } from "~/db.server"; import { requireUserId } from "~/services/session.server"; import { marqs } from "~/v3/marqs/index.server"; import { engine } from "~/v3/runEngine.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { deserialiseSnapshot } from "@trigger.dev/redis-worker"; const ParamSchema = z.object({ runParam: z.string(), @@ -43,6 +45,55 @@ export async function loader({ request, params }: LoaderFunctionArgs) { }); if (!run) { + // Buffered run isn't on a queue yet (it sits in the mollifier buffer + // until the drainer materialises it), so the queue-concurrency fields + // don't apply. Return a minimal "buffered" debug payload from the + // snapshot so the Debug panel can show *something* instead of 404'ing. + const buffer = getMollifierBuffer(); + if (buffer) { + try { + const entry = await buffer.getEntry(runParam); + if (entry) { + // Same org-membership gate as the PG path above. Without it, + // any authenticated user who knows a runId could read the + // buffered run's queue/concurrencyKey snapshot across orgs. + const member = await $replica.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + throw new Response("Not Found", { status: 404 }); + } + const snapshot = deserialiseSnapshot<{ + taskIdentifier?: string; + queue?: string; + concurrencyKey?: string; + }>(entry.payload); + return typedjson({ + engine: "V2" as const, + buffered: true, + run: { + id: entry.runId, + engine: "V2" as const, + friendlyId: entry.runId, + queue: snapshot.queue ?? null, + concurrencyKey: snapshot.concurrencyKey ?? null, + queueTimestamp: entry.createdAt, + runtimeEnvironment: null, + }, + queueConcurrencyLimit: undefined, + envConcurrencyLimit: undefined, + queueCurrentConcurrency: undefined, + envCurrentConcurrency: undefined, + queueReserveConcurrency: undefined, + envReserveConcurrency: undefined, + keys: [], + }); + } + } catch { + // fall through to 404 on buffer error + } + } throw new Response("Not Found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 8a22822d06b..62da62e0478 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -11,6 +11,9 @@ import { requireUser } from "~/services/session.server"; import { sortEnvironments } from "~/utils/environmentSort"; import { v3RunSpanPath } from "~/utils/pathBuilder"; import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import type { TaskRun } from "@trigger.dev/database"; import parseDuration from "parse-duration"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; @@ -33,7 +36,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { Object.fromEntries(new URL(request.url).searchParams) ); - const run = await $replica.taskRun.findFirst({ + let run = await $replica.taskRun.findFirst({ select: { payload: true, payloadType: true, @@ -88,6 +91,74 @@ export async function loader({ request, params }: LoaderFunctionArgs) { where: { friendlyId: runParam, project: { organization: { members: { some: { userId } } } } }, }); + let synthetic: + | (Awaited> & { __synth: true }) + | undefined; + if (!run) { + // Buffered fallback: read the snapshot and look up the env list via + // the snapshot's organizationId. Without this the Replay dialog + // 404s for runs queued in the mollifier buffer, which dumps the + // user back to the task list. + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (!entry) throw new Response("Not Found", { status: 404 }); + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) throw new Response("Not Found", { status: 404 }); + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: entry.envId, + organizationId: entry.orgId, + }); + if (!buffered) throw new Response("Not Found", { status: 404 }); + synthetic = Object.assign(buffered, { __synth: true as const }); + const orgProject = await $replica.project.findFirst({ + where: { + environments: { some: { id: entry.envId } }, + }, + select: { + slug: true, + environments: { + select: { + id: true, + type: true, + slug: true, + branchName: true, + orgMember: { select: { user: true } }, + }, + where: { + archivedAt: null, + OR: [ + { type: { in: ["PREVIEW", "STAGING", "PRODUCTION"] } }, + { type: "DEVELOPMENT", orgMember: { userId } }, + ], + }, + }, + }, + }); + if (!orgProject) throw new Response("Not Found", { status: 404 }); + run = { + payload: buffered.payload, + payloadType: buffered.payloadType ?? "application/json", + seedMetadata: buffered.seedMetadata ?? null, + seedMetadataType: buffered.seedMetadataType ?? null, + runtimeEnvironmentId: entry.envId, + concurrencyKey: buffered.concurrencyKey ?? null, + maxAttempts: buffered.maxAttempts ?? null, + maxDurationInSeconds: buffered.maxDurationInSeconds ?? null, + machinePreset: buffered.machinePreset ?? null, + workerQueue: buffered.workerQueue ?? null, + ttl: buffered.ttl ?? null, + idempotencyKey: buffered.idempotencyKey ?? null, + runTags: buffered.runTags, + queue: buffered.queue ?? "task/", + taskIdentifier: buffered.taskIdentifier ?? "", + project: orgProject, + } as unknown as typeof run; + } + if (!run) { throw new Response("Not Found", { status: 404 }); } @@ -174,7 +245,7 @@ export const action: ActionFunction = async ({ request, params }) => { } try { - const taskRun = await prisma.taskRun.findFirst({ + const pgRun = await prisma.taskRun.findFirst({ where: { friendlyId: runParam, }, @@ -192,6 +263,45 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); + // Mollifier read-fallback (Q2): if the original isn't in PG yet, + // synthesise a TaskRun from the buffered snapshot. The B4-extended + // SyntheticRun carries every field ReplayTaskRunService reads. We + // also need projectSlug + orgSlug + envSlug for the redirect path, + // so look those up via the snapshot's runtimeEnvironmentId. + let taskRun: + | (TaskRun & { + project: { slug: string; organization: { slug: string } }; + runtimeEnvironment: { slug: string }; + }) + | null = pgRun ?? null; + if (!taskRun) { + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (entry) { + const synthetic = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: entry.envId, + organizationId: entry.orgId, + }); + if (synthetic) { + const envRow = await prisma.runtimeEnvironment.findFirst({ + where: { id: entry.envId }, + select: { + slug: true, + project: { select: { slug: true, organization: { select: { slug: true } } } }, + }, + }); + if (envRow) { + taskRun = { + ...(synthetic as unknown as TaskRun), + project: { slug: envRow.project.slug, organization: { slug: envRow.project.organization.slug } }, + runtimeEnvironment: { slug: envRow.slug }, + }; + } + } + } + } + if (!taskRun) { return redirectWithErrorMessage(submission.value.failedRedirect, request, "Run not found"); } diff --git a/apps/webapp/app/routes/runs.$runParam.ts b/apps/webapp/app/routes/runs.$runParam.ts index b472d7ae8f4..7be799746fd 100644 --- a/apps/webapp/app/routes/runs.$runParam.ts +++ b/apps/webapp/app/routes/runs.$runParam.ts @@ -4,6 +4,7 @@ import { prisma } from "~/db.server"; import { redirectWithErrorMessage } from "~/models/message.server"; import { requireUser } from "~/services/session.server"; import { rootPath, v3RunPath } from "~/utils/pathBuilder"; +import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -48,6 +49,26 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }); if (!run) { + // Fall back to the mollifier buffer. Without this a customer clicking + // the run link returned by the trigger API gets bounced to the home + // page until the drainer materialises the PG row. + const buffered = await findBufferedRunRedirectInfo({ runFriendlyId: runParam, userId: user.id }); + if (buffered) { + const url = new URL(request.url); + const searchParams = url.searchParams; + if (!searchParams.has("span") && buffered.spanId) { + searchParams.set("span", buffered.spanId); + } + return redirect( + v3RunPath( + { slug: buffered.organizationSlug }, + { slug: buffered.projectSlug }, + { slug: buffered.environmentSlug }, + { friendlyId: runParam }, + searchParams + ) + ); + } return redirectWithErrorMessage( rootPath(), request, diff --git a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts index f9c7ca72f1f..094999c1f00 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts @@ -31,9 +31,9 @@ export const realtimeBufferedSubscriptionsCounter = meter.createCounter( // No `envId` attribute — `envId` is a banned high-cardinality metric // label per the repo's OTel rules. The structured warn log emitted -// alongside the counter tick (in `mollifierStaleSweep.server.ts`) -// carries the envId / orgId / runId for forensic drill-down; the -// metric stays an aggregate. +// alongside the counter tick (in `mollifierStaleSweep.server.ts` and +// the realtime route's `logger.info`) carries the envId / orgId / +// runId for forensic drill-down; the metric stays an aggregate. export function recordRealtimeBufferedSubscription(): void { realtimeBufferedSubscriptionsCounter.add(1); } diff --git a/apps/webapp/app/v3/mollifier/realtimeRunResource.server.ts b/apps/webapp/app/v3/mollifier/realtimeRunResource.server.ts new file mode 100644 index 00000000000..0a84f984530 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/realtimeRunResource.server.ts @@ -0,0 +1,57 @@ +import type { SyntheticRun } from "./readFallback.server"; + +// Shape `realtime.v1.runs.$runId.ts`'s findResource hands to the route's +// authorization callback + loader body. The PG-resident case is the +// canonical shape (a TaskRun row with the batch join); the buffered +// case below mirrors it from the synthetic run. +export type RealtimeRunResource = { + id: string; + friendlyId: string; + taskIdentifier: string; + runTags: string[]; + batch: { friendlyId: string } | null; + // Present only when this resource was resolved from the mollifier + // buffer (no PG row yet). Stamped at resolve time so the loader body + // can emit observability for buffered-window subscriptions. The flag + // doubles as the discriminant — PG-sourced resources never carry it. + __bufferedDwellMs?: number; +}; + +export type RealtimeRunResourcePgRun = { + id: string; + friendlyId: string; + taskIdentifier: string; + runTags: string[]; + batch: { friendlyId: string } | null; +}; + +// Given the results of the PG and buffer lookups, produce the resource +// shape the realtime route returns from findResource. PG-first: if the +// run is PG-resident, return it unchanged (the buffered fallback only +// fires when no PG row exists yet). When only the buffer has the run, +// synthesise a matching shape whose `id` is the deterministic value +// engine.trigger will write when the drainer materialises this run — +// this is what lets the Electric subscription's `WHERE id=` match +// the eventual INSERT. +export function resolveRealtimeRunResource(input: { + pgRun: RealtimeRunResourcePgRun | null; + bufferedSynthetic: Pick< + SyntheticRun, + "id" | "friendlyId" | "taskIdentifier" | "runTags" | "createdAt" + > | null; + now?: () => number; +}): RealtimeRunResource | null { + if (input.pgRun) return input.pgRun; + if (input.bufferedSynthetic) { + const now = (input.now ?? Date.now)(); + return { + id: input.bufferedSynthetic.id, + friendlyId: input.bufferedSynthetic.friendlyId, + taskIdentifier: input.bufferedSynthetic.taskIdentifier ?? "", + runTags: input.bufferedSynthetic.runTags, + batch: null, + __bufferedDwellMs: now - input.bufferedSynthetic.createdAt.getTime(), + }; + } + return null; +} diff --git a/apps/webapp/test/mollifierRealtimeRunResource.test.ts b/apps/webapp/test/mollifierRealtimeRunResource.test.ts new file mode 100644 index 00000000000..2f53ecb892f --- /dev/null +++ b/apps/webapp/test/mollifierRealtimeRunResource.test.ts @@ -0,0 +1,90 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { resolveRealtimeRunResource } from "~/v3/mollifier/realtimeRunResource.server"; + +const pgRun = { + id: "pg_internal_id", + friendlyId: "run_pg_friendly", + taskIdentifier: "hello-world", + runTags: ["a", "b"], + batch: { friendlyId: "batch_1" }, +}; + +const bufferedSynthetic = { + id: "buffered_id", + friendlyId: "run_buffered_id", + taskIdentifier: "hello-world", + runTags: ["c"], + // Six seconds ago against the fixed `now` below. + createdAt: new Date("2026-05-22T12:00:00.000Z"), +}; + +const fixedNow = () => new Date("2026-05-22T12:00:06.000Z").getTime(); + +describe("resolveRealtimeRunResource", () => { + it("returns the PG run unchanged when one exists", () => { + // PG wins even if the buffer also has the entry — the drainer may + // be racing the route call and the PG row is the canonical source. + expect( + resolveRealtimeRunResource({ pgRun, bufferedSynthetic: null }), + ).toEqual(pgRun); + expect( + resolveRealtimeRunResource({ pgRun, bufferedSynthetic }), + ).toEqual(pgRun); + }); + + it("never stamps __bufferedDwellMs on a PG-sourced resource", () => { + // The loader body uses __bufferedDwellMs as a discriminant for + // emitting buffered-subscription observability. A PG-resident run + // must never carry it or every PG subscription would over-count. + const result = resolveRealtimeRunResource({ pgRun, bufferedSynthetic }); + expect(result).not.toHaveProperty("__bufferedDwellMs"); + }); + + it("synthesises a resource from the buffered entry when PG misses", () => { + // Load-bearing assertion: `id` must equal `bufferedSynthetic.id`. + // The realtime route hands this `id` to streamRun, which builds + // Electric's `WHERE id=''` clause. When the drainer materialises + // the run, engine.trigger writes the row with that same id (derived + // deterministically from friendlyId), and Electric streams the + // INSERT to the client. If the synthesised `id` ever drifts from + // what the drainer writes, the customer subscribes to a shape that + // never matches and the hook silently hangs even after materialise. + const result = resolveRealtimeRunResource({ + pgRun: null, + bufferedSynthetic, + now: fixedNow, + }); + expect(result).toEqual({ + id: "buffered_id", + friendlyId: "run_buffered_id", + taskIdentifier: "hello-world", + runTags: ["c"], + batch: null, + __bufferedDwellMs: 6000, + }); + }); + + it("defaults a missing taskIdentifier to empty string", () => { + const result = resolveRealtimeRunResource({ + pgRun: null, + bufferedSynthetic: { ...bufferedSynthetic, taskIdentifier: undefined }, + now: fixedNow, + }); + expect(result?.taskIdentifier).toBe(""); + }); + + it("returns null when neither PG nor buffer have the run", () => { + // This is the genuine not-found case — typo'd runId, deleted run, + // etc. The api-builder maps null to 404. Critically, the buffered- + // fallback must NOT promote a missing run to a synthetic resource — + // that would cause Electric to open a shape for a runId that may + // never exist, which is also a silent-hang situation but for a + // different reason. + expect( + resolveRealtimeRunResource({ pgRun: null, bufferedSynthetic: null }), + ).toBeNull(); + }); +}); diff --git a/apps/webapp/test/mollifierRealtimeRunResourceBuffer.test.ts b/apps/webapp/test/mollifierRealtimeRunResourceBuffer.test.ts new file mode 100644 index 00000000000..5cf0610b73b --- /dev/null +++ b/apps/webapp/test/mollifierRealtimeRunResourceBuffer.test.ts @@ -0,0 +1,152 @@ +import { describe, expect, vi } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { resolveRealtimeRunResource } from "~/v3/mollifier/realtimeRunResource.server"; + +const SNAPSHOT_BASE = { + friendlyId: "run_phase52e2e", + taskIdentifier: "hello-world", + payload: '{"x":1}', + payloadType: "application/json", + traceContext: { traceparent: "00-0123456789abcdef0123456789abcdef-fedcba9876543210-01" }, + traceId: "0123456789abcdef0123456789abcdef", + spanId: "fedcba9876543210", + queue: "task/hello-world", + tags: ["realtime-e2e"], + depth: 0, + isTest: false, + taskEventStore: "taskEvent", +}; + +// End-to-end: a real MollifierBuffer has an entry, the real +// readFallback helper deserialises it, and the resolveRealtimeRunResource +// helper produces the resource shape the realtime route returns from +// findResource. Regression intent: if any link in the chain breaks — +// buffer interface rename, snapshot field rename, id-derivation drift, +// synthetic-shape change — this test fails. The route file itself is +// then a thin glue layer over tested pieces. +describe("realtime buffered-subscription resource resolution (testcontainers)", () => { + redisTest( + "synthesises a resource whose `id` matches RunId.fromFriendlyId", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: SNAPSHOT_BASE.friendlyId, + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT_BASE), + }); + + const bufferedSynthetic = await findRunByIdWithMollifierFallback( + { + runId: SNAPSHOT_BASE.friendlyId, + environmentId: "env_a", + organizationId: "org_1", + }, + { getBuffer: () => buffer }, + ); + expect(bufferedSynthetic).not.toBeNull(); + + const resource = resolveRealtimeRunResource({ + pgRun: null, + bufferedSynthetic, + }); + + // The load-bearing contract: the resolved `id` MUST equal what + // engine.trigger will write to PG.TaskRun.id when the drainer + // materialises this run. Electric's `WHERE id=''` clause + // depends on this match — drift means a silent-hang regression. + expect(resource?.id).toBe(RunId.fromFriendlyId(SNAPSHOT_BASE.friendlyId)); + expect(resource?.friendlyId).toBe(SNAPSHOT_BASE.friendlyId); + expect(resource?.taskIdentifier).toBe("hello-world"); + expect(resource?.runTags).toEqual(["realtime-e2e"]); + expect(resource?.batch).toBeNull(); + expect(resource?.__bufferedDwellMs).toBeTypeOf("number"); + expect(resource?.__bufferedDwellMs).toBeGreaterThanOrEqual(0); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "returns null when neither PG nor the buffer have the entry", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + const bufferedSynthetic = await findRunByIdWithMollifierFallback( + { + runId: "run_does_not_exist", + environmentId: "env_a", + organizationId: "org_1", + }, + { getBuffer: () => buffer }, + ); + expect(bufferedSynthetic).toBeNull(); + + const resource = resolveRealtimeRunResource({ + pgRun: null, + bufferedSynthetic, + }); + // The api builder relies on this null to emit a real 404 for + // genuinely missing runs. If we ever promote unknown runIds to + // synthetic resources here, the route opens an Electric shape + // for a run that may never exist — a different silent-hang + // failure mode for typos, deleted runs, etc. + expect(resource).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "does not fall back to buffer when PG has the row", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: SNAPSHOT_BASE.friendlyId, + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT_BASE), + }); + + // Simulate the drainer having materialised the run: PG has the + // canonical row, the buffer still has its entry (would be + // ack'd & removed in real ops). The resolver must return the + // PG row and NOT carry the __bufferedDwellMs flag — otherwise + // the loader body would emit a buffered-subscription log for a + // run that's actually PG-resident, over-counting the signal. + const pgRun = { + id: RunId.fromFriendlyId(SNAPSHOT_BASE.friendlyId), + friendlyId: SNAPSHOT_BASE.friendlyId, + taskIdentifier: "hello-world", + runTags: ["realtime-e2e"], + batch: null, + }; + + const bufferedSynthetic = await findRunByIdWithMollifierFallback( + { + runId: SNAPSHOT_BASE.friendlyId, + environmentId: "env_a", + organizationId: "org_1", + }, + { getBuffer: () => buffer }, + ); + + const resource = resolveRealtimeRunResource({ pgRun, bufferedSynthetic }); + expect(resource).toEqual(pgRun); + expect(resource).not.toHaveProperty("__bufferedDwellMs"); + } finally { + await buffer.close(); + } + }, + ); +}); diff --git a/apps/webapp/test/mollifierRealtimeSubscription.test.ts b/apps/webapp/test/mollifierRealtimeSubscription.test.ts new file mode 100644 index 00000000000..0ea0471a5f1 --- /dev/null +++ b/apps/webapp/test/mollifierRealtimeSubscription.test.ts @@ -0,0 +1,46 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { isInitialBufferedSubscriptionRequest } from "~/v3/mollifier/mollifierTelemetry.server"; + +describe("isInitialBufferedSubscriptionRequest", () => { + // Electric's shape-stream protocol returns a `handle=` in + // the first response. The SDK echoes that handle on every reconnect / + // live-poll iteration thereafter. The realtime route logs + + // increments the mollifier.realtime_subscriptions.buffered counter + // only on the initial connect (handle absent) so each subscription + // produces a single observability event instead of one per + // long-poll round-trip (~20s). + it("returns true for the SDK's initial GET (no handle param)", () => { + expect( + isInitialBufferedSubscriptionRequest( + "http://localhost:3030/realtime/v1/runs/run_x?log=full&offset=-1", + ), + ).toBe(true); + }); + + it("returns false for Electric's reconnects (handle present)", () => { + expect( + isInitialBufferedSubscriptionRequest( + "http://localhost:3030/realtime/v1/runs/run_x?handle=100344308-1779&log=full&offset=0_0", + ), + ).toBe(false); + }); + + it("returns false for Electric live-poll reconnects (handle + cursor)", () => { + expect( + isInitialBufferedSubscriptionRequest( + "http://localhost:3030/realtime/v1/runs/run_x?cursor=51020980&handle=100344308&live=true&log=full&offset=0_inf", + ), + ).toBe(false); + }); + + it("accepts a URL instance as well as a string", () => { + const url = new URL("http://localhost:3030/realtime/v1/runs/run_x?log=full"); + expect(isInitialBufferedSubscriptionRequest(url)).toBe(true); + }); +});