diff --git a/.server-changes/v3-engine-retirement-messaging.md b/.server-changes/v3-engine-retirement-messaging.md new file mode 100644 index 0000000000..ca7aceb472 --- /dev/null +++ b/.server-changes/v3-engine-retirement-messaging.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +When the v3 engine is retired, triggering a v3 task and connecting the v3 dev CLI now fail with a clear message pointing to the v4 migration guide instead of failing opaquely. Enforcement is off by default, so self-hosted instances still running v3 are unaffected until they migrate. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index f793654dd3..c04a52cff2 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -535,6 +535,16 @@ const EnvironmentSchema = z // log-only mode before enforcement. DEPRECATE_V3_CLI_DEPLOYS_ENABLED: z.string().default("0"), + // Master switch for the v3 engine (RunEngineVersion.V1) shutdown. When + // enabled it: rejects triggers that resolve to V1 (single, batch, schedule, + // replay, triggerAndWait) with a graceful error pointing at the v4 migration + // guide; closes the legacy `trigger dev` websocket used by v3 CLIs; and turns + // the V1 run-lifecycle background jobs (heartbeat timeout, TTL expiry, retry, + // resume, scheduled fires) into no-ops so abandoned V1 runs stop generating + // database load. v4 (V2) is never affected (every gate also checks the run is + // V1). Defaults to off so self-hosted instances still on V1 keep working. + DEPRECATE_V3_ENABLED: z.string().default("0"), + OBJECT_STORE_BASE_URL: z.string().optional(), OBJECT_STORE_BUCKET: z.string().optional(), OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(), diff --git a/apps/webapp/app/v3/engineDeprecation.server.ts b/apps/webapp/app/v3/engineDeprecation.server.ts new file mode 100644 index 0000000000..7e7dad9654 --- /dev/null +++ b/apps/webapp/app/v3/engineDeprecation.server.ts @@ -0,0 +1,34 @@ +import { env } from "~/env.server"; + +/** + * Graceful sunset of the v3 engine (RunEngineVersion.V1). + * + * v3 maps to engine V1 (MarQS + Graphile); v4 is engine V2 (run-engine). A + * single master flag (DEPRECATE_V3_ENABLED, default off) gates every shutdown + * behaviour so the cloud can flip the switch while self-hosted instances still + * on V1 keep working until they migrate. This mirrors + * DEPRECATE_V3_CLI_DEPLOYS_ENABLED, which already gates deploys. + * + * The flag controls three surfaces: + * 1. Triggers that resolve to V1 are rejected with a graceful error. + * 2. The legacy `trigger dev` websocket (v3 CLIs only) is closed. + * 3. V1 run-lifecycle background jobs become no-ops to shed database load. + * + * Every call site also checks the run/project is actually V1, so v4 (V2) is + * never affected. + */ + +export const V3_MIGRATION_URL = "https://trigger.dev/docs/migrating-from-v3"; + +export const V3_TRIGGER_DEPRECATION_MESSAGE = `Trigger.dev v3 is no longer supported. Please upgrade your project to v4 to keep triggering tasks: ${V3_MIGRATION_URL}`; + +// Sent as a websocket close reason, which is capped at 123 bytes, so keep it short. +export const V3_DEV_DEPRECATION_MESSAGE = `Trigger.dev v3 is no longer supported. Upgrade to v4: ${V3_MIGRATION_URL}`; + +/** + * Whether the v3 (engine V1) shutdown is being enforced. Guard every V1-only + * code path with `isV3Disabled() && ` so v4 is untouched. + */ +export function isV3Disabled(): boolean { + return env.DEPRECATE_V3_ENABLED === "1"; +} diff --git a/apps/webapp/app/v3/handleSocketIo.server.ts b/apps/webapp/app/v3/handleSocketIo.server.ts index aec9ded6f4..2e7fd211bb 100644 --- a/apps/webapp/app/v3/handleSocketIo.server.ts +++ b/apps/webapp/app/v3/handleSocketIo.server.ts @@ -36,6 +36,7 @@ import { ResumeAttemptService } from "./services/resumeAttempt.server"; import { UpdateFatalRunErrorService } from "./services/updateFatalRunError.server"; import { WorkerGroupTokenService } from "./services/worker/workerGroupTokenService.server"; import { SharedSocketConnection } from "./sharedSocketConnection"; +import { isV3Disabled } from "./engineDeprecation.server"; export const socketIo = singleton("socketIo", initalizeIoServer); @@ -426,6 +427,16 @@ function createSharedQueueConsumerNamespace(io: Server) { clientMessages: ClientToSharedQueueMessages, serverMessages: SharedQueueToClientMessages, onConnection: async (socket, handler, sender, logger) => { + // v3 (engine V1) shutdown: don't start the MarQS shared-queue consumer, so no + // deployed V1 runs are dequeued. This namespace is V1-only; v4 dequeues through + // the run-engine worker path. This is the code-level equivalent of taking the + // v3 coordinator offline. + if (isV3Disabled()) { + logger.warn("Refusing /shared-queue connection: v3 engine is shut down"); + socket.disconnect(true); + return; + } + const sharedSocketConnection = new SharedSocketConnection({ // @ts-ignore - for some reason the built ZodNamespace Server type is not compatible with the Server type here, but only when doing typechecking namespace: sharedQueue.namespace, diff --git a/apps/webapp/app/v3/handleWebsockets.server.ts b/apps/webapp/app/v3/handleWebsockets.server.ts index 2e3b3c05ec..ee7c97d079 100644 --- a/apps/webapp/app/v3/handleWebsockets.server.ts +++ b/apps/webapp/app/v3/handleWebsockets.server.ts @@ -6,6 +6,7 @@ import { singleton } from "../utils/singleton"; import { AuthenticatedSocketConnection } from "./authenticatedSocketConnection.server"; import { Gauge } from "prom-client"; import { metricsRegister } from "~/metrics.server"; +import { isV3Disabled, V3_DEV_DEPRECATION_MESSAGE } from "./engineDeprecation.server"; export const wss = singleton("wss", initalizeWebSocketServer); @@ -58,6 +59,19 @@ async function handleWebSocketConnection(ws: WebSocket, req: IncomingMessage) { const authenticatedEnv = authenticationResult.environment; + // This legacy websocket is only used by the v3 `trigger dev` CLI (v4 uses a + // different dev transport). When the v3 shutdown is on, close it with a + // graceful reason instead of letting the CLI sit connected with no work. + if (isV3Disabled()) { + logger.warn("Rejected deprecated v3 dev CLI websocket connection", { + environmentId: authenticatedEnv.id, + projectId: authenticatedEnv.projectId, + organizationId: authenticatedEnv.organizationId, + }); + ws.close(1008, V3_DEV_DEPRECATION_MESSAGE); + return; + } + const authenticatedConnection = new AuthenticatedSocketConnection( ws, authenticatedEnv, diff --git a/apps/webapp/app/v3/scheduleEngine.server.ts b/apps/webapp/app/v3/scheduleEngine.server.ts index 4a12046a89..1c0040fa11 100644 --- a/apps/webapp/app/v3/scheduleEngine.server.ts +++ b/apps/webapp/app/v3/scheduleEngine.server.ts @@ -10,6 +10,7 @@ import { TriggerTaskService } from "./services/triggerTask.server"; import { meter, tracer } from "./tracer.server"; import { workerQueue } from "~/services/worker.server"; import { ServiceValidationError } from "./services/common.server"; +import { isV3Disabled } from "./engineDeprecation.server"; export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine); @@ -84,6 +85,18 @@ function createScheduleEngine() { exactScheduleTime, }) => { try { + // v3 (engine V1) shutdown: skip firing schedules for V1 projects so the + // cron doesn't keep doing trigger work just to be rejected. Return success + // so the schedule engine treats it as handled and doesn't retry. v4 is + // unaffected. + if (isV3Disabled() && environment.project.engine === "V1") { + logger.debug("[ScheduleEngine] Skipping scheduled fire for shut-down v3 project", { + taskIdentifier, + scheduleId, + }); + return { success: true }; + } + // This will trigger either v1 or v2 depending on the engine of the project const triggerService = new TriggerTaskService(); diff --git a/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts b/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts index 79cb4fb097..2e9d86916c 100644 --- a/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts +++ b/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts @@ -5,6 +5,7 @@ import { commonWorker } from "../commonWorker.server"; import { BaseService } from "./baseService.server"; import { enqueueRun } from "./enqueueRun.server"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; +import { isV3Disabled } from "../engineDeprecation.server"; export class EnqueueDelayedRunService extends BaseService { public static async enqueue(runId: string, runAt?: Date) { @@ -75,6 +76,12 @@ export class EnqueueDelayedRunService extends BaseService { return; } + // v3 (engine V1) shutdown: don't enqueue delayed V1 runs into MarQS. v4 is unaffected. + if (isV3Disabled() && run.engine === "V1") { + logger.debug("[EnqueueDelayedRunService] Skipping enqueue for shut-down v3 run", { runId }); + return; + } + if (run.status !== "DELAYED") { logger.debug("Delayed run cannot be enqueued because it's not in DELAYED status", { run, diff --git a/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts b/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts index 12ccddbf2e..27a8e1e8b0 100644 --- a/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts +++ b/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts @@ -5,6 +5,7 @@ import { BaseService } from "./baseService.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { tryCatch } from "@trigger.dev/core/utils"; import { getEventRepositoryForStore } from "../eventRepository/index.server"; +import { isV3Disabled } from "../engineDeprecation.server"; export class ExpireEnqueuedRunService extends BaseService { public static async ack(runId: string, tx?: PrismaClientOrTransaction) { @@ -48,6 +49,12 @@ export class ExpireEnqueuedRunService extends BaseService { return; } + // v3 (engine V1) shutdown: skip expiring abandoned V1 runs. v4 is unaffected. + if (isV3Disabled() && run.engine === "V1") { + logger.debug("[ExpireEnqueuedRunService] Skipping expiry for shut-down v3 run", { runId }); + return; + } + if (run.status !== "PENDING") { logger.debug("Run cannot be expired because it's not in PENDING status", { run, diff --git a/apps/webapp/app/v3/services/resumeBatchRun.server.ts b/apps/webapp/app/v3/services/resumeBatchRun.server.ts index d9f5170fc2..a886d12d60 100644 --- a/apps/webapp/app/v3/services/resumeBatchRun.server.ts +++ b/apps/webapp/app/v3/services/resumeBatchRun.server.ts @@ -5,6 +5,7 @@ import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; import { BatchTaskRun } from "@trigger.dev/database"; import { workerQueue } from "~/services/worker.server"; +import { isV3Disabled } from "../engineDeprecation.server"; const finishedBatchRunStatuses = ["COMPLETED", "FAILED", "CANCELED"]; @@ -43,6 +44,14 @@ export class ResumeBatchRunService extends BaseService { return "ERROR"; } + // v3 (engine V1) shutdown: don't resume batches for abandoned V1 projects. v4 is unaffected. + if (isV3Disabled() && batchRun.runtimeEnvironment.project.engine === "V1") { + logger.debug("[ResumeBatchRunService] Skipping resume for shut-down v3 batch", { + batchRunId, + }); + return "ERROR"; + } + if (batchRun.batchVersion === "v3") { return await this.#handleV3BatchRun(batchRun); } else { diff --git a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts index a831fdd78c..70f6e1e3cd 100644 --- a/apps/webapp/app/v3/services/resumeTaskDependency.server.ts +++ b/apps/webapp/app/v3/services/resumeTaskDependency.server.ts @@ -3,6 +3,7 @@ import { logger } from "~/services/logger.server"; import { marqs } from "~/v3/marqs/index.server"; import { commonWorker } from "../commonWorker.server"; import { BaseService } from "./baseService.server"; +import { isV3Disabled } from "../engineDeprecation.server"; export class ResumeTaskDependencyService extends BaseService { public async call(dependencyId: string, sourceTaskAttemptId: string) { @@ -32,6 +33,14 @@ export class ResumeTaskDependencyService extends BaseService { return; } + // v3 (engine V1) shutdown: don't resume dependencies for abandoned V1 runs. v4 is unaffected. + if (isV3Disabled() && dependency.taskRun.engine === "V1") { + logger.debug("[ResumeTaskDependencyService] Skipping resume for shut-down v3 run", { + dependencyId, + }); + return; + } + if (dependency.taskRun.runtimeEnvironment.type === "DEVELOPMENT") { return; } diff --git a/apps/webapp/app/v3/services/retryAttempt.server.ts b/apps/webapp/app/v3/services/retryAttempt.server.ts index 6ed83c1080..dd1f8bf908 100644 --- a/apps/webapp/app/v3/services/retryAttempt.server.ts +++ b/apps/webapp/app/v3/services/retryAttempt.server.ts @@ -2,6 +2,7 @@ import { logger } from "~/services/logger.server"; import { commonWorker } from "../commonWorker.server"; import { socketIo } from "../handleSocketIo.server"; import { BaseService } from "./baseService.server"; +import { isV3Disabled } from "../engineDeprecation.server"; export class RetryAttemptService extends BaseService { public async call(runId: string) { @@ -12,6 +13,12 @@ export class RetryAttemptService extends BaseService { return; } + // v3 (engine V1) shutdown: don't retry abandoned V1 runs. v4 is unaffected. + if (isV3Disabled() && taskRun.engine === "V1") { + logger.debug("[RetryAttemptService] Skipping retry for shut-down v3 run", { runId }); + return; + } + socketIo.coordinatorNamespace.emit("READY_FOR_RETRY", { version: "v1", runId, diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 7bbaa0dd99..c68171bb34 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -10,7 +10,8 @@ import { DefaultTriggerTaskValidator } from "~/runEngine/validators/triggerTaskV import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { determineEngineVersion } from "../engineVersion.server"; import { tracer } from "../tracer.server"; -import { WithRunEngine } from "./baseService.server"; +import { isV3Disabled, V3_TRIGGER_DEPRECATION_MESSAGE } from "../engineDeprecation.server"; +import { ServiceValidationError, WithRunEngine } from "./baseService.server"; import { TriggerTaskServiceV1 } from "./triggerTaskV1.server"; export type TriggerTaskServiceOptions = { @@ -77,6 +78,14 @@ export class TriggerTaskService extends WithRunEngine { switch (v) { case "V1": { + // v3 (engine V1) is being sunset. When the shutdown is on, reject the + // trigger with a graceful, actionable error instead of creating a V1 + // run. Covers single, batch, schedule, replay, and triggerAndWait, + // which all route through here. + if (isV3Disabled()) { + throw new ServiceValidationError(V3_TRIGGER_DEPRECATION_MESSAGE); + } + return await this.callV1(taskId, environment, body, options); } case "V2": { diff --git a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts index c472bff53e..c211b191ea 100644 --- a/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts +++ b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts @@ -8,6 +8,7 @@ import { PrismaClientOrTransaction } from "~/db.server"; import { workerQueue } from "~/services/worker.server"; import { socketIo } from "./handleSocketIo.server"; import { TaskRunErrorCodes } from "@trigger.dev/core/v3"; +import { isV3Disabled } from "./engineDeprecation.server"; export class TaskRunHeartbeatFailedService extends BaseService { public async call(runId: string) { @@ -18,6 +19,7 @@ export class TaskRunHeartbeatFailedService extends BaseService { { select: { id: true, + engine: true, friendlyId: true, status: true, lockedAt: true, @@ -49,6 +51,15 @@ export class TaskRunHeartbeatFailedService extends BaseService { return; } + // v3 (engine V1) shutdown: leave abandoned V1 runs as-is instead of doing + // MarQS/DB work to fail or requeue them. v4 (V2) is unaffected. + if (isV3Disabled() && taskRun.engine === "V1") { + logger.debug("[TaskRunHeartbeatFailedService] Skipping heartbeat for shut-down v3 run", { + runId, + }); + return; + } + const service = new FailedTaskRunService(); switch (taskRun.status) {