Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions apps/sim/lib/billing/core/usage-log.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ describe('recordCumulativeUsage', () => {
return { tx, select, updateSet }
}

/** True when any tx.execute call ran a sql`` template containing the substring. */
const executedSqlContaining = (tx: { execute: ReturnType<typeof vi.fn> }, substring: string) =>
tx.execute.mock.calls.some(([arg]) => {
const strings = (arg as { strings?: readonly string[] } | null)?.strings
return Array.isArray(strings) && strings.some((s) => s.includes(substring))
})
Comment thread
icecrasher321 marked this conversation as resolved.

it('inserts the full cumulative on the first flush', async () => {
setupTx(null)
const result = await recordCumulativeUsage({
Expand Down Expand Up @@ -253,4 +260,50 @@ describe('recordCumulativeUsage', () => {
expect(updateSet).not.toHaveBeenCalled()
expect(mockInsert).not.toHaveBeenCalled()
})

it('resolves the billing context before opening the locked transaction, exactly once', async () => {
setupTx(null)
await recordCumulativeUsage({
userId: 'user-1',
source: 'workspace-chat',
model: 'claude-opus-4.8',
cost: 0.3474447,
eventKey: 'update-cost:msg-1-billing',
})
// One lookup total: pre-resolved outside the tx, and the first-flush
// insert reuses it instead of re-resolving on the pool inside the tx.
expect(mockGetHighestPrioritySubscription).toHaveBeenCalledTimes(1)
expect(mockGetHighestPrioritySubscription.mock.invocationCallOrder[0]).toBeLessThan(
mockTransaction.mock.invocationCallOrder[0]
)
})

it('stamps the pre-resolved billing context onto the first-flush insert', async () => {
setupTx(null)
await recordCumulativeUsage({
userId: 'user-1',
source: 'workspace-chat',
model: 'claude-opus-4.8',
cost: 0.3474447,
eventKey: 'update-cost:msg-1-billing',
})
expect(mockValues.mock.calls[0][0][0]).toMatchObject({
billingEntityId: 'org-1',
billingEntityType: 'organization',
})
})

it('bounds the advisory-lock wait and locks on the 64-bit event-key hash', async () => {
const { tx } = setupTx({ id: 'row-1', cost: '0.3474447' })
await recordCumulativeUsage({
userId: 'user-1',
source: 'workspace-chat',
model: 'claude-opus-4.8',
cost: 0.4662453,
eventKey: 'update-cost:msg-1-billing',
})
expect(executedSqlContaining(tx, 'lock_timeout')).toBe(true)
expect(executedSqlContaining(tx, 'pg_advisory_xact_lock')).toBe(true)
expect(executedSqlContaining(tx, 'hashtextextended')).toBe(true)
})
})
36 changes: 32 additions & 4 deletions apps/sim/lib/billing/core/usage-log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,16 @@ export interface RecordCumulativeUsageResult {
total: number
}

/**
* Bounds the wait for the per-event-key advisory lock (and any row/index lock
* waits inside the critical section). The Go mothership gives each UpdateCost
* POST a 5s deadline, retries 3x with backoff, then dead-letters the charge
* keyed on the same idempotency key — so a stuck lock holder must surface as
* a fast, retryable failure (SQLSTATE 55P03) within that budget rather than
* an unbounded wait that pins pooled connections.
*/
const CUMULATIVE_FLUSH_LOCK_TIMEOUT_MS = 3_000

/**
* Record a request's CUMULATIVE cost idempotently with monotonic top-up.
*
Expand All @@ -413,6 +423,9 @@ export interface RecordCumulativeUsageResult {
* each flush. A per-key transactional advisory lock serializes concurrent
* flushes so the read-then-write — including the first insert — is race-free
* (no two flushes can both believe they are first and clobber each other).
* The billing context is resolved BEFORE the transaction and the lock wait is
* bounded by `lock_timeout`, keeping the critical section to one SELECT plus
* one INSERT/UPDATE on a single pooled connection.
*
* Because every leg flushes its cumulative and this converges to the max,
* there is no under-billing if the request recovers after a partial flush, no
Expand All @@ -424,9 +437,22 @@ export async function recordCumulativeUsage(
): Promise<RecordCumulativeUsageResult> {
const { userId, workspaceId, source, model, cost, eventKey, metadata } = params

// Resolved before the locked transaction on purpose: resolving inside it
// ran the subscription lookups on the global pool while this tx already
// held a pooled connection plus the advisory lock, so under load N
// first-flush transactions each pinned a connection while waiting for one
// more — starving the pool and queueing every same-key flush (and the Go
// side's retries) behind the stall.
const billingContext = await resolveBillingContext(userId)
Comment thread
icecrasher321 marked this conversation as resolved.

return db.transaction(async (tx) => {
// Serialize all flushes for this request (lock auto-releases at tx end).
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${eventKey}))`)
// Serialize all flushes for this request (lock auto-releases at tx end),
// with a bounded wait so a pathological holder fails this flush fast and
// lets the caller retry instead of hanging the connection.
await tx.execute(
sql`select set_config('lock_timeout', ${`${CUMULATIVE_FLUSH_LOCK_TIMEOUT_MS}ms`}, true)`
)
await tx.execute(sql`select pg_advisory_xact_lock(hashtextextended(${eventKey}, 0))`)

const [existing] = await tx
.select({ id: usageLog.id, cost: usageLog.cost })
Expand All @@ -449,12 +475,14 @@ export async function recordCumulativeUsage(
.set({ cost: newTotal.toString(), metadata: metadata ?? null })
.where(eq(usageLog.id, existing.id))
} else {
// First flush for this request: insert the canonical row (recordUsage
// resolves billing entity/period). Runs in the same tx + advisory lock.
// First flush for this request: insert the canonical row with the
// pre-resolved billing context. Runs in the same tx + advisory lock.
await recordUsage({
userId,
workspaceId,
tx,
billingEntity: billingContext.billingEntity,
billingPeriod: billingContext.billingPeriod,
Comment thread
icecrasher321 marked this conversation as resolved.
entries: [
{
category: 'model',
Expand Down
Loading