Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(billing): attribute cost to caller when info available"
  • Loading branch information
icecrasher321 committed Feb 10, 2026
commit 0ff7e57f99ead61c515b14eafc334a5804ce0303
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ export async function POST(
checkDeployment: false, // Resuming existing execution, deployment already checked
skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits
workspaceId: workflow.workspaceId || undefined,
isResumeContext: true, // Enable billing fallback for paused workflow resumes
})

if (!preprocessResult.success) {
Expand Down
166 changes: 32 additions & 134 deletions apps/sim/lib/execution/preprocessing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,94 +19,6 @@ const BILLING_ERROR_MESSAGES = {
BILLING_ERROR_GENERIC: 'Error resolving billing account',
} as const

/**
* Attempts to resolve billing actor with fallback for resume contexts.
* Returns the resolved actor user ID or null if resolution fails and should block execution.
*
* For resume contexts, this function allows fallback to the workflow owner if workspace
* billing cannot be resolved, ensuring users can complete their paused workflows even
* if billing configuration changes mid-execution.
*
* @returns Object containing actorUserId (null if should block) and shouldBlock flag
*/
async function resolveBillingActorWithFallback(params: {
requestId: string
workflowId: string
workspaceId: string
executionId: string
triggerType: string
workflowRecord: WorkflowRecord
userId: string
isResumeContext: boolean
baseActorUserId: string | null
failureReason: 'null' | 'error'
error?: unknown
loggingSession?: LoggingSession
}): Promise<
{ actorUserId: string; shouldBlock: false } | { actorUserId: null; shouldBlock: true }
> {
const {
requestId,
workflowId,
workspaceId,
executionId,
triggerType,
workflowRecord,
userId,
isResumeContext,
baseActorUserId,
failureReason,
error,
loggingSession,
} = params

if (baseActorUserId) {
return { actorUserId: baseActorUserId, shouldBlock: false }
}

const workflowOwner = workflowRecord.userId?.trim()
if (isResumeContext && workflowOwner) {
const logMessage =
failureReason === 'null'
? '[BILLING_FALLBACK] Workspace billing account is null. Using workflow owner for billing.'
: '[BILLING_FALLBACK] Exception during workspace billing resolution. Using workflow owner for billing.'

logger.warn(`[${requestId}] ${logMessage}`, {
workflowId,
workspaceId,
fallbackUserId: workflowOwner,
...(error ? { error } : {}),
})

return { actorUserId: workflowOwner, shouldBlock: false }
}

const fallbackUserId = workflowRecord.userId || userId || 'unknown'
const errorMessage =
failureReason === 'null'
? BILLING_ERROR_MESSAGES.BILLING_REQUIRED
: BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC

logger.warn(`[${requestId}] ${errorMessage}`, {
workflowId,
workspaceId,
...(error ? { error } : {}),
})

await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: fallbackUserId,
workspaceId,
errorMessage,
loggingSession,
})

return { actorUserId: null, shouldBlock: true }
}

export interface PreprocessExecutionOptions {
// Required fields
workflowId: string
Expand All @@ -123,7 +35,7 @@ export interface PreprocessExecutionOptions {
// Context information
workspaceId?: string // If known, used for billing resolution
loggingSession?: LoggingSession // If provided, will be used for error logging
isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes)
isResumeContext?: boolean // Deprecated: no billing fallback is allowed
useAuthenticatedUserAsActor?: boolean // If true, use the authenticated userId as actorUserId (for client-side executions and personal API keys)
/** @deprecated No longer used - background/async executions always use deployed state */
useDraftState?: boolean
Expand Down Expand Up @@ -170,7 +82,7 @@ export async function preprocessExecution(
skipUsageLimits = false,
workspaceId: providedWorkspaceId,
loggingSession: providedLoggingSession,
isResumeContext = false,
isResumeContext: _isResumeContext = false,
useAuthenticatedUserAsActor = false,
} = options

Expand Down Expand Up @@ -274,68 +186,54 @@ export async function preprocessExecution(
}

if (!actorUserId) {
actorUserId = workflowRecord.userId || userId
logger.info(`[${requestId}] Using workflow owner as actor: ${actorUserId}`)
}

if (!actorUserId) {
const result = await resolveBillingActorWithFallback({
requestId,
const fallbackUserId = userId || workflowRecord.userId || 'unknown'
logger.warn(`[${requestId}] ${BILLING_ERROR_MESSAGES.BILLING_REQUIRED}`, {
workflowId,
workspaceId,
})

await logPreprocessingError({
workflowId,
executionId,
triggerType,
workflowRecord,
userId,
isResumeContext,
baseActorUserId: actorUserId,
failureReason: 'null',
requestId,
userId: fallbackUserId,
workspaceId,
errorMessage: BILLING_ERROR_MESSAGES.BILLING_REQUIRED,
loggingSession: providedLoggingSession,
})

if (result.shouldBlock) {
return {
success: false,
error: {
message: 'Unable to resolve billing account',
statusCode: 500,
logCreated: true,
},
}
return {
success: false,
error: {
message: 'Unable to resolve billing account',
statusCode: 500,
logCreated: true,
},
}

actorUserId = result.actorUserId
}
} catch (error) {
logger.error(`[${requestId}] Error resolving billing actor`, { error, workflowId })

const result = await resolveBillingActorWithFallback({
requestId,
const fallbackUserId = userId || workflowRecord.userId || 'unknown'
await logPreprocessingError({
workflowId,
workspaceId,
executionId,
triggerType,
workflowRecord,
userId,
isResumeContext,
baseActorUserId: null,
failureReason: 'error',
error,
requestId,
userId: fallbackUserId,
workspaceId,
errorMessage: BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC,
loggingSession: providedLoggingSession,
})

if (result.shouldBlock) {
return {
success: false,
error: {
message: 'Error resolving billing account',
statusCode: 500,
logCreated: true,
},
}
return {
success: false,
error: {
message: 'Error resolving billing account',
statusCode: 500,
logCreated: true,
},
}

actorUserId = result.actorUserId
}

// ========== STEP 4: Get User Subscription ==========
Expand Down
69 changes: 46 additions & 23 deletions apps/sim/lib/logs/execution/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import type {
WorkflowExecutionSnapshot,
WorkflowState,
} from '@/lib/logs/types'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import type { SerializableExecutionState } from '@/executor/execution/types'

export interface ToolCall {
Expand Down Expand Up @@ -210,16 +209,15 @@ export class ExecutionLogger implements IExecutionLoggerService {

logger.debug(`Completing workflow execution ${executionId}`, { isResume })

// If this is a resume, fetch the existing log to merge data
let existingLog: any = null
if (isResume) {
const [existing] = await db
.select()
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.executionId, executionId))
.limit(1)
existingLog = existing
}
const [existingLog] = await db
.select()
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.executionId, executionId))
.limit(1)
const billingUserId = this.extractBillingUserId(existingLog?.executionData)
const existingExecutionData = existingLog?.executionData as
| { traceSpans?: TraceSpan[] }
| undefined

// Determine if workflow failed by checking trace spans for errors
// Use the override if provided (for cost-only fallback scenarios)
Expand All @@ -244,7 +242,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
const mergedTraceSpans = isResume
? traceSpans && traceSpans.length > 0
? traceSpans
: existingLog?.executionData?.traceSpans || []
: existingExecutionData?.traceSpans || []
: traceSpans

const filteredTraceSpans = filterForDisplay(mergedTraceSpans)
Expand Down Expand Up @@ -329,7 +327,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type'],
executionId
executionId,
billingUserId
)

const limit = before.usageData.limit
Expand Down Expand Up @@ -367,7 +366,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type'],
executionId
executionId,
billingUserId
)

const percentBefore =
Expand All @@ -393,15 +393,17 @@ export class ExecutionLogger implements IExecutionLoggerService {
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type'],
executionId
executionId,
billingUserId
)
}
} else {
await this.updateUserStats(
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type'],
executionId
executionId,
billingUserId
)
}
} catch (e) {
Expand All @@ -410,7 +412,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type'],
executionId
executionId,
billingUserId
)
} catch {}
logger.warn('Usage threshold notification check failed (non-fatal)', { error: e })
Expand Down Expand Up @@ -472,6 +475,22 @@ export class ExecutionLogger implements IExecutionLoggerService {
* Updates user stats with cost and token information
* Maintains same logic as original execution logger for billing consistency
*/
private extractBillingUserId(executionData: unknown): string | null {
if (!executionData || typeof executionData !== 'object') {
return null
}

const environment = (executionData as { environment?: { userId?: unknown } }).environment
const userId = environment?.userId

if (typeof userId !== 'string') {
return null
}

const trimmedUserId = userId.trim()
return trimmedUserId.length > 0 ? trimmedUserId : null
}

private async updateUserStats(
workflowId: string | null,
costSummary: {
Expand All @@ -494,7 +513,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
>
},
trigger: ExecutionTrigger['type'],
executionId?: string
executionId?: string,
billingUserId?: string | null
): Promise<void> {
if (!isBillingEnabled) {
logger.debug('Billing is disabled, skipping user stats cost update')
Expand All @@ -512,7 +532,6 @@ export class ExecutionLogger implements IExecutionLoggerService {
}

try {
// Get the workflow record to get workspace and fallback userId
const [workflowRecord] = await db
.select()
.from(workflow)
Expand All @@ -524,12 +543,16 @@ export class ExecutionLogger implements IExecutionLoggerService {
return
}

let billingUserId: string | null = null
if (workflowRecord.workspaceId) {
billingUserId = await getWorkspaceBilledAccountUserId(workflowRecord.workspaceId)
const userId = billingUserId?.trim() || null
if (!userId) {
logger.error('Missing billing actor in execution context; skipping stats update', {
workflowId,
trigger,
executionId,
})
return
}

const userId = billingUserId || workflowRecord.userId
const costToStore = costSummary.totalCost

const existing = await db.select().from(userStats).where(eq(userStats.userId, userId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,6 @@ export class PauseResumeManager {
skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits
workspaceId: baseSnapshot.metadata.workspaceId,
loggingSession,
isResumeContext: true, // Enable billing fallback for paused workflow resumes
})

if (!preprocessingResult.success) {
Expand Down