From 7084891c9bdea11a1eaff226a24fb9b1d204dd43 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 10 Jun 2026 13:02:04 -0700 Subject: [PATCH] fix(billing): prevent deadlock with timeout --- apps/sim/lib/billing/core/usage-log.test.ts | 53 +++++++++++++++++++++ apps/sim/lib/billing/core/usage-log.ts | 36 ++++++++++++-- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/apps/sim/lib/billing/core/usage-log.test.ts b/apps/sim/lib/billing/core/usage-log.test.ts index 66a6175744..448cb2eab4 100644 --- a/apps/sim/lib/billing/core/usage-log.test.ts +++ b/apps/sim/lib/billing/core/usage-log.test.ts @@ -208,6 +208,13 @@ describe('recordCumulativeUsage', () => { return { tx, select, updateSet } } + /** True when any tx.execute call ran a sql`` template containing the substring. */ + const executedSqlContaining = (tx: { execute: ReturnType }, substring: string) => + tx.execute.mock.calls.some(([arg]) => { + const strings = (arg as { strings?: readonly string[] } | null)?.strings + return Array.isArray(strings) && strings.some((s) => s.includes(substring)) + }) + it('inserts the full cumulative on the first flush', async () => { setupTx(null) const result = await recordCumulativeUsage({ @@ -253,4 +260,50 @@ describe('recordCumulativeUsage', () => { expect(updateSet).not.toHaveBeenCalled() expect(mockInsert).not.toHaveBeenCalled() }) + + it('resolves the billing context before opening the locked transaction, exactly once', async () => { + setupTx(null) + await recordCumulativeUsage({ + userId: 'user-1', + source: 'workspace-chat', + model: 'claude-opus-4.8', + cost: 0.3474447, + eventKey: 'update-cost:msg-1-billing', + }) + // One lookup total: pre-resolved outside the tx, and the first-flush + // insert reuses it instead of re-resolving on the pool inside the tx. + expect(mockGetHighestPrioritySubscription).toHaveBeenCalledTimes(1) + expect(mockGetHighestPrioritySubscription.mock.invocationCallOrder[0]).toBeLessThan( + mockTransaction.mock.invocationCallOrder[0] + ) + }) + + it('stamps the pre-resolved billing context onto the first-flush insert', async () => { + setupTx(null) + await recordCumulativeUsage({ + userId: 'user-1', + source: 'workspace-chat', + model: 'claude-opus-4.8', + cost: 0.3474447, + eventKey: 'update-cost:msg-1-billing', + }) + expect(mockValues.mock.calls[0][0][0]).toMatchObject({ + billingEntityId: 'org-1', + billingEntityType: 'organization', + }) + }) + + it('bounds the advisory-lock wait and locks on the 64-bit event-key hash', async () => { + const { tx } = setupTx({ id: 'row-1', cost: '0.3474447' }) + await recordCumulativeUsage({ + userId: 'user-1', + source: 'workspace-chat', + model: 'claude-opus-4.8', + cost: 0.4662453, + eventKey: 'update-cost:msg-1-billing', + }) + expect(executedSqlContaining(tx, 'lock_timeout')).toBe(true) + expect(executedSqlContaining(tx, 'pg_advisory_xact_lock')).toBe(true) + expect(executedSqlContaining(tx, 'hashtextextended')).toBe(true) + }) }) diff --git a/apps/sim/lib/billing/core/usage-log.ts b/apps/sim/lib/billing/core/usage-log.ts index 083ee7de9a..5d654723d4 100644 --- a/apps/sim/lib/billing/core/usage-log.ts +++ b/apps/sim/lib/billing/core/usage-log.ts @@ -405,6 +405,16 @@ export interface RecordCumulativeUsageResult { total: number } +/** + * Bounds the wait for the per-event-key advisory lock (and any row/index lock + * waits inside the critical section). The Go mothership gives each UpdateCost + * POST a 5s deadline, retries 3x with backoff, then dead-letters the charge + * keyed on the same idempotency key — so a stuck lock holder must surface as + * a fast, retryable failure (SQLSTATE 55P03) within that budget rather than + * an unbounded wait that pins pooled connections. + */ +const CUMULATIVE_FLUSH_LOCK_TIMEOUT_MS = 3_000 + /** * Record a request's CUMULATIVE cost idempotently with monotonic top-up. * @@ -413,6 +423,9 @@ export interface RecordCumulativeUsageResult { * each flush. A per-key transactional advisory lock serializes concurrent * flushes so the read-then-write — including the first insert — is race-free * (no two flushes can both believe they are first and clobber each other). + * The billing context is resolved BEFORE the transaction and the lock wait is + * bounded by `lock_timeout`, keeping the critical section to one SELECT plus + * one INSERT/UPDATE on a single pooled connection. * * Because every leg flushes its cumulative and this converges to the max, * there is no under-billing if the request recovers after a partial flush, no @@ -424,9 +437,22 @@ export async function recordCumulativeUsage( ): Promise { const { userId, workspaceId, source, model, cost, eventKey, metadata } = params + // Resolved before the locked transaction on purpose: resolving inside it + // ran the subscription lookups on the global pool while this tx already + // held a pooled connection plus the advisory lock, so under load N + // first-flush transactions each pinned a connection while waiting for one + // more — starving the pool and queueing every same-key flush (and the Go + // side's retries) behind the stall. + const billingContext = await resolveBillingContext(userId) + return db.transaction(async (tx) => { - // Serialize all flushes for this request (lock auto-releases at tx end). - await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${eventKey}))`) + // Serialize all flushes for this request (lock auto-releases at tx end), + // with a bounded wait so a pathological holder fails this flush fast and + // lets the caller retry instead of hanging the connection. + await tx.execute( + sql`select set_config('lock_timeout', ${`${CUMULATIVE_FLUSH_LOCK_TIMEOUT_MS}ms`}, true)` + ) + await tx.execute(sql`select pg_advisory_xact_lock(hashtextextended(${eventKey}, 0))`) const [existing] = await tx .select({ id: usageLog.id, cost: usageLog.cost }) @@ -449,12 +475,14 @@ export async function recordCumulativeUsage( .set({ cost: newTotal.toString(), metadata: metadata ?? null }) .where(eq(usageLog.id, existing.id)) } else { - // First flush for this request: insert the canonical row (recordUsage - // resolves billing entity/period). Runs in the same tx + advisory lock. + // First flush for this request: insert the canonical row with the + // pre-resolved billing context. Runs in the same tx + advisory lock. await recordUsage({ userId, workspaceId, tx, + billingEntity: billingContext.billingEntity, + billingPeriod: billingContext.billingPeriod, entries: [ { category: 'model',