Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/sim/app/api/jobs/[jobId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export async function GET(
}

if (job.status === JOB_STATUS.PROCESSING || job.status === JOB_STATUS.PENDING) {
response.estimatedDuration = 180000
response.estimatedDuration = 300000
}

return NextResponse.json(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ export async function POST(
checkDeployment: false, // Resuming existing execution, deployment already checked
skipUsageLimits: true, // Resume is continuation of authorized execution - don't recheck limits
workspaceId: workflow.workspaceId || undefined,
isResumeContext: true, // Enable billing fallback for paused workflow resumes
})

if (!preprocessResult.success) {
Expand Down
166 changes: 32 additions & 134 deletions apps/sim/lib/execution/preprocessing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,94 +19,6 @@ const BILLING_ERROR_MESSAGES = {
BILLING_ERROR_GENERIC: 'Error resolving billing account',
} as const

/**
* Attempts to resolve billing actor with fallback for resume contexts.
* Returns the resolved actor user ID or null if resolution fails and should block execution.
*
* For resume contexts, this function allows fallback to the workflow owner if workspace
* billing cannot be resolved, ensuring users can complete their paused workflows even
* if billing configuration changes mid-execution.
*
* @returns Object containing actorUserId (null if should block) and shouldBlock flag
*/
async function resolveBillingActorWithFallback(params: {
requestId: string
workflowId: string
workspaceId: string
executionId: string
triggerType: string
workflowRecord: WorkflowRecord
userId: string
isResumeContext: boolean
baseActorUserId: string | null
failureReason: 'null' | 'error'
error?: unknown
loggingSession?: LoggingSession
}): Promise<
{ actorUserId: string; shouldBlock: false } | { actorUserId: null; shouldBlock: true }
> {
const {
requestId,
workflowId,
workspaceId,
executionId,
triggerType,
workflowRecord,
userId,
isResumeContext,
baseActorUserId,
failureReason,
error,
loggingSession,
} = params

if (baseActorUserId) {
return { actorUserId: baseActorUserId, shouldBlock: false }
}

const workflowOwner = workflowRecord.userId?.trim()
if (isResumeContext && workflowOwner) {
const logMessage =
failureReason === 'null'
? '[BILLING_FALLBACK] Workspace billing account is null. Using workflow owner for billing.'
: '[BILLING_FALLBACK] Exception during workspace billing resolution. Using workflow owner for billing.'

logger.warn(`[${requestId}] ${logMessage}`, {
workflowId,
workspaceId,
fallbackUserId: workflowOwner,
...(error ? { error } : {}),
})

return { actorUserId: workflowOwner, shouldBlock: false }
}

const fallbackUserId = workflowRecord.userId || userId || 'unknown'
const errorMessage =
failureReason === 'null'
? BILLING_ERROR_MESSAGES.BILLING_REQUIRED
: BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC

logger.warn(`[${requestId}] ${errorMessage}`, {
workflowId,
workspaceId,
...(error ? { error } : {}),
})

await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: fallbackUserId,
workspaceId,
errorMessage,
loggingSession,
})

return { actorUserId: null, shouldBlock: true }
}

export interface PreprocessExecutionOptions {
// Required fields
workflowId: string
Expand All @@ -123,7 +35,7 @@ export interface PreprocessExecutionOptions {
// Context information
workspaceId?: string // If known, used for billing resolution
loggingSession?: LoggingSession // If provided, will be used for error logging
isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes)
isResumeContext?: boolean // Deprecated: no billing fallback is allowed
useAuthenticatedUserAsActor?: boolean // If true, use the authenticated userId as actorUserId (for client-side executions and personal API keys)
/** @deprecated No longer used - background/async executions always use deployed state */
useDraftState?: boolean
Expand Down Expand Up @@ -170,7 +82,7 @@ export async function preprocessExecution(
skipUsageLimits = false,
workspaceId: providedWorkspaceId,
loggingSession: providedLoggingSession,
isResumeContext = false,
isResumeContext: _isResumeContext = false,
useAuthenticatedUserAsActor = false,
} = options

Expand Down Expand Up @@ -274,68 +186,54 @@ export async function preprocessExecution(
}

if (!actorUserId) {
actorUserId = workflowRecord.userId || userId
logger.info(`[${requestId}] Using workflow owner as actor: ${actorUserId}`)
}

if (!actorUserId) {
const result = await resolveBillingActorWithFallback({
requestId,
const fallbackUserId = userId || 'unknown'
logger.warn(`[${requestId}] ${BILLING_ERROR_MESSAGES.BILLING_REQUIRED}`, {
workflowId,
workspaceId,
})

await logPreprocessingError({
workflowId,
executionId,
triggerType,
workflowRecord,
userId,
isResumeContext,
baseActorUserId: actorUserId,
failureReason: 'null',
requestId,
userId: fallbackUserId,
workspaceId,
errorMessage: BILLING_ERROR_MESSAGES.BILLING_REQUIRED,
loggingSession: providedLoggingSession,
})

if (result.shouldBlock) {
return {
success: false,
error: {
message: 'Unable to resolve billing account',
statusCode: 500,
logCreated: true,
},
}
return {
success: false,
error: {
message: 'Unable to resolve billing account',
statusCode: 500,
logCreated: true,
},
}

actorUserId = result.actorUserId
}
} catch (error) {
logger.error(`[${requestId}] Error resolving billing actor`, { error, workflowId })

const result = await resolveBillingActorWithFallback({
requestId,
const fallbackUserId = userId || 'unknown'
await logPreprocessingError({
workflowId,
workspaceId,
executionId,
triggerType,
workflowRecord,
userId,
isResumeContext,
baseActorUserId: null,
failureReason: 'error',
error,
requestId,
userId: fallbackUserId,
workspaceId,
errorMessage: BILLING_ERROR_MESSAGES.BILLING_ERROR_GENERIC,
loggingSession: providedLoggingSession,
})

if (result.shouldBlock) {
return {
success: false,
error: {
message: 'Error resolving billing account',
statusCode: 500,
logCreated: true,
},
}
return {
success: false,
error: {
message: 'Error resolving billing account',
statusCode: 500,
logCreated: true,
},
}

actorUserId = result.actorUserId
}

// ========== STEP 4: Get User Subscription ==========
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/lib/knowledge/documents/document-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { mistralParserTool } from '@/tools/mistral/parser'
const logger = createLogger('DocumentProcessor')

const TIMEOUTS = {
FILE_DOWNLOAD: 180000,
FILE_DOWNLOAD: 600000,
MISTRAL_OCR_API: 120000,
} as const

Expand Down
69 changes: 46 additions & 23 deletions apps/sim/lib/logs/execution/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import type {
WorkflowExecutionSnapshot,
WorkflowState,
} from '@/lib/logs/types'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import type { SerializableExecutionState } from '@/executor/execution/types'

export interface ToolCall {
Expand Down Expand Up @@ -210,16 +209,15 @@ export class ExecutionLogger implements IExecutionLoggerService {

logger.debug(`Completing workflow execution ${executionId}`, { isResume })

// If this is a resume, fetch the existing log to merge data
let existingLog: any = null
if (isResume) {
const [existing] = await db
.select()
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.executionId, executionId))
.limit(1)
existingLog = existing
}
const [existingLog] = await db
.select()
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.executionId, executionId))
.limit(1)
const billingUserId = this.extractBillingUserId(existingLog?.executionData)
const existingExecutionData = existingLog?.executionData as
| { traceSpans?: TraceSpan[] }
| undefined

// Determine if workflow failed by checking trace spans for errors
// Use the override if provided (for cost-only fallback scenarios)
Expand All @@ -244,7 +242,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
const mergedTraceSpans = isResume
? traceSpans && traceSpans.length > 0
? traceSpans
: existingLog?.executionData?.traceSpans || []
: existingExecutionData?.traceSpans || []
: traceSpans

const filteredTraceSpans = filterForDisplay(mergedTraceSpans)
Expand Down Expand Up @@ -329,7 +327,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type'],
executionId
executionId,
billingUserId
)

const limit = before.usageData.limit
Expand Down Expand Up @@ -367,7 +366,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type'],
executionId
executionId,
billingUserId
)

const percentBefore =
Expand All @@ -393,15 +393,17 @@ export class ExecutionLogger implements IExecutionLoggerService {
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type'],
executionId
executionId,
billingUserId
)
}
} else {
await this.updateUserStats(
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type'],
executionId
executionId,
billingUserId
)
}
} catch (e) {
Expand All @@ -410,7 +412,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type'],
executionId
executionId,
billingUserId
)
} catch {}
logger.warn('Usage threshold notification check failed (non-fatal)', { error: e })
Expand Down Expand Up @@ -472,6 +475,22 @@ export class ExecutionLogger implements IExecutionLoggerService {
* Updates user stats with cost and token information
* Maintains same logic as original execution logger for billing consistency
*/
private extractBillingUserId(executionData: unknown): string | null {
if (!executionData || typeof executionData !== 'object') {
return null
}

const environment = (executionData as { environment?: { userId?: unknown } }).environment
const userId = environment?.userId

if (typeof userId !== 'string') {
return null
}

const trimmedUserId = userId.trim()
return trimmedUserId.length > 0 ? trimmedUserId : null
}

private async updateUserStats(
workflowId: string | null,
costSummary: {
Expand All @@ -494,7 +513,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
>
},
trigger: ExecutionTrigger['type'],
executionId?: string
executionId?: string,
billingUserId?: string | null
): Promise<void> {
if (!isBillingEnabled) {
logger.debug('Billing is disabled, skipping user stats cost update')
Expand All @@ -512,7 +532,6 @@ export class ExecutionLogger implements IExecutionLoggerService {
}

try {
// Get the workflow record to get workspace and fallback userId
const [workflowRecord] = await db
.select()
.from(workflow)
Expand All @@ -524,12 +543,16 @@ export class ExecutionLogger implements IExecutionLoggerService {
return
}

let billingUserId: string | null = null
if (workflowRecord.workspaceId) {
billingUserId = await getWorkspaceBilledAccountUserId(workflowRecord.workspaceId)
const userId = billingUserId?.trim() || null
if (!userId) {
logger.error('Missing billing actor in execution context; skipping stats update', {
workflowId,
trigger,
executionId,
})
return
}

const userId = billingUserId || workflowRecord.userId
const costToStore = costSummary.totalCost

const existing = await db.select().from(userStats).where(eq(userStats.userId, userId))
Expand Down
Loading