diff --git a/.server-changes/dequeue-region-gate.md b/.server-changes/dequeue-region-gate.md new file mode 100644 index 00000000000..d4f9d6979c4 --- /dev/null +++ b/.server-changes/dequeue-region-gate.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Add a `RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES` setting that refuses worker dequeue requests for the listed worker queues (or base regions), so their runs stay queued instead of being handed to workers that can't run them. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index f793654dd3a..f2c11aba415 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -809,6 +809,7 @@ const EnvironmentSchema = z RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000), RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200), RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10), + RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES: z.string().optional(), RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000), RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000), RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/runEngine/concerns/dequeueGate.server.ts b/apps/webapp/app/runEngine/concerns/dequeueGate.server.ts new file mode 100644 index 00000000000..1068cd4c19d --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/dequeueGate.server.ts @@ -0,0 +1,29 @@ +import { getMeter } from "@internal/tracing"; +import { env } from "~/env.server"; +import { + baseWorkerQueue, + matchesDisabledWorkerQueue, + parseDisabledWorkerQueues, +} from "./workerQueueSplit.server"; + +const meter = getMeter("run-engine-dequeue-gate"); + +const blockedDequeueCounter = meter.createCounter("run_engine.dequeue.blocked", { + description: + "Count of worker dequeue requests refused because the worker queue is gated off via RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES", +}); + +const disabledWorkerQueues = parseDisabledWorkerQueues( + env.RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES +); + +export function isWorkerQueueDequeueDisabled(workerQueue: string): boolean { + return matchesDisabledWorkerQueue(workerQueue, disabledWorkerQueues); +} + +export function recordBlockedDequeue(workerQueue: string): void { + blockedDequeueCounter.add(1, { + worker_queue: workerQueue, + region: baseWorkerQueue(workerQueue), + }); +} diff --git a/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts b/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts index a2bf5df87b3..c309e79e02e 100644 --- a/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts +++ b/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts @@ -122,3 +122,25 @@ export function workerQueueForClass( return masterQueue; } + +export function parseDisabledWorkerQueues(raw: string | undefined): Set { + return new Set( + (raw ?? "") + .split(",") + .map((entry) => entry.trim()) + .filter(Boolean) + ); +} + +export function matchesDisabledWorkerQueue( + workerQueue: string, + disabledWorkerQueues: ReadonlySet +): boolean { + if (disabledWorkerQueues.size === 0) { + return false; + } + + return ( + disabledWorkerQueues.has(workerQueue) || disabledWorkerQueues.has(baseWorkerQueue(workerQueue)) + ); +} diff --git a/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts b/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts index cb4a90cc597..7f52e32d5c3 100644 --- a/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts +++ b/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts @@ -28,6 +28,10 @@ import { singleton } from "~/utils/singleton"; import { resolveVariablesForEnvironment } from "~/v3/environmentVariables/environmentVariablesRepository.server"; import { machinePresetFromName } from "~/v3/machinePresets.server"; import { workerQueueForClass } from "~/runEngine/concerns/workerQueueSplit.server"; +import { + isWorkerQueueDequeueDisabled, + recordBlockedDequeue, +} from "~/runEngine/concerns/dequeueGate.server"; import { WithRunEngine, WithRunEngineOptions } from "../baseService.server"; const authenticatedWorkerInstanceCache = singleton( @@ -377,11 +381,16 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { runnerId?: string; queueClass?: WorkerQueueClass; }): Promise { - // Derive the actual queue from this worker's own masterQueue + class, so a - // token can only ever reach its own region's queues (default or :scheduled). + const workerQueue = workerQueueForClass(this.masterQueue, queueClass); + + if (isWorkerQueueDequeueDisabled(workerQueue)) { + recordBlockedDequeue(workerQueue); + return []; + } + return await this._engine.dequeueFromWorkerQueue({ consumerId: this.workerInstanceId, - workerQueue: workerQueueForClass(this.masterQueue, queueClass), + workerQueue, workerId: this.workerInstanceId, runnerId, }); diff --git a/apps/webapp/test/workerQueueSplit.server.test.ts b/apps/webapp/test/workerQueueSplit.server.test.ts new file mode 100644 index 00000000000..c8182b83105 --- /dev/null +++ b/apps/webapp/test/workerQueueSplit.server.test.ts @@ -0,0 +1,44 @@ +import { describe, expect, it } from "vitest"; +import { + matchesDisabledWorkerQueue, + parseDisabledWorkerQueues, +} from "~/runEngine/concerns/workerQueueSplit.server"; + +describe("parseDisabledWorkerQueues", () => { + it("returns an empty set for undefined or empty input", () => { + expect(parseDisabledWorkerQueues(undefined).size).toBe(0); + expect(parseDisabledWorkerQueues("").size).toBe(0); + expect(parseDisabledWorkerQueues(" , ,").size).toBe(0); + }); + + it("splits, trims, and drops empties", () => { + const parsed = parseDisabledWorkerQueues(" eu-central-1 , us-east-1:scheduled ,, "); + expect([...parsed]).toEqual(["eu-central-1", "us-east-1:scheduled"]); + }); +}); + +describe("matchesDisabledWorkerQueue", () => { + it("never matches when the disabled set is empty", () => { + const empty = parseDisabledWorkerQueues(undefined); + expect(matchesDisabledWorkerQueue("eu-central-1", empty)).toBe(false); + expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", empty)).toBe(false); + }); + + it("gates the base region and its scheduled split when the base region is listed", () => { + const disabled = parseDisabledWorkerQueues("eu-central-1"); + expect(matchesDisabledWorkerQueue("eu-central-1", disabled)).toBe(true); + expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", disabled)).toBe(true); + }); + + it("leaves other regions alone", () => { + const disabled = parseDisabledWorkerQueues("eu-central-1"); + expect(matchesDisabledWorkerQueue("us-east-1", disabled)).toBe(false); + expect(matchesDisabledWorkerQueue("us-east-1:scheduled", disabled)).toBe(false); + }); + + it("gates only the scheduled split when a full worker queue is listed", () => { + const disabled = parseDisabledWorkerQueues("eu-central-1:scheduled"); + expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", disabled)).toBe(true); + expect(matchesDisabledWorkerQueue("eu-central-1", disabled)).toBe(false); + }); +});