diff --git a/.server-changes/plan-aware-compute-migration.md b/.server-changes/plan-aware-compute-migration.md new file mode 100644 index 00000000000..bc42ccc6473 --- /dev/null +++ b/.server-changes/plan-aware-compute-migration.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Gradually roll out a new run execution backend to a configurable percentage of organizations. diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index 1282127cb20..72191c1d0c5 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -43,6 +43,10 @@ import { registerRunChangeNotifierHandlers } from "./services/realtime/runChange import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server"; (globalThis as Record).__sessionsReplicationInstance = sessionsReplicationInstance; +import { globalFlagsRegistry } from "./v3/globalFlagsRegistry.server"; +(globalThis as Record).__globalFlagsRegistry = globalFlagsRegistry; +import { workerRegionRegistry } from "./v3/workerRegions.server"; +(globalThis as Record).__workerRegionRegistry = workerRegionRegistry; const ABORT_DELAY = 30000; diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index c12b29a6080..d2021f4fe32 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -158,6 +158,12 @@ const EnvironmentSchema = z WORKER_SCHEMA: z.string().default("graphile_worker"), WORKER_CONCURRENCY: z.coerce.number().int().default(10), WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + // How often each replica reloads the global flags snapshot from the DB. + // Sets kill/ramp propagation latency. + GLOBAL_FLAGS_RELOAD_INTERVAL_MS: z.coerce.number().int().min(1000).default(5000), + // Max time the first trigger blocks waiting for the initial flags load + // before falling back to defaults (off = container, the safe direction). + GLOBAL_FLAGS_READY_TIMEOUT_MS: z.coerce.number().int().min(0).default(5000), WORKER_ENABLED: z.string().default("true"), GRACEFUL_SHUTDOWN_TIMEOUT: z.coerce.number().int().default(60000), DISABLE_SSE: z.string().optional(), diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index 00290b36203..fec8dabdb0e 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -15,7 +15,7 @@ import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/databa import assertNever from "assert-never"; import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions"; import { $replica, prisma } from "~/db.server"; -import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { findRunByIdWithMollifierFallback, @@ -49,6 +49,7 @@ const commonRunSelect = { depth: true, scheduleId: true, workerQueue: true, + region: true, lockedToVersion: { select: { version: true, @@ -520,7 +521,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V triggerFunction: resolveTriggerFunction(run), batchId: run.batch?.friendlyId, metadata, - region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined, + region: regionForDisplay(run.region, run.workerQueue), }; } @@ -684,6 +685,7 @@ export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { // API response's `region` to undefined instead of advertising a // misleading "main" region for a not-yet-assigned buffered run). workerQueue: buffered.workerQueue ?? "", + region: buffered.region ?? "", parentTaskRun: null, rootTaskRun: null, childRuns: [], diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index 8d1b8c13883..3594aa71cea 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -11,7 +11,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters"; import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; import { getTaskIdentifiers } from "~/models/task.server"; import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; -import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus"; @@ -260,7 +260,7 @@ export class NextRunListPresenter { name: run.queue.replace("task/", ""), type: run.queue.startsWith("task/") ? "task" : "custom", }, - region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined, + region: regionForDisplay(run.region, run.workerQueue), taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD", }; }), diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index bd0ac5c540a..98ee75cda39 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -303,11 +303,17 @@ export class SpanPresenter extends BasePresenter { location: true, }, where: { + // masterQueue is unique and IS the run's backing queue, so this finds + // the group the run actually ran on. masterQueue: baseWorkerQueue(run.workerQueue), }, }); - region = workerGroup ?? null; + // Show the stamped geo region as the name so a migrated run never reveals + // its compute backing; fall back to the group name for unstamped runs. + region = workerGroup + ? { name: run.region ?? workerGroup.name, location: workerGroup.location } + : null; } // Only AGENT-tagged runs (chat.agent and friends) can be session-bound, @@ -513,6 +519,7 @@ export class SpanPresenter extends BasePresenter { }, engine: true, workerQueue: true, + region: true, error: true, output: true, outputType: true, diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 631bd5ece52..03bfdaccc65 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -18,7 +18,7 @@ import { type SyntheticReplayTaskRun, } from "~/v3/mollifier/syntheticReplayTaskRun.server"; import parseDuration from "parse-duration"; -import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; import { ReplayRunData } from "~/v3/replayTask"; @@ -52,6 +52,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { maxDurationInSeconds: true, machinePreset: true, workerQueue: true, + region: true, ttl: true, idempotencyKey: true, runTags: true, @@ -163,6 +164,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { maxDurationInSeconds: buffered.maxDurationInSeconds ?? null, machinePreset: buffered.machinePreset ?? null, workerQueue: buffered.workerQueue ?? null, + region: buffered.region ?? null, ttl: buffered.ttl ?? null, idempotencyKey: buffered.idempotencyKey ?? null, runTags: buffered.runTags, @@ -210,7 +212,10 @@ export async function loader({ request, params }: LoaderFunctionArgs) { maxAttempts: run.maxAttempts, maxDurationSeconds: run.maxDurationInSeconds, machinePreset: run.machinePreset, - region: environment.type === "DEVELOPMENT" ? undefined : baseWorkerQueue(run.workerQueue), + region: + environment.type === "DEVELOPMENT" + ? undefined + : regionForDisplay(run.region, run.workerQueue), regions: regionsResult.regions, ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined, idempotencyKey: run.idempotencyKey, diff --git a/apps/webapp/app/runEngine/concerns/computeMigration.server.ts b/apps/webapp/app/runEngine/concerns/computeMigration.server.ts new file mode 100644 index 00000000000..c885a80356d --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/computeMigration.server.ts @@ -0,0 +1,77 @@ +import { hashBucket } from "~/utils/computeBucket"; + +/** Subset of the global flags snapshot this resolver reads. */ +export type ComputeMigrationFlags = { + computeMigrationEnabled?: boolean; + computeMigrationFreePercentage?: number; + computeMigrationPaidPercentage?: number; +}; + +type MigrationDecisionInput = { + planType: string | undefined; + orgId: string; + orgFeatureFlags: Record | null | undefined; + flags: ComputeMigrationFlags | undefined; +}; + +/** + * Whether this org should run on the compute backing. Shared by the trigger-time + * transform and the deploy-time template decision so a migrated org always gets a + * compute template. Precedence: per-org override (both directions) wins; otherwise + * global enable + the plan's percentage bucket. Enterprise and unknown plans are + * never enrolled by percentage (override only). The sole opt-out is the per-org + * `computeMigrationEnabled: false`. + */ +export function isOrgMigrated({ + planType, + orgId, + orgFeatureFlags, + flags, +}: MigrationDecisionInput): boolean { + const override = orgFeatureFlags?.["computeMigrationEnabled"]; + if (override === false) return false; + if (override === true) return true; + + if (!(flags?.computeMigrationEnabled ?? false)) return false; + + const pct = + planType === "free" + ? flags?.computeMigrationFreePercentage ?? 0 + : planType === "paid" + ? flags?.computeMigrationPaidPercentage ?? 0 + : 0; // enterprise / undefined + + return hashBucket(orgId) < pct; +} + +type ResolveInput = MigrationDecisionInput & { + baseWorkerQueue: string | undefined; + baseEnableFastPath: boolean; + region: string | undefined; // geo of the base queue (same whether migrated or not) + backing: { workerQueue: string; enableFastPath: boolean } | undefined; + envType: string; +}; + +/** + * Produce the target descriptor `{ workerQueue, region, enableFastPath }` for a + * run. When the org is migrated and the region has a compute backing, the queue + * and fast-path setting come from the MICROVM backing group; `region` is the geo + * either way. Same-geo swap (us-east-1 -> us-east-1-next): any explicit placement + * is a geography preference, honored by staying in-region. Applied after region + * resolution, mirroring the scheduled-split. + */ +export function resolveComputeMigration({ + baseWorkerQueue, + baseEnableFastPath, + region, + backing, + envType, + ...decision +}: ResolveInput): { workerQueue: string | undefined; region: string | undefined; enableFastPath: boolean } { + const passthrough = { workerQueue: baseWorkerQueue, region, enableFastPath: baseEnableFastPath }; + if (baseWorkerQueue === undefined) return passthrough; + if (envType === "DEVELOPMENT") return passthrough; + if (!isOrgMigrated(decision)) return passthrough; + if (!backing) return passthrough; + return { workerQueue: backing.workerQueue, region, enableFastPath: backing.enableFastPath }; +} diff --git a/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts b/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts index af0d4a819a1..a2bf5df87b3 100644 --- a/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts +++ b/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts @@ -33,6 +33,19 @@ export function baseWorkerQueue(workerQueue: string | null | undefined): string return colon === -1 ? workerQueue : workerQueue.slice(0, colon); } +/** + * User-facing region for read surfaces: the explicit geo region if set, else the + * region derived from the worker queue, else undefined. Use everywhere a run's + * region is displayed so an empty queue never surfaces as `""` and all surfaces + * agree. Not for query keys — those want the raw worker queue, not this fallback. + */ +export function regionForDisplay( + region: string | null | undefined, + workerQueue: string | null | undefined +): string | undefined { + return region || (workerQueue ? baseWorkerQueue(workerQueue) : undefined); +} + /** `TriggerSource` value used for runs originating from a schedule. */ const SCHEDULE_TRIGGER_SOURCE = "schedule"; diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 77057990db9..1c2936e89aa 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -38,6 +38,9 @@ import { resolveScheduledQueueSplitEnabled, workerQueueForRun, } from "../concerns/workerQueueSplit.server"; +import { resolveComputeMigration } from "../concerns/computeMigration.server"; +import { workerRegionRegistry, backingForQueue, regionForQueue } from "~/v3/workerRegions.server"; +import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server"; import { publishClaim as publishMollifierClaim, releaseClaim as releaseMollifierClaim, @@ -358,6 +361,31 @@ export class RunEngineTriggerTaskService { const baseWorkerQueue = workerQueueResult?.masterQueue; const enableFastPath = workerQueueResult?.enableFastPath ?? false; + // Rewrite the region to its compute backing for migration-enrolled orgs, + // from the in-memory flag snapshot (no DB query). The isLoaded gates only + // block during cold start so the first request can't serve a default over + // a real flag; once warm they're a synchronous no-op. + if (!globalFlagsRegistry.isLoaded) { + await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); + } + if (!workerRegionRegistry.isLoaded) { + await workerRegionRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); + } + const workerGroups = workerRegionRegistry.current() ?? []; + const region = baseWorkerQueue ? regionForQueue(baseWorkerQueue, workerGroups) : undefined; + const backing = baseWorkerQueue ? backingForQueue(baseWorkerQueue, workerGroups) : undefined; + const migrated = resolveComputeMigration({ + baseWorkerQueue, + baseEnableFastPath: enableFastPath, + region, + backing, + planType, + orgId: environment.organization.id, + orgFeatureFlags: environment.organization.featureFlags as Record | null, + flags: globalFlagsRegistry.current(), + envType: environment.type, + }); + // Build annotations for this run const triggerSource = options.triggerSource ?? "api"; const triggerAction = options.triggerAction ?? "trigger"; @@ -386,13 +414,13 @@ export class RunEngineTriggerTaskService { globalDefault: env.TRIGGER_WORKER_QUEUE_SCHEDULED_SPLIT_ENABLED === "1", }); const workerQueue = - baseWorkerQueue !== undefined + migrated.workerQueue !== undefined ? workerQueueForRun({ - workerQueue: baseWorkerQueue, + workerQueue: migrated.workerQueue, rootTriggerSource: annotations.rootTriggerSource, splitEnabled: scheduledQueueSplitEnabled, }) - : baseWorkerQueue; + : migrated.workerQueue; try { return await this.traceEventConcern.traceRun( @@ -491,7 +519,8 @@ export class RunEngineTriggerTaskService { queueName, lockedQueueId, workerQueue, - enableFastPath, + region: migrated.region, + enableFastPath: migrated.enableFastPath, lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined, delayUntil, ttl, @@ -569,7 +598,8 @@ export class RunEngineTriggerTaskService { queueName, lockedQueueId, workerQueue, - enableFastPath, + region: migrated.region, + enableFastPath: migrated.enableFastPath, lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined, delayUntil, ttl, @@ -718,6 +748,7 @@ export class RunEngineTriggerTaskService { queueName: string; lockedQueueId?: string; workerQueue?: string; + region?: string; enableFastPath: boolean; lockedToBackgroundWorker?: { id: string; version: string; sdkVersion: string; cliVersion: string }; delayUntil?: Date; @@ -771,6 +802,7 @@ export class RunEngineTriggerTaskService { queue: args.queueName, lockedQueueId: args.lockedQueueId, workerQueue: args.workerQueue, + region: args.region, enableFastPath: args.enableFastPath, isTest: args.body.options?.test ?? false, delayUntil: args.delayUntil, diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index d6055c21b17..06223c488f8 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -1122,7 +1122,8 @@ export class RunsReplicationService { event === "delete" ? 1 : 0, // _is_deleted run.concurrencyKey ?? "", // concurrency_key run.bulkActionGroupIds ?? [], // bulk_action_group_ids - baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (region; strip any split suffix like `:scheduled`) + baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (raw - operators slice by this) + run.region ?? "", // region (geo for customers) run.maxDurationInSeconds ?? null, // max_duration_in_seconds annotations?.triggerSource ?? "", // trigger_source annotations?.rootTriggerSource ?? "", // root_trigger_source diff --git a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index 304777b39e0..88e792b4a40 100644 --- a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -204,6 +204,7 @@ export class ClickHouseRunsRepository implements IRunsRepository { machinePreset: true, queue: true, workerQueue: true, + region: true, annotations: true, }, }); @@ -377,7 +378,9 @@ function applyRunFiltersToQueryBuilder( } if (options.regions && options.regions.length > 0) { - queryBuilder.where("worker_queue IN {regions: Array(String)}", { regions: options.regions }); + queryBuilder.where("if(region != '', region, worker_queue) IN {regions: Array(String)}", { + regions: options.regions, + }); } if (options.machines && options.machines.length > 0) { diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts index 74963bc3ff2..d40e5cf8c0a 100644 --- a/apps/webapp/app/services/runsRepository/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -108,6 +108,7 @@ export type ListedRun = Prisma.TaskRunGetPayload<{ machinePreset: true; queue: true; workerQueue: true; + region: true; annotations: true; }; }>; diff --git a/apps/webapp/app/utils/computeBucket.ts b/apps/webapp/app/utils/computeBucket.ts new file mode 100644 index 00000000000..7a948c57db7 --- /dev/null +++ b/apps/webapp/app/utils/computeBucket.ts @@ -0,0 +1,15 @@ +/** + * Deterministic 0-99 bucket for an org id, stable across processes and deploys. + * FNV-1a (non-crypto): we only need determinism + uniform spread, not collision + * resistance. Used for nested percentage rollout: `hashBucket(orgId) < percentage`. + * Ramping the percentage down keeps a strict subset (the low buckets), so an org + * never flaps in and out as the dial moves. + */ +export function hashBucket(orgId: string): number { + let hash = 0x811c9dc5; // FNV offset basis + for (let i = 0; i < orgId.length; i++) { + hash ^= orgId.charCodeAt(i); + hash = Math.imul(hash, 0x01000193) >>> 0; + } + return hash % 100; +} diff --git a/apps/webapp/app/utils/reloadingRegistry.server.ts b/apps/webapp/app/utils/reloadingRegistry.server.ts new file mode 100644 index 00000000000..431557f60bb --- /dev/null +++ b/apps/webapp/app/utils/reloadingRegistry.server.ts @@ -0,0 +1,143 @@ +import pRetry from "p-retry"; +import { Counter, Gauge } from "prom-client"; +import { metricsRegister } from "~/metrics.server"; +import { logger } from "~/services/logger.server"; +import { signalsEmitter } from "~/services/signals.server"; + +const loadFailures = new Counter({ + name: "reloading_registry_load_failures_total", + help: "Failed loads of a reloading registry", + labelNames: ["name"], + registers: [metricsRegister], +}); + +const lastSuccessfulLoadAt = new Gauge({ + name: "reloading_registry_last_successful_load_timestamp_seconds", + help: "Unix time of the last successful registry load (staleness signal)", + labelNames: ["name"], + registers: [metricsRegister], +}); + +export type ReloadingRegistry = { + isReady: Promise; + readonly isLoaded: boolean; + current(): T | undefined; + reload(): Promise; + waitUntilReady(timeoutMs: number): Promise; + stop(): void; +}; + +export type ReloadingRegistryOptions = { + /** Tag for metrics + logs. */ + name: string; + /** Loads the full snapshot from the source of truth. */ + load: () => Promise; + /** How often to reload after the first successful load. */ + intervalMs: number; + /** Startup retry config; defaults to forever with backoff. */ + retry?: { retries?: number }; + /** Start the background load + interval at construction. Default true; set false to keep inert (e.g. tests). */ + autoStart?: boolean; +}; + +/** + * In-memory snapshot loaded at startup and refreshed on an interval. Reads are + * synchronous (`current()`); the first read should gate on `waitUntilReady` so a + * cold replica never serves a default over a real value. Mirrors the datastore / + * LLM-pricing registries. Interval-only: no pub/sub (a follow-up if sub-second + * propagation is ever needed). + */ +export function createReloadingRegistry(opts: ReloadingRegistryOptions): ReloadingRegistry { + let snapshot: T | undefined; + let loaded = false; + let started = false; + let loadSeq = 0; + let resolveReady!: () => void; + const isReady = new Promise((resolve) => { + resolveReady = resolve; + }); + + async function doLoad() { + const seq = ++loadSeq; + const next = await opts.load(); + if (seq < loadSeq) return; // a newer load started while we were awaiting; don't clobber + snapshot = next; + lastSuccessfulLoadAt.set({ name: opts.name }, Date.now() / 1000); + if (!loaded) { + loaded = true; + resolveReady(); + } + } + + let interval: ReturnType | undefined; + + if (opts.autoStart !== false) { + started = true; + + const startup = pRetry(() => doLoad(), { + forever: opts.retry?.retries === undefined, + retries: opts.retry?.retries, + minTimeout: 1_000, + maxTimeout: 60_000, + factor: 2, + onFailedAttempt: (error) => { + loadFailures.inc({ name: opts.name }); + logger.warn("[ReloadingRegistry] startup load failed, retrying", { + name: opts.name, + attemptNumber: error.attemptNumber, + retriesLeft: error.retriesLeft, + error: error.message, + }); + }, + }); + startup.catch((err) => { + logger.error("[ReloadingRegistry] startup load gave up", { + name: opts.name, + error: err instanceof Error ? err.message : String(err), + }); + }); + + interval = setInterval(() => { + doLoad().catch((err) => { + loadFailures.inc({ name: opts.name }); + logger.warn("[ReloadingRegistry] reload failed", { + name: opts.name, + error: err instanceof Error ? err.message : String(err), + }); + }); + }, opts.intervalMs); + interval.unref(); // never keep the process alive; SIGTERM still clears it + } else { + resolveReady(); // inert: any direct `await isReady` resolves immediately + } + + function stop() { + if (interval) clearInterval(interval); + } + signalsEmitter.on("SIGTERM", stop); + signalsEmitter.on("SIGINT", stop); + + return { + isReady, + get isLoaded() { + return loaded; + }, + current: () => snapshot, + reload: doLoad, + async waitUntilReady(timeoutMs: number) { + if (!started || loaded || timeoutMs <= 0) return; + let timer: ReturnType | undefined; + try { + await Promise.race([ + isReady, + new Promise((resolve) => { + timer = setTimeout(resolve, timeoutMs); + }), + ]); + } finally { + if (timer) clearTimeout(timer); + } + }, + stop, + }; +} diff --git a/apps/webapp/app/v3/featureFlags.ts b/apps/webapp/app/v3/featureFlags.ts index 3066f2dda01..18abaae67f1 100644 --- a/apps/webapp/app/v3/featureFlags.ts +++ b/apps/webapp/app/v3/featureFlags.ts @@ -11,6 +11,9 @@ export const FEATURE_FLAG = { mollifierEnabled: "mollifierEnabled", workerQueueScheduledSplitEnabled: "workerQueueScheduledSplitEnabled", realtimeBackend: "realtimeBackend", + computeMigrationEnabled: "computeMigrationEnabled", + computeMigrationFreePercentage: "computeMigrationFreePercentage", + computeMigrationPaidPercentage: "computeMigrationPaidPercentage", } as const; export const FeatureFlagCatalog = { @@ -27,6 +30,13 @@ export const FeatureFlagCatalog = { // globally and per-org (org wins). Defaults to "electric" when unset. // "shadow" serves Electric but diffs the native path in the background. [FEATURE_FLAG.realtimeBackend]: z.enum(["electric", "native", "shadow"]), + // Strict z.boolean() (not z.coerce.boolean()): coercion turns the string "false" + // into true, which would silently flip this kill switch / per-org exclude the wrong + // way if written as a string via the admin PAT route. The admin toggle sends a real + // boolean, so this only rejects the dangerous stringified case. + [FEATURE_FLAG.computeMigrationEnabled]: z.boolean(), + [FEATURE_FLAG.computeMigrationFreePercentage]: z.coerce.number().int().min(0).max(100), + [FEATURE_FLAG.computeMigrationPaidPercentage]: z.coerce.number().int().min(0).max(100), }; export type FeatureFlagKey = keyof typeof FeatureFlagCatalog; diff --git a/apps/webapp/app/v3/globalFlagsRegistry.server.ts b/apps/webapp/app/v3/globalFlagsRegistry.server.ts new file mode 100644 index 00000000000..0fd4a0cb504 --- /dev/null +++ b/apps/webapp/app/v3/globalFlagsRegistry.server.ts @@ -0,0 +1,20 @@ +import { singleton } from "~/utils/singleton"; +import { env } from "~/env.server"; +import { flags } from "~/v3/featureFlags.server"; +import type { FeatureFlagCatalog } from "~/v3/featureFlags"; +import { createReloadingRegistry } from "~/utils/reloadingRegistry.server"; + +/** + * In-memory snapshot of the global feature flags, refreshed every + * GLOBAL_FLAGS_RELOAD_INTERVAL_MS. `flags()` reads the DB-backed global values + * (no per-org overrides). Read synchronously on the trigger hot path; callers + * gate the first read on `waitUntilReady`. + */ +export const globalFlagsRegistry = singleton("globalFlagsRegistry", () => + createReloadingRegistry>({ + name: "global-flags", + intervalMs: env.GLOBAL_FLAGS_RELOAD_INTERVAL_MS, + autoStart: process.env.NODE_ENV !== "test", // only auto-poll outside tests + load: () => flags(), + }) +); diff --git a/apps/webapp/app/v3/mollifier/readFallback.server.ts b/apps/webapp/app/v3/mollifier/readFallback.server.ts index 21dd6c23957..d684741219f 100644 --- a/apps/webapp/app/v3/mollifier/readFallback.server.ts +++ b/apps/webapp/app/v3/mollifier/readFallback.server.ts @@ -85,6 +85,7 @@ export type SyntheticRun = { runtimeEnvironmentId: string | undefined; engine: "V2"; workerQueue: string | undefined; + region: string | undefined; queue: string | undefined; concurrencyKey: string | undefined; machinePreset: string | undefined; @@ -222,6 +223,7 @@ export async function findRunByIdWithMollifierFallback( asString(environment?.id) ?? entry.envId, engine: "V2", workerQueue: asString(snapshot.workerQueue), + region: asString(snapshot.region), queue: asString(snapshot.queue), concurrencyKey: asString(snapshot.concurrencyKey), machinePreset: asString(snapshot.machine), diff --git a/apps/webapp/app/v3/querySchemas.ts b/apps/webapp/app/v3/querySchemas.ts index 947e6e8e468..304664800ea 100644 --- a/apps/webapp/app/v3/querySchemas.ts +++ b/apps/webapp/app/v3/querySchemas.ts @@ -192,12 +192,13 @@ export const runsSchema: TableSchema = { }, region: { name: "region", - clickhouseName: "worker_queue", + clickhouseName: "region", ...column("String", { description: "Region", example: "us-east-1", }), - expression: "if(startsWith(worker_queue, 'cm'), NULL, worker_queue)", + // No whereTransform: the expression drives WHERE too, so pre-region rows still match. + expression: "multiIf(region != '', region, startsWith(worker_queue, 'cm'), NULL, worker_queue)", }, // Timing diff --git a/apps/webapp/app/v3/services/computeTemplateCreation.server.ts b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts index c972952b471..16968b01d40 100644 --- a/apps/webapp/app/v3/services/computeTemplateCreation.server.ts +++ b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts @@ -9,6 +9,10 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { ServiceValidationError } from "./baseService.server"; import { FailDeploymentService } from "./failDeployment.server"; import { resolveComputeAccess } from "../regionAccess.server"; +import { isOrgMigrated } from "~/runEngine/concerns/computeMigration.server"; +import { backingForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; +import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server"; +import { getEntitlement } from "~/services/platform.v3.server"; type TemplateCreationMode = "required" | "shadow" | "skip"; @@ -145,10 +149,10 @@ export class ComputeTemplateCreationService { where: { id: projectId }, select: { defaultWorkerGroup: { - select: { workloadType: true }, + select: { workloadType: true, masterQueue: true }, }, organization: { - select: { featureFlags: true }, + select: { id: true, featureFlags: true }, }, }, }); @@ -161,6 +165,43 @@ export class ComputeTemplateCreationService { return "required"; } + // Migrated orgs route runs to the compute backing even though their stored + // default is still the container region, so they need a compute template too. + // shadow mode: never fail a deploy over a backing the org didn't opt into. + if (!workerRegionRegistry.isLoaded) { + await workerRegionRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); + } + const defaultQueue = project.defaultWorkerGroup?.masterQueue; + if (defaultQueue && backingForQueue(defaultQueue, workerRegionRegistry.current() ?? [])) { + if (!globalFlagsRegistry.isLoaded) { + await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); + } + const decision = { + orgId: project.organization.id, + orgFeatureFlags: project.organization.featureFlags as Record | null, + flags: globalFlagsRegistry.current(), + }; + // Per-org override needs no plan; only the percentage path does. So skip the + // external entitlement lookup unless it could matter, and degrade gracefully + // if it throws - a shadow-template check must never fail a deploy. + let migrated = isOrgMigrated({ ...decision, planType: undefined }); + if (!migrated && (decision.flags?.computeMigrationEnabled ?? false)) { + let planType: string | undefined; + try { + planType = (await getEntitlement(project.organization.id))?.plan?.type; + } catch (error) { + logger.warn("compute migration: entitlement lookup failed; skipping shadow template", { + organizationId: project.organization.id, + error: error instanceof Error ? error.message : String(error), + }); + } + migrated = isOrgMigrated({ ...decision, planType }); + } + if (migrated) { + return "shadow"; + } + } + const hasComputeAccess = await resolveComputeAccess(prisma, project.organization.featureFlags); if (hasComputeAccess) { diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index 7975694b5e4..ed2292e32b2 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -68,7 +68,9 @@ export class ReplayTaskRunService extends BaseService { authenticatedEnvironment.type === "DEVELOPMENT"; const region = ignoreRegion ? undefined - : overrideOptions.region ?? baseWorkerQueue(existingTaskRun.workerQueue); + : overrideOptions.region || + existingTaskRun.region || + baseWorkerQueue(existingTaskRun.workerQueue); try { const taskQueue = await this._prisma.taskQueue.findFirst({ diff --git a/apps/webapp/app/v3/workerRegions.server.ts b/apps/webapp/app/v3/workerRegions.server.ts new file mode 100644 index 00000000000..e925af11afc --- /dev/null +++ b/apps/webapp/app/v3/workerRegions.server.ts @@ -0,0 +1,73 @@ +import { type WorkloadType } from "@trigger.dev/database"; +import { prisma } from "~/db.server"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; +import { createReloadingRegistry } from "~/utils/reloadingRegistry.server"; + +export type WorkerGroupRegionRow = { + masterQueue: string; + region: string | null; + workloadType: WorkloadType; + hidden: boolean; + enableFastPath: boolean; +}; + +/** + * Reverse map: a stored worker queue -> its user-facing geo region. A backing + * queue (e.g. "us-east-1-next") returns the region it backs ("us-east-1"); + * anything unknown or with no region set passes through unchanged (so container + * queues and not-yet-labelled groups behave exactly as before). + */ +export function regionForQueue(queue: string, groups: WorkerGroupRegionRow[]): string { + const self = groups.find((g) => g.masterQueue === queue); + return self?.region ?? queue; +} + +/** + * Forward map: the compute (MICROVM) backing queue for the region that `queue` + * belongs to, or undefined if the region has no compute backing. `queue` is the + * resolved (container) worker queue; we look up its region, then find a visible + * MICROVM group in the same region. Returns the backing group's queue and its + * `enableFastPath` so the caller adopts the backing's fast-path setting. + */ +export function backingForQueue( + queue: string, + groups: WorkerGroupRegionRow[] +): { workerQueue: string; enableFastPath: boolean } | undefined { + const self = groups.find((g) => g.masterQueue === queue); + const region = self?.region; + if (!region) return undefined; + const backing = groups.find( + (g) => + g.workloadType === "MICROVM" && + g.region === region && + !g.hidden && + g.masterQueue !== queue + ); + if (!backing) return undefined; + return { workerQueue: backing.masterQueue, enableFastPath: backing.enableFastPath }; +} + +/** + * In-memory snapshot of every worker group's (queue, region, type, hidden), + * refreshed on an interval. Read synchronously on the hot path; callers gate the + * first read on `waitUntilReady`. DB-backed source of truth for region<->backing + * resolution (replaces the old env-var backing map). + */ +export const workerRegionRegistry = singleton("workerRegionRegistry", () => + createReloadingRegistry({ + name: "worker-region", + intervalMs: env.GLOBAL_FLAGS_RELOAD_INTERVAL_MS, + autoStart: process.env.NODE_ENV !== "test", // only auto-poll outside tests + load: () => + prisma.workerInstanceGroup.findMany({ + select: { + masterQueue: true, + region: true, + workloadType: true, + hidden: true, + enableFastPath: true, + }, + }), + }) +); diff --git a/apps/webapp/test/computeBucket.test.ts b/apps/webapp/test/computeBucket.test.ts new file mode 100644 index 00000000000..a3d0c9c72ba --- /dev/null +++ b/apps/webapp/test/computeBucket.test.ts @@ -0,0 +1,46 @@ +import { describe, it, expect } from "vitest"; +import cuid from "cuid"; +import { hashBucket } from "~/utils/computeBucket"; + +describe("hashBucket", () => { + it("returns a stable value in [0, 100) for the same id", () => { + const a = hashBucket("org_abc"); + const b = hashBucket("org_abc"); + expect(a).toBe(b); + expect(a).toBeGreaterThanOrEqual(0); + expect(a).toBeLessThan(100); + }); + + it("is nested: the set enrolled at 1% is a subset of the set at 5%", () => { + const ids = Array.from({ length: 5000 }, (_, i) => `org_${i}`); + const at1 = new Set(ids.filter((id) => hashBucket(id) < 1)); + const at5 = ids.filter((id) => hashBucket(id) < 5); + for (const id of at1) { + expect(at5).toContain(id); + } + }); + + it("distributes roughly uniformly", () => { + const ids = Array.from({ length: 10000 }, (_, i) => `org_${i}`); + const under10 = ids.filter((id) => hashBucket(id) < 10).length; + expect(under10).toBeGreaterThan(700); + expect(under10).toBeLessThan(1300); + }); + + // Org ids are `@default(cuid())` primary keys (e.g. "cjld2cjxh0000qzrmn831i7rn"), + // not the synthetic sequential ids above. cuids share a "c" prefix + timestamp/counter + // structure, so verify the hash still spreads *real-shaped* ids evenly across deciles + // (so a percentage dial maps to ~that fraction of actual orgs, not just of the id space). + it("distributes cuids evenly across all 10 deciles", () => { + const ids = Array.from({ length: 20000 }, () => cuid()); + const counts = new Array(10).fill(0); + for (const id of ids) { + counts[Math.floor(hashBucket(id) / 10)]++; + } + // Expected ~2000 per decile; allow a wide band so it isn't flaky. + for (const count of counts) { + expect(count).toBeGreaterThan(1700); + expect(count).toBeLessThan(2300); + } + }); +}); diff --git a/apps/webapp/test/computeMigration.test.ts b/apps/webapp/test/computeMigration.test.ts new file mode 100644 index 00000000000..0c914f2e2ae --- /dev/null +++ b/apps/webapp/test/computeMigration.test.ts @@ -0,0 +1,106 @@ +import { describe, it, expect } from "vitest"; +import { + isOrgMigrated, + resolveComputeMigration, +} from "~/runEngine/concerns/computeMigration.server"; + +describe("isOrgMigrated", () => { + const base = { + planType: "free" as string | undefined, + orgFeatureFlags: {} as Record, + flags: { computeMigrationEnabled: true, computeMigrationFreePercentage: 100 }, + }; + + it("migrates a free org at 100%", () => { + expect(isOrgMigrated({ ...base, orgId: "org_x" })).toBe(true); + }); + it("does not migrate when globally disabled", () => { + expect( + isOrgMigrated({ ...base, orgId: "org_x", flags: { computeMigrationEnabled: false, computeMigrationFreePercentage: 100 } }) + ).toBe(false); + }); + it("per-org override false excludes even at 100%", () => { + expect( + isOrgMigrated({ ...base, orgId: "org_x", orgFeatureFlags: { computeMigrationEnabled: false } }) + ).toBe(false); + }); + it("per-org override true enrolls even when globally off", () => { + expect( + isOrgMigrated({ + ...base, + orgId: "org_x", + orgFeatureFlags: { computeMigrationEnabled: true }, + flags: { computeMigrationEnabled: false, computeMigrationFreePercentage: 0 }, + }) + ).toBe(true); + }); + it("paid uses the paid dial", () => { + expect( + isOrgMigrated({ + planType: "paid", + orgId: "org_x", + orgFeatureFlags: {}, + flags: { computeMigrationEnabled: true, computeMigrationPaidPercentage: 100 }, + }) + ).toBe(true); + }); + it("enterprise is never enrolled by percentage", () => { + expect( + isOrgMigrated({ + planType: "enterprise", + orgId: "org_x", + orgFeatureFlags: {}, + flags: { computeMigrationEnabled: true, computeMigrationFreePercentage: 100, computeMigrationPaidPercentage: 100 }, + }) + ).toBe(false); + }); + it("undefined planType is not enrolled", () => { + expect( + isOrgMigrated({ planType: undefined, orgId: "org_x", orgFeatureFlags: {}, flags: { computeMigrationEnabled: true } }) + ).toBe(false); + }); +}); + +describe("resolveComputeMigration", () => { + const enrolled = { + planType: "free", + orgFeatureFlags: {}, + flags: { computeMigrationEnabled: true, computeMigrationFreePercentage: 100 }, + envType: "PRODUCTION", + baseEnableFastPath: false, + region: "us-east-1", + }; + const backing = { workerQueue: "us-east-1-next", enableFastPath: true }; + + it("swaps to the compute backing for an enrolled free org", () => { + expect( + resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing }) + ).toEqual({ workerQueue: "us-east-1-next", region: "us-east-1", enableFastPath: true }); + }); + it("leaves the queue unchanged when there is no backing for the region (EU)", () => { + expect( + resolveComputeMigration({ + ...enrolled, + baseWorkerQueue: "eu-central-1", + region: "eu-central-1", + orgId: "org_x", + backing: undefined, + }) + ).toEqual({ workerQueue: "eu-central-1", region: "eu-central-1", enableFastPath: false }); + }); + it("does not migrate DEVELOPMENT", () => { + expect( + resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing, envType: "DEVELOPMENT" }) + ).toEqual({ workerQueue: "us-east-1", region: "us-east-1", enableFastPath: false }); + }); + it("leaves a non-enrolled org untouched", () => { + expect( + resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing, flags: { computeMigrationEnabled: false } }) + ).toEqual({ workerQueue: "us-east-1", region: "us-east-1", enableFastPath: false }); + }); + it("undefined baseWorkerQueue passes through", () => { + expect( + resolveComputeMigration({ ...enrolled, baseWorkerQueue: undefined, region: undefined, orgId: "org_x", backing }) + ).toEqual({ workerQueue: undefined, region: undefined, enableFastPath: false }); + }); +}); diff --git a/apps/webapp/test/reloadingRegistry.test.ts b/apps/webapp/test/reloadingRegistry.test.ts new file mode 100644 index 00000000000..4b01f34a017 --- /dev/null +++ b/apps/webapp/test/reloadingRegistry.test.ts @@ -0,0 +1,135 @@ +import { describe, it, expect, vi } from "vitest"; +import { createReloadingRegistry } from "~/utils/reloadingRegistry.server"; + +describe("createReloadingRegistry", () => { + it("current() is undefined before load, snapshot after isReady", async () => { + const reg = createReloadingRegistry({ + name: "test-a", + intervalMs: 10_000, + load: async () => ({ value: 42 }), + }); + expect(reg.current()).toBeUndefined(); + await reg.isReady; + expect(reg.isLoaded).toBe(true); + expect(reg.current()).toEqual({ value: 42 }); + reg.stop(); + }); + + it("waitUntilReady resolves once loaded", async () => { + const reg = createReloadingRegistry({ + name: "test-b", + intervalMs: 10_000, + load: async () => 1, + }); + await reg.waitUntilReady(1000); + expect(reg.current()).toBe(1); + reg.stop(); + }); + + it("waitUntilReady times out (and stays unloaded) when load never succeeds", async () => { + const reg = createReloadingRegistry({ + name: "test-c", + intervalMs: 10_000, + retry: { retries: 0 }, + load: async () => { + throw new Error("db down"); + }, + }); + await reg.waitUntilReady(50); + expect(reg.isLoaded).toBe(false); + expect(reg.current()).toBeUndefined(); + reg.stop(); + }); + + it("reload() picks up a changed value", async () => { + let v = 1; + const reg = createReloadingRegistry({ + name: "test-d", + intervalMs: 10_000, + load: async () => v, + }); + await reg.isReady; + expect(reg.current()).toBe(1); + v = 2; + await reg.reload(); + expect(reg.current()).toBe(2); + reg.stop(); + }); + + it("newer load wins even if an older load resolves later", async () => { + // load hands the test a deferred resolver per call so completion order is controllable. + const deferred: Array<(value: number) => void> = []; + const reg = createReloadingRegistry({ + name: "test-e", + intervalMs: 10_000, + load: () => + new Promise((resolve) => { + deferred.push(resolve); + }), + }); + + // deferred[0] is the startup load; let it complete with an initial value. + deferred[0](0); + await reg.isReady; + + // start two overlapping loads; don't await yet (deferred[1] older, deferred[2] newer) + const older = reg.reload(); + const newer = reg.reload(); + + // resolve the NEWER load first, then the OLDER load last + deferred[2](2); + deferred[1](1); + await Promise.all([older, newer]); + + // the older load completing last must NOT clobber the newer snapshot + expect(reg.current()).toBe(2); + reg.stop(); + }); + + it("autoStart:false stays inert and non-blocking", async () => { + let loadCalls = 0; + const reg = createReloadingRegistry({ + name: "test-inert", + intervalMs: 10_000, + autoStart: false, + load: async () => { + loadCalls++; + return 1; + }, + }); + expect(reg.isLoaded).toBe(false); + expect(reg.current()).toBeUndefined(); + await reg.waitUntilReady(10_000); // must resolve ~immediately, not wait 10s + expect(reg.isLoaded).toBe(false); + expect(loadCalls).toBe(0); // never hit the DB/load + reg.stop(); + }); + + it("waitUntilReady clears its timeout when ready wins", async () => { + const clearSpy = vi.spyOn(global, "clearTimeout"); + // load resolves only when the test releases it, so waitUntilReady runs the + // race while still unloaded (it would return early if already loaded) + let releaseLoad!: () => void; + const loadGate = new Promise((resolve) => { + releaseLoad = resolve; + }); + const reg = createReloadingRegistry({ + name: "test-f", + intervalMs: 10_000, + load: async () => { + await loadGate; + return 1; + }, + }); + + // long timeout so isReady is what actually wins the race + const waiting = reg.waitUntilReady(10_000); + releaseLoad(); + await reg.isReady; + await waiting; + + expect(clearSpy).toHaveBeenCalled(); + clearSpy.mockRestore(); + reg.stop(); + }); +}); diff --git a/apps/webapp/test/runsReplicationService.part1.test.ts b/apps/webapp/test/runsReplicationService.part1.test.ts index 5a085944a61..9fe4402b541 100644 --- a/apps/webapp/test/runsReplicationService.part1.test.ts +++ b/apps/webapp/test/runsReplicationService.part1.test.ts @@ -82,6 +82,8 @@ describe("RunsReplicationService (part 1/7)", () => { traceId: "1234", spanId: "1234", queue: "test", + workerQueue: "us-east-1-next", + region: "us-east-1", runtimeEnvironmentId: runtimeEnvironment.id, projectId: project.id, organizationId: organization.id, @@ -121,6 +123,9 @@ describe("RunsReplicationService (part 1/7)", () => { trigger_source: "api", root_trigger_source: "dashboard", is_warm_start: 1, + // worker_queue stays the raw backing (operators); region is the geo (customers) + worker_queue: "us-east-1-next", + region: "us-east-1", }) ); diff --git a/apps/webapp/test/workerRegions.test.ts b/apps/webapp/test/workerRegions.test.ts new file mode 100644 index 00000000000..7befdfee842 --- /dev/null +++ b/apps/webapp/test/workerRegions.test.ts @@ -0,0 +1,42 @@ +import { describe, it, expect } from "vitest"; +import { regionForQueue, backingForQueue, type WorkerGroupRegionRow } from "~/v3/workerRegions.server"; + +const groups: WorkerGroupRegionRow[] = [ + { masterQueue: "us-east-1", region: "us-east-1", workloadType: "CONTAINER", hidden: false, enableFastPath: false }, + { masterQueue: "us-east-1-next", region: "us-east-1", workloadType: "MICROVM", hidden: false, enableFastPath: true }, + { masterQueue: "eu-central-1", region: "eu-central-1", workloadType: "CONTAINER", hidden: false, enableFastPath: false }, +]; + +describe("regionForQueue", () => { + it("maps a backing queue to its region", () => { + expect(regionForQueue("us-east-1-next", groups)).toBe("us-east-1"); + }); + it("maps a container queue to its own region", () => { + expect(regionForQueue("us-east-1", groups)).toBe("us-east-1"); + }); + it("passes an unknown queue through unchanged", () => { + expect(regionForQueue("mystery", groups)).toBe("mystery"); + }); + it("passes through when a group has no region set", () => { + expect(regionForQueue("x", [{ masterQueue: "x", region: null, workloadType: "CONTAINER", hidden: false, enableFastPath: false }])).toBe("x"); + }); +}); + +describe("backingForQueue", () => { + it("finds the MICROVM backing for a region with one", () => { + expect(backingForQueue("us-east-1", groups)).toEqual({ workerQueue: "us-east-1-next", enableFastPath: true }); + }); + it("returns undefined for a region with no compute backing (EU)", () => { + expect(backingForQueue("eu-central-1", groups)).toBeUndefined(); + }); + it("returns undefined when the queue's group has no region", () => { + expect(backingForQueue("x", [{ masterQueue: "x", region: null, workloadType: "CONTAINER", hidden: false, enableFastPath: false }])).toBeUndefined(); + }); + it("ignores hidden MICROVM groups", () => { + const g: WorkerGroupRegionRow[] = [ + { masterQueue: "us-east-1", region: "us-east-1", workloadType: "CONTAINER", hidden: false, enableFastPath: false }, + { masterQueue: "us-east-1-next", region: "us-east-1", workloadType: "MICROVM", hidden: true, enableFastPath: true }, + ]; + expect(backingForQueue("us-east-1", g)).toBeUndefined(); + }); +}); diff --git a/internal-packages/clickhouse/schema/032_add_task_runs_v2_region.sql b/internal-packages/clickhouse/schema/032_add_task_runs_v2_region.sql new file mode 100644 index 00000000000..ed55cada852 --- /dev/null +++ b/internal-packages/clickhouse/schema/032_add_task_runs_v2_region.sql @@ -0,0 +1,7 @@ +-- +goose Up +ALTER TABLE trigger_dev.task_runs_v2 +ADD COLUMN IF NOT EXISTS region String DEFAULT ''; + +-- +goose Down +ALTER TABLE trigger_dev.task_runs_v2 +DROP COLUMN IF EXISTS region; diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index 76c2267b073..72eab055eec 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -83,6 +83,7 @@ describe("Task Runs V2", () => { "concurrency_key_1234", // concurrency_key ["bulk_action_group_id_1234", "bulk_action_group_id_1235"], // bulk_action_group_ids "", // worker_queue + "", // region null, // max_duration_in_seconds "", // trigger_source "", // root_trigger_source @@ -215,6 +216,7 @@ describe("Task Runs V2", () => { "", // concurrency_key [], // bulk_action_group_ids "", // worker_queue + "", // region null, // max_duration_in_seconds "", // trigger_source "", // root_trigger_source @@ -270,6 +272,7 @@ describe("Task Runs V2", () => { "", // concurrency_key [], // bulk_action_group_ids "", // worker_queue + "", // region null, // max_duration_in_seconds "", // trigger_source "", // root_trigger_source @@ -372,6 +375,7 @@ describe("Task Runs V2", () => { "", // concurrency_key [], // bulk_action_group_ids "", // worker_queue + "", // region null, // max_duration_in_seconds "", // trigger_source "", // root_trigger_source @@ -482,6 +486,7 @@ describe("Task Runs V2", () => { "", // concurrency_key [], // bulk_action_group_ids "", // worker_queue + "", // region null, // max_duration_in_seconds "", // trigger_source "", // root_trigger_source @@ -536,7 +541,8 @@ describe("Task Runs V2", () => { 0, "", [], - "", + "", // worker_queue + "", // region null, "", "", @@ -597,7 +603,8 @@ describe("Task Runs V2", () => { 0, "", [], - "", + "", // worker_queue + "", // region null, "", "", @@ -652,7 +659,8 @@ describe("Task Runs V2", () => { 0, "", [], - "", + "", // worker_queue + "", // region null, "", "", diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index 2201f4baf6e..633f6e668be 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -48,6 +48,7 @@ export const TaskRunV2 = z.object({ concurrency_key: z.string().default(""), bulk_action_group_ids: z.array(z.string()).default([]), worker_queue: z.string().default(""), + region: z.string().default(""), max_duration_in_seconds: z.number().int().nullish(), trigger_source: z.string().default(""), root_trigger_source: z.string().default(""), @@ -108,6 +109,7 @@ export const TASK_RUN_COLUMNS = [ "concurrency_key", "bulk_action_group_ids", "worker_queue", + "region", "max_duration_in_seconds", "trigger_source", "root_trigger_source", @@ -175,6 +177,7 @@ export type TaskRunFieldTypes = { concurrency_key: string; bulk_action_group_ids: string[]; worker_queue: string; + region: string; max_duration_in_seconds: number | null; trigger_source: string; root_trigger_source: string; @@ -313,6 +316,7 @@ export type TaskRunInsertArray = [ concurrency_key: string, bulk_action_group_ids: string[], worker_queue: string, + region: string, max_duration_in_seconds: number | null, trigger_source: string, root_trigger_source: string, diff --git a/internal-packages/database/prisma/migrations/20260615210000_add_worker_instance_group_region/migration.sql b/internal-packages/database/prisma/migrations/20260615210000_add_worker_instance_group_region/migration.sql new file mode 100644 index 00000000000..2ff22825549 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260615210000_add_worker_instance_group_region/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "WorkerInstanceGroup" ADD COLUMN IF NOT EXISTS "region" TEXT; diff --git a/internal-packages/database/prisma/migrations/20260615220000_add_task_run_region/migration.sql b/internal-packages/database/prisma/migrations/20260615220000_add_task_run_region/migration.sql new file mode 100644 index 00000000000..e30bb37484a --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260615220000_add_task_run_region/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "TaskRun" ADD COLUMN IF NOT EXISTS "region" TEXT; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 337a6059ebd..2aeb27e3038 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -915,6 +915,9 @@ model TaskRun { /// The main queue that this run is part of workerQueue String @default("main") @map("masterQueue") + /// User-facing geo region, stamped at trigger; workerQueue is where it actually ran. + region String? + /// @deprecated secondaryMasterQueue String? @@ -1532,6 +1535,9 @@ model WorkerInstanceGroup { workloadType WorkloadType @default(CONTAINER) + /// Geo region; container + MICROVM groups for one geo share it. Set-once once it has runs. + region String? + /// When true, runs enqueued to this worker queue may skip the intermediate queue /// and be pushed directly to the worker queue when concurrency is available. enableFastPath Boolean @default(false) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 2b434a86eec..84941560a59 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -689,6 +689,7 @@ export class RunEngine { cliVersion, concurrencyKey, workerQueue, + region, enableFastPath, queue, lockedQueueId, @@ -857,6 +858,7 @@ export class RunEngine { queue, lockedQueueId, workerQueue, + region, isTest, delayUntil, queuedAt, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 0077478318b..98c722e8c74 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -247,6 +247,7 @@ export type TriggerParams = { cliVersion?: string; concurrencyKey?: string; workerQueue?: string; + region?: string; /** When true, the run queue may push directly to the worker queue if concurrency is available. * Gated per WorkerInstanceGroup (production) or always true (development). */ enableFastPath?: boolean;