Skip to content

Commit 20dd654

Browse files
fix(billing): prevent deadlock with timeout (#4949)
1 parent 9aa2a51 commit 20dd654

2 files changed

Lines changed: 85 additions & 4 deletions

File tree

apps/sim/lib/billing/core/usage-log.test.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,13 @@ describe('recordCumulativeUsage', () => {
208208
return { tx, select, updateSet }
209209
}
210210

211+
/** True when any tx.execute call ran a sql`` template containing the substring. */
212+
const executedSqlContaining = (tx: { execute: ReturnType<typeof vi.fn> }, substring: string) =>
213+
tx.execute.mock.calls.some(([arg]) => {
214+
const strings = (arg as { strings?: readonly string[] } | null)?.strings
215+
return Array.isArray(strings) && strings.some((s) => s.includes(substring))
216+
})
217+
211218
it('inserts the full cumulative on the first flush', async () => {
212219
setupTx(null)
213220
const result = await recordCumulativeUsage({
@@ -253,4 +260,50 @@ describe('recordCumulativeUsage', () => {
253260
expect(updateSet).not.toHaveBeenCalled()
254261
expect(mockInsert).not.toHaveBeenCalled()
255262
})
263+
264+
it('resolves the billing context before opening the locked transaction, exactly once', async () => {
265+
setupTx(null)
266+
await recordCumulativeUsage({
267+
userId: 'user-1',
268+
source: 'workspace-chat',
269+
model: 'claude-opus-4.8',
270+
cost: 0.3474447,
271+
eventKey: 'update-cost:msg-1-billing',
272+
})
273+
// One lookup total: pre-resolved outside the tx, and the first-flush
274+
// insert reuses it instead of re-resolving on the pool inside the tx.
275+
expect(mockGetHighestPrioritySubscription).toHaveBeenCalledTimes(1)
276+
expect(mockGetHighestPrioritySubscription.mock.invocationCallOrder[0]).toBeLessThan(
277+
mockTransaction.mock.invocationCallOrder[0]
278+
)
279+
})
280+
281+
it('stamps the pre-resolved billing context onto the first-flush insert', async () => {
282+
setupTx(null)
283+
await recordCumulativeUsage({
284+
userId: 'user-1',
285+
source: 'workspace-chat',
286+
model: 'claude-opus-4.8',
287+
cost: 0.3474447,
288+
eventKey: 'update-cost:msg-1-billing',
289+
})
290+
expect(mockValues.mock.calls[0][0][0]).toMatchObject({
291+
billingEntityId: 'org-1',
292+
billingEntityType: 'organization',
293+
})
294+
})
295+
296+
it('bounds the advisory-lock wait and locks on the 64-bit event-key hash', async () => {
297+
const { tx } = setupTx({ id: 'row-1', cost: '0.3474447' })
298+
await recordCumulativeUsage({
299+
userId: 'user-1',
300+
source: 'workspace-chat',
301+
model: 'claude-opus-4.8',
302+
cost: 0.4662453,
303+
eventKey: 'update-cost:msg-1-billing',
304+
})
305+
expect(executedSqlContaining(tx, 'lock_timeout')).toBe(true)
306+
expect(executedSqlContaining(tx, 'pg_advisory_xact_lock')).toBe(true)
307+
expect(executedSqlContaining(tx, 'hashtextextended')).toBe(true)
308+
})
256309
})

apps/sim/lib/billing/core/usage-log.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,16 @@ export interface RecordCumulativeUsageResult {
405405
total: number
406406
}
407407

408+
/**
409+
* Bounds the wait for the per-event-key advisory lock (and any row/index lock
410+
* waits inside the critical section). The Go mothership gives each UpdateCost
411+
* POST a 5s deadline, retries 3x with backoff, then dead-letters the charge
412+
* keyed on the same idempotency key — so a stuck lock holder must surface as
413+
* a fast, retryable failure (SQLSTATE 55P03) within that budget rather than
414+
* an unbounded wait that pins pooled connections.
415+
*/
416+
const CUMULATIVE_FLUSH_LOCK_TIMEOUT_MS = 3_000
417+
408418
/**
409419
* Record a request's CUMULATIVE cost idempotently with monotonic top-up.
410420
*
@@ -413,6 +423,9 @@ export interface RecordCumulativeUsageResult {
413423
* each flush. A per-key transactional advisory lock serializes concurrent
414424
* flushes so the read-then-write — including the first insert — is race-free
415425
* (no two flushes can both believe they are first and clobber each other).
426+
* The billing context is resolved BEFORE the transaction and the lock wait is
427+
* bounded by `lock_timeout`, keeping the critical section to one SELECT plus
428+
* one INSERT/UPDATE on a single pooled connection.
416429
*
417430
* Because every leg flushes its cumulative and this converges to the max,
418431
* there is no under-billing if the request recovers after a partial flush, no
@@ -424,9 +437,22 @@ export async function recordCumulativeUsage(
424437
): Promise<RecordCumulativeUsageResult> {
425438
const { userId, workspaceId, source, model, cost, eventKey, metadata } = params
426439

440+
// Resolved before the locked transaction on purpose: resolving inside it
441+
// ran the subscription lookups on the global pool while this tx already
442+
// held a pooled connection plus the advisory lock, so under load N
443+
// first-flush transactions each pinned a connection while waiting for one
444+
// more — starving the pool and queueing every same-key flush (and the Go
445+
// side's retries) behind the stall.
446+
const billingContext = await resolveBillingContext(userId)
447+
427448
return db.transaction(async (tx) => {
428-
// Serialize all flushes for this request (lock auto-releases at tx end).
429-
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${eventKey}))`)
449+
// Serialize all flushes for this request (lock auto-releases at tx end),
450+
// with a bounded wait so a pathological holder fails this flush fast and
451+
// lets the caller retry instead of hanging the connection.
452+
await tx.execute(
453+
sql`select set_config('lock_timeout', ${`${CUMULATIVE_FLUSH_LOCK_TIMEOUT_MS}ms`}, true)`
454+
)
455+
await tx.execute(sql`select pg_advisory_xact_lock(hashtextextended(${eventKey}, 0))`)
430456

431457
const [existing] = await tx
432458
.select({ id: usageLog.id, cost: usageLog.cost })
@@ -449,12 +475,14 @@ export async function recordCumulativeUsage(
449475
.set({ cost: newTotal.toString(), metadata: metadata ?? null })
450476
.where(eq(usageLog.id, existing.id))
451477
} else {
452-
// First flush for this request: insert the canonical row (recordUsage
453-
// resolves billing entity/period). Runs in the same tx + advisory lock.
478+
// First flush for this request: insert the canonical row with the
479+
// pre-resolved billing context. Runs in the same tx + advisory lock.
454480
await recordUsage({
455481
userId,
456482
workspaceId,
457483
tx,
484+
billingEntity: billingContext.billingEntity,
485+
billingPeriod: billingContext.billingPeriod,
458486
entries: [
459487
{
460488
category: 'model',

0 commit comments

Comments
 (0)