From 8321d69fd7b48d6cf43b359d25bbac70eaac840d Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 24 Jun 2026 12:29:36 +0100 Subject: [PATCH 1/4] feat(run-engine,webapp): always report worker queue length metrics The runqueue.workerQueue.length gauge only observed worker queues that a dequeue had registered, so a queue's depth stopped being reported once dequeues stopped (or was never reported for a queue that backed up before anything dequeued from it). A periodic observer now refreshes the observed set from the WorkerInstanceGroup records instead, so every active worker queue (and its scheduled split variant) keeps reporting its length regardless of dequeue activity. Off by default; enable per-service via an env var. Reads from the replica and skips configured cloud providers. --- .../worker-queue-length-always-reported.md | 6 + apps/webapp/app/env.server.ts | 6 + apps/webapp/app/v3/runEngine.server.ts | 13 ++ .../run-engine/src/engine/index.ts | 97 +++++++++++- .../tests/workerQueueObservation.test.ts | 145 ++++++++++++++++++ .../run-engine/src/engine/types.ts | 25 +++ .../run-engine/src/run-queue/index.ts | 10 ++ 7 files changed, 299 insertions(+), 3 deletions(-) create mode 100644 .server-changes/worker-queue-length-always-reported.md create mode 100644 internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts diff --git a/.server-changes/worker-queue-length-always-reported.md b/.server-changes/worker-queue-length-always-reported.md new file mode 100644 index 00000000000..ccce193d9f5 --- /dev/null +++ b/.server-changes/worker-queue-length-always-reported.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Optionally report worker queue length metrics continuously (enabled per-service via the RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED env var) so a queue's depth keeps being emitted even when nothing is dequeuing from it. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index f793654dd3a..9cc9b18229e 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -810,6 +810,12 @@ const EnvironmentSchema = z RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200), RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10), RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000), + // Off by default. Enable on a single service (e.g. the engine worker) so only one + // instance reports worker queue length, rather than every replica. + RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED: z.string().default("0"), + RUN_ENGINE_WORKER_QUEUE_OBSERVER_INTERVAL_MS: z.coerce.number().int().default(30_000), + // Comma-separated cloud providers to exclude from worker queue length observation. + RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS: z.string().default("digitalocean"), RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000), RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10), RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index e2c8ad85e94..06cb591a0b4 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -2,6 +2,7 @@ import { RunEngine } from "@internal/run-engine"; import { $replica, prisma } from "~/db.server"; import { env } from "~/env.server"; import { createBatchGlobalRateLimiter } from "~/runEngine/concerns/batchGlobalRateLimiter.server"; +import { SCHEDULED_WORKER_QUEUE_SUFFIX } from "~/runEngine/concerns/workerQueueSplit.server"; import { logger } from "~/services/logger.server"; import { defaultMachine, getCurrentPlan } from "~/services/platform.v3.server"; import { singleton } from "~/utils/singleton"; @@ -121,6 +122,18 @@ function createRunEngine() { }, tracer, meter, + workerQueueObserver: { + enabled: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED === "1", + intervalMs: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_INTERVAL_MS, + // Also observe the scheduled split variant of each worker queue. The suffix + // naming convention lives in the webapp, so it is passed in here. + additionalQueueSuffixes: [SCHEDULED_WORKER_QUEUE_SUFFIX], + excludedCloudProviders: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS.split( + "," + ) + .map((provider) => provider.trim()) + .filter(Boolean), + }, defaultMaxTtl: env.RUN_ENGINE_DEFAULT_MAX_TTL, heartbeatTimeoutsMs: { PENDING_EXECUTING: env.RUN_ENGINE_TIMEOUT_PENDING_EXECUTING, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index a6a20b5b9fd..a4685c30d95 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -33,7 +33,7 @@ import { import { Worker } from "@trigger.dev/redis-worker"; import { assertNever } from "assert-never"; import { EventEmitter } from "node:events"; -import { setTimeout } from "node:timers/promises"; +import { setInterval, setTimeout } from "node:timers/promises"; import { BatchQueue } from "../batch-queue/index.js"; import type { BatchItem, @@ -100,6 +100,7 @@ export class RunEngine { private heartbeatTimeouts: HeartbeatTimeouts; private repairSnapshotTimeoutMs: number; private batchQueue: BatchQueue; + private workerQueueObserverAbortController?: AbortController; prisma: PrismaClient; readOnlyPrisma: PrismaReplicaClient; @@ -474,6 +475,90 @@ export class RunEngine { machines: this.options.machines, billingCache: this.billingCache, }); + + this.#startWorkerQueueObserver(); + } + + /** + * Refreshes the set of worker queues observed by the `runqueue.workerQueue.length` + * gauge from the WorkerInstanceGroup records, so the gauge reports each worker queue's + * length even when nothing is dequeuing from it. Includes hidden groups; excludes + * groups whose cloud provider is configured to be excluded (groups with no cloud + * provider are always included). + */ + async refreshWorkerQueueObservation() { + const suffixes = this.options.workerQueueObserver?.additionalQueueSuffixes ?? []; + const excludedCloudProviders = new Set( + (this.options.workerQueueObserver?.excludedCloudProviders ?? []).map((p) => p.toLowerCase()) + ); + + // Read from the replica: this is a periodic metrics-only read and worker groups change + // rarely, so a little replication lag is fine and keeps it off the primary. + const workerGroups = await this.readOnlyPrisma.workerInstanceGroup.findMany({ + select: { masterQueue: true, cloudProvider: true }, + }); + + const workerQueues: string[] = []; + + for (const { masterQueue, cloudProvider } of workerGroups) { + if (cloudProvider && excludedCloudProviders.has(cloudProvider.toLowerCase())) { + continue; + } + + workerQueues.push(masterQueue); + + for (const suffix of suffixes) { + workerQueues.push(`${masterQueue}${suffix}`); + } + } + + this.runQueue.setObservableWorkerQueues(workerQueues); + } + + #startWorkerQueueObserver() { + if (!this.options.workerQueueObserver?.enabled) { + return; + } + + const intervalMs = this.options.workerQueueObserver.intervalMs ?? 30_000; + this.workerQueueObserverAbortController = new AbortController(); + + this.#runWorkerQueueObserver( + intervalMs, + this.workerQueueObserverAbortController.signal + ).catch((error) => { + this.logger.error("Worker queue observer loop crashed", { + error: error instanceof Error ? error.message : String(error), + }); + }); + } + + async #runWorkerQueueObserver(intervalMs: number, signal: AbortSignal) { + const refresh = async () => { + try { + await this.refreshWorkerQueueObservation(); + } catch (error) { + this.logger.error("Failed to refresh worker queue observation", { + error: error instanceof Error ? error.message : String(error), + }); + } + }; + + // Refresh once immediately so a freshly started instance reports queue lengths + // without waiting for the first interval, then keep it fresh on an interval. + await refresh(); + + try { + for await (const _ of setInterval(intervalMs, null, { signal })) { + await refresh(); + } + } catch (error) { + if (error instanceof Error && error.name !== "AbortError") { + throw error; + } + + this.logger.debug("Worker queue observer stopped"); + } } //MARK: - Run functions @@ -1322,8 +1407,11 @@ export class RunEngine { blockingPop?: boolean; blockingPopTimeoutSeconds?: number; }): Promise { - if (!skipObserving) { - // We only do this with "prod" worker queues because we don't want to observe dev (e.g. environment) worker queues + // We only do this with "prod" worker queues because we don't want to observe dev (e.g. + // environment) worker queues. When the worker queue observer is enabled it is the source + // of truth for the observed set (and applies the cloud-provider exclusions), so the + // per-dequeue registration is skipped. + if (!skipObserving && !this.options.workerQueueObserver?.enabled) { this.runQueue.registerObservableWorkerQueue(workerQueue); } @@ -2061,6 +2149,9 @@ export class RunEngine { async quit() { try { + // stop the worker queue observer loop + this.workerQueueObserverAbortController?.abort(); + //stop the run queue await this.runQueue.quit(); await this.worker.stop(); diff --git a/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts new file mode 100644 index 00000000000..a7af72183fd --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts @@ -0,0 +1,145 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; +import { createTestMetricsMeter } from "./helpers/replicaTestHelpers.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +const WORKER_QUEUE_LENGTH_METRIC = "runqueue.workerQueue.length"; +const WORKER_QUEUE_ATTRIBUTE = "runqueue.workerQueue"; + +describe("RunEngine worker queue observation", () => { + containerTest( + "reports worker queue length from WorkerInstanceGroup records without any dequeue", + async ({ prisma, redisOptions }) => { + const { meter, getCounterValue } = createTestMetricsMeter(); + + // Seeds a MANAGED WorkerInstanceGroup with masterQueue "default" (no cloud provider). + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // A hidden worker group should still be observed. + await prisma.workerInstanceGroup.create({ + data: { + name: "hidden-region", + masterQueue: "hidden-region", + type: "MANAGED", + hidden: true, + cloudProvider: "aws", + token: { create: { tokenHash: "hidden_region_token_hash" } }, + }, + }); + + // A DigitalOcean worker group should be excluded from observation. + await prisma.workerInstanceGroup.create({ + data: { + name: "do-region", + masterQueue: "do-region", + type: "MANAGED", + hidden: true, + cloudProvider: "digitalocean", + token: { create: { tokenHash: "do_region_token_hash" } }, + }, + }); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + workerQueueObserver: { + enabled: true, + intervalMs: 60_000, + additionalQueueSuffixes: [":scheduled"], + excludedCloudProviders: ["digitalocean"], + }, + }); + + const enqueueTo = async (workerQueue: string, count: number, prefix: string) => { + for (let i = 0; i < count; i++) { + await engine.runQueue.enqueueMessage({ + env: authenticatedEnvironment, + message: { + runId: `${prefix}_${i}`, + taskIdentifier: "task/my-task", + orgId: authenticatedEnvironment.organization.id, + projectId: authenticatedEnvironment.project.id, + environmentId: authenticatedEnvironment.id, + environmentType: "PRODUCTION", + queue: "task/my-task", + timestamp: Date.now(), + attempt: 0, + }, + workerQueue, + skipDequeueProcessing: true, + }); + } + }; + + const lengthOf = (workerQueue: string) => + getCounterValue(WORKER_QUEUE_LENGTH_METRIC, { + [WORKER_QUEUE_ATTRIBUTE]: workerQueue, + }); + + try { + // Keep the total under the environment concurrency limit (10) so every message moves + // into its worker queue list (processMasterQueueForEnvironment is concurrency-gated). + const defaultBacklog = 3; + const scheduledBacklog = 2; + const hiddenBacklog = 2; + const doBacklog = 2; + + // Build a backlog across several worker queues, then move them into the worker queue + // lists, but never dequeue. + await enqueueTo("default", defaultBacklog, "r_default"); + await enqueueTo("default:scheduled", scheduledBacklog, "r_scheduled"); + await enqueueTo("hidden-region", hiddenBacklog, "r_hidden"); + await enqueueTo("do-region", doBacklog, "r_do"); + await engine.runQueue.processMasterQueueForEnvironment( + authenticatedEnvironment.id, + defaultBacklog + scheduledBacklog + hiddenBacklog + doBacklog + ); + + // Observe the worker queues derived from the WorkerInstanceGroup records. No dequeue + // has happened, so this is the only thing that registers them for observation. + await engine.refreshWorkerQueueObservation(); + + // Reported: the default queue, its scheduled split variant, and the hidden group. + expect(await lengthOf("default")).toBe(defaultBacklog); + expect(await lengthOf("default:scheduled")).toBe(scheduledBacklog); + expect(await lengthOf("hidden-region")).toBe(hiddenBacklog); + + // Excluded: the DigitalOcean group is not observed even though it has a backlog. + expect(await lengthOf("do-region")).toBe(0); + } finally { + await engine.quit(); + } + } + ); +}); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 98c722e8c74..a65eca347c6 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -211,6 +211,31 @@ export type RunEngineOptions = { * the since snapshot is not yet on the replica, before falling back to the primary. * Set maxMs to 0 (or any value <= 0) to skip the replica retry and go straight to the primary. */ readReplicaSnapshotsSinceRetryDelay?: { minMs: number; maxMs: number }; + /** + * Periodically refreshes the set of worker queues observed by the + * `runqueue.workerQueue.length` gauge from the WorkerInstanceGroup records, so the + * gauge reports every active worker queue's length even when this instance is not + * dequeuing from them (a dequeue is otherwise the only thing that registers a worker + * queue for observation). When enabled the observer is the source of truth for the + * observed set, so the per-dequeue registration is skipped. Disabled by default; the + * server enables it. + */ + workerQueueObserver?: { + enabled?: boolean; + /** How often to refresh the observed worker queue set from the database (ms). Default: 30_000. */ + intervalMs?: number; + /** + * Extra suffix variants to also observe for each worker queue, e.g. the scheduled + * split queue suffix. The suffix value lives with the caller that owns the naming + * convention rather than in the engine. Default: []. + */ + additionalQueueSuffixes?: string[]; + /** + * Worker groups whose `cloudProvider` is in this list are not observed. Groups with + * no `cloudProvider` are always observed. Matched case-insensitively. Default: []. + */ + excludedCloudProviders?: string[]; + }; tracer: Tracer; meter?: Meter; logger?: Logger; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index c695cacad07..9808f96f41a 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -318,6 +318,16 @@ export class RunQueue { this._observableWorkerQueues.add(workerQueue); } + /** + * Replaces the full set of worker queues observed by the `runqueue.workerQueue.length` + * gauge. Used by a periodic observer that derives the set from the current worker + * groups, so the observed set stays correct (and prunes queues that no longer exist) + * independent of dequeue activity. + */ + public setObservableWorkerQueues(workerQueues: string[]) { + this._observableWorkerQueues = new Set(workerQueues); + } + async #updateWorkerQueueLength(observableResult: ObservableResult) { for (const workerQueue of this._observableWorkerQueues) { const workerQueueLength = await this.redis.llen(this.keys.workerQueueKey(workerQueue)); From ae8531300660f64f37c6de88d6d24d69ad7a9d53 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 24 Jun 2026 12:29:36 +0100 Subject: [PATCH 2/4] chore(webapp): remove unused worker group management API endpoints The GET and POST /api/v1/workers endpoints backed a CLI command group that is no longer registered, so they had no reachable consumer. Remove them. --- .../remove-worker-create-endpoint.md | 6 ++ apps/webapp/app/routes/api.v1.workers.ts | 73 ------------------- 2 files changed, 6 insertions(+), 73 deletions(-) create mode 100644 .server-changes/remove-worker-create-endpoint.md delete mode 100644 apps/webapp/app/routes/api.v1.workers.ts diff --git a/.server-changes/remove-worker-create-endpoint.md b/.server-changes/remove-worker-create-endpoint.md new file mode 100644 index 00000000000..dd7c2041876 --- /dev/null +++ b/.server-changes/remove-worker-create-endpoint.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: breaking +--- + +Remove the unused worker group management API endpoints (GET and POST /api/v1/workers). diff --git a/apps/webapp/app/routes/api.v1.workers.ts b/apps/webapp/app/routes/api.v1.workers.ts deleted file mode 100644 index 4008d64f1a9..00000000000 --- a/apps/webapp/app/routes/api.v1.workers.ts +++ /dev/null @@ -1,73 +0,0 @@ -import { json, TypedResponse } from "@remix-run/server-runtime"; -import { - WorkersCreateRequestBody, - WorkersCreateResponseBody, - WorkersListResponseBody, -} from "@trigger.dev/core/v3"; -import { - createActionApiRoute, - createLoaderApiRoute, -} from "~/services/routeBuilders/apiBuilder.server"; -import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.server"; - -export const loader = createLoaderApiRoute( - { - corsStrategy: "all", - findResource: async () => 1, // This is a dummy function, we don't need to find a resource - }, - async ({ - authentication, - }): Promise> => { - if (authentication.environment.project.engine !== "V2") { - return json({ error: "Not supported for V1 projects" }, { status: 400 }); - } - - const service = new WorkerGroupService(); - const workers = await service.listWorkerGroups({ - projectId: authentication.environment.projectId, - }); - - return json( - workers.map((w) => ({ - type: w.type, - name: w.name, - description: w.description, - isDefault: w.id === authentication.environment.project.defaultWorkerGroupId, - updatedAt: w.updatedAt, - })) - ); - } -); - -export const { action } = createActionApiRoute( - { - corsStrategy: "all", - body: WorkersCreateRequestBody, - }, - async ({ - authentication, - body, - }): Promise> => { - if (authentication.environment.project.engine !== "V2") { - return json({ error: "Not supported" }, { status: 400 }); - } - - const service = new WorkerGroupService(); - const { workerGroup, token } = await service.createWorkerGroup({ - projectId: authentication.environment.projectId, - organizationId: authentication.environment.organizationId, - name: body.name, - description: body.description, - }); - - return json({ - token: { - plaintext: token.plaintext, - }, - workerGroup: { - name: workerGroup.name, - description: workerGroup.description, - }, - }); - } -); From 9c5633bdab074c79c3839ff46891f328cb9b720a Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 24 Jun 2026 13:36:22 +0100 Subject: [PATCH 3/4] test(run-engine): keep worker queue observation test lean to avoid CI timeout Disable the execution workers and batch consumers in the worker queue observation test. It only needs enqueue + processMasterQueue + the observer gauge, and the extra workers add Redis connections and make engine.quit() hang on worker shutdown when the shard's Redis is under pressure, timing the test out in CI. --- .../src/engine/tests/workerQueueObservation.test.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts index a7af72183fd..7e84db8781c 100644 --- a/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts +++ b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts @@ -45,17 +45,22 @@ describe("RunEngine worker queue observation", () => { const engine = new RunEngine({ prisma, + // This test only exercises enqueue + processMasterQueue + the observer gauge, so keep + // the engine lean: no execution workers or batch consumers to start up and tear down. worker: { redis: redisOptions, - workers: 1, - tasksPerWorker: 10, - pollIntervalMs: 100, + disabled: true, + shutdownTimeoutMs: 2000, }, queue: { redis: redisOptions, masterQueueConsumersDisabled: true, processWorkerQueueDebounceMs: 50, }, + batchQueue: { + redis: redisOptions, + consumerEnabled: false, + }, runLock: { redis: redisOptions, }, From 6495e99486c7558f8461fae1f910185cc71f4a71 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 24 Jun 2026 18:18:59 +0100 Subject: [PATCH 4/4] fix(run-engine): observe only managed worker groups in the queue-length gauge --- .../run-engine/src/engine/index.ts | 6 ++++++ .../tests/workerQueueObservation.test.ts | 19 +++++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index a4685c30d95..5a733e0c1c2 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -485,6 +485,11 @@ export class RunEngine { * length even when nothing is dequeuing from it. Includes hidden groups; excludes * groups whose cloud provider is configured to be excluded (groups with no cloud * provider are always included). + * + * Only MANAGED groups are observed. UNMANAGED groups are created per project + * (masterQueue `-`), so observing them would grow the set, and the + * per-tick Redis fanout, with the number of self-hosted-worker projects rather than + * with the managed regions this gauge is meant to track. */ async refreshWorkerQueueObservation() { const suffixes = this.options.workerQueueObserver?.additionalQueueSuffixes ?? []; @@ -495,6 +500,7 @@ export class RunEngine { // Read from the replica: this is a periodic metrics-only read and worker groups change // rarely, so a little replication lag is fine and keeps it off the primary. const workerGroups = await this.readOnlyPrisma.workerInstanceGroup.findMany({ + where: { type: "MANAGED" }, select: { masterQueue: true, cloudProvider: true }, }); diff --git a/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts index 7e84db8781c..ffa63699474 100644 --- a/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts +++ b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts @@ -43,6 +43,16 @@ describe("RunEngine worker queue observation", () => { }, }); + // An UNMANAGED (per-project, self-hosted) worker group should not be observed. + await prisma.workerInstanceGroup.create({ + data: { + name: "unmanaged-region", + masterQueue: "unmanaged-region", + type: "UNMANAGED", + token: { create: { tokenHash: "unmanaged_region_token_hash" } }, + }, + }); + const engine = new RunEngine({ prisma, // This test only exercises enqueue + processMasterQueue + the observer gauge, so keep @@ -118,7 +128,8 @@ describe("RunEngine worker queue observation", () => { const defaultBacklog = 3; const scheduledBacklog = 2; const hiddenBacklog = 2; - const doBacklog = 2; + const doBacklog = 1; + const unmanagedBacklog = 1; // Build a backlog across several worker queues, then move them into the worker queue // lists, but never dequeue. @@ -126,9 +137,10 @@ describe("RunEngine worker queue observation", () => { await enqueueTo("default:scheduled", scheduledBacklog, "r_scheduled"); await enqueueTo("hidden-region", hiddenBacklog, "r_hidden"); await enqueueTo("do-region", doBacklog, "r_do"); + await enqueueTo("unmanaged-region", unmanagedBacklog, "r_unmanaged"); await engine.runQueue.processMasterQueueForEnvironment( authenticatedEnvironment.id, - defaultBacklog + scheduledBacklog + hiddenBacklog + doBacklog + defaultBacklog + scheduledBacklog + hiddenBacklog + doBacklog + unmanagedBacklog ); // Observe the worker queues derived from the WorkerInstanceGroup records. No dequeue @@ -142,6 +154,9 @@ describe("RunEngine worker queue observation", () => { // Excluded: the DigitalOcean group is not observed even though it has a backlog. expect(await lengthOf("do-region")).toBe(0); + + // Excluded: the UNMANAGED (per-project) group is not observed even with a backlog. + expect(await lengthOf("unmanaged-region")).toBe(0); } finally { await engine.quit(); }