Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 149 additions & 3 deletions apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import {
findRunByIdWithMollifierFallback,
type SyntheticRun,
} from "~/v3/mollifier/readFallback.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";
Expand Down Expand Up @@ -64,13 +68,34 @@ type CommonRelatedRun = Prisma.Result<
"findFirstOrThrow"
>;

type FoundRun = NonNullable<Awaited<ReturnType<typeof ApiRetrieveRunPresenter.findRun>>>;
// Full shape returned by findRun() — the commonRunSelect fields plus the
// extras the route handler reads. Declared explicitly (not inferred via
// ReturnType<typeof findRun>) so findRun can return a synthesised buffered
// run without the type becoming self-referential.
type FoundRun = CommonRelatedRun & {
traceId: string;
payload: string;
payloadType: string;
output: string | null;
outputType: string;
error: Prisma.JsonValue;
attempts: { id: string }[];
attemptNumber: number | null;
engine: "V1" | "V2";
taskEventStore: string;
parentTaskRun: CommonRelatedRun | null;
rootTaskRun: CommonRelatedRun | null;
childRuns: CommonRelatedRun[];
};

export class ApiRetrieveRunPresenter {
constructor(private readonly apiVersion: API_VERSIONS) {}

public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) {
return $replica.taskRun.findFirst({
public static async findRun(
friendlyId: string,
env: AuthenticatedEnvironment,
): Promise<FoundRun | null> {
const pgRow = await $replica.taskRun.findFirst({
where: {
friendlyId,
runtimeEnvironmentId: env.id,
Expand Down Expand Up @@ -102,6 +127,23 @@ export class ApiRetrieveRunPresenter {
},
},
});

if (pgRow) return pgRow;

// Postgres miss → fall back to the mollifier buffer. When the gate
// diverted a trigger, the run lives in Redis until the drainer replays
// it through engine.trigger. Synthesise the FoundRun shape so call()
// returns a `QUEUED` (or `FAILED`) response with empty output, no
// attempts, no relations.
const buffered = await findRunByIdWithMollifierFallback({
runId: friendlyId,
environmentId: env.id,
organizationId: env.organizationId,
});

if (!buffered) return null;

return synthesiseFoundRunFromBuffer(buffered);
}

public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) {
Expand Down Expand Up @@ -475,3 +517,107 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction {
return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger";
}
}

// Build a FoundRun-shaped object from a buffered (mollified) run. The run
// is in the Redis buffer; engine.trigger hasn't created the Postgres row
// yet, so every field that comes from execution state (output, attempts,
// completedAt, cost, relations) takes a default. The presenter's call()
// handles QUEUED-state runs without surprise.
function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus {
switch (status) {
case "FAILED":
return "SYSTEM_FAILURE";
case "CANCELED":
return "CANCELED";
default:
return "PENDING";
}
}

// The PG path stores `TaskRun.payload` as `String?`, so in production
// the buffered snapshot's `payload` is always a string. We defensively
// coerce other types instead of silently dropping them: an object gets
// JSON-stringified (matches how the trigger path would serialise it),
// anything truly unrenderable falls back to an empty string. The log
// line surfaces format drift to ops without crashing the read path.
function synthesisePayload(buffered: SyntheticRun): string {
const payload = buffered.payload;
if (typeof payload === "string") return payload;
if (payload === undefined || payload === null) return "";
try {
const serialised = JSON.stringify(payload);
logger.warn("ApiRetrieveRunPresenter: buffered snapshot.payload non-string coerced", {
runFriendlyId: buffered.friendlyId,
payloadType: typeof payload,
});
return typeof serialised === "string" ? serialised : "";
} catch {
logger.error("ApiRetrieveRunPresenter: buffered snapshot.payload unserialisable", {
runFriendlyId: buffered.friendlyId,
payloadType: typeof payload,
});
return "";
}
}

function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status);

const errorJson: Prisma.JsonValue = buffered.error
? {
type: "STRING_ERROR",
raw: `${buffered.error.code}: ${buffered.error.message}`,
}
: null;

const metadata: Prisma.JsonValue =
typeof buffered.metadata === "string" ? buffered.metadata : null;

return {
// `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId`
// is the user-facing `run_xxx` token. Downstream logging keyed off
// `taskRun.id` correlates with other systems via the cuid — using
// the friendlyId here breaks log correlation. `SyntheticRun` carries
// the cuid alongside the friendlyId for exactly this reason
// (RunId.fromFriendlyId in readFallback.server.ts).
id: buffered.id,
friendlyId: buffered.friendlyId,
status,
taskIdentifier: buffered.taskIdentifier ?? "",
createdAt: buffered.createdAt,
startedAt: null,
updatedAt: buffered.cancelledAt ?? buffered.createdAt,
completedAt: buffered.cancelledAt ?? null,
expiredAt: null,
delayUntil: buffered.delayUntil ?? null,
metadata,
metadataType: buffered.metadataType ?? "application/json",
ttl: buffered.ttl ?? null,
costInCents: 0,
baseCostInCents: 0,
usageDurationMs: 0,
idempotencyKey: buffered.idempotencyKey ?? null,
idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null,
isTest: buffered.isTest,
depth: buffered.depth,
scheduleId: null,
lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null,
resumeParentOnCompletion: buffered.resumeParentOnCompletion,
batch: null,
runTags: buffered.tags,
traceId: buffered.traceId ?? "",
payload: synthesisePayload(buffered),
payloadType: buffered.payloadType ?? "application/json",
output: null,
outputType: "application/json",
error: errorJson,
attempts: [],
attemptNumber: null,
engine: "V2",
taskEventStore: "taskEvent",
workerQueue: buffered.workerQueue ?? "main",
parentTaskRun: null,
rootTaskRun: null,
childRuns: [],
};
}
85 changes: 72 additions & 13 deletions apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,101 @@ import {
} from "~/services/routeBuilders/apiBuilder.server";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";

const ParamsSchema = z.object({
runId: z.string(),
spanId: z.string(),
});

// Phase A2 — discriminated union for PG vs buffered runs. Buffered runs
// only have one valid spanId (the queued span recorded at gate time and
// reused as the run's root spanId when the drainer materialises). Any
// other spanId returns a deterministic 404; the queued span returns a
// minimal synthesised shape so the customer's SDK sees the same 200
// contract they'd get for a freshly-triggered run.
type ResolvedRun =
| { source: "pg"; run: Awaited<ReturnType<typeof findPgRun>> & {} }
| { source: "buffer"; run: NonNullable<Awaited<ReturnType<typeof findRunByIdWithMollifierFallback>>> };

async function findPgRun(runId: string, environmentId: string) {
return $replica.taskRun.findFirst({
where: { friendlyId: runId, runtimeEnvironmentId: environmentId },
});
}

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
findResource: (params, auth) => {
return $replica.taskRun.findFirst({
where: {
friendlyId: params.runId,
runtimeEnvironmentId: auth.environment.id,
},
findResource: async (params, auth): Promise<ResolvedRun | null> => {
const pgRun = await findPgRun(params.runId, auth.environment.id);
if (pgRun) return { source: "pg", run: pgRun };

const buffered = await findRunByIdWithMollifierFallback({
runId: params.runId,
environmentId: auth.environment.id,
organizationId: auth.environment.organizationId,
});
if (buffered) return { source: "buffer", run: buffered };

return null;
},
shouldRetryNotFound: true,
authorization: {
action: "read",
resource: (run) => {
resource: (resolved) => {
if (resolved.source === "pg") {
const run = resolved.run;
const resources = [
{ type: "runs", id: run.friendlyId },
{ type: "tasks", id: run.taskIdentifier },
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
];
if (run.batchId) {
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
}
return anyResource(resources);
}
const run = resolved.run;
const resources = [
{ type: "runs", id: run.friendlyId },
{ type: "tasks", id: run.taskIdentifier },
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []),
...run.tags.map((tag) => ({ type: "tags", id: tag })),
];
if (run.batchId) {
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
}
return anyResource(resources);
},
},
},
async ({ params, resource: run, authentication }) => {
async ({ params, resource: resolved, authentication }) => {
if (resolved.source === "buffer") {
// Buffered runs have exactly one valid spanId — the queued span the
// mollifier gate recorded at trigger time, which becomes the run's
// root spanId once the drainer materialises. Any other spanId is a
// deterministic 404. The matching spanId returns a minimal shape
// representing "span exists, no execution data yet."
if (resolved.run.spanId !== params.spanId) {
return json({ error: "Span not found" }, { status: 404 });
}
return json(
{
spanId: resolved.run.spanId,
parentId: resolved.run.parentSpanId ?? null,
runId: resolved.run.friendlyId,
message: resolved.run.taskIdentifier ?? "",
isError: false,
isPartial: resolved.run.status !== "CANCELED",
isCancelled: resolved.run.status === "CANCELED",
level: "TRACE",
startTime: resolved.run.createdAt,
durationMs: 0,
},
{ status: 200 }
);
}

const run = resolved.run;
const eventRepository = await getEventRepositoryForStore(
run.taskEventStore,
authentication.environment.organization.id
Expand Down
Loading