diff --git a/.server-changes/remove-worker-create-endpoint.md b/.server-changes/remove-worker-create-endpoint.md new file mode 100644 index 00000000000..dd7c2041876 --- /dev/null +++ b/.server-changes/remove-worker-create-endpoint.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: breaking +--- + +Remove the unused worker group management API endpoints (GET and POST /api/v1/workers). diff --git a/.server-changes/worker-queue-length-always-reported.md b/.server-changes/worker-queue-length-always-reported.md new file mode 100644 index 00000000000..ccce193d9f5 --- /dev/null +++ b/.server-changes/worker-queue-length-always-reported.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Optionally report worker queue length metrics continuously (enabled per-service via the RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED env var) so a queue's depth keeps being emitted even when nothing is dequeuing from it. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index f793654dd3a..9cc9b18229e 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -810,6 +810,12 @@ const EnvironmentSchema = z RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200), RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10), RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000), + // Off by default. Enable on a single service (e.g. the engine worker) so only one + // instance reports worker queue length, rather than every replica. + RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED: z.string().default("0"), + RUN_ENGINE_WORKER_QUEUE_OBSERVER_INTERVAL_MS: z.coerce.number().int().default(30_000), + // Comma-separated cloud providers to exclude from worker queue length observation. + RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS: z.string().default("digitalocean"), RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000), RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10), RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/routes/api.v1.workers.ts b/apps/webapp/app/routes/api.v1.workers.ts deleted file mode 100644 index 4008d64f1a9..00000000000 --- a/apps/webapp/app/routes/api.v1.workers.ts +++ /dev/null @@ -1,73 +0,0 @@ -import { json, TypedResponse } from "@remix-run/server-runtime"; -import { - WorkersCreateRequestBody, - WorkersCreateResponseBody, - WorkersListResponseBody, -} from "@trigger.dev/core/v3"; -import { - createActionApiRoute, - createLoaderApiRoute, -} from "~/services/routeBuilders/apiBuilder.server"; -import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.server"; - -export const loader = createLoaderApiRoute( - { - corsStrategy: "all", - findResource: async () => 1, // This is a dummy function, we don't need to find a resource - }, - async ({ - authentication, - }): Promise> => { - if (authentication.environment.project.engine !== "V2") { - return json({ error: "Not supported for V1 projects" }, { status: 400 }); - } - - const service = new WorkerGroupService(); - const workers = await service.listWorkerGroups({ - projectId: authentication.environment.projectId, - }); - - return json( - workers.map((w) => ({ - type: w.type, - name: w.name, - description: w.description, - isDefault: w.id === authentication.environment.project.defaultWorkerGroupId, - updatedAt: w.updatedAt, - })) - ); - } -); - -export const { action } = createActionApiRoute( - { - corsStrategy: "all", - body: WorkersCreateRequestBody, - }, - async ({ - authentication, - body, - }): Promise> => { - if (authentication.environment.project.engine !== "V2") { - return json({ error: "Not supported" }, { status: 400 }); - } - - const service = new WorkerGroupService(); - const { workerGroup, token } = await service.createWorkerGroup({ - projectId: authentication.environment.projectId, - organizationId: authentication.environment.organizationId, - name: body.name, - description: body.description, - }); - - return json({ - token: { - plaintext: token.plaintext, - }, - workerGroup: { - name: workerGroup.name, - description: workerGroup.description, - }, - }); - } -); diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index e2c8ad85e94..06cb591a0b4 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -2,6 +2,7 @@ import { RunEngine } from "@internal/run-engine"; import { $replica, prisma } from "~/db.server"; import { env } from "~/env.server"; import { createBatchGlobalRateLimiter } from "~/runEngine/concerns/batchGlobalRateLimiter.server"; +import { SCHEDULED_WORKER_QUEUE_SUFFIX } from "~/runEngine/concerns/workerQueueSplit.server"; import { logger } from "~/services/logger.server"; import { defaultMachine, getCurrentPlan } from "~/services/platform.v3.server"; import { singleton } from "~/utils/singleton"; @@ -121,6 +122,18 @@ function createRunEngine() { }, tracer, meter, + workerQueueObserver: { + enabled: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_ENABLED === "1", + intervalMs: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_INTERVAL_MS, + // Also observe the scheduled split variant of each worker queue. The suffix + // naming convention lives in the webapp, so it is passed in here. + additionalQueueSuffixes: [SCHEDULED_WORKER_QUEUE_SUFFIX], + excludedCloudProviders: env.RUN_ENGINE_WORKER_QUEUE_OBSERVER_EXCLUDED_CLOUD_PROVIDERS.split( + "," + ) + .map((provider) => provider.trim()) + .filter(Boolean), + }, defaultMaxTtl: env.RUN_ENGINE_DEFAULT_MAX_TTL, heartbeatTimeoutsMs: { PENDING_EXECUTING: env.RUN_ENGINE_TIMEOUT_PENDING_EXECUTING, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index a6a20b5b9fd..5a733e0c1c2 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -33,7 +33,7 @@ import { import { Worker } from "@trigger.dev/redis-worker"; import { assertNever } from "assert-never"; import { EventEmitter } from "node:events"; -import { setTimeout } from "node:timers/promises"; +import { setInterval, setTimeout } from "node:timers/promises"; import { BatchQueue } from "../batch-queue/index.js"; import type { BatchItem, @@ -100,6 +100,7 @@ export class RunEngine { private heartbeatTimeouts: HeartbeatTimeouts; private repairSnapshotTimeoutMs: number; private batchQueue: BatchQueue; + private workerQueueObserverAbortController?: AbortController; prisma: PrismaClient; readOnlyPrisma: PrismaReplicaClient; @@ -474,6 +475,96 @@ export class RunEngine { machines: this.options.machines, billingCache: this.billingCache, }); + + this.#startWorkerQueueObserver(); + } + + /** + * Refreshes the set of worker queues observed by the `runqueue.workerQueue.length` + * gauge from the WorkerInstanceGroup records, so the gauge reports each worker queue's + * length even when nothing is dequeuing from it. Includes hidden groups; excludes + * groups whose cloud provider is configured to be excluded (groups with no cloud + * provider are always included). + * + * Only MANAGED groups are observed. UNMANAGED groups are created per project + * (masterQueue `-`), so observing them would grow the set, and the + * per-tick Redis fanout, with the number of self-hosted-worker projects rather than + * with the managed regions this gauge is meant to track. + */ + async refreshWorkerQueueObservation() { + const suffixes = this.options.workerQueueObserver?.additionalQueueSuffixes ?? []; + const excludedCloudProviders = new Set( + (this.options.workerQueueObserver?.excludedCloudProviders ?? []).map((p) => p.toLowerCase()) + ); + + // Read from the replica: this is a periodic metrics-only read and worker groups change + // rarely, so a little replication lag is fine and keeps it off the primary. + const workerGroups = await this.readOnlyPrisma.workerInstanceGroup.findMany({ + where: { type: "MANAGED" }, + select: { masterQueue: true, cloudProvider: true }, + }); + + const workerQueues: string[] = []; + + for (const { masterQueue, cloudProvider } of workerGroups) { + if (cloudProvider && excludedCloudProviders.has(cloudProvider.toLowerCase())) { + continue; + } + + workerQueues.push(masterQueue); + + for (const suffix of suffixes) { + workerQueues.push(`${masterQueue}${suffix}`); + } + } + + this.runQueue.setObservableWorkerQueues(workerQueues); + } + + #startWorkerQueueObserver() { + if (!this.options.workerQueueObserver?.enabled) { + return; + } + + const intervalMs = this.options.workerQueueObserver.intervalMs ?? 30_000; + this.workerQueueObserverAbortController = new AbortController(); + + this.#runWorkerQueueObserver( + intervalMs, + this.workerQueueObserverAbortController.signal + ).catch((error) => { + this.logger.error("Worker queue observer loop crashed", { + error: error instanceof Error ? error.message : String(error), + }); + }); + } + + async #runWorkerQueueObserver(intervalMs: number, signal: AbortSignal) { + const refresh = async () => { + try { + await this.refreshWorkerQueueObservation(); + } catch (error) { + this.logger.error("Failed to refresh worker queue observation", { + error: error instanceof Error ? error.message : String(error), + }); + } + }; + + // Refresh once immediately so a freshly started instance reports queue lengths + // without waiting for the first interval, then keep it fresh on an interval. + await refresh(); + + try { + for await (const _ of setInterval(intervalMs, null, { signal })) { + await refresh(); + } + } catch (error) { + if (error instanceof Error && error.name !== "AbortError") { + throw error; + } + + this.logger.debug("Worker queue observer stopped"); + } } //MARK: - Run functions @@ -1322,8 +1413,11 @@ export class RunEngine { blockingPop?: boolean; blockingPopTimeoutSeconds?: number; }): Promise { - if (!skipObserving) { - // We only do this with "prod" worker queues because we don't want to observe dev (e.g. environment) worker queues + // We only do this with "prod" worker queues because we don't want to observe dev (e.g. + // environment) worker queues. When the worker queue observer is enabled it is the source + // of truth for the observed set (and applies the cloud-provider exclusions), so the + // per-dequeue registration is skipped. + if (!skipObserving && !this.options.workerQueueObserver?.enabled) { this.runQueue.registerObservableWorkerQueue(workerQueue); } @@ -2061,6 +2155,9 @@ export class RunEngine { async quit() { try { + // stop the worker queue observer loop + this.workerQueueObserverAbortController?.abort(); + //stop the run queue await this.runQueue.quit(); await this.worker.stop(); diff --git a/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts new file mode 100644 index 00000000000..ffa63699474 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/workerQueueObservation.test.ts @@ -0,0 +1,165 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; +import { createTestMetricsMeter } from "./helpers/replicaTestHelpers.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +const WORKER_QUEUE_LENGTH_METRIC = "runqueue.workerQueue.length"; +const WORKER_QUEUE_ATTRIBUTE = "runqueue.workerQueue"; + +describe("RunEngine worker queue observation", () => { + containerTest( + "reports worker queue length from WorkerInstanceGroup records without any dequeue", + async ({ prisma, redisOptions }) => { + const { meter, getCounterValue } = createTestMetricsMeter(); + + // Seeds a MANAGED WorkerInstanceGroup with masterQueue "default" (no cloud provider). + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // A hidden worker group should still be observed. + await prisma.workerInstanceGroup.create({ + data: { + name: "hidden-region", + masterQueue: "hidden-region", + type: "MANAGED", + hidden: true, + cloudProvider: "aws", + token: { create: { tokenHash: "hidden_region_token_hash" } }, + }, + }); + + // A DigitalOcean worker group should be excluded from observation. + await prisma.workerInstanceGroup.create({ + data: { + name: "do-region", + masterQueue: "do-region", + type: "MANAGED", + hidden: true, + cloudProvider: "digitalocean", + token: { create: { tokenHash: "do_region_token_hash" } }, + }, + }); + + // An UNMANAGED (per-project, self-hosted) worker group should not be observed. + await prisma.workerInstanceGroup.create({ + data: { + name: "unmanaged-region", + masterQueue: "unmanaged-region", + type: "UNMANAGED", + token: { create: { tokenHash: "unmanaged_region_token_hash" } }, + }, + }); + + const engine = new RunEngine({ + prisma, + // This test only exercises enqueue + processMasterQueue + the observer gauge, so keep + // the engine lean: no execution workers or batch consumers to start up and tear down. + worker: { + redis: redisOptions, + disabled: true, + shutdownTimeoutMs: 2000, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + batchQueue: { + redis: redisOptions, + consumerEnabled: false, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + workerQueueObserver: { + enabled: true, + intervalMs: 60_000, + additionalQueueSuffixes: [":scheduled"], + excludedCloudProviders: ["digitalocean"], + }, + }); + + const enqueueTo = async (workerQueue: string, count: number, prefix: string) => { + for (let i = 0; i < count; i++) { + await engine.runQueue.enqueueMessage({ + env: authenticatedEnvironment, + message: { + runId: `${prefix}_${i}`, + taskIdentifier: "task/my-task", + orgId: authenticatedEnvironment.organization.id, + projectId: authenticatedEnvironment.project.id, + environmentId: authenticatedEnvironment.id, + environmentType: "PRODUCTION", + queue: "task/my-task", + timestamp: Date.now(), + attempt: 0, + }, + workerQueue, + skipDequeueProcessing: true, + }); + } + }; + + const lengthOf = (workerQueue: string) => + getCounterValue(WORKER_QUEUE_LENGTH_METRIC, { + [WORKER_QUEUE_ATTRIBUTE]: workerQueue, + }); + + try { + // Keep the total under the environment concurrency limit (10) so every message moves + // into its worker queue list (processMasterQueueForEnvironment is concurrency-gated). + const defaultBacklog = 3; + const scheduledBacklog = 2; + const hiddenBacklog = 2; + const doBacklog = 1; + const unmanagedBacklog = 1; + + // Build a backlog across several worker queues, then move them into the worker queue + // lists, but never dequeue. + await enqueueTo("default", defaultBacklog, "r_default"); + await enqueueTo("default:scheduled", scheduledBacklog, "r_scheduled"); + await enqueueTo("hidden-region", hiddenBacklog, "r_hidden"); + await enqueueTo("do-region", doBacklog, "r_do"); + await enqueueTo("unmanaged-region", unmanagedBacklog, "r_unmanaged"); + await engine.runQueue.processMasterQueueForEnvironment( + authenticatedEnvironment.id, + defaultBacklog + scheduledBacklog + hiddenBacklog + doBacklog + unmanagedBacklog + ); + + // Observe the worker queues derived from the WorkerInstanceGroup records. No dequeue + // has happened, so this is the only thing that registers them for observation. + await engine.refreshWorkerQueueObservation(); + + // Reported: the default queue, its scheduled split variant, and the hidden group. + expect(await lengthOf("default")).toBe(defaultBacklog); + expect(await lengthOf("default:scheduled")).toBe(scheduledBacklog); + expect(await lengthOf("hidden-region")).toBe(hiddenBacklog); + + // Excluded: the DigitalOcean group is not observed even though it has a backlog. + expect(await lengthOf("do-region")).toBe(0); + + // Excluded: the UNMANAGED (per-project) group is not observed even with a backlog. + expect(await lengthOf("unmanaged-region")).toBe(0); + } finally { + await engine.quit(); + } + } + ); +}); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 98c722e8c74..a65eca347c6 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -211,6 +211,31 @@ export type RunEngineOptions = { * the since snapshot is not yet on the replica, before falling back to the primary. * Set maxMs to 0 (or any value <= 0) to skip the replica retry and go straight to the primary. */ readReplicaSnapshotsSinceRetryDelay?: { minMs: number; maxMs: number }; + /** + * Periodically refreshes the set of worker queues observed by the + * `runqueue.workerQueue.length` gauge from the WorkerInstanceGroup records, so the + * gauge reports every active worker queue's length even when this instance is not + * dequeuing from them (a dequeue is otherwise the only thing that registers a worker + * queue for observation). When enabled the observer is the source of truth for the + * observed set, so the per-dequeue registration is skipped. Disabled by default; the + * server enables it. + */ + workerQueueObserver?: { + enabled?: boolean; + /** How often to refresh the observed worker queue set from the database (ms). Default: 30_000. */ + intervalMs?: number; + /** + * Extra suffix variants to also observe for each worker queue, e.g. the scheduled + * split queue suffix. The suffix value lives with the caller that owns the naming + * convention rather than in the engine. Default: []. + */ + additionalQueueSuffixes?: string[]; + /** + * Worker groups whose `cloudProvider` is in this list are not observed. Groups with + * no `cloudProvider` are always observed. Matched case-insensitively. Default: []. + */ + excludedCloudProviders?: string[]; + }; tracer: Tracer; meter?: Meter; logger?: Logger; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index c695cacad07..9808f96f41a 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -318,6 +318,16 @@ export class RunQueue { this._observableWorkerQueues.add(workerQueue); } + /** + * Replaces the full set of worker queues observed by the `runqueue.workerQueue.length` + * gauge. Used by a periodic observer that derives the set from the current worker + * groups, so the observed set stays correct (and prunes queues that no longer exist) + * independent of dequeue activity. + */ + public setObservableWorkerQueues(workerQueues: string[]) { + this._observableWorkerQueues = new Set(workerQueues); + } + async #updateWorkerQueueLength(observableResult: ObservableResult) { for (const workerQueue of this._observableWorkerQueues) { const workerQueueLength = await this.redis.llen(this.keys.workerQueueKey(workerQueue));