Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(logs): add durable execution diagnostics foundation (#3564)
* fix(logs): persist execution diagnostics markers

Store last-started and last-completed block markers with finalization metadata so later read surfaces can explain how a run ended without reconstructing executor state.

* fix(executor): preserve durable diagnostics ordering

Await only the persistence needed to keep diagnostics durable before terminal completion while keeping callback failures from changing execution behavior.

* fix(logs): preserve fallback diagnostics semantics

Keep successful fallback output and accumulated cost intact while tightening progress-write draining and deduplicating trace span counting for diagnostics helpers.

* fix(api): restore async execute route test mock

Add the missing AuthType export to the hybrid auth mock so the async execution route test exercises the 202 queueing path instead of crashing with a 500 in CI.

* fix(executor): align async block error handling

* fix(logs): tighten marker ordering scope

Allow same-millisecond marker writes to replace prior markers and drop the unused diagnostics read helper so this PR stays focused on persistence rather than unread foundation code.

* fix(logs): remove unused finalization type guard

Drop the unused  helper so this PR only ships the persistence-side status types it actually uses.

* fix(executor): await subflow diagnostics callbacks

Ensure empty-subflow and subflow-error lifecycle callbacks participate in progress-write draining before terminal finalization while still swallowing callback failures.

---------

Co-authored-by: test <test@example.com>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
  • Loading branch information
3 people authored Mar 18, 2026
commit 67478bbc80382b636f4221d2f5dd754827465105
86 changes: 51 additions & 35 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class BlockExecutor {
if (!isSentinel) {
blockLog = this.createBlockLog(ctx, node.id, block, node)
ctx.blockLogs.push(blockLog)
this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
await this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
}

const startTime = performance.now()
Expand Down Expand Up @@ -105,7 +105,7 @@ export class BlockExecutor {
}
} catch (error) {
cleanupSelfReference?.()
return this.handleBlockError(
return await this.handleBlockError(
error,
ctx,
node,
Expand Down Expand Up @@ -179,7 +179,7 @@ export class BlockExecutor {
const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, {
block,
})
this.callOnBlockComplete(
await this.callOnBlockComplete(
ctx,
node,
block,
Expand All @@ -195,7 +195,7 @@ export class BlockExecutor {

return normalizedOutput
} catch (error) {
return this.handleBlockError(
return await this.handleBlockError(
error,
ctx,
node,
Expand Down Expand Up @@ -226,7 +226,7 @@ export class BlockExecutor {
return this.blockHandlers.find((h) => h.canHandle(block))
}

private handleBlockError(
private async handleBlockError(
error: unknown,
ctx: ExecutionContext,
node: DAGNode,
Expand All @@ -236,7 +236,7 @@ export class BlockExecutor {
resolvedInputs: Record<string, any>,
isSentinel: boolean,
phase: 'input_resolution' | 'execution'
): NormalizedBlockOutput {
): Promise<NormalizedBlockOutput> {
const duration = performance.now() - startTime
const errorMessage = normalizeError(error)
const hasResolvedInputs =
Expand Down Expand Up @@ -287,7 +287,7 @@ export class BlockExecutor {
? error.childWorkflowInstanceId
: undefined
const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
this.callOnBlockComplete(
await this.callOnBlockComplete(
ctx,
node,
block,
Expand Down Expand Up @@ -439,31 +439,39 @@ export class BlockExecutor {
return redactApiKeys(result)
}

private callOnBlockStart(
private async callOnBlockStart(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
executionOrder: number
): void {
): Promise<void> {
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE

const iterationContext = getIterationContext(ctx, node?.metadata)

if (this.contextExtensions.onBlockStart) {
this.contextExtensions.onBlockStart(
blockId,
blockName,
blockType,
executionOrder,
iterationContext,
ctx.childWorkflowContext
)
try {
await this.contextExtensions.onBlockStart(
blockId,
blockName,
blockType,
executionOrder,
iterationContext,
ctx.childWorkflowContext
)
} catch (error) {
logger.warn('Block start callback failed', {
blockId,
blockType,
error: error instanceof Error ? error.message : String(error),
})
}
}
}

private callOnBlockComplete(
private async callOnBlockComplete(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
Expand All @@ -474,30 +482,38 @@ export class BlockExecutor {
executionOrder: number,
endedAt: string,
childWorkflowInstanceId?: string
): void {
): Promise<void> {
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE

const iterationContext = getIterationContext(ctx, node?.metadata)

if (this.contextExtensions.onBlockComplete) {
this.contextExtensions.onBlockComplete(
blockId,
blockName,
blockType,
{
input,
output,
executionTime: duration,
startedAt,
executionOrder,
endedAt,
childWorkflowInstanceId,
},
iterationContext,
ctx.childWorkflowContext
)
try {
await this.contextExtensions.onBlockComplete(
blockId,
blockName,
blockType,
{
input,
output,
executionTime: duration,
startedAt,
executionOrder,
endedAt,
childWorkflowInstanceId,
},
iterationContext,
ctx.childWorkflowContext
)
} catch (error) {
logger.warn('Block completion callback failed', {
blockId,
blockType,
error: error instanceof Error ? error.message : String(error),
})
}
}
}

Expand Down
65 changes: 36 additions & 29 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class LoopOrchestrator {
private edgeManager: EdgeManager | null = null
) {}

initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope {
async initializeLoopScope(ctx: ExecutionContext, loopId: string): Promise<LoopScope> {
const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined
if (!loopConfig) {
throw new Error(`Loop config not found: ${loopId}`)
Expand All @@ -76,7 +76,7 @@ export class LoopOrchestrator {
)
if (iterationError) {
logger.error(iterationError, { loopId, requestedIterations })
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
iterations: requestedIterations,
})
scope.maxIterations = 0
Expand All @@ -99,7 +99,7 @@ export class LoopOrchestrator {
} catch (error) {
const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
await this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
forEachItems: loopConfig.forEachItems,
})
scope.items = []
Expand All @@ -117,7 +117,7 @@ export class LoopOrchestrator {
)
if (sizeError) {
logger.error(sizeError, { loopId, collectionSize: items.length })
this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
await this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
forEachItems: loopConfig.forEachItems,
collectionSize: items.length,
})
Expand Down Expand Up @@ -155,7 +155,7 @@ export class LoopOrchestrator {
)
if (iterationError) {
logger.error(iterationError, { loopId, requestedIterations })
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
iterations: requestedIterations,
})
scope.maxIterations = 0
Expand All @@ -182,14 +182,14 @@ export class LoopOrchestrator {
return scope
}

private addLoopErrorLog(
private async addLoopErrorLog(
ctx: ExecutionContext,
loopId: string,
loopType: string,
errorMessage: string,
inputData?: any
): void {
addSubflowErrorLog(
): Promise<void> {
await addSubflowErrorLog(
ctx,
loopId,
'loop',
Expand Down Expand Up @@ -238,7 +238,7 @@ export class LoopOrchestrator {
}
if (isCancelled) {
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
return this.createExitResult(ctx, loopId, scope)
return await this.createExitResult(ctx, loopId, scope)
}

const iterationResults: NormalizedBlockOutput[] = []
Expand All @@ -253,7 +253,7 @@ export class LoopOrchestrator {
scope.currentIterationOutputs.clear()

if (!(await this.evaluateCondition(ctx, scope, scope.iteration + 1))) {
return this.createExitResult(ctx, loopId, scope)
return await this.createExitResult(ctx, loopId, scope)
}

scope.iteration++
Expand All @@ -269,11 +269,11 @@ export class LoopOrchestrator {
}
}

private createExitResult(
private async createExitResult(
ctx: ExecutionContext,
loopId: string,
scope: LoopScope
): LoopContinuationResult {
): Promise<LoopContinuationResult> {
const results = scope.allIterationOutputs
const output = { results }
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
Expand All @@ -282,19 +282,26 @@ export class LoopOrchestrator {
const now = new Date().toISOString()
const iterationContext = buildContainerIterationContext(ctx, loopId)

this.contextExtensions.onBlockComplete(
loopId,
'Loop',
'loop',
{
output,
executionTime: DEFAULTS.EXECUTION_TIME,
startedAt: now,
executionOrder: getNextExecutionOrder(ctx),
endedAt: now,
},
iterationContext
)
try {
await this.contextExtensions.onBlockComplete(
loopId,
'Loop',
'loop',
{
output,
executionTime: DEFAULTS.EXECUTION_TIME,
startedAt: now,
executionOrder: getNextExecutionOrder(ctx),
endedAt: now,
},
iterationContext
)
} catch (error) {
logger.warn('Loop completion callback failed', {
loopId,
error: error instanceof Error ? error.message : String(error),
})
}
}

return {
Expand Down Expand Up @@ -597,7 +604,7 @@ export class LoopOrchestrator {
if (!scope.items || scope.items.length === 0) {
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}
return true
Expand All @@ -607,7 +614,7 @@ export class LoopOrchestrator {
if (scope.maxIterations === 0) {
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}
return true
Expand All @@ -621,7 +628,7 @@ export class LoopOrchestrator {
if (!scope.condition) {
logger.warn('No condition defined for while loop', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}

Expand All @@ -634,7 +641,7 @@ export class LoopOrchestrator {

if (!result) {
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
}

return result
Expand Down
Loading
Loading