Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/dequeue-region-gate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add a `RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES` setting that refuses worker dequeue requests for the listed worker queues (or base regions), so their runs stay queued instead of being handed to workers that can't run them.
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ const EnvironmentSchema = z
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES: z.string().optional(),
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000),
RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000),
RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10),
Expand Down
29 changes: 29 additions & 0 deletions apps/webapp/app/runEngine/concerns/dequeueGate.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { getMeter } from "@internal/tracing";
import { env } from "~/env.server";
import {
baseWorkerQueue,
matchesDisabledWorkerQueue,
parseDisabledWorkerQueues,
} from "./workerQueueSplit.server";

const meter = getMeter("run-engine-dequeue-gate");

const blockedDequeueCounter = meter.createCounter("run_engine.dequeue.blocked", {
description:
"Count of worker dequeue requests refused because the worker queue is gated off via RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES",
});

const disabledWorkerQueues = parseDisabledWorkerQueues(
env.RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES
);

export function isWorkerQueueDequeueDisabled(workerQueue: string): boolean {
return matchesDisabledWorkerQueue(workerQueue, disabledWorkerQueues);
}

export function recordBlockedDequeue(workerQueue: string): void {
blockedDequeueCounter.add(1, {
worker_queue: workerQueue,
region: baseWorkerQueue(workerQueue),
});
}
22 changes: 22 additions & 0 deletions apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,25 @@ export function workerQueueForClass(

return masterQueue;
}

export function parseDisabledWorkerQueues(raw: string | undefined): Set<string> {
return new Set(
(raw ?? "")
.split(",")
.map((entry) => entry.trim())
.filter(Boolean)
);
}

export function matchesDisabledWorkerQueue(
workerQueue: string,
disabledWorkerQueues: ReadonlySet<string>
): boolean {
if (disabledWorkerQueues.size === 0) {
return false;
}

return (
disabledWorkerQueues.has(workerQueue) || disabledWorkerQueues.has(baseWorkerQueue(workerQueue))
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import { singleton } from "~/utils/singleton";
import { resolveVariablesForEnvironment } from "~/v3/environmentVariables/environmentVariablesRepository.server";
import { machinePresetFromName } from "~/v3/machinePresets.server";
import { workerQueueForClass } from "~/runEngine/concerns/workerQueueSplit.server";
import {
isWorkerQueueDequeueDisabled,
recordBlockedDequeue,
} from "~/runEngine/concerns/dequeueGate.server";
import { WithRunEngine, WithRunEngineOptions } from "../baseService.server";

const authenticatedWorkerInstanceCache = singleton(
Expand Down Expand Up @@ -377,11 +381,16 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
runnerId?: string;
queueClass?: WorkerQueueClass;
}): Promise<DequeuedMessage[]> {
// Derive the actual queue from this worker's own masterQueue + class, so a
// token can only ever reach its own region's queues (default or :scheduled).
const workerQueue = workerQueueForClass(this.masterQueue, queueClass);

if (isWorkerQueueDequeueDisabled(workerQueue)) {
recordBlockedDequeue(workerQueue);
return [];
}

return await this._engine.dequeueFromWorkerQueue({
consumerId: this.workerInstanceId,
workerQueue: workerQueueForClass(this.masterQueue, queueClass),
workerQueue,
workerId: this.workerInstanceId,
runnerId,
});
Expand Down
44 changes: 44 additions & 0 deletions apps/webapp/test/workerQueueSplit.server.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { describe, expect, it } from "vitest";
import {
matchesDisabledWorkerQueue,
parseDisabledWorkerQueues,
} from "~/runEngine/concerns/workerQueueSplit.server";

describe("parseDisabledWorkerQueues", () => {
it("returns an empty set for undefined or empty input", () => {
expect(parseDisabledWorkerQueues(undefined).size).toBe(0);
expect(parseDisabledWorkerQueues("").size).toBe(0);
expect(parseDisabledWorkerQueues(" , ,").size).toBe(0);
});

it("splits, trims, and drops empties", () => {
const parsed = parseDisabledWorkerQueues(" eu-central-1 , us-east-1:scheduled ,, ");
expect([...parsed]).toEqual(["eu-central-1", "us-east-1:scheduled"]);
});
});

describe("matchesDisabledWorkerQueue", () => {
it("never matches when the disabled set is empty", () => {
const empty = parseDisabledWorkerQueues(undefined);
expect(matchesDisabledWorkerQueue("eu-central-1", empty)).toBe(false);
expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", empty)).toBe(false);
});

it("gates the base region and its scheduled split when the base region is listed", () => {
const disabled = parseDisabledWorkerQueues("eu-central-1");
expect(matchesDisabledWorkerQueue("eu-central-1", disabled)).toBe(true);
expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", disabled)).toBe(true);
});

it("leaves other regions alone", () => {
const disabled = parseDisabledWorkerQueues("eu-central-1");
expect(matchesDisabledWorkerQueue("us-east-1", disabled)).toBe(false);
expect(matchesDisabledWorkerQueue("us-east-1:scheduled", disabled)).toBe(false);
});

it("gates only the scheduled split when a full worker queue is listed", () => {
const disabled = parseDisabledWorkerQueues("eu-central-1:scheduled");
expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", disabled)).toBe(true);
expect(matchesDisabledWorkerQueue("eu-central-1", disabled)).toBe(false);
});
});