diff --git a/.server-changes/idempotent-background-worker-registration.md b/.server-changes/idempotent-background-worker-registration.md new file mode 100644 index 0000000000..abadd47154 --- /dev/null +++ b/.server-changes/idempotent-background-worker-registration.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Make `POST /api/v1/deployments/:deploymentId/background-workers` idempotent (for sequential requests only) so client-side retries no longer collide on the `BackgroundWorker` `(project, env, version)` unique index. Helps make deployments more resilient against the class of indexing failures that surfaces in the dashboard as "Indexing timed out", e.g. during transient database issues. diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index ec3f6d077a..b3052fa65b 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -34,31 +34,8 @@ import { tryCatch } from "@trigger.dev/core/v3"; import { engine } from "../runEngine.server"; import { scheduleEngine } from "../scheduleEngine.server"; -/** - * Strip BackgroundWorkerMetadata down to the slice that's actually read after - * storage. Everything else is duplicated to dedicated columns/tables - * (BackgroundWorker.{contentHash,cliVersion,sdkVersion,runtime,runtimeVersion}, - * BackgroundWorkerTask, BackgroundWorkerFile, TaskQueue, Prompt). Today the - * only post-write reader is changeCurrentDeployment.server.ts, which feeds - * tasks[].schedule into syncDeclarativeSchedules. packageVersion, contentHash, - * and tasks[].filePath are kept solely to satisfy BackgroundWorkerMetadata's - * required fields when the column is parsed back. - */ -export function stripBackgroundWorkerMetadataForStorage( - metadata: BackgroundWorkerMetadata -): Prisma.InputJsonValue { - return { - packageVersion: metadata.packageVersion, - contentHash: metadata.contentHash, - tasks: metadata.tasks - .filter((t) => t.schedule) - .map((t) => ({ - id: t.id, - filePath: t.filePath, - schedule: t.schedule, - })), - }; -} +import { stripBackgroundWorkerMetadataForStorage } from "./stripBackgroundWorkerMetadataForStorage.server"; +export { stripBackgroundWorkerMetadataForStorage }; export class CreateBackgroundWorkerService extends BaseService { private readonly _taskMetaCache: TaskMetadataCache; @@ -356,8 +333,13 @@ async function createWorkerTask( prisma: PrismaClientOrTransaction, tasksToBackgroundFiles?: Map ): Promise { + // Hoisted so the P2002 catch branch can return the same entry shape. + let queue: TaskQueue | undefined; + let resolvedTriggerSource: "SCHEDULED" | "AGENT" | "STANDARD" | undefined; + let resolvedTtl: string | null | undefined; + try { - let queue = queues.find((queue) => queue.name === task.queue?.name); + queue = queues.find((queue) => queue.name === task.queue?.name); if (!queue) { // Create a TaskQueue @@ -374,14 +356,14 @@ async function createWorkerTask( ); } - const resolvedTriggerSource = + resolvedTriggerSource = task.triggerSource === "schedule" ? ("SCHEDULED" as const) : task.triggerSource === "agent" ? ("AGENT" as const) : ("STANDARD" as const); - const resolvedTtl = + resolvedTtl = typeof task.ttl === "number" ? stringifyDuration(task.ttl) ?? null : task.ttl ?? null; await prisma.backgroundWorkerTask.create({ @@ -418,10 +400,26 @@ async function createWorkerTask( if (error instanceof Prisma.PrismaClientKnownRequestError) { // The error code for unique constraint violation in Prisma is P2002 if (error.code === "P2002") { - logger.warn("Task already exists", { + // Retry landing after the first attempt's row was already written. + const existing = await prisma.backgroundWorkerTask.findFirst({ + where: { workerId: worker.id, slug: task.id }, + select: { id: true }, + }); + + logger.warn("Attempted to recreate background worker task", { task, worker, }); + + if (existing && queue && resolvedTriggerSource && resolvedTtl !== undefined) { + return { + slug: task.id, + ttl: resolvedTtl, + triggerSource: resolvedTriggerSource, + queueId: queue.id, + queueName: queue.name, + }; + } } else { logger.error("Prisma Error creating background worker task", { error: { diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts index ff041359bb..d240af3411 100644 --- a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts @@ -1,6 +1,9 @@ import { CreateBackgroundWorkerRequestBody, logger, tryCatch } from "@trigger.dev/core/v3"; -import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; -import type { BackgroundWorker, PrismaClientOrTransaction, WorkerDeployment } from "@trigger.dev/database"; +import type { + BackgroundWorker, + PrismaClientOrTransaction, + WorkerDeployment, +} from "@trigger.dev/database"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { type TaskMetadataCache } from "~/services/taskMetadataCache.server"; import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server"; @@ -8,9 +11,9 @@ import { BaseService, ServiceValidationError } from "./baseService.server"; import { createBackgroundFiles, createWorkerResources, - stripBackgroundWorkerMetadataForStorage, syncDeclarativeSchedules, } from "./createBackgroundWorker.server"; +import { findOrCreateBackgroundWorker } from "./createDeploymentBackgroundWorkerV4/findOrCreateBackgroundWorker.server"; import { TimeoutDeploymentService } from "./timeoutDeployment.server"; import { env } from "~/env.server"; @@ -50,6 +53,11 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { }); if (!deployment) { + logger.warn("createDeploymentBackgroundWorker: deployment not found", { + deploymentId, + environmentId: environment.id, + projectId: environment.projectId, + }); return; } @@ -69,26 +77,44 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { } } + // Late-retry idempotency: if a worker was registered by a prior fully- + // successful attempt and the deployment already moved past BUILDING, return + // that worker so the CLI can finalize instead of seeing a 5xx. + if (deployment.workerId) { + const linkedWorker = await this._prisma.backgroundWorker.findFirst({ + where: { id: deployment.workerId }, + }); + if (linkedWorker) { + return linkedWorker; + } + } + if (deployment.status !== "BUILDING") { + logger.warn("createDeploymentBackgroundWorker: deployment not in BUILDING state", { + deploymentId, + deploymentStatus: deployment.status, + environmentId: environment.id, + projectId: environment.projectId, + }); return; } - const backgroundWorker = await this._prisma.backgroundWorker.create({ - data: { - ...BackgroundWorkerId.generate(), - version: deployment.version, - runtimeEnvironmentId: environment.id, - projectId: environment.projectId, - metadata: stripBackgroundWorkerMetadataForStorage(body.metadata), - contentHash: body.metadata.contentHash, - cliVersion: body.metadata.cliPackageVersion, - sdkVersion: body.metadata.packageVersion, - supportsLazyAttempts: body.supportsLazyAttempts, - engine: body.engine, - runtime: body.metadata.runtime, - runtimeVersion: body.metadata.runtimeVersion, - }, - }); + const [findOrCreateError, backgroundWorker] = await tryCatch( + findOrCreateBackgroundWorker(environment, deployment, body, this._prisma) + ); + + if (findOrCreateError) { + // Definitive failures (e.g. contentHash drift) surface as + // `ServiceValidationError` — fail the deployment so the operator sees it + // immediately instead of waiting 8 minutes for the timeout. Transient + // races throw a plain `Error` and propagate as 5xx without failing. + if (findOrCreateError instanceof ServiceValidationError) { + // `#failBackgroundWorkerDeployment` already throws its argument; the + // outer `throw` covers the non-SVE branch. + await this.#failBackgroundWorkerDeployment(deployment, findOrCreateError); + } + throw findOrCreateError; + } //upgrade the project to engine "V2" if it's not already if (environment.project.engine === "V1" && body.engine === "V2") { @@ -188,10 +214,11 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { throw serviceError; } - // Link the deployment with the background worker - await this._prisma.workerDeployment.update({ + // Guarded BUILDING → DEPLOYING transition. `updateMany` for optimistic concurrency control + const { count: updatedCount } = await this._prisma.workerDeployment.updateMany({ where: { id: deployment.id, + status: "BUILDING", }, data: { status: "DEPLOYING", @@ -203,6 +230,18 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { }, }); + if (updatedCount === 0) { + logger.warn( + "createDeploymentBackgroundWorker: deployment no longer in BUILDING state, skipping DEPLOYING transition", + { + deploymentId, + environmentId: environment.id, + projectId: environment.projectId, + } + ); + return backgroundWorker; + } + await TimeoutDeploymentService.enqueue( deployment.id, "DEPLOYING", @@ -215,9 +254,14 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { } async #failBackgroundWorkerDeployment(deployment: WorkerDeployment, error: Error) { - await this._prisma.workerDeployment.update({ + // Guarded BUILDING → FAILED transition, symmetric with the BUILDING → DEPLOYING + // transition in `call()`. With idempotent retries, two attempts can run side-by-side; + // without the predicate, one attempt's failure could downgrade the deployment after + // the other already flipped it to DEPLOYING, leaving it stuck in FAILED with a worker. + const { count: updatedCount } = await this._prisma.workerDeployment.updateMany({ where: { id: deployment.id, + status: "BUILDING", }, data: { status: "FAILED", @@ -229,7 +273,20 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService { }, }); - await TimeoutDeploymentService.dequeue(deployment.id, this._prisma); + if (updatedCount === 0) { + logger.warn( + "failBackgroundWorkerDeployment: deployment moved out of BUILDING during call, skipping FAILED transition", + { + deploymentId: deployment.id, + originalError: error.message, + } + ); + } else { + // Only dequeue the timeout if we actually flipped to FAILED — otherwise a + // sibling attempt may have just enqueued it as part of a successful + // BUILDING → DEPLOYING transition. + await TimeoutDeploymentService.dequeue(deployment.id, this._prisma); + } throw error; } diff --git a/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4/findOrCreateBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4/findOrCreateBackgroundWorker.server.ts new file mode 100644 index 0000000000..7a2dcb06c3 --- /dev/null +++ b/apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4/findOrCreateBackgroundWorker.server.ts @@ -0,0 +1,81 @@ +import type { CreateBackgroundWorkerRequestBody } from "@trigger.dev/core/v3"; +import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic"; +import { + isUniqueConstraintError, + type BackgroundWorker, + type PrismaClientOrTransaction, + type WorkerDeployment, +} from "@trigger.dev/database"; +import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { ServiceValidationError } from "../common.server"; +import { stripBackgroundWorkerMetadataForStorage } from "../stripBackgroundWorkerMetadataForStorage.server"; + +/** + * Idempotent on `(project, environment, version)` for sequential calls, not concurrent calls. + * + * Failure shapes the caller distinguishes: + * - `ServiceValidationError` (409): definitive — contentHash drift means a different build + * is being pushed under the same deployment version, so the caller should fail the + * deployment instead of waiting for a timeout. + * - Plain `Error`: transient — two attempts raced the `create()` call and the loser caught + * the unique-index violation. The caller should propagate this as 5xx so the CLI's + * retry/backoff hits findFirst on the next attempt and returns the winner's row. + */ +export async function findOrCreateBackgroundWorker( + environment: AuthenticatedEnvironment, + deployment: WorkerDeployment, + body: CreateBackgroundWorkerRequestBody, + prisma: PrismaClientOrTransaction +): Promise { + const existing = await prisma.backgroundWorker.findFirst({ + where: { + projectId: environment.projectId, + runtimeEnvironmentId: environment.id, + version: deployment.version, + }, + }); + + if (existing && existing.contentHash === body.metadata.contentHash) { + return existing; + } + + if (existing) { + throw new ServiceValidationError( + "A background worker for this deployment version already exists with a different content hash", + 409 + ); + } + + try { + return await prisma.backgroundWorker.create({ + data: { + ...BackgroundWorkerId.generate(), + version: deployment.version, + runtimeEnvironmentId: environment.id, + projectId: environment.projectId, + metadata: stripBackgroundWorkerMetadataForStorage(body.metadata), + contentHash: body.metadata.contentHash, + cliVersion: body.metadata.cliPackageVersion, + sdkVersion: body.metadata.packageVersion, + supportsLazyAttempts: body.supportsLazyAttempts, + engine: body.engine, + runtime: body.metadata.runtime, + runtimeVersion: body.metadata.runtimeVersion, + }, + }); + } catch (error) { + // Concurrent attempts raced past `findFirst` and both reached `create`. Surface + // a clear, non-Prisma error so the 5xx the caller returns isn't an opaque + // P2002 — the CLI's retry will then hit `findFirst` and find the winner's row. + // Intentionally NOT a ServiceValidationError so the caller doesn't fail-deploy + // on a transient race. + if ( + isUniqueConstraintError(error, ["projectId", "runtimeEnvironmentId", "version"]) + ) { + throw new Error( + "Concurrent background worker registration detected for this deployment version; please retry" + ); + } + throw error; + } +} diff --git a/apps/webapp/app/v3/services/stripBackgroundWorkerMetadataForStorage.server.ts b/apps/webapp/app/v3/services/stripBackgroundWorkerMetadataForStorage.server.ts new file mode 100644 index 0000000000..a740bd2c1d --- /dev/null +++ b/apps/webapp/app/v3/services/stripBackgroundWorkerMetadataForStorage.server.ts @@ -0,0 +1,28 @@ +import { BackgroundWorkerMetadata } from "@trigger.dev/core/v3"; +import { Prisma } from "@trigger.dev/database"; + +/** + * Strip BackgroundWorkerMetadata down to the slice that's actually read after + * storage. Everything else is duplicated to dedicated columns/tables + * (BackgroundWorker.{contentHash,cliVersion,sdkVersion,runtime,runtimeVersion}, + * BackgroundWorkerTask, BackgroundWorkerFile, TaskQueue, Prompt). Today the + * only post-write reader is changeCurrentDeployment.server.ts, which feeds + * tasks[].schedule into syncDeclarativeSchedules. packageVersion, contentHash, + * and tasks[].filePath are kept solely to satisfy BackgroundWorkerMetadata's + * required fields when the column is parsed back. + */ +export function stripBackgroundWorkerMetadataForStorage( + metadata: BackgroundWorkerMetadata +): Prisma.InputJsonValue { + return { + packageVersion: metadata.packageVersion, + contentHash: metadata.contentHash, + tasks: metadata.tasks + .filter((t) => t.schedule) + .map((t) => ({ + id: t.id, + filePath: t.filePath, + schedule: t.schedule, + })), + }; +} diff --git a/apps/webapp/test/findOrCreateBackgroundWorker.test.ts b/apps/webapp/test/findOrCreateBackgroundWorker.test.ts new file mode 100644 index 0000000000..8b047001b8 --- /dev/null +++ b/apps/webapp/test/findOrCreateBackgroundWorker.test.ts @@ -0,0 +1,183 @@ +import { containerTest } from "@internal/testcontainers"; +import { CreateBackgroundWorkerRequestBody } from "@trigger.dev/core/v3"; +import { PrismaClient } from "@trigger.dev/database"; +import { describe, expect, vi } from "vitest"; +import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { ServiceValidationError } from "~/v3/services/common.server"; +import { findOrCreateBackgroundWorker } from "~/v3/services/createDeploymentBackgroundWorkerV4/findOrCreateBackgroundWorker.server"; + +vi.setConfig({ testTimeout: 30_000 }); + +async function seedDeployment(prisma: PrismaClient, version = "20260528.1") { + const slug = `s${Math.random().toString(36).slice(2, 10)}`; + const organization = await prisma.organization.create({ + data: { title: slug, slug }, + }); + + const project = await prisma.project.create({ + data: { + name: slug, + slug, + organizationId: organization.id, + externalRef: slug, + }, + }); + + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug, + type: "PRODUCTION", + projectId: project.id, + organizationId: organization.id, + apiKey: slug, + pkApiKey: slug, + shortcode: slug, + }, + }); + + const deployment = await prisma.workerDeployment.create({ + data: { + friendlyId: `deployment_${slug}`, + shortCode: slug, + contentHash: "h_initial", + status: "BUILDING", + version, + projectId: project.id, + environmentId: environment.id, + }, + }); + + // The helper only reads `id` and `projectId`; the rest of the type is + // unused here so we cast a partial. + const authEnv = { id: environment.id, projectId: project.id } as AuthenticatedEnvironment; + + return { project, environment, deployment, authEnv }; +} + +function bodyWithHash(contentHash: string): CreateBackgroundWorkerRequestBody { + return { + localOnly: false, + metadata: { + contentHash, + packageVersion: "0.0.0", + cliPackageVersion: "0.0.0", + tasks: [], + queues: [], + sourceFiles: [], + runtime: "node", + runtimeVersion: "21.0.0", + }, + engine: "V2", + supportsLazyAttempts: true, + }; +} + +describe("findOrCreateBackgroundWorker", () => { + containerTest("creates a new BackgroundWorker on the first call", async ({ prisma }) => { + const { authEnv, deployment } = await seedDeployment(prisma); + + const worker = await findOrCreateBackgroundWorker( + authEnv, + deployment, + bodyWithHash("h1"), + prisma + ); + + expect(worker.projectId).toBe(authEnv.projectId); + expect(worker.runtimeEnvironmentId).toBe(authEnv.id); + expect(worker.version).toBe(deployment.version); + expect(worker.contentHash).toBe("h1"); + + const rowCount = await prisma.backgroundWorker.count({ + where: { projectId: authEnv.projectId, version: deployment.version }, + }); + expect(rowCount).toBe(1); + }); + + containerTest( + "returns the existing row on a second call with the same contentHash (no duplicate)", + async ({ prisma }) => { + const { authEnv, deployment } = await seedDeployment(prisma); + + const first = await findOrCreateBackgroundWorker( + authEnv, + deployment, + bodyWithHash("h1"), + prisma + ); + const second = await findOrCreateBackgroundWorker( + authEnv, + deployment, + bodyWithHash("h1"), + prisma + ); + + expect(second.id).toBe(first.id); + + const rowCount = await prisma.backgroundWorker.count({ + where: { projectId: authEnv.projectId, version: deployment.version }, + }); + expect(rowCount).toBe(1); + } + ); + + containerTest( + "throws 409 ServiceValidationError when an existing row has a different contentHash", + async ({ prisma }) => { + const { authEnv, deployment } = await seedDeployment(prisma); + + await findOrCreateBackgroundWorker(authEnv, deployment, bodyWithHash("h1"), prisma); + + await expect( + findOrCreateBackgroundWorker(authEnv, deployment, bodyWithHash("h2"), prisma) + ).rejects.toMatchObject({ + name: "ServiceValidationError", + status: 409, + }); + + // Also assert the constructor so callers can `catch (e instanceof ServiceValidationError)`. + await expect( + findOrCreateBackgroundWorker(authEnv, deployment, bodyWithHash("h2"), prisma) + ).rejects.toBeInstanceOf(ServiceValidationError); + } + ); + + containerTest( + "concurrent create race surfaces as plain Error (not ServiceValidationError)", + async ({ prisma }) => { + // The class distinction matters: the V4 service uses `instanceof ServiceValidationError` + // to decide whether to fail-deploy. A transient race must not fail-deploy. + const { authEnv, deployment } = await seedDeployment(prisma); + + const [first, second] = await Promise.allSettled([ + findOrCreateBackgroundWorker(authEnv, deployment, bodyWithHash("h-race"), prisma), + findOrCreateBackgroundWorker(authEnv, deployment, bodyWithHash("h-race"), prisma), + ]); + + const fulfilled = [first, second].filter((r) => r.status === "fulfilled"); + const rejected = [first, second].filter( + (r): r is PromiseRejectedResult => r.status === "rejected" + ); + + // Exactly one row in the database regardless of who won. + const rowCount = await prisma.backgroundWorker.count({ + where: { projectId: authEnv.projectId, version: deployment.version }, + }); + expect(rowCount).toBe(1); + + // If the schedule produced an actual race (one wins, one loses), the loser + // must surface a non-SVE error. If the schedule serialised them by accident, + // both fulfilled is also acceptable — this test is about the error *class*, + // not the rate of races. + if (rejected.length > 0) { + expect(fulfilled).toHaveLength(1); + for (const r of rejected) { + expect(r.reason).not.toBeInstanceOf(ServiceValidationError); + expect((r.reason as Error).message).toMatch(/concurrent/i); + } + } else { + expect(fulfilled).toHaveLength(2); + } + } + ); +});