diff --git a/.server-changes/supervisor-pod-count-backpressure.md b/.server-changes/supervisor-pod-count-backpressure.md new file mode 100644 index 00000000000..f45411b04a5 --- /dev/null +++ b/.server-changes/supervisor-pod-count-backpressure.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: feature +--- + +The supervisor can pause dequeuing when the Kubernetes cluster is saturated, based on the cluster's total pod count. Opt-in and off by default. diff --git a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts new file mode 100644 index 00000000000..8857372a042 --- /dev/null +++ b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts @@ -0,0 +1,95 @@ +import { describe, it, expect } from "vitest"; +import { parsePodCount, K8sPodCountSignalSource } from "./k8sPodCountSignalSource.js"; + +describe("parsePodCount", () => { + it("reads the pods object count", () => { + const text = [ + "# HELP apiserver_storage_objects Number of stored objects", + "# TYPE apiserver_storage_objects gauge", + 'apiserver_storage_objects{resource="pods"} 8421', + 'apiserver_storage_objects{resource="configmaps"} 17', + ].join("\n"); + expect(parsePodCount(text)).toBe(8421); + }); + + it("is tolerant of extra labels in any order", () => { + const text = 'apiserver_storage_objects{group="",resource="pods",extra="x"} 12'; + expect(parsePodCount(text)).toBe(12); + }); + + it("parses scientific notation", () => { + const text = 'apiserver_storage_objects{resource="pods"} 1.2e+04'; + expect(parsePodCount(text)).toBe(12000); + }); + + it("throws when the pods metric is absent", () => { + const text = 'apiserver_storage_objects{resource="configmaps"} 17'; + expect(() => parsePodCount(text)).toThrow(/not found/); + }); + + it("throws on a non-finite value (e.g. 1e999)", () => { + const text = 'apiserver_storage_objects{resource="pods"} 1e999'; + expect(() => parsePodCount(text)).toThrow(); + }); + + it("throws on a negative value", () => { + const text = 'apiserver_storage_objects{resource="pods"} -5'; + expect(() => parsePodCount(text)).toThrow(); + }); +}); + +function metrics(count: number): string { + return `apiserver_storage_objects{resource="pods"} ${count}`; +} + +describe("K8sPodCountSignalSource", () => { + it("engages at the engage threshold and reports the count", async () => { + const counts: number[] = []; + const source = new K8sPodCountSignalSource({ + fetchMetrics: async () => metrics(10000), + engageThreshold: 10000, + releaseThreshold: 5000, + reportPodCount: (c) => counts.push(c), + }); + const verdict = await source.read(); + expect(verdict.engaged).toBe(true); + expect(typeof verdict.ts).toBe("number"); + expect(counts).toEqual([10000]); + }); + + it("does not engage below the engage threshold", async () => { + const source = new K8sPodCountSignalSource({ + fetchMetrics: async () => metrics(9999), + engageThreshold: 10000, + releaseThreshold: 5000, + }); + expect((await source.read()).engaged).toBe(false); + }); + + it("stays engaged in the hysteresis band, releases only below release threshold", async () => { + let count = 10000; + const source = new K8sPodCountSignalSource({ + fetchMetrics: async () => metrics(count), + engageThreshold: 10000, + releaseThreshold: 5000, + }); + expect((await source.read()).engaged).toBe(true); // engage + count = 7000; + expect((await source.read()).engaged).toBe(true); // band -> still engaged + count = 4999; + expect((await source.read()).engaged).toBe(false); // below release -> off + count = 7000; + expect((await source.read()).engaged).toBe(false); // band again -> stays off + }); + + it("propagates scrape failures (monitor fails open on throw)", async () => { + const source = new K8sPodCountSignalSource({ + fetchMetrics: async () => { + throw new Error("connection refused"); + }, + engageThreshold: 10000, + releaseThreshold: 5000, + }); + await expect(source.read()).rejects.toThrow("connection refused"); + }); +}); diff --git a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts new file mode 100644 index 00000000000..e1fa0b78b11 --- /dev/null +++ b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts @@ -0,0 +1,46 @@ +import type { BackpressureSignalSource, BackpressureVerdict } from "./backpressureMonitor.js"; + +// Reads the apiserver's stored-pod-object count from a Prometheus /metrics scrape. +const POD_COUNT_RE = /^apiserver_storage_objects\{[^}]*resource="pods"[^}]*\}\s+([0-9.eE+]+)/m; + +export function parsePodCount(metricsText: string): number { + const match = metricsText.match(POD_COUNT_RE); + if (!match) { + throw new Error('apiserver_storage_objects{resource="pods"} not found in metrics'); + } + const value = Number(match[1]); + if (!Number.isFinite(value)) { + throw new Error(`unparseable pod count: ${match[1]}`); + } + return value; +} + +export type K8sPodCountSignalSourceOptions = { + fetchMetrics: () => Promise; + engageThreshold: number; + releaseThreshold: number; + reportPodCount?: (count: number) => void; +}; + +// Engage/release with hysteresis so a count hovering near the line doesn't flap. +export class K8sPodCountSignalSource implements BackpressureSignalSource { + private engaged = false; + + constructor(private readonly opts: K8sPodCountSignalSourceOptions) {} + + async read(): Promise { + const text = await this.opts.fetchMetrics(); + const count = parsePodCount(text); + this.opts.reportPodCount?.(count); + + if (this.engaged) { + if (count < this.opts.releaseThreshold) { + this.engaged = false; + } + } else if (count >= this.opts.engageThreshold) { + this.engaged = true; + } + + return { engaged: this.engaged, ts: Date.now() }; + } +} diff --git a/apps/supervisor/src/clients/kubernetes.ts b/apps/supervisor/src/clients/kubernetes.ts index f66e57e4353..129ff32b6ec 100644 --- a/apps/supervisor/src/clients/kubernetes.ts +++ b/apps/supervisor/src/clients/kubernetes.ts @@ -1,3 +1,4 @@ +import * as https from "node:https"; import * as k8s from "@kubernetes/client-node"; import { Informer } from "@kubernetes/client-node"; import { ListPromise } from "@kubernetes/client-node"; @@ -53,3 +54,57 @@ function getKubeConfig() { } export { k8s }; + +/** + * Builds a function that scrapes the apiserver's Prometheus /metrics endpoint. + * One lightweight aggregate read - not a pod listing. Requires the service + * account to be granted GET on the /metrics non-resource URL. + */ +export function createApiserverMetricsFetcher(timeoutMs: number): () => Promise { + const kubeConfig = getKubeConfig(); + + return async () => { + const cluster = kubeConfig.getCurrentCluster(); + if (!cluster) { + throw new Error("no current cluster in kubeconfig"); + } + const url = new URL(`${cluster.server}/metrics`); + const opts: https.RequestOptions = { + method: "GET", + protocol: url.protocol, + hostname: url.hostname, + port: url.port, + path: url.pathname, + }; + // applyToHTTPSOptions sets the cluster CA, client cert/key, and auth headers + // (incl. exec plugins) on the request - so TLS verifies against the cluster + // CA, not the system store. The fetch-options path attaches the CA as an + // https.Agent, which global fetch (undici) ignores. + await kubeConfig.applyToHTTPSOptions(opts); + + return new Promise((resolve, reject) => { + const req = https.request(opts, (res) => { + const status = res.statusCode ?? 0; + let body = ""; + res.setEncoding("utf8"); + res.on("data", (chunk) => { + body += chunk; + }); + res.on("end", () => { + if (status >= 200 && status < 300) { + resolve(body); + } else { + reject(new Error(`apiserver /metrics scrape failed: ${status}`)); + } + }); + }); + // Without this a hung connect/TLS/read never settles, and the monitor's + // refreshInFlight guard would freeze the source (silent fail-open). + req.setTimeout(timeoutMs, () => { + req.destroy(new Error(`apiserver /metrics scrape timed out after ${timeoutMs}ms`)); + }); + req.on("error", reject); + req.end(); + }); + }; +} diff --git a/apps/supervisor/src/env.test.ts b/apps/supervisor/src/env.test.ts new file mode 100644 index 00000000000..b02d117513c --- /dev/null +++ b/apps/supervisor/src/env.test.ts @@ -0,0 +1,64 @@ +import { describe, it, expect, vi } from "vitest"; + +// Mock std-env before importing env.ts so the module-level `Env.parse(stdEnv)` +// doesn't fail in a test environment that lacks required vars. +vi.mock("std-env", () => ({ + env: { + TRIGGER_API_URL: "http://localhost:3030", + TRIGGER_WORKER_TOKEN: "test-token", + MANAGED_WORKER_SECRET: "test-secret", + OTEL_EXPORTER_OTLP_ENDPOINT: "http://localhost:4318", + }, +})); + +const { Env } = await import("./env.js"); + +// Minimal env that satisfies all required fields; everything else has defaults. +const base = { + TRIGGER_API_URL: "http://localhost:3030", + TRIGGER_WORKER_TOKEN: "test-token", + MANAGED_WORKER_SECRET: "test-secret", + OTEL_EXPORTER_OTLP_ENDPOINT: "http://localhost:4318", +}; + +describe("Env superRefine - backpressure source awareness", () => { + it("pod-count source can be enabled without a Redis host", () => { + expect(() => + Env.parse({ + ...base, + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: "true", + }) + ).not.toThrow(); + }); + + it("redis source requires a Redis host", () => { + expect(() => + Env.parse({ + ...base, + TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true", + }) + ).toThrow(); + }); + + it("both sources can be enabled together (with a Redis host)", () => { + expect(() => + Env.parse({ + ...base, + TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true", + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST: "localhost", + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: "true", + }) + ).not.toThrow(); + }); + + it("rejects pod-count release >= engage when the source is enabled", () => { + expect(() => + Env.parse({ + ...base, + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: "true", + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: "100", + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: "100", + }) + ).toThrow(); + }); +}); diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 99d440820a7..10e8f9b62b3 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -3,7 +3,7 @@ import { env as stdEnv } from "std-env"; import { z } from "zod"; import { AdditionalEnvVars, BoolEnv } from "./envUtil.js"; -const Env = z +export const Env = z .object({ // This will come from `spec.nodeName` in k8s TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), @@ -73,6 +73,18 @@ const Env = z TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME: z.string().optional(), TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD: z.string().optional(), TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED: BoolEnv.default(false), + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: BoolEnv.default(false), + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_DRY_RUN: BoolEnv.default(true), + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: z.coerce.number().int().positive().default(10_000), + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: z.coerce.number().int().positive().default(5_000), + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS: z.coerce.number().int().positive().default(5_000), + // Hard timeout on the apiserver /metrics scrape. A hung request would otherwise + // never settle and freeze the monitor's refresh loop (fail-open silently). + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_SCRAPE_TIMEOUT_MS: z.coerce + .number() + .int() + .positive() + .default(10_000), // Optional services TRIGGER_WARM_START_URL: z.string().optional(), @@ -312,6 +324,18 @@ const Env = z TRIGGER_WIDE_EVENTS_NOISY_ROUTES: BoolEnv.default(false), }) .superRefine((data, ctx) => { + if ( + data.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED && + data.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >= + data.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE + ) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: + "TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE must be less than TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE", + path: ["TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE"], + }); + } if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) { ctx.addIssue({ code: z.ZodIssueCode.custom, diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 647549da7d1..d2bc82c13b9 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -20,8 +20,8 @@ import { CheckpointClient, isKubernetesEnvironment, } from "@trigger.dev/core/v3/serverOnly"; -import { createK8sApi } from "./clients/kubernetes.js"; -import { collectDefaultMetrics, Histogram } from "prom-client"; +import { createK8sApi, createApiserverMetricsFetcher } from "./clients/kubernetes.js"; +import { collectDefaultMetrics, Gauge, Histogram } from "prom-client"; import { register } from "./metrics.js"; import { PodCleaner } from "./services/podCleaner.js"; import { FailedPodHandler } from "./services/failedPodHandler.js"; @@ -36,6 +36,7 @@ import { Redis } from "ioredis"; import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js"; import { RedisBackpressureSignalSource } from "./backpressure/redisBackpressureSignalSource.js"; import { BackpressureMetrics } from "./backpressure/backpressureMetrics.js"; +import { K8sPodCountSignalSource } from "./backpressure/k8sPodCountSignalSource.js"; import { fromContext, recordPhaseSince, @@ -72,7 +73,7 @@ class ManagedSupervisor { private readonly podCleaner?: PodCleaner; private readonly failedPodHandler?: FailedPodHandler; private readonly tracing?: OtlpTraceService; - private readonly backpressureMonitor?: BackpressureMonitor; + private readonly backpressureMonitors: BackpressureMonitor[] = []; private readonly backpressureRedis?: Redis; private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED); @@ -213,6 +214,7 @@ class ManagedSupervisor { ); } + // Redis-verdict source (external aggregator). Keeps existing metric names. if (env.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED) { this.backpressureRedis = new Redis({ host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST, @@ -225,27 +227,61 @@ class ManagedSupervisor { this.backpressureRedis.on("error", (error) => this.logger.error("Backpressure redis error", { error: error.message }) ); - - this.backpressureMonitor = new BackpressureMonitor({ - enabled: true, - source: new RedisBackpressureSignalSource( - this.backpressureRedis, - env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY - ), + this.backpressureMonitors.push( + new BackpressureMonitor({ + enabled: true, + source: new RedisBackpressureSignalSource( + this.backpressureRedis, + env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY + ), + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, + maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, + rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, + logger: this.logger, + metrics: new BackpressureMetrics({ register }), + }) + ); + this.logger.log("🛑 Dequeue backpressure enabled (redis source)", { + key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY, refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, - maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, - rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, - logger: this.logger, - metrics: new BackpressureMetrics({ register }), }); + } - this.logger.log("🛑 Dequeue backpressure enabled", { - key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY, - refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, - maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, - rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, - dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, + // Pod-count source (in-process apiserver scrape). Namespaced metrics so the + // redis source's metric names are preserved. + if (env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED) { + // RELEASE < ENGAGE is enforced in env.ts (superRefine), so it's valid here. + const podCountGauge = new Gauge({ + name: "supervisor_cluster_pod_count", + help: "Total pod objects stored in the cluster, scraped for backpressure", + registers: [register], + }); + this.backpressureMonitors.push( + new BackpressureMonitor({ + enabled: true, + source: new K8sPodCountSignalSource({ + fetchMetrics: createApiserverMetricsFetcher( + env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_SCRAPE_TIMEOUT_MS + ), + engageThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE, + releaseThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE, + reportPodCount: (count) => podCountGauge.set(count), + }), + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS, + maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, + rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_DRY_RUN, + logger: this.logger, + metrics: new BackpressureMetrics({ register, prefix: "supervisor_backpressure_pod_count" }), + }) + ); + this.logger.log("🛑 Dequeue backpressure enabled (pod-count source)", { + engage: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE, + release: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE, + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_DRY_RUN, }); } @@ -272,14 +308,14 @@ class ManagedSupervisor { dampingFactor: env.TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR, // Freeze scale-up while backpressure is hard-engaged (not during the resume // ramp). Undefined when backpressure is disabled → no effect on scaling. - shouldPauseScaling: () => this.backpressureMonitor?.isEngaged() ?? false, + shouldPauseScaling: () => this.backpressureMonitors.some((m) => m.isEngaged()), }, runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS, sendRunDebugLogs: env.SEND_RUN_DEBUG_LOGS, preDequeue: async () => { - // Synchronous, hot-path-safe cached read; undefined when backpressure is disabled. - const skipForBackpressure = this.backpressureMonitor?.shouldSkipDequeue() ?? false; + // Synchronous, hot-path-safe cached read; false when no monitors are active. + const skipForBackpressure = this.backpressureMonitors.some((m) => m.shouldSkipDequeue()); if (!env.RESOURCE_MONITOR_ENABLED || this.isKubernetes) { // Resource monitor is not used in k8s; backpressure is the only gate there. @@ -674,7 +710,7 @@ class ManagedSupervisor { this.logger.log("Starting up"); // Optional services - this.backpressureMonitor?.start(); + this.backpressureMonitors.forEach((m) => m.start()); await this.podCleaner?.start(); await this.failedPodHandler?.start(); await this.metricsServer?.start(); @@ -702,7 +738,7 @@ class ManagedSupervisor { await this.workerSession.stop(); // Optional services - this.backpressureMonitor?.stop(); + this.backpressureMonitors.forEach((m) => m.stop()); await this.backpressureRedis?.quit(); await this.podCleaner?.stop(); await this.failedPodHandler?.stop();