Skip to content
Prev Previous commit
Next Next commit
fix(logs): preserve fallback diagnostics semantics
Keep successful fallback output and accumulated cost intact while tightening progress-write draining and deduplicating trace span counting for diagnostics helpers.
  • Loading branch information
test committed Mar 13, 2026
commit c6d91952ef38da05370fa75fae975919ace16859
16 changes: 1 addition & 15 deletions apps/sim/lib/logs/execution/diagnostics.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { countTraceSpans } from '@/lib/logs/execution/trace-span-count'
import type { ExecutionFinalizationPath } from '@/lib/logs/types'
import { isExecutionFinalizationPath } from '@/lib/logs/types'

Expand All @@ -14,21 +15,6 @@ type ExecutionData = {
finalizationPath?: unknown
}

function countTraceSpans(traceSpans: unknown[] | undefined): number {
if (!Array.isArray(traceSpans) || traceSpans.length === 0) {
return 0
}

return traceSpans.reduce<number>((count, span) => {
const children =
span && typeof span === 'object' && 'children' in span && Array.isArray(span.children)
? (span.children as unknown[])
: undefined

return count + 1 + countTraceSpans(children)
}, 0)
}

export function buildExecutionDiagnostics(params: {
status: string
level?: string | null
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/lib/logs/execution/logger.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { databaseMock, loggerMock } from '@sim/testing'
import { beforeEach, describe, expect, test, vi } from 'vitest'
import { ExecutionLogger } from './logger'
import { ExecutionLogger } from '@/lib/logs/execution/logger'

vi.mock('@sim/db', () => databaseMock)
Comment thread
PlaneInABottle marked this conversation as resolved.

Expand Down
9 changes: 1 addition & 8 deletions apps/sim/lib/logs/execution/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { redactApiKeys } from '@/lib/core/security/redaction'
import { filterForDisplay } from '@/lib/core/utils/display-filters'
import { emitWorkflowExecutionCompleted } from '@/lib/logs/events'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import { countTraceSpans } from '@/lib/logs/execution/trace-span-count'
import type {
BlockOutputData,
ExecutionEnvironment,
Expand All @@ -49,14 +50,6 @@ export interface ToolCall {

const logger = createLogger('ExecutionLogger')

function countTraceSpans(traceSpans?: TraceSpan[]): number {
if (!Array.isArray(traceSpans) || traceSpans.length === 0) {
return 0
}

return traceSpans.reduce((count, span) => count + 1 + countTraceSpans(span.children), 0)
}

export class ExecutionLogger implements IExecutionLoggerService {
private buildCompletedExecutionData(params: {
existingExecutionData?: WorkflowExecutionLog['executionData']
Expand Down
127 changes: 127 additions & 0 deletions apps/sim/lib/logs/execution/logging-session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,64 @@ describe('LoggingSession completion retries', () => {
expect(session.hasCompleted()).toBe(true)
})

it('preserves successful final output during fallback completion', async () => {
const session = new LoggingSession('workflow-1', 'execution-5', 'api', 'req-1')

completeWorkflowExecutionMock
.mockRejectedValueOnce(new Error('success finalize failed'))
.mockResolvedValueOnce({})

await expect(
session.safeComplete({ finalOutput: { ok: true, stage: 'done' } })
).resolves.toBeUndefined()

expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith(
expect.objectContaining({
executionId: 'execution-5',
finalOutput: { ok: true, stage: 'done' },
finalizationPath: 'fallback_completed',
})
)
})

it('preserves accumulated cost during fallback completion', async () => {
const session = new LoggingSession('workflow-1', 'execution-6', 'api', 'req-1') as any

session.accumulatedCost = {
total: 12,
input: 5,
output: 7,
tokens: { input: 11, output: 13, total: 24 },
models: {
'test-model': {
input: 5,
output: 7,
total: 12,
tokens: { input: 11, output: 13, total: 24 },
},
},
}
session.costFlushed = true

completeWorkflowExecutionMock
.mockRejectedValueOnce(new Error('success finalize failed'))
.mockResolvedValueOnce({})

await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined()

expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith(
expect.objectContaining({
executionId: 'execution-6',
costSummary: expect.objectContaining({
totalCost: 12,
totalInputCost: 5,
totalOutputCost: 7,
totalTokens: 24,
}),
})
)
})

it('persists failed error semantics when completeWithError receives non-error trace spans', async () => {
const session = new LoggingSession('workflow-1', 'execution-4', 'api', 'req-1')
const traceSpans = [
Expand Down Expand Up @@ -294,6 +352,75 @@ describe('LoggingSession completion retries', () => {
expect(session.complete).toHaveBeenCalledTimes(1)
})

it('drains fire-and-forget cost flushes before terminal completion', async () => {
let releaseFlush: (() => void) | undefined
const flushPromise = new Promise<void>((resolve) => {
releaseFlush = resolve
})

const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any
session.flushAccumulatedCost = vi.fn(() => flushPromise)
session.complete = vi.fn().mockResolvedValue(undefined)

await session.onBlockComplete('block-2', 'Transform', 'function', {
endedAt: '2025-01-01T00:00:01.000Z',
output: { value: true },
cost: { total: 1, input: 1, output: 0 },
tokens: { input: 1, output: 0, total: 1 },
model: 'test-model',
})

const completionPromise = session.safeComplete({ finalOutput: { ok: true } })

await Promise.resolve()

expect(session.complete).not.toHaveBeenCalled()

releaseFlush?.()

await completionPromise

expect(session.flushAccumulatedCost).toHaveBeenCalledTimes(1)
expect(session.complete).toHaveBeenCalledTimes(1)
})

it('keeps draining when new progress writes arrive during drain', async () => {
let releaseFirst: (() => void) | undefined
let releaseSecond: (() => void) | undefined
const firstPromise = new Promise<void>((resolve) => {
releaseFirst = resolve
})
const secondPromise = new Promise<void>((resolve) => {
releaseSecond = resolve
})

const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any

void session.trackProgressWrite(firstPromise)

const drainPromise = session.drainPendingProgressWrites()

await Promise.resolve()

void session.trackProgressWrite(secondPromise)
releaseFirst?.()

await Promise.resolve()

let drained = false
void drainPromise.then(() => {
drained = true
})

await Promise.resolve()
expect(drained).toBe(false)

releaseSecond?.()
await drainPromise

expect(session.pendingProgressWrites.size).toBe(0)
})

it('marks pause completion as terminal and prevents duplicate pause finalization', async () => {
const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any
session.completeExecutionWithFinalization = vi.fn().mockResolvedValue(undefined)
Expand Down
59 changes: 42 additions & 17 deletions apps/sim/lib/logs/execution/logging-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,9 @@ export class LoggingSession {
}

private async drainPendingProgressWrites(): Promise<void> {
if (this.pendingProgressWrites.size === 0) {
return
while (this.pendingProgressWrites.size > 0) {
await Promise.allSettled(Array.from(this.pendingProgressWrites))
}

await Promise.allSettled(Array.from(this.pendingProgressWrites))
}

private async completeExecutionWithFinalization(params: {
Expand Down Expand Up @@ -330,7 +328,7 @@ export class LoggingSession {
}
}

void this.flushAccumulatedCost()
void this.trackProgressWrite(this.flushAccumulatedCost())
}

private async flushAccumulatedCost(): Promise<void> {
Expand Down Expand Up @@ -921,6 +919,7 @@ export class LoggingSession {
errorMessage: `Failed to store trace spans: ${errorMsg}`,
isError: false,
finalizationPath: 'fallback_completed',
finalOutput: params.finalOutput || {},
})
}
}
Expand All @@ -947,6 +946,9 @@ export class LoggingSession {
params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`,
isError: true,
finalizationPath: 'force_failed',
finalOutput: {
error: params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`,
},
status: 'failed',
})
}
Expand Down Expand Up @@ -975,6 +977,7 @@ export class LoggingSession {
errorMessage: 'Execution was cancelled',
isError: false,
finalizationPath: 'cancelled',
finalOutput: { cancelled: true },
status: 'cancelled',
})
}
Expand All @@ -1001,6 +1004,7 @@ export class LoggingSession {
errorMessage: 'Execution paused but failed to store full trace spans',
isError: false,
finalizationPath: 'paused',
finalOutput: { paused: true },
status: 'pending',
})
}
Expand Down Expand Up @@ -1054,6 +1058,7 @@ export class LoggingSession {
errorMessage: string
isError: boolean
finalizationPath: ExecutionFinalizationPath
finalOutput?: Record<string, unknown>
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
}): Promise<void> {
if (this.completed || this.completing) {
Expand All @@ -1066,25 +1071,45 @@ export class LoggingSession {
)

try {
const costSummary = params.traceSpans?.length
? calculateCostSummary(params.traceSpans)
: {
totalCost: BASE_EXECUTION_CHARGE,
totalInputCost: 0,
totalOutputCost: 0,
totalTokens: 0,
totalPromptTokens: 0,
totalCompletionTokens: 0,
const hasAccumulatedCost =
this.costFlushed ||
this.accumulatedCost.total > BASE_EXECUTION_CHARGE ||
this.accumulatedCost.tokens.total > 0 ||
Object.keys(this.accumulatedCost.models).length > 0

const costSummary = hasAccumulatedCost
? {
totalCost: this.accumulatedCost.total,
totalInputCost: this.accumulatedCost.input,
totalOutputCost: this.accumulatedCost.output,
totalTokens: this.accumulatedCost.tokens.total,
totalPromptTokens: this.accumulatedCost.tokens.input,
totalCompletionTokens: this.accumulatedCost.tokens.output,
baseExecutionCharge: BASE_EXECUTION_CHARGE,
modelCost: 0,
models: {},
modelCost: Math.max(0, this.accumulatedCost.total - BASE_EXECUTION_CHARGE),
models: this.accumulatedCost.models,
}
: params.traceSpans?.length
? calculateCostSummary(params.traceSpans)
: {
totalCost: BASE_EXECUTION_CHARGE,
totalInputCost: 0,
totalOutputCost: 0,
totalTokens: 0,
totalPromptTokens: 0,
totalCompletionTokens: 0,
baseExecutionCharge: BASE_EXECUTION_CHARGE,
modelCost: 0,
models: {},
}

const finalOutput = params.finalOutput || { _fallback: true, error: params.errorMessage }

await this.completeExecutionWithFinalization({
endedAt: params.endedAt || new Date().toISOString(),
totalDurationMs: params.totalDurationMs || 0,
costSummary,
finalOutput: { _fallback: true, error: params.errorMessage },
finalOutput,
traceSpans: [],
finalizationPath: params.finalizationPath,
completionFailure: params.errorMessage,
Expand Down
14 changes: 14 additions & 0 deletions apps/sim/lib/logs/execution/trace-span-count.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export function countTraceSpans(traceSpans?: unknown[]): number {
if (!Array.isArray(traceSpans) || traceSpans.length === 0) {
return 0
}

return traceSpans.reduce<number>((count, span) => {
const children =
span && typeof span === 'object' && 'children' in span && Array.isArray(span.children)
? (span.children as unknown[])
: undefined

return count + 1 + countTraceSpans(children)
}, 0)
}
24 changes: 8 additions & 16 deletions apps/sim/lib/workflows/executor/execution-core.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => {
expect(wasExecutionFinalizedByCore('engine failed', 'execution-a')).toBe(true)
})

it('falls back to error finalization when success finalization rejects', async () => {
it('does not replace a successful outcome when success finalization rejects', async () => {
executorExecuteMock.mockResolvedValue({
success: true,
status: 'completed',
Expand All @@ -626,22 +626,14 @@ describe('executeWorkflowCore terminal finalization sequencing', () => {

safeCompleteMock.mockRejectedValue(new Error('completion failed'))

await expect(
executeWorkflowCore({
snapshot: createSnapshot() as any,
callbacks: {},
loggingSession: loggingSession as any,
})
).rejects.toThrow('completion failed')
const result = await executeWorkflowCore({
snapshot: createSnapshot() as any,
callbacks: {},
loggingSession: loggingSession as any,
})

expect(safeCompleteWithErrorMock).toHaveBeenCalledTimes(1)
expect(safeCompleteWithErrorMock).toHaveBeenCalledWith(
expect.objectContaining({
error: expect.objectContaining({
message: 'completion failed',
}),
})
)
expect(result).toMatchObject({ status: 'completed', success: true })
expect(safeCompleteWithErrorMock).not.toHaveBeenCalled()
})

it('does not replace a successful outcome when cancellation cleanup fails', async () => {
Expand Down
Loading