-
Notifications
You must be signed in to change notification settings - Fork 3.7k
improvement(metrics): emit hosted-key metrics to CloudWatch instead of OTel #4914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+146
−91
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,139 +1,194 @@ | ||
| /** | ||
| * Hosted-key OTel metrics. | ||
| * Hosted-key metrics → CloudWatch. | ||
| * | ||
| * Point events (usage, cost, throttles, queue waits) are emitted as metrics — | ||
| * not spans — so they bypass trace sampling and survive aggregation. Reads the | ||
| * global MeterProvider, which the Next.js app registers in `instrumentation-node.ts` | ||
| * and trigger.dev registers from `trigger.config.ts`; with no provider the API | ||
| * returns a no-op meter, so these recorders are always safe to call. | ||
| * Emitted to CloudWatch (not OTel/Prometheus) because hosted-key work runs in | ||
| * both the long-lived web app and ephemeral trigger.dev workers. CloudWatch | ||
| * aggregates pushed values server-side (additively), so one-shot worker | ||
| * processes don't break aggregation the way cumulative Prometheus counters do | ||
| * (no per-process series collisions, no counter-reset math, no delta plumbing). | ||
| * | ||
| * Labels stay low-cardinality (provider, tool, reason, key). `key` is the env var | ||
| * NAME of the chosen hosted key (e.g. `PERPLEXITY_API_KEY_2`) — never the secret — | ||
| * and the pool is operator-managed, so it's safe to label. Per-workspace/user cost | ||
| * lives exactly in the `usage_log` table — never put those on metric labels. | ||
| * Dimensions stay low-cardinality (Provider, Tool, Key, Reason, Environment) — | ||
| * CloudWatch bills per unique dimension combination. `Key` is the env-var NAME | ||
| * of the chosen hosted key (e.g. `PERPLEXITY_API_KEY_2`), never the secret. | ||
| * Per-workspace/user cost lives in the `usage_log` table, never on a dimension. | ||
| * | ||
| * Records buffer in-process and flush asynchronously via PutMetricData (batched, | ||
| * off the request path). Flushing is automatic — a 5s timer, a buffer-size | ||
| * threshold, and SIGTERM/SIGINT/beforeExit (the exit handlers AWAIT the final | ||
| * drain, so both long-lived app processes and ephemeral trigger.dev workers push | ||
| * their last batch before the process exits). flushHostedKeyMetrics() is also | ||
| * exported for explicit/early draining (e.g. tests). The buffer is hard-capped: | ||
| * if CloudWatch flushing stalls it drops the oldest points rather than growing | ||
| * unbounded. | ||
| */ | ||
|
|
||
| import { type Counter, type Histogram, metrics } from '@opentelemetry/api' | ||
| import { | ||
| CloudWatchClient, | ||
| type MetricDatum, | ||
| PutMetricDataCommand, | ||
| StandardUnit, | ||
| } from '@aws-sdk/client-cloudwatch' | ||
| import { createLogger } from '@sim/logger' | ||
|
|
||
| const logger = createLogger('HostedKeyMetrics') | ||
|
|
||
| const METER_NAME = 'sim.hosted-key' | ||
| const METER_VERSION = '1.0.0' | ||
| const NAMESPACE = 'Sim/HostedKey' | ||
| const MAX_BATCH = 1000 // CloudWatch PutMetricData hard limit per request | ||
| const FLUSH_INTERVAL_MS = 5_000 | ||
| const FLUSH_THRESHOLD = 1000 // flush once the buffer reaches this many points | ||
| const MAX_BUFFER = 10_000 // hard cap; drop oldest beyond this if flushing stalls | ||
|
|
||
| type ThrottleReason = 'billing_actor_limit' | 'upstream_retries_exhausted' | ||
| type QueueReason = 'actor_requests' | 'dimension' | 'queue_position' | ||
| type FailureReason = 'rate_limited' | 'auth' | 'other' | ||
|
|
||
| let meter: ReturnType<typeof metrics.getMeter> | undefined | ||
| let usedCounter: Counter | undefined | ||
| let failedCounter: Counter | undefined | ||
| let costCounter: Counter | undefined | ||
| let throttledCounter: Counter | undefined | ||
| let upstreamRateLimitedCounter: Counter | undefined | ||
| let queueWaitHistogram: Histogram | undefined | ||
| let queueWaitExceededCounter: Counter | undefined | ||
| let unknownModelCostCounter: Counter | undefined | ||
|
|
||
| function getMeter() { | ||
| if (!meter) meter = metrics.getMeter(METER_NAME, METER_VERSION) | ||
| return meter | ||
| } | ||
| // Deployed envs (app + trigger worker) carry static AWS creds; local dev does | ||
| // not. No creds → no-op, so recorders stay always-safe to call (same contract | ||
| // as the previous no-op-meter behavior). | ||
| const ENABLED = Boolean(process.env.AWS_ACCESS_KEY_ID) | ||
|
|
||
| function getUsedCounter() { | ||
| if (!usedCounter) { | ||
| usedCounter = getMeter().createCounter('hosted_key.used', { | ||
| description: 'Successful tool executions backed by a Sim-hosted API key', | ||
| }) | ||
| } | ||
| return usedCounter | ||
| } | ||
| const ENVIRONMENT = | ||
| process.env.OTEL_DEPLOYMENT_ENVIRONMENT || | ||
| process.env.DEPLOYMENT_ENVIRONMENT || | ||
| process.env.NODE_ENV || | ||
| 'development' | ||
|
|
||
| function getFailedCounter() { | ||
| if (!failedCounter) { | ||
| failedCounter = getMeter().createCounter('hosted_key.failed', { | ||
| description: 'Failed tool executions backed by a Sim-hosted API key', | ||
| }) | ||
| } | ||
| return failedCounter | ||
| } | ||
| let client: CloudWatchClient | undefined | ||
| let buffer: MetricDatum[] = [] | ||
| let dropped = 0 | ||
| let timer: ReturnType<typeof setInterval> | undefined | ||
| let handlersRegistered = false | ||
|
TheodoreSpeaks marked this conversation as resolved.
|
||
|
|
||
| function getCostCounter() { | ||
| if (!costCounter) { | ||
| costCounter = getMeter().createCounter('hosted_key.cost_charged', { | ||
| description: 'Dollar cost charged to the billing actor for hosted-key usage', | ||
| unit: 'USD', | ||
| }) | ||
| function getClient(): CloudWatchClient { | ||
| if (!client) { | ||
| client = new CloudWatchClient({ region: process.env.AWS_REGION || 'us-east-1' }) | ||
| } | ||
| return costCounter | ||
| return client | ||
| } | ||
|
|
||
| function getThrottledCounter() { | ||
| if (!throttledCounter) { | ||
| throttledCounter = getMeter().createCounter('hosted_key.throttled', { | ||
| description: 'Rate-limit errors surfaced to the end user (not retried/absorbed)', | ||
| }) | ||
| function ensureBackground(): void { | ||
| if (timer) return | ||
| timer = setInterval(() => { | ||
| void flushHostedKeyMetrics() | ||
| }, FLUSH_INTERVAL_MS) | ||
| timer.unref?.() | ||
| if (!handlersRegistered) { | ||
| handlersRegistered = true | ||
| const onExit = async () => { | ||
| await flushHostedKeyMetrics() | ||
| } | ||
| process.once('SIGTERM', onExit) | ||
| process.once('SIGINT', onExit) | ||
| process.once('beforeExit', onExit) | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| } | ||
| return throttledCounter | ||
| } | ||
|
|
||
| function getUpstreamRateLimitedCounter() { | ||
| if (!upstreamRateLimitedCounter) { | ||
| upstreamRateLimitedCounter = getMeter().createCounter('hosted_key.upstream_rate_limited', { | ||
| description: 'Upstream provider 429s absorbed via retry/backoff', | ||
| }) | ||
| function buildDimensions(labels: Record<string, string | undefined>) { | ||
| const dimensions = [{ Name: 'Environment', Value: ENVIRONMENT }] | ||
| for (const [Name, Value] of Object.entries(labels)) { | ||
| if (Value) dimensions.push({ Name, Value }) | ||
| } | ||
| return upstreamRateLimitedCounter | ||
| return dimensions | ||
| } | ||
|
|
||
| function getQueueWaitHistogram() { | ||
| if (!queueWaitHistogram) { | ||
| queueWaitHistogram = getMeter().createHistogram('hosted_key.queue_wait_duration', { | ||
| description: 'Time a hosted-key acquisition spent waiting in the per-workspace queue/bucket', | ||
| unit: 'ms', | ||
| }) | ||
| function enqueue( | ||
| MetricName: string, | ||
| Value: number, | ||
| Unit: StandardUnit, | ||
| labels: Record<string, string | undefined> | ||
| ): void { | ||
| if (!ENABLED) return | ||
| buffer.push({ | ||
| MetricName, | ||
| Value, | ||
| Unit, | ||
| Timestamp: new Date(), | ||
| Dimensions: buildDimensions(labels), | ||
| }) | ||
| if (buffer.length > MAX_BUFFER) { | ||
| // Flushing has stalled (CloudWatch slow/erroring) — bound memory by dropping | ||
| // the oldest points instead of growing without limit. | ||
| const overflow = buffer.length - MAX_BUFFER | ||
| buffer.splice(0, overflow) | ||
| dropped += overflow | ||
| } | ||
| return queueWaitHistogram | ||
| ensureBackground() | ||
| if (buffer.length >= FLUSH_THRESHOLD) void flushHostedKeyMetrics() | ||
| } | ||
|
|
||
| function getQueueWaitExceededCounter() { | ||
| if (!queueWaitExceededCounter) { | ||
| queueWaitExceededCounter = getMeter().createCounter('hosted_key.queue_wait_exceeded', { | ||
| description: 'Hosted-key acquisitions that exceeded the queue wait cap and fell back to 429', | ||
| }) | ||
| /** Drain the buffer to CloudWatch. Safe to call repeatedly; await before exit. */ | ||
| export async function flushHostedKeyMetrics(): Promise<void> { | ||
| if (dropped > 0) { | ||
| logger.warn('Dropped hosted-key metric points (buffer cap reached)', { dropped }) | ||
| dropped = 0 | ||
| } | ||
| return queueWaitExceededCounter | ||
| } | ||
|
|
||
| function getUnknownModelCostCounter() { | ||
| if (!unknownModelCostCounter) { | ||
| unknownModelCostCounter = getMeter().createCounter('hosted_key.unknown_model_cost', { | ||
| description: 'Hosted-key cost calculations that fell back to a default for an unmapped model', | ||
| }) | ||
| if (!ENABLED || buffer.length === 0) return | ||
| const pending = buffer | ||
| buffer = [] | ||
| for (let i = 0; i < pending.length; i += MAX_BATCH) { | ||
| const MetricData = pending.slice(i, i + MAX_BATCH) | ||
| try { | ||
| await getClient().send(new PutMetricDataCommand({ Namespace: NAMESPACE, MetricData })) | ||
| } catch (err) { | ||
| // Telemetry must never break the request path — log and drop the batch. | ||
| logger.warn('PutMetricData failed; dropping batch', { | ||
| count: MetricData.length, | ||
| error: err instanceof Error ? err.message : String(err), | ||
| }) | ||
| } | ||
| } | ||
| return unknownModelCostCounter | ||
| } | ||
|
|
||
| export const hostedKeyMetrics = { | ||
| recordUsed(labels: { provider: string; tool: string; key: string }) { | ||
| getUsedCounter().add(1, labels) | ||
| enqueue('Used', 1, StandardUnit.Count, { | ||
| Provider: labels.provider, | ||
| Tool: labels.tool, | ||
| Key: labels.key, | ||
| }) | ||
| }, | ||
| recordFailed(labels: { provider: string; tool: string; key: string; reason: FailureReason }) { | ||
| getFailedCounter().add(1, labels) | ||
| enqueue('Failed', 1, StandardUnit.Count, { | ||
| Provider: labels.provider, | ||
| Tool: labels.tool, | ||
| Key: labels.key, | ||
| Reason: labels.reason, | ||
| }) | ||
| }, | ||
| recordCostCharged(costUsd: number, labels: { provider: string; tool: string }) { | ||
| if (costUsd > 0) getCostCounter().add(costUsd, labels) | ||
| // Unit None: CloudWatch has no USD unit; value is dollars. | ||
| if (costUsd > 0) | ||
| enqueue('CostCharged', costUsd, StandardUnit.None, { | ||
| Provider: labels.provider, | ||
| Tool: labels.tool, | ||
| }) | ||
| }, | ||
| recordThrottled(labels: { provider: string; tool: string; reason: ThrottleReason }) { | ||
| getThrottledCounter().add(1, labels) | ||
| enqueue('Throttled', 1, StandardUnit.Count, { | ||
| Provider: labels.provider, | ||
| Tool: labels.tool, | ||
| Reason: labels.reason, | ||
| }) | ||
| }, | ||
| recordUpstreamRateLimited(labels: { tool: string; key: string }) { | ||
| getUpstreamRateLimitedCounter().add(1, labels) | ||
| enqueue('UpstreamRateLimited', 1, StandardUnit.Count, { | ||
| Tool: labels.tool, | ||
| Key: labels.key, | ||
| }) | ||
| }, | ||
| recordQueueWait(durationMs: number, labels: { provider: string; reason: QueueReason }) { | ||
| getQueueWaitHistogram().record(durationMs, labels) | ||
| enqueue('QueueWaitDuration', durationMs, StandardUnit.Milliseconds, { | ||
| Provider: labels.provider, | ||
| Reason: labels.reason, | ||
| }) | ||
| }, | ||
| recordQueueWaitExceeded(labels: { provider: string; reason: QueueReason }) { | ||
| getQueueWaitExceededCounter().add(1, labels) | ||
| enqueue('QueueWaitExceeded', 1, StandardUnit.Count, { | ||
| Provider: labels.provider, | ||
| Reason: labels.reason, | ||
| }) | ||
| }, | ||
| recordUnknownModelCost(labels: { tool: string }) { | ||
| getUnknownModelCostCounter().add(1, labels) | ||
| enqueue('UnknownModelCost', 1, StandardUnit.Count, { Tool: labels.tool }) | ||
| }, | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.