Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/v3-engine-retirement-messaging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

When the v3 engine is retired, triggering a v3 task and connecting the v3 dev CLI now fail with a clear message pointing to the v4 migration guide instead of failing opaquely. Enforcement is off by default, so self-hosted instances still running v3 are unaffected until they migrate.
10 changes: 10 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,16 @@ const EnvironmentSchema = z
// log-only mode before enforcement.
DEPRECATE_V3_CLI_DEPLOYS_ENABLED: z.string().default("0"),

// Master switch for the v3 engine (RunEngineVersion.V1) shutdown. When
// enabled it: rejects triggers that resolve to V1 (single, batch, schedule,
// replay, triggerAndWait) with a graceful error pointing at the v4 migration
// guide; closes the legacy `trigger dev` websocket used by v3 CLIs; and turns
// the V1 run-lifecycle background jobs (heartbeat timeout, TTL expiry, retry,
// resume, scheduled fires) into no-ops so abandoned V1 runs stop generating
// database load. v4 (V2) is never affected (every gate also checks the run is
// V1). Defaults to off so self-hosted instances still on V1 keep working.
DEPRECATE_V3_ENABLED: z.string().default("0"),

OBJECT_STORE_BASE_URL: z.string().optional(),
OBJECT_STORE_BUCKET: z.string().optional(),
OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
Expand Down
34 changes: 34 additions & 0 deletions apps/webapp/app/v3/engineDeprecation.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { env } from "~/env.server";

/**
* Graceful sunset of the v3 engine (RunEngineVersion.V1).
*
* v3 maps to engine V1 (MarQS + Graphile); v4 is engine V2 (run-engine). A
* single master flag (DEPRECATE_V3_ENABLED, default off) gates every shutdown
* behaviour so the cloud can flip the switch while self-hosted instances still
* on V1 keep working until they migrate. This mirrors
* DEPRECATE_V3_CLI_DEPLOYS_ENABLED, which already gates deploys.
*
* The flag controls three surfaces:
* 1. Triggers that resolve to V1 are rejected with a graceful error.
* 2. The legacy `trigger dev` websocket (v3 CLIs only) is closed.
* 3. V1 run-lifecycle background jobs become no-ops to shed database load.
*
* Every call site also checks the run/project is actually V1, so v4 (V2) is
* never affected.
*/

export const V3_MIGRATION_URL = "https://trigger.dev/docs/migrating-from-v3";

export const V3_TRIGGER_DEPRECATION_MESSAGE = `Trigger.dev v3 is no longer supported. Please upgrade your project to v4 to keep triggering tasks: ${V3_MIGRATION_URL}`;

// Sent as a websocket close reason, which is capped at 123 bytes, so keep it short.
export const V3_DEV_DEPRECATION_MESSAGE = `Trigger.dev v3 is no longer supported. Upgrade to v4: ${V3_MIGRATION_URL}`;

/**
* Whether the v3 (engine V1) shutdown is being enforced. Guard every V1-only
* code path with `isV3Disabled() && <run/project is V1>` so v4 is untouched.
*/
export function isV3Disabled(): boolean {
return env.DEPRECATE_V3_ENABLED === "1";
}
11 changes: 11 additions & 0 deletions apps/webapp/app/v3/handleSocketIo.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { ResumeAttemptService } from "./services/resumeAttempt.server";
import { UpdateFatalRunErrorService } from "./services/updateFatalRunError.server";
import { WorkerGroupTokenService } from "./services/worker/workerGroupTokenService.server";
import { SharedSocketConnection } from "./sharedSocketConnection";
import { isV3Disabled } from "./engineDeprecation.server";

export const socketIo = singleton("socketIo", initalizeIoServer);

Expand Down Expand Up @@ -426,6 +427,16 @@ function createSharedQueueConsumerNamespace(io: Server) {
clientMessages: ClientToSharedQueueMessages,
serverMessages: SharedQueueToClientMessages,
onConnection: async (socket, handler, sender, logger) => {
// v3 (engine V1) shutdown: don't start the MarQS shared-queue consumer, so no
// deployed V1 runs are dequeued. This namespace is V1-only; v4 dequeues through
// the run-engine worker path. This is the code-level equivalent of taking the
// v3 coordinator offline.
if (isV3Disabled()) {
logger.warn("Refusing /shared-queue connection: v3 engine is shut down");
socket.disconnect(true);
return;
}

const sharedSocketConnection = new SharedSocketConnection({
// @ts-ignore - for some reason the built ZodNamespace Server type is not compatible with the Server type here, but only when doing typechecking
namespace: sharedQueue.namespace,
Expand Down
14 changes: 14 additions & 0 deletions apps/webapp/app/v3/handleWebsockets.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { singleton } from "../utils/singleton";
import { AuthenticatedSocketConnection } from "./authenticatedSocketConnection.server";
import { Gauge } from "prom-client";
import { metricsRegister } from "~/metrics.server";
import { isV3Disabled, V3_DEV_DEPRECATION_MESSAGE } from "./engineDeprecation.server";

export const wss = singleton("wss", initalizeWebSocketServer);

Expand Down Expand Up @@ -58,6 +59,19 @@ async function handleWebSocketConnection(ws: WebSocket, req: IncomingMessage) {

const authenticatedEnv = authenticationResult.environment;

// This legacy websocket is only used by the v3 `trigger dev` CLI (v4 uses a
// different dev transport). When the v3 shutdown is on, close it with a
// graceful reason instead of letting the CLI sit connected with no work.
if (isV3Disabled()) {
logger.warn("Rejected deprecated v3 dev CLI websocket connection", {
environmentId: authenticatedEnv.id,
projectId: authenticatedEnv.projectId,
organizationId: authenticatedEnv.organizationId,
});
ws.close(1008, V3_DEV_DEPRECATION_MESSAGE);
return;
}

const authenticatedConnection = new AuthenticatedSocketConnection(
ws,
authenticatedEnv,
Expand Down
13 changes: 13 additions & 0 deletions apps/webapp/app/v3/scheduleEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { TriggerTaskService } from "./services/triggerTask.server";
import { meter, tracer } from "./tracer.server";
import { workerQueue } from "~/services/worker.server";
import { ServiceValidationError } from "./services/common.server";
import { isV3Disabled } from "./engineDeprecation.server";

export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine);

Expand Down Expand Up @@ -84,6 +85,18 @@ function createScheduleEngine() {
exactScheduleTime,
}) => {
try {
// v3 (engine V1) shutdown: skip firing schedules for V1 projects so the
// cron doesn't keep doing trigger work just to be rejected. Return success
// so the schedule engine treats it as handled and doesn't retry. v4 is
// unaffected.
if (isV3Disabled() && environment.project.engine === "V1") {
logger.debug("[ScheduleEngine] Skipping scheduled fire for shut-down v3 project", {
taskIdentifier,
scheduleId,
});
return { success: true };
}

// This will trigger either v1 or v2 depending on the engine of the project
const triggerService = new TriggerTaskService();

Expand Down
7 changes: 7 additions & 0 deletions apps/webapp/app/v3/services/enqueueDelayedRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { commonWorker } from "../commonWorker.server";
import { BaseService } from "./baseService.server";
import { enqueueRun } from "./enqueueRun.server";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
import { isV3Disabled } from "../engineDeprecation.server";

export class EnqueueDelayedRunService extends BaseService {
public static async enqueue(runId: string, runAt?: Date) {
Expand Down Expand Up @@ -75,6 +76,12 @@ export class EnqueueDelayedRunService extends BaseService {
return;
}

// v3 (engine V1) shutdown: don't enqueue delayed V1 runs into MarQS. v4 is unaffected.
if (isV3Disabled() && run.engine === "V1") {
logger.debug("[EnqueueDelayedRunService] Skipping enqueue for shut-down v3 run", { runId });
return;
}

if (run.status !== "DELAYED") {
logger.debug("Delayed run cannot be enqueued because it's not in DELAYED status", {
run,
Expand Down
7 changes: 7 additions & 0 deletions apps/webapp/app/v3/services/expireEnqueuedRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { BaseService } from "./baseService.server";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
import { tryCatch } from "@trigger.dev/core/utils";
import { getEventRepositoryForStore } from "../eventRepository/index.server";
import { isV3Disabled } from "../engineDeprecation.server";

export class ExpireEnqueuedRunService extends BaseService {
public static async ack(runId: string, tx?: PrismaClientOrTransaction) {
Expand Down Expand Up @@ -48,6 +49,12 @@ export class ExpireEnqueuedRunService extends BaseService {
return;
}

// v3 (engine V1) shutdown: skip expiring abandoned V1 runs. v4 is unaffected.
if (isV3Disabled() && run.engine === "V1") {
logger.debug("[ExpireEnqueuedRunService] Skipping expiry for shut-down v3 run", { runId });
return;
}

if (run.status !== "PENDING") {
logger.debug("Run cannot be expired because it's not in PENDING status", {
run,
Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/app/v3/services/resumeBatchRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { BaseService } from "./baseService.server";
import { logger } from "~/services/logger.server";
import { BatchTaskRun } from "@trigger.dev/database";
import { workerQueue } from "~/services/worker.server";
import { isV3Disabled } from "../engineDeprecation.server";

const finishedBatchRunStatuses = ["COMPLETED", "FAILED", "CANCELED"];

Expand Down Expand Up @@ -43,6 +44,14 @@ export class ResumeBatchRunService extends BaseService {
return "ERROR";
}

// v3 (engine V1) shutdown: don't resume batches for abandoned V1 projects. v4 is unaffected.
if (isV3Disabled() && batchRun.runtimeEnvironment.project.engine === "V1") {
logger.debug("[ResumeBatchRunService] Skipping resume for shut-down v3 batch", {
batchRunId,
});
return "ERROR";
}

if (batchRun.batchVersion === "v3") {
return await this.#handleV3BatchRun(batchRun);
} else {
Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/app/v3/services/resumeTaskDependency.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { commonWorker } from "../commonWorker.server";
import { BaseService } from "./baseService.server";
import { isV3Disabled } from "../engineDeprecation.server";

export class ResumeTaskDependencyService extends BaseService {
public async call(dependencyId: string, sourceTaskAttemptId: string) {
Expand Down Expand Up @@ -32,6 +33,14 @@ export class ResumeTaskDependencyService extends BaseService {
return;
}

// v3 (engine V1) shutdown: don't resume dependencies for abandoned V1 runs. v4 is unaffected.
if (isV3Disabled() && dependency.taskRun.engine === "V1") {
logger.debug("[ResumeTaskDependencyService] Skipping resume for shut-down v3 run", {
dependencyId,
});
return;
}

if (dependency.taskRun.runtimeEnvironment.type === "DEVELOPMENT") {
return;
}
Expand Down
7 changes: 7 additions & 0 deletions apps/webapp/app/v3/services/retryAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { logger } from "~/services/logger.server";
import { commonWorker } from "../commonWorker.server";
import { socketIo } from "../handleSocketIo.server";
import { BaseService } from "./baseService.server";
import { isV3Disabled } from "../engineDeprecation.server";

export class RetryAttemptService extends BaseService {
public async call(runId: string) {
Expand All @@ -12,6 +13,12 @@ export class RetryAttemptService extends BaseService {
return;
}

// v3 (engine V1) shutdown: don't retry abandoned V1 runs. v4 is unaffected.
if (isV3Disabled() && taskRun.engine === "V1") {
logger.debug("[RetryAttemptService] Skipping retry for shut-down v3 run", { runId });
return;
}

socketIo.coordinatorNamespace.emit("READY_FOR_RETRY", {
version: "v1",
runId,
Expand Down
11 changes: 10 additions & 1 deletion apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import { DefaultTriggerTaskValidator } from "~/runEngine/validators/triggerTaskV
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { determineEngineVersion } from "../engineVersion.server";
import { tracer } from "../tracer.server";
import { WithRunEngine } from "./baseService.server";
import { isV3Disabled, V3_TRIGGER_DEPRECATION_MESSAGE } from "../engineDeprecation.server";
import { ServiceValidationError, WithRunEngine } from "./baseService.server";
import { TriggerTaskServiceV1 } from "./triggerTaskV1.server";

export type TriggerTaskServiceOptions = {
Expand Down Expand Up @@ -77,6 +78,14 @@ export class TriggerTaskService extends WithRunEngine {

switch (v) {
case "V1": {
// v3 (engine V1) is being sunset. When the shutdown is on, reject the
// trigger with a graceful, actionable error instead of creating a V1
// run. Covers single, batch, schedule, replay, and triggerAndWait,
// which all route through here.
if (isV3Disabled()) {
throw new ServiceValidationError(V3_TRIGGER_DEPRECATION_MESSAGE);
}

return await this.callV1(taskId, environment, body, options);
}
case "V2": {
Expand Down
11 changes: 11 additions & 0 deletions apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { PrismaClientOrTransaction } from "~/db.server";
import { workerQueue } from "~/services/worker.server";
import { socketIo } from "./handleSocketIo.server";
import { TaskRunErrorCodes } from "@trigger.dev/core/v3";
import { isV3Disabled } from "./engineDeprecation.server";

export class TaskRunHeartbeatFailedService extends BaseService {
public async call(runId: string) {
Expand All @@ -18,6 +19,7 @@ export class TaskRunHeartbeatFailedService extends BaseService {
{
select: {
id: true,
engine: true,
friendlyId: true,
status: true,
lockedAt: true,
Expand Down Expand Up @@ -49,6 +51,15 @@ export class TaskRunHeartbeatFailedService extends BaseService {
return;
}

// v3 (engine V1) shutdown: leave abandoned V1 runs as-is instead of doing
// MarQS/DB work to fail or requeue them. v4 (V2) is unaffected.
if (isV3Disabled() && taskRun.engine === "V1") {
logger.debug("[TaskRunHeartbeatFailedService] Skipping heartbeat for shut-down v3 run", {
runId,
});
return;
}

const service = new FailedTaskRunService();

switch (taskRun.status) {
Expand Down