Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 146 additions & 91 deletions apps/sim/lib/monitoring/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,139 +1,194 @@
/**
* Hosted-key OTel metrics.
* Hosted-key metrics → CloudWatch.
*
* Point events (usage, cost, throttles, queue waits) are emitted as metrics —
* not spans — so they bypass trace sampling and survive aggregation. Reads the
* global MeterProvider, which the Next.js app registers in `instrumentation-node.ts`
* and trigger.dev registers from `trigger.config.ts`; with no provider the API
* returns a no-op meter, so these recorders are always safe to call.
* Emitted to CloudWatch (not OTel/Prometheus) because hosted-key work runs in
* both the long-lived web app and ephemeral trigger.dev workers. CloudWatch
* aggregates pushed values server-side (additively), so one-shot worker
* processes don't break aggregation the way cumulative Prometheus counters do
* (no per-process series collisions, no counter-reset math, no delta plumbing).
*
* Labels stay low-cardinality (provider, tool, reason, key). `key` is the env var
* NAME of the chosen hosted key (e.g. `PERPLEXITY_API_KEY_2`) — never the secret —
* and the pool is operator-managed, so it's safe to label. Per-workspace/user cost
* lives exactly in the `usage_log` table — never put those on metric labels.
* Dimensions stay low-cardinality (Provider, Tool, Key, Reason, Environment) —
* CloudWatch bills per unique dimension combination. `Key` is the env-var NAME
* of the chosen hosted key (e.g. `PERPLEXITY_API_KEY_2`), never the secret.
* Per-workspace/user cost lives in the `usage_log` table, never on a dimension.
*
* Records buffer in-process and flush asynchronously via PutMetricData (batched,
* off the request path). Flushing is automatic — a 5s timer, a buffer-size
* threshold, and SIGTERM/SIGINT/beforeExit (the exit handlers AWAIT the final
* drain, so both long-lived app processes and ephemeral trigger.dev workers push
* their last batch before the process exits). flushHostedKeyMetrics() is also
* exported for explicit/early draining (e.g. tests). The buffer is hard-capped:
* if CloudWatch flushing stalls it drops the oldest points rather than growing
* unbounded.
*/

import { type Counter, type Histogram, metrics } from '@opentelemetry/api'
import {
CloudWatchClient,
type MetricDatum,
PutMetricDataCommand,
StandardUnit,
} from '@aws-sdk/client-cloudwatch'
import { createLogger } from '@sim/logger'

const logger = createLogger('HostedKeyMetrics')

const METER_NAME = 'sim.hosted-key'
const METER_VERSION = '1.0.0'
const NAMESPACE = 'Sim/HostedKey'
const MAX_BATCH = 1000 // CloudWatch PutMetricData hard limit per request
const FLUSH_INTERVAL_MS = 5_000
const FLUSH_THRESHOLD = 1000 // flush once the buffer reaches this many points
const MAX_BUFFER = 10_000 // hard cap; drop oldest beyond this if flushing stalls

type ThrottleReason = 'billing_actor_limit' | 'upstream_retries_exhausted'
type QueueReason = 'actor_requests' | 'dimension' | 'queue_position'
type FailureReason = 'rate_limited' | 'auth' | 'other'

let meter: ReturnType<typeof metrics.getMeter> | undefined
let usedCounter: Counter | undefined
let failedCounter: Counter | undefined
let costCounter: Counter | undefined
let throttledCounter: Counter | undefined
let upstreamRateLimitedCounter: Counter | undefined
let queueWaitHistogram: Histogram | undefined
let queueWaitExceededCounter: Counter | undefined
let unknownModelCostCounter: Counter | undefined

function getMeter() {
if (!meter) meter = metrics.getMeter(METER_NAME, METER_VERSION)
return meter
}
// Deployed envs (app + trigger worker) carry static AWS creds; local dev does
// not. No creds → no-op, so recorders stay always-safe to call (same contract
// as the previous no-op-meter behavior).
const ENABLED = Boolean(process.env.AWS_ACCESS_KEY_ID)
Comment thread
TheodoreSpeaks marked this conversation as resolved.

function getUsedCounter() {
if (!usedCounter) {
usedCounter = getMeter().createCounter('hosted_key.used', {
description: 'Successful tool executions backed by a Sim-hosted API key',
})
}
return usedCounter
}
const ENVIRONMENT =
process.env.OTEL_DEPLOYMENT_ENVIRONMENT ||
process.env.DEPLOYMENT_ENVIRONMENT ||
process.env.NODE_ENV ||
'development'

function getFailedCounter() {
if (!failedCounter) {
failedCounter = getMeter().createCounter('hosted_key.failed', {
description: 'Failed tool executions backed by a Sim-hosted API key',
})
}
return failedCounter
}
let client: CloudWatchClient | undefined
let buffer: MetricDatum[] = []
let dropped = 0
let timer: ReturnType<typeof setInterval> | undefined
let handlersRegistered = false
Comment thread
TheodoreSpeaks marked this conversation as resolved.

function getCostCounter() {
if (!costCounter) {
costCounter = getMeter().createCounter('hosted_key.cost_charged', {
description: 'Dollar cost charged to the billing actor for hosted-key usage',
unit: 'USD',
})
function getClient(): CloudWatchClient {
if (!client) {
client = new CloudWatchClient({ region: process.env.AWS_REGION || 'us-east-1' })
}
return costCounter
return client
}

function getThrottledCounter() {
if (!throttledCounter) {
throttledCounter = getMeter().createCounter('hosted_key.throttled', {
description: 'Rate-limit errors surfaced to the end user (not retried/absorbed)',
})
function ensureBackground(): void {
if (timer) return
timer = setInterval(() => {
void flushHostedKeyMetrics()
}, FLUSH_INTERVAL_MS)
timer.unref?.()
if (!handlersRegistered) {
handlersRegistered = true
const onExit = async () => {
await flushHostedKeyMetrics()
}
process.once('SIGTERM', onExit)
process.once('SIGINT', onExit)
process.once('beforeExit', onExit)
Comment thread
cursor[bot] marked this conversation as resolved.
}
return throttledCounter
}

function getUpstreamRateLimitedCounter() {
if (!upstreamRateLimitedCounter) {
upstreamRateLimitedCounter = getMeter().createCounter('hosted_key.upstream_rate_limited', {
description: 'Upstream provider 429s absorbed via retry/backoff',
})
function buildDimensions(labels: Record<string, string | undefined>) {
const dimensions = [{ Name: 'Environment', Value: ENVIRONMENT }]
for (const [Name, Value] of Object.entries(labels)) {
if (Value) dimensions.push({ Name, Value })
}
return upstreamRateLimitedCounter
return dimensions
}

function getQueueWaitHistogram() {
if (!queueWaitHistogram) {
queueWaitHistogram = getMeter().createHistogram('hosted_key.queue_wait_duration', {
description: 'Time a hosted-key acquisition spent waiting in the per-workspace queue/bucket',
unit: 'ms',
})
function enqueue(
MetricName: string,
Value: number,
Unit: StandardUnit,
labels: Record<string, string | undefined>
): void {
if (!ENABLED) return
buffer.push({
MetricName,
Value,
Unit,
Timestamp: new Date(),
Dimensions: buildDimensions(labels),
})
if (buffer.length > MAX_BUFFER) {
// Flushing has stalled (CloudWatch slow/erroring) — bound memory by dropping
// the oldest points instead of growing without limit.
const overflow = buffer.length - MAX_BUFFER
buffer.splice(0, overflow)
dropped += overflow
}
return queueWaitHistogram
ensureBackground()
if (buffer.length >= FLUSH_THRESHOLD) void flushHostedKeyMetrics()
}

function getQueueWaitExceededCounter() {
if (!queueWaitExceededCounter) {
queueWaitExceededCounter = getMeter().createCounter('hosted_key.queue_wait_exceeded', {
description: 'Hosted-key acquisitions that exceeded the queue wait cap and fell back to 429',
})
/** Drain the buffer to CloudWatch. Safe to call repeatedly; await before exit. */
export async function flushHostedKeyMetrics(): Promise<void> {
if (dropped > 0) {
logger.warn('Dropped hosted-key metric points (buffer cap reached)', { dropped })
dropped = 0
}
return queueWaitExceededCounter
}

function getUnknownModelCostCounter() {
if (!unknownModelCostCounter) {
unknownModelCostCounter = getMeter().createCounter('hosted_key.unknown_model_cost', {
description: 'Hosted-key cost calculations that fell back to a default for an unmapped model',
})
if (!ENABLED || buffer.length === 0) return
const pending = buffer
buffer = []
for (let i = 0; i < pending.length; i += MAX_BATCH) {
const MetricData = pending.slice(i, i + MAX_BATCH)
try {
await getClient().send(new PutMetricDataCommand({ Namespace: NAMESPACE, MetricData }))
} catch (err) {
// Telemetry must never break the request path — log and drop the batch.
logger.warn('PutMetricData failed; dropping batch', {
count: MetricData.length,
error: err instanceof Error ? err.message : String(err),
})
}
}
return unknownModelCostCounter
}

export const hostedKeyMetrics = {
recordUsed(labels: { provider: string; tool: string; key: string }) {
getUsedCounter().add(1, labels)
enqueue('Used', 1, StandardUnit.Count, {
Provider: labels.provider,
Tool: labels.tool,
Key: labels.key,
})
},
recordFailed(labels: { provider: string; tool: string; key: string; reason: FailureReason }) {
getFailedCounter().add(1, labels)
enqueue('Failed', 1, StandardUnit.Count, {
Provider: labels.provider,
Tool: labels.tool,
Key: labels.key,
Reason: labels.reason,
})
},
recordCostCharged(costUsd: number, labels: { provider: string; tool: string }) {
if (costUsd > 0) getCostCounter().add(costUsd, labels)
// Unit None: CloudWatch has no USD unit; value is dollars.
if (costUsd > 0)
enqueue('CostCharged', costUsd, StandardUnit.None, {
Provider: labels.provider,
Tool: labels.tool,
})
},
recordThrottled(labels: { provider: string; tool: string; reason: ThrottleReason }) {
getThrottledCounter().add(1, labels)
enqueue('Throttled', 1, StandardUnit.Count, {
Provider: labels.provider,
Tool: labels.tool,
Reason: labels.reason,
})
},
recordUpstreamRateLimited(labels: { tool: string; key: string }) {
getUpstreamRateLimitedCounter().add(1, labels)
enqueue('UpstreamRateLimited', 1, StandardUnit.Count, {
Tool: labels.tool,
Key: labels.key,
})
},
recordQueueWait(durationMs: number, labels: { provider: string; reason: QueueReason }) {
getQueueWaitHistogram().record(durationMs, labels)
enqueue('QueueWaitDuration', durationMs, StandardUnit.Milliseconds, {
Provider: labels.provider,
Reason: labels.reason,
})
},
recordQueueWaitExceeded(labels: { provider: string; reason: QueueReason }) {
getQueueWaitExceededCounter().add(1, labels)
enqueue('QueueWaitExceeded', 1, StandardUnit.Count, {
Provider: labels.provider,
Reason: labels.reason,
})
},
recordUnknownModelCost(labels: { tool: string }) {
getUnknownModelCostCounter().add(1, labels)
enqueue('UnknownModelCost', 1, StandardUnit.Count, { Tool: labels.tool })
},
}
Loading