@@ -405,6 +405,16 @@ export interface RecordCumulativeUsageResult {
405405 total : number
406406}
407407
408+ /**
409+ * Bounds the wait for the per-event-key advisory lock (and any row/index lock
410+ * waits inside the critical section). The Go mothership gives each UpdateCost
411+ * POST a 5s deadline, retries 3x with backoff, then dead-letters the charge
412+ * keyed on the same idempotency key — so a stuck lock holder must surface as
413+ * a fast, retryable failure (SQLSTATE 55P03) within that budget rather than
414+ * an unbounded wait that pins pooled connections.
415+ */
416+ const CUMULATIVE_FLUSH_LOCK_TIMEOUT_MS = 3_000
417+
408418/**
409419 * Record a request's CUMULATIVE cost idempotently with monotonic top-up.
410420 *
@@ -413,6 +423,9 @@ export interface RecordCumulativeUsageResult {
413423 * each flush. A per-key transactional advisory lock serializes concurrent
414424 * flushes so the read-then-write — including the first insert — is race-free
415425 * (no two flushes can both believe they are first and clobber each other).
426+ * The billing context is resolved BEFORE the transaction and the lock wait is
427+ * bounded by `lock_timeout`, keeping the critical section to one SELECT plus
428+ * one INSERT/UPDATE on a single pooled connection.
416429 *
417430 * Because every leg flushes its cumulative and this converges to the max,
418431 * there is no under-billing if the request recovers after a partial flush, no
@@ -424,9 +437,22 @@ export async function recordCumulativeUsage(
424437) : Promise < RecordCumulativeUsageResult > {
425438 const { userId, workspaceId, source, model, cost, eventKey, metadata } = params
426439
440+ // Resolved before the locked transaction on purpose: resolving inside it
441+ // ran the subscription lookups on the global pool while this tx already
442+ // held a pooled connection plus the advisory lock, so under load N
443+ // first-flush transactions each pinned a connection while waiting for one
444+ // more — starving the pool and queueing every same-key flush (and the Go
445+ // side's retries) behind the stall.
446+ const billingContext = await resolveBillingContext ( userId )
447+
427448 return db . transaction ( async ( tx ) => {
428- // Serialize all flushes for this request (lock auto-releases at tx end).
429- await tx . execute ( sql `SELECT pg_advisory_xact_lock(hashtext(${ eventKey } ))` )
449+ // Serialize all flushes for this request (lock auto-releases at tx end),
450+ // with a bounded wait so a pathological holder fails this flush fast and
451+ // lets the caller retry instead of hanging the connection.
452+ await tx . execute (
453+ sql `select set_config('lock_timeout', ${ `${ CUMULATIVE_FLUSH_LOCK_TIMEOUT_MS } ms` } , true)`
454+ )
455+ await tx . execute ( sql `select pg_advisory_xact_lock(hashtextextended(${ eventKey } , 0))` )
430456
431457 const [ existing ] = await tx
432458 . select ( { id : usageLog . id , cost : usageLog . cost } )
@@ -449,12 +475,14 @@ export async function recordCumulativeUsage(
449475 . set ( { cost : newTotal . toString ( ) , metadata : metadata ?? null } )
450476 . where ( eq ( usageLog . id , existing . id ) )
451477 } else {
452- // First flush for this request: insert the canonical row (recordUsage
453- // resolves billing entity/period) . Runs in the same tx + advisory lock.
478+ // First flush for this request: insert the canonical row with the
479+ // pre-resolved billing context . Runs in the same tx + advisory lock.
454480 await recordUsage ( {
455481 userId,
456482 workspaceId,
457483 tx,
484+ billingEntity : billingContext . billingEntity ,
485+ billingPeriod : billingContext . billingPeriod ,
458486 entries : [
459487 {
460488 category : 'model' ,
0 commit comments