From f75b741c939d5ea661957cd7569c959771dc19d2 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 20 Jun 2024 20:04:00 +0100 Subject: [PATCH 1/2] add basic uptime heartbeat --- apps/kubernetes-provider/src/index.ts | 18 ++ .../src/uptimeHeartbeat.ts | 218 ++++++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 apps/kubernetes-provider/src/uptimeHeartbeat.ts diff --git a/apps/kubernetes-provider/src/index.ts b/apps/kubernetes-provider/src/index.ts index a310f63c01b..9f3aad9aed6 100644 --- a/apps/kubernetes-provider/src/index.ts +++ b/apps/kubernetes-provider/src/index.ts @@ -16,13 +16,18 @@ import { import { randomUUID } from "crypto"; import { TaskMonitor } from "./taskMonitor"; import { PodCleaner } from "./podCleaner"; +import { UptimeHeartbeat } from "./uptimeHeartbeat"; const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local"; const NODE_NAME = process.env.NODE_NAME || "local"; const OTEL_EXPORTER_OTLP_ENDPOINT = process.env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318"; + const POD_CLEANER_INTERVAL_SECONDS = Number(process.env.POD_CLEANER_INTERVAL_SECONDS || "300"); +const UPTIME_HEARTBEAT_URL = process.env.UPTIME_HEARTBEAT_URL; +const UPTIME_INTERVAL_SECONDS = Number(process.env.UPTIME_INTERVAL_SECONDS || "30"); + const logger = new SimpleLogger(`[${NODE_NAME}]`); logger.log(`running in ${RUNTIME_ENV} mode`); @@ -565,3 +570,16 @@ const podCleaner = new PodCleaner({ }); podCleaner.start(); + +if (UPTIME_HEARTBEAT_URL) { + const uptimeHeartbeat = new UptimeHeartbeat({ + runtimeEnv: RUNTIME_ENV, + namespace: "default", + intervalInSeconds: UPTIME_INTERVAL_SECONDS, + pingUrl: UPTIME_HEARTBEAT_URL, + }); + + uptimeHeartbeat.start(); +} else { + logger.log("Uptime heartbeat is disabled, set UPTIME_HEARTBEAT_URL to enable."); +} diff --git a/apps/kubernetes-provider/src/uptimeHeartbeat.ts b/apps/kubernetes-provider/src/uptimeHeartbeat.ts new file mode 100644 index 00000000000..41d9c89386b --- /dev/null +++ b/apps/kubernetes-provider/src/uptimeHeartbeat.ts @@ -0,0 +1,218 @@ +import * as k8s from "@kubernetes/client-node"; +import { SimpleLogger } from "@trigger.dev/core-apps"; + +type UptimeHeartbeatOptions = { + runtimeEnv: "local" | "kubernetes"; + pingUrl: string; + namespace?: string; + intervalInSeconds?: number; + maxPendingRuns?: number; + leadingEdge?: boolean; +}; + +export class UptimeHeartbeat { + private enabled = false; + private namespace = "default"; + private intervalInSeconds = 30; + private maxPendingRuns = 25; + private leadingEdge = true; + + private logger = new SimpleLogger("[UptimeHeartbeat]"); + private k8sClient: { + core: k8s.CoreV1Api; + kubeConfig: k8s.KubeConfig; + }; + + constructor(private opts: UptimeHeartbeatOptions) { + if (opts.namespace) { + this.namespace = opts.namespace; + } + + if (opts.intervalInSeconds) { + this.intervalInSeconds = opts.intervalInSeconds; + } + + if (opts.maxPendingRuns) { + this.maxPendingRuns = opts.maxPendingRuns; + } + + this.k8sClient = this.#createK8sClient(); + } + + #createK8sClient() { + const kubeConfig = new k8s.KubeConfig(); + + if (this.opts.runtimeEnv === "local") { + kubeConfig.loadFromDefault(); + } else if (this.opts.runtimeEnv === "kubernetes") { + kubeConfig.loadFromCluster(); + } else { + throw new Error(`Unsupported runtime environment: ${this.opts.runtimeEnv}`); + } + + return { + core: kubeConfig.makeApiClient(k8s.CoreV1Api), + kubeConfig: kubeConfig, + }; + } + + #isRecord(candidate: unknown): candidate is Record { + if (typeof candidate !== "object" || candidate === null) { + return false; + } else { + return true; + } + } + + #logK8sError(err: unknown, debugOnly = false) { + if (debugOnly) { + this.logger.debug("K8s API Error", err); + } else { + this.logger.error("K8s API Error", err); + } + } + + #handleK8sError(err: unknown) { + if (!this.#isRecord(err) || !this.#isRecord(err.body)) { + this.#logK8sError(err); + return; + } + + this.#logK8sError(err, true); + + if (typeof err.body.message === "string") { + this.#logK8sError({ message: err.body.message }); + return; + } + + this.#logK8sError({ body: err.body }); + } + + async #getTotalPods(opts: { + namespace: string; + fieldSelector?: string; + labelSelector?: string; + }): Promise { + const listReturn = await this.k8sClient.core + .listNamespacedPod( + opts.namespace, + undefined, // pretty + undefined, // allowWatchBookmarks + undefined, // _continue + opts.fieldSelector, + opts.labelSelector, + this.maxPendingRuns * 2, // limit + undefined, // resourceVersion + undefined, // resourceVersionMatch + undefined, // sendInitialEvents + this.intervalInSeconds, // timeoutSeconds, + undefined // watch + ) + .catch(this.#handleK8sError.bind(this)); + + if (!listReturn) { + this.logger.error("Failed to get pods", { opts }); + return 0; + } + + return listReturn.body.items.length; + } + + async #getTotalPendingTasks(): Promise { + return await this.#getTotalPods({ + namespace: this.namespace, + fieldSelector: "status.phase=Pending", + labelSelector: "app=task-run", + }); + } + + async #sendPing() { + this.logger.log("Sending ping"); + + const start = Date.now(); + const controller = new AbortController(); + + const timeoutMs = (this.intervalInSeconds * 1000) / 2; + + const fetchTimeout = setTimeout(() => { + controller.abort(); + }, timeoutMs); + + try { + const response = await fetch(this.opts.pingUrl, { + signal: controller.signal, + }); + + if (!response.ok) { + this.logger.error("Failed to send ping, response not OK", { + status: response.status, + }); + return; + } + + const elapsedMs = Date.now() - start; + this.logger.log("Ping sent", { elapsedMs }); + } catch (error) { + if (error instanceof DOMException && error.name === "AbortError") { + this.logger.log("Ping timeout", { timeoutSeconds: timeoutMs }); + return; + } + + this.logger.error("Failed to send ping", error); + } finally { + clearTimeout(fetchTimeout); + } + } + + async #heartbeat() { + this.logger.log("Performing heartbeat"); + + const start = Date.now(); + + const totalPendingTasks = await this.#getTotalPendingTasks(); + + const elapsedMs = Date.now() - start; + + this.logger.log("Finished heartbeat checks", { elapsedMs }); + + if (totalPendingTasks > this.maxPendingRuns) { + this.logger.log("Too many pending tasks, skipping heartbeat", { totalPendingTasks }); + return; + } + + await this.#sendPing(); + + this.logger.log("Heartbeat done", { totalPendingTasks, elapsedMs }); + } + + async start() { + this.enabled = true; + this.logger.log("Starting"); + + if (this.leadingEdge) { + await this.#heartbeat(); + } + + const heartbeat = setInterval(async () => { + if (!this.enabled) { + clearInterval(heartbeat); + return; + } + + try { + await this.#heartbeat(); + } catch (error) { + this.logger.error("Error while heartbeating", error); + } + }, this.intervalInSeconds * 1000); + } + + async stop() { + if (!this.enabled) { + return; + } + + this.enabled = false; + this.logger.log("Shutting down.."); + } +} From 16a0bc8ba701775ed6006812c25c831ac9552bf8 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 2 Jul 2024 12:24:51 +0100 Subject: [PATCH 2/2] add more heartbeat metrics --- apps/kubernetes-provider/src/index.ts | 8 +- .../src/uptimeHeartbeat.ts | 100 ++++++++++++++---- 2 files changed, 84 insertions(+), 24 deletions(-) diff --git a/apps/kubernetes-provider/src/index.ts b/apps/kubernetes-provider/src/index.ts index 9516845da37..9859c66979c 100644 --- a/apps/kubernetes-provider/src/index.ts +++ b/apps/kubernetes-provider/src/index.ts @@ -26,7 +26,10 @@ const OTEL_EXPORTER_OTLP_ENDPOINT = const POD_CLEANER_INTERVAL_SECONDS = Number(process.env.POD_CLEANER_INTERVAL_SECONDS || "300"); const UPTIME_HEARTBEAT_URL = process.env.UPTIME_HEARTBEAT_URL; -const UPTIME_INTERVAL_SECONDS = Number(process.env.UPTIME_INTERVAL_SECONDS || "30"); +const UPTIME_INTERVAL_SECONDS = Number(process.env.UPTIME_INTERVAL_SECONDS || "60"); +const UPTIME_MAX_PENDING_RUNS = Number(process.env.UPTIME_MAX_PENDING_RUNS || "25"); +const UPTIME_MAX_PENDING_INDECES = Number(process.env.UPTIME_MAX_PENDING_INDECES || "10"); +const UPTIME_MAX_PENDING_ERRORS = Number(process.env.UPTIME_MAX_PENDING_ERRORS || "10"); const logger = new SimpleLogger(`[${NODE_NAME}]`); logger.log(`running in ${RUNTIME_ENV} mode`); @@ -578,6 +581,9 @@ if (UPTIME_HEARTBEAT_URL) { namespace: "default", intervalInSeconds: UPTIME_INTERVAL_SECONDS, pingUrl: UPTIME_HEARTBEAT_URL, + maxPendingRuns: UPTIME_MAX_PENDING_RUNS, + maxPendingIndeces: UPTIME_MAX_PENDING_INDECES, + maxPendingErrors: UPTIME_MAX_PENDING_ERRORS, }); uptimeHeartbeat.start(); diff --git a/apps/kubernetes-provider/src/uptimeHeartbeat.ts b/apps/kubernetes-provider/src/uptimeHeartbeat.ts index 41d9c89386b..2ddee20ae14 100644 --- a/apps/kubernetes-provider/src/uptimeHeartbeat.ts +++ b/apps/kubernetes-provider/src/uptimeHeartbeat.ts @@ -7,14 +7,20 @@ type UptimeHeartbeatOptions = { namespace?: string; intervalInSeconds?: number; maxPendingRuns?: number; + maxPendingIndeces?: number; + maxPendingErrors?: number; leadingEdge?: boolean; }; export class UptimeHeartbeat { private enabled = false; - private namespace = "default"; - private intervalInSeconds = 30; - private maxPendingRuns = 25; + private namespace: string; + + private intervalInSeconds: number; + private maxPendingRuns: number; + private maxPendingIndeces: number; + private maxPendingErrors: number; + private leadingEdge = true; private logger = new SimpleLogger("[UptimeHeartbeat]"); @@ -24,17 +30,12 @@ export class UptimeHeartbeat { }; constructor(private opts: UptimeHeartbeatOptions) { - if (opts.namespace) { - this.namespace = opts.namespace; - } - - if (opts.intervalInSeconds) { - this.intervalInSeconds = opts.intervalInSeconds; - } + this.namespace = opts.namespace ?? "default"; - if (opts.maxPendingRuns) { - this.maxPendingRuns = opts.maxPendingRuns; - } + this.intervalInSeconds = opts.intervalInSeconds ?? 60; + this.maxPendingRuns = opts.maxPendingRuns ?? 25; + this.maxPendingIndeces = opts.maxPendingIndeces ?? 10; + this.maxPendingErrors = opts.maxPendingErrors ?? 10; this.k8sClient = this.#createK8sClient(); } @@ -88,11 +89,11 @@ export class UptimeHeartbeat { this.#logK8sError({ body: err.body }); } - async #getTotalPods(opts: { + async #getPods(opts: { namespace: string; fieldSelector?: string; labelSelector?: string; - }): Promise { + }): Promise | undefined> { const listReturn = await this.k8sClient.core .listNamespacedPod( opts.namespace, @@ -110,22 +111,39 @@ export class UptimeHeartbeat { ) .catch(this.#handleK8sError.bind(this)); - if (!listReturn) { - this.logger.error("Failed to get pods", { opts }); - return 0; - } + return listReturn?.body.items; + } - return listReturn.body.items.length; + async #getPendingIndeces(): Promise | undefined> { + return await this.#getPods({ + namespace: this.namespace, + fieldSelector: "status.phase=Pending", + labelSelector: "app=task-index", + }); } - async #getTotalPendingTasks(): Promise { - return await this.#getTotalPods({ + async #getPendingTasks(): Promise | undefined> { + return await this.#getPods({ namespace: this.namespace, fieldSelector: "status.phase=Pending", labelSelector: "app=task-run", }); } + #countPods(pods: Array): number { + return pods.length; + } + + #filterPendingPods( + pods: Array, + waitingReason: "CreateContainerError" | "RunContainerError" + ): Array { + return pods.filter((pod) => { + const containerStatus = pod.status?.containerStatuses?.[0]; + return containerStatus?.state?.waiting?.reason === waitingReason; + }); + } + async #sendPing() { this.logger.log("Sending ping"); @@ -169,7 +187,23 @@ export class UptimeHeartbeat { const start = Date.now(); - const totalPendingTasks = await this.#getTotalPendingTasks(); + const pendingTasks = await this.#getPendingTasks(); + + if (!pendingTasks) { + this.logger.error("Failed to get pending tasks"); + return; + } + + const totalPendingTasks = this.#countPods(pendingTasks); + + const pendingIndeces = await this.#getPendingIndeces(); + + if (!pendingIndeces) { + this.logger.error("Failed to get pending indeces"); + return; + } + + const totalPendingIndeces = this.#countPods(pendingIndeces); const elapsedMs = Date.now() - start; @@ -180,6 +214,26 @@ export class UptimeHeartbeat { return; } + if (totalPendingIndeces > this.maxPendingIndeces) { + this.logger.log("Too many pending indeces, skipping heartbeat", { totalPendingIndeces }); + return; + } + + const totalCreateContainerErrors = this.#countPods( + this.#filterPendingPods(pendingTasks, "CreateContainerError") + ); + const totalRunContainerErrors = this.#countPods( + this.#filterPendingPods(pendingTasks, "RunContainerError") + ); + + if (totalCreateContainerErrors + totalRunContainerErrors > this.maxPendingErrors) { + this.logger.log("Too many pending tasks with errors, skipping heartbeat", { + totalRunContainerErrors, + totalCreateContainerErrors, + }); + return; + } + await this.#sendPing(); this.logger.log("Heartbeat done", { totalPendingTasks, elapsedMs });