Skip to content
Prev Previous commit
Next Next commit
fix(executor): await subflow diagnostics callbacks
Ensure empty-subflow and subflow-error lifecycle callbacks participate in progress-write draining before terminal finalization while still swallowing callback failures.
  • Loading branch information
test committed Mar 13, 2026
commit c3f5d776513ef0780cbd96d9d6270f4d1b9f5591
24 changes: 12 additions & 12 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class LoopOrchestrator {
private edgeManager: EdgeManager | null = null
) {}

initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope {
async initializeLoopScope(ctx: ExecutionContext, loopId: string): Promise<LoopScope> {
const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined
if (!loopConfig) {
throw new Error(`Loop config not found: ${loopId}`)
Expand All @@ -76,7 +76,7 @@ export class LoopOrchestrator {
)
if (iterationError) {
logger.error(iterationError, { loopId, requestedIterations })
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
iterations: requestedIterations,
})
scope.maxIterations = 0
Expand All @@ -99,7 +99,7 @@ export class LoopOrchestrator {
} catch (error) {
const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
await this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
forEachItems: loopConfig.forEachItems,
})
scope.items = []
Expand All @@ -117,7 +117,7 @@ export class LoopOrchestrator {
)
if (sizeError) {
logger.error(sizeError, { loopId, collectionSize: items.length })
this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
await this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
forEachItems: loopConfig.forEachItems,
collectionSize: items.length,
})
Expand Down Expand Up @@ -155,7 +155,7 @@ export class LoopOrchestrator {
)
if (iterationError) {
logger.error(iterationError, { loopId, requestedIterations })
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
iterations: requestedIterations,
})
scope.maxIterations = 0
Expand All @@ -182,14 +182,14 @@ export class LoopOrchestrator {
return scope
}

private addLoopErrorLog(
private async addLoopErrorLog(
ctx: ExecutionContext,
loopId: string,
loopType: string,
errorMessage: string,
inputData?: any
): void {
addSubflowErrorLog(
): Promise<void> {
await addSubflowErrorLog(
ctx,
loopId,
'loop',
Expand Down Expand Up @@ -604,7 +604,7 @@ export class LoopOrchestrator {
if (!scope.items || scope.items.length === 0) {
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}
return true
Expand All @@ -614,7 +614,7 @@ export class LoopOrchestrator {
if (scope.maxIterations === 0) {
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}
return true
Expand All @@ -628,7 +628,7 @@ export class LoopOrchestrator {
if (!scope.condition) {
logger.warn('No condition defined for while loop', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}

Expand All @@ -641,7 +641,7 @@ export class LoopOrchestrator {

if (!result) {
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
}

return result
Expand Down
8 changes: 4 additions & 4 deletions apps/sim/executor/orchestrators/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ export class NodeExecutionOrchestrator {

const loopId = node.metadata.loopId
if (loopId && !this.loopOrchestrator.getLoopScope(ctx, loopId)) {
this.loopOrchestrator.initializeLoopScope(ctx, loopId)
await this.loopOrchestrator.initializeLoopScope(ctx, loopId)
}

const parallelId = node.metadata.parallelId
if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
const nodesInParallel = parallelConfig?.nodes?.length || 1
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
}

if (node.metadata.isSentinel) {
Expand Down Expand Up @@ -158,7 +158,7 @@ export class NodeExecutionOrchestrator {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
if (parallelConfig) {
const nodesInParallel = parallelConfig.nodes?.length || 1
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
}
}

Expand Down Expand Up @@ -239,7 +239,7 @@ export class NodeExecutionOrchestrator {
if (!scope) {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
const nodesInParallel = parallelConfig?.nodes?.length || 1
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
}
const allComplete = this.parallelOrchestrator.handleParallelBranchCompletion(
ctx,
Expand Down
142 changes: 142 additions & 0 deletions apps/sim/executor/orchestrators/parallel.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* @vitest-environment node
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
import type { DAG } from '@/executor/dag/builder'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
import type { ExecutionContext } from '@/executor/types'

vi.mock('@sim/logger', () => ({
createLogger: () => ({
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
}),
}))

function createDag(): DAG {
return {
nodes: new Map(),
loopConfigs: new Map(),
parallelConfigs: new Map([
[
'parallel-1',
{
id: 'parallel-1',
nodes: ['task-1'],
distribution: [],
parallelType: 'collection',
},
],
]),
}
}

function createState(): BlockStateWriter {
return {
setBlockOutput: vi.fn(),
setBlockState: vi.fn(),
deleteBlockState: vi.fn(),
unmarkExecuted: vi.fn(),
}
}

function createContext(overrides: Partial<ExecutionContext> = {}): ExecutionContext {
return {
workflowId: 'workflow-1',
workspaceId: 'workspace-1',
executionId: 'execution-1',
userId: 'user-1',
blockStates: new Map(),
executedBlocks: new Set(),
blockLogs: [],
metadata: { duration: 0 },
environmentVariables: {},
decisions: {
router: new Map(),
condition: new Map(),
},
completedLoops: new Set(),
activeExecutionPath: new Set(),
workflow: {
version: '1',
blocks: [
{
id: 'parallel-1',
position: { x: 0, y: 0 },
config: { tool: '', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'parallel', name: 'Parallel 1' },
enabled: true,
},
],
connections: [],
loops: {},
parallels: {},
},
...overrides,
}
}

describe('ParallelOrchestrator', () => {
beforeEach(() => {
vi.clearAllMocks()
})

it('awaits empty-subflow lifecycle callbacks before returning the empty scope', async () => {
let releaseStart: (() => void) | undefined
const onBlockStart = vi.fn(
() =>
new Promise<void>((resolve) => {
releaseStart = resolve
})
)
const onBlockComplete = vi.fn()
const contextExtensions: ContextExtensions = {
onBlockStart,
onBlockComplete,
}
const orchestrator = new ParallelOrchestrator(
createDag(),
createState(),
null,
contextExtensions
)
const ctx = createContext()

const initializePromise = orchestrator.initializeParallelScope(ctx, 'parallel-1', 1)
await Promise.resolve()

expect(onBlockStart).toHaveBeenCalledTimes(1)
expect(onBlockComplete).not.toHaveBeenCalled()

releaseStart?.()
const scope = await initializePromise

expect(onBlockComplete).toHaveBeenCalledTimes(1)
expect(scope.isEmpty).toBe(true)
})

it('swallows helper callback failures on empty parallel paths', async () => {
const contextExtensions: ContextExtensions = {
onBlockStart: vi.fn().mockRejectedValue(new Error('start failed')),
onBlockComplete: vi.fn().mockRejectedValue(new Error('complete failed')),
}
const orchestrator = new ParallelOrchestrator(
createDag(),
createState(),
null,
contextExtensions
)

await expect(
orchestrator.initializeParallelScope(createContext(), 'parallel-1', 1)
).resolves.toMatchObject({
parallelId: 'parallel-1',
isEmpty: true,
})
})
})
16 changes: 8 additions & 8 deletions apps/sim/executor/orchestrators/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ export class ParallelOrchestrator {
private contextExtensions: ContextExtensions | null = null
) {}

initializeParallelScope(
async initializeParallelScope(
ctx: ExecutionContext,
parallelId: string,
terminalNodesCount = 1
): ParallelScope {
): Promise<ParallelScope> {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
if (!parallelConfig) {
throw new Error(`Parallel config not found: ${parallelId}`)
Expand All @@ -69,7 +69,7 @@ export class ParallelOrchestrator {
} catch (error) {
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
this.addParallelErrorLog(ctx, parallelId, errorMessage, {
await this.addParallelErrorLog(ctx, parallelId, errorMessage, {
distribution: parallelConfig.distribution,
})
this.setErrorScope(ctx, parallelId, errorMessage)
Expand All @@ -83,7 +83,7 @@ export class ParallelOrchestrator {
)
if (branchError) {
logger.error(branchError, { parallelId, branchCount })
this.addParallelErrorLog(ctx, parallelId, branchError, {
await this.addParallelErrorLog(ctx, parallelId, branchError, {
distribution: parallelConfig.distribution,
branchCount,
})
Expand All @@ -109,7 +109,7 @@ export class ParallelOrchestrator {

this.state.setBlockOutput(parallelId, { results: [] })

emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions)
await emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions)

logger.info('Parallel scope initialized with empty distribution, skipping body', {
parallelId,
Expand Down Expand Up @@ -220,13 +220,13 @@ export class ParallelOrchestrator {
return { branchCount: items.length, items }
}

private addParallelErrorLog(
private async addParallelErrorLog(
ctx: ExecutionContext,
parallelId: string,
errorMessage: string,
inputData?: any
): void {
addSubflowErrorLog(
): Promise<void> {
await addSubflowErrorLog(
ctx,
parallelId,
'parallel',
Expand Down
Loading
Loading