Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 150 additions & 9 deletions apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,99 @@
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { tryCatch } from "@trigger.dev/core/utils";
import type { RunMetadataChangeOperation } from "@trigger.dev/core/v3/schemas";
import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
import { z } from "zod";
import { $replica } from "~/db.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { ServiceValidationError } from "~/v3/services/common.server";
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";

const ParamsSchema = z.object({
runId: z.string(),
});

// Phase A6 — fixes the pre-existing route bug where GET on this URL
// returned a Remix "no loader" 400. The route only exposed PUT (update);
// GET had no handler. Returns `{ metadata, metadataType }` from either
// the Postgres row or the mollifier buffer snapshot.
export async function loader({ request, params }: LoaderFunctionArgs) {
const authenticationResult = await authenticateApiRequest(request);
if (!authenticationResult) {
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
}

const parsed = ParamsSchema.safeParse(params);
if (!parsed.success) {
return json({ error: "Invalid or missing run ID" }, { status: 400 });
}

const env = authenticationResult.environment;

const pgRun = await $replica.taskRun.findFirst({
where: { friendlyId: parsed.data.runId, runtimeEnvironmentId: env.id },
select: { metadata: true, metadataType: true },
});
if (pgRun) {
return json({ metadata: pgRun.metadata, metadataType: pgRun.metadataType }, { status: 200 });
}

const buffered = await findRunByIdWithMollifierFallback({
runId: parsed.data.runId,
environmentId: env.id,
organizationId: env.organizationId,
});
if (buffered) {
return json(
{
metadata: buffered.metadata ?? null,
metadataType: buffered.metadataType ?? "application/json",
},
{ status: 200 }
);
}

return json({ error: "Run not found" }, { status: 404 });
}

// Route parent/root operations to the existing PG service by directly
// invoking it against the parent/root runId. The service ingests via
// its batching worker, which targets PG by id. If the parent/root is
// itself buffered we recurse through our buffered-mutation helper.
// `_ingestion_only` flag: a synthetic body that has the operations
// promoted to top-level `operations` so the service applies them to
// `targetRunId` directly.
async function routeOperationsToRun(
targetRunId: string | undefined,
operations: RunMetadataChangeOperation[] | undefined,
env: AuthenticatedEnvironment
): Promise<void> {
if (!targetRunId || !operations || operations.length === 0) return;

// Try PG first via the existing service (this is how parent/root
// operations have always landed; preserve that). Accepts the full
// AuthenticatedEnvironment so we don't have to recover the unsafe
// `as unknown` cast that the previous narrowed `{ id, organizationId }`
// signature forced on us.
const [error] = await tryCatch(
updateMetadataService.call(targetRunId, { operations }, env)
);
Comment thread
d-cs marked this conversation as resolved.
if (!error) return;

// PG service threw — could be "Cannot update metadata for a completed
// run" or similar. If the target is buffered, route operations to its
// snapshot too. Best-effort; do not surface this failure to the
// caller — the parent/root ops are auxiliary.
await applyMetadataMutationToBufferedRun({
runId: targetRunId,
body: { operations },
});
}

const { action } = createActionApiRoute(
{
params: ParamsSchema,
Expand All @@ -18,23 +102,80 @@ const { action } = createActionApiRoute(
method: "PUT",
},
async ({ authentication, body, params }) => {
const [error, result] = await tryCatch(
updateMetadataService.call(params.runId, body, authentication.environment)
);
const env = authentication.environment;
const runId = params.runId;

if (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: error.status ?? 422 });
// PG-canonical path. If the run is in PG, the existing service
// owns the full request shape including parent/root operations,
// metadataVersion CAS, batching, validation — none of which the
// buffer side needs to reimplement.
const [pgError, pgResult] = await tryCatch(
updateMetadataService.call(runId, body, env)
);
if (pgError) {
if (pgError instanceof ServiceValidationError) {
return json({ error: pgError.message }, { status: pgError.status ?? 422 });
}

return json({ error: "Internal Server Error" }, { status: 500 });
}
if (pgResult) {
return json(pgResult, { status: 200 });
}

if (!result) {
// PG miss. Target run is either buffered or genuinely absent.
const bufferOutcome = await applyMetadataMutationToBufferedRun({
runId,
body: { metadata: body.metadata, operations: body.operations },
});

if (bufferOutcome.kind === "not_found") {
return json({ error: "Task Run not found" }, { status: 404 });
}
if (bufferOutcome.kind === "busy") {
// Entry is materialising. Best path is to retry the PG call —
// the row may be visible now. We don't waste a roundtrip in
// the happy path, but a 503 here would be customer-visible
// breakage for legitimately-burst workloads. Hand back 503 with
// a retry hint; SDK retry policy converges.
return json({ error: "Run materialising, retry shortly" }, { status: 503 });
}
if (bufferOutcome.kind === "version_exhausted") {
// Pathological contention — many concurrent metadata writers on
// the same buffered runId. Surface as 503 rather than silently
// dropping the request.
return json({ error: "Metadata write contention; retry shortly" }, { status: 503 });
}

// Buffered metadata mutation succeeded. Fan parent/root operations
// out to their respective runs (parent/root are typically PG-
// materialised by the time the child is buffered, so the existing
// service handles them; if they're also buffered, the helper
// recurses through the buffered mutation path).
const bufferedEntry = await findRunByIdWithMollifierFallback({
runId,
environmentId: env.id,
organizationId: env.organizationId,
});
if (bufferedEntry) {
await Promise.all([
routeOperationsToRun(bufferedEntry.parentTaskRunId, body.parentOperations, env),
// The PG service routes rootOperations to
// `taskRun.rootTaskRun?.id ?? taskRun.id` — the actual root, not
// the parent. The snapshot carries the root's *friendlyId*
// (parentTaskRunId is an internal id; root is friendlyId because
// it's what the engine passes through). Use it; if absent,
// route to the run itself (matches PG's self-fallback) rather
// than misrouting to the parent for grandchild → child → root
// hierarchies.
routeOperationsToRun(
bufferedEntry.rootTaskRunFriendlyId ?? runId,
body.rootOperations,
env,
),
]);
}

return json(result, { status: 200 });
return json({ metadata: bufferOutcome.newMetadata }, { status: 200 });
}
);

Expand Down
94 changes: 51 additions & 43 deletions apps/webapp/app/routes/api.v1.runs.$runId.tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import { z } from "zod";
import { prisma } from "~/db.server";
import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { logger } from "~/services/logger.server";
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";

const ParamsSchema = z.object({
runId: z.string(),
});

export async function action({ request, params }: ActionFunctionArgs) {
// Ensure this is a POST request
if (request.method.toUpperCase() !== "POST") {
return { status: 405, body: "Method Not Allowed" };
}

// Authenticate the request
const authenticationResult = await authenticateApiRequest(request);
if (!authenticationResult) {
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
Expand All @@ -32,59 +32,67 @@ export async function action({ request, params }: ActionFunctionArgs) {

try {
const anyBody = await request.json();

const body = AddTagsRequestBody.safeParse(anyBody);
if (!body.success) {
return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 });
}

const run = await prisma.taskRun.findFirst({
where: {
friendlyId: parsedParams.data.runId,
runtimeEnvironmentId: authenticationResult.environment.id,
},
select: {
runTags: true,
},
});

const existingTags = run?.runTags ?? [];

//remove duplicate tags from the new tags
const bodyTags = typeof body.data.tags === "string" ? [body.data.tags] : body.data.tags;
const newTags = bodyTags.filter((tag) => {
if (tag.trim().length === 0) return false;
return !existingTags.includes(tag);
});

if (existingTags.length + newTags.length > MAX_TAGS_PER_RUN) {
return json(
{
error: `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${
existingTags.length + newTags.length
}. These tags have not been set: ${newTags.map((t) => `'${t}'`).join(", ")}.`,
},
{ status: 422 }
);
}
const nonEmptyTags = bodyTags.filter((t) => t.trim().length > 0);

if (newTags.length === 0) {
if (nonEmptyTags.length === 0) {
return json({ message: "No new tags to add" }, { status: 200 });
}

await prisma.taskRun.update({
where: {
friendlyId: parsedParams.data.runId,
runtimeEnvironmentId: authenticationResult.environment.id,
},
data: {
runTags: {
push: newTags,
},
const env = authenticationResult.environment;
const outcome = await mutateWithFallback({
runId: parsedParams.data.runId,
environmentId: env.id,
organizationId: env.organizationId,
bufferPatch: { type: "append_tags", tags: nonEmptyTags },
pgMutation: async (taskRun) => {
const existing = taskRun.runTags ?? [];
const newTags = nonEmptyTags.filter((t) => !existing.includes(t));

if (existing.length + newTags.length > MAX_TAGS_PER_RUN) {
return json(
{
error: `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${
existing.length + newTags.length
}. These tags have not been set: ${newTags.map((t) => `'${t}'`).join(", ")}.`,
},
{ status: 422 }
);
}
if (newTags.length === 0) {
return json({ message: "No new tags to add" }, { status: 200 });
}
await prisma.taskRun.update({
where: {
id: taskRun.id,
runtimeEnvironmentId: env.id,
},
data: { runTags: { push: newTags } },
});
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
},
// Buffer-applied patch path. The mutateSnapshot Lua deduplicates
// against existing snapshot tags atomically. MAX_TAGS_PER_RUN
// enforcement is skipped on the buffered side — the drainer's
// engine.trigger writes the PG row without enforcement either,
// matching today's pre-buffer trigger semantics. A future
// refinement could push the limit check into the Lua.
synthesisedResponse: () =>
json({ message: `Successfully set ${nonEmptyTags.length} new tags.` }, { status: 200 }),
abortSignal: getRequestAbortSignal(),
});

return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
if (outcome.kind === "not_found") {
return json({ error: "Run not found" }, { status: 404 });
}
if (outcome.kind === "timed_out") {
return json({ error: "Run materialisation timed out" }, { status: 503 });
}
return outcome.response;
} catch (error) {
logger.error("Failed to add run tags", { error });
return json({ error: "Something went wrong, please try again." }, { status: 500 });
Expand Down
Loading