Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions apps/sim/executor/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ export const EDGE = {
DEFAULT: 'default',
} as const

export const SUBFLOW_CONTROL_EDGE_HANDLES = new Set<string>([
EDGE.LOOP_CONTINUE,
EDGE.LOOP_CONTINUE_ALT,
EDGE.LOOP_EXIT,
EDGE.PARALLEL_CONTINUE,
EDGE.PARALLEL_EXIT,
])

export const CONTROL_BACK_EDGE_HANDLES = new Set<string>([
EDGE.LOOP_CONTINUE,
EDGE.LOOP_CONTINUE_ALT,
EDGE.PARALLEL_CONTINUE,
])

export const LOOP = {
TYPE: {
FOR: 'for' as LoopType,
Expand Down
192 changes: 165 additions & 27 deletions apps/sim/executor/dag/construction/edges.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { beforeEach, describe, expect, it } from 'vitest'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import { buildBranchNodeId } from '@/executor/utils/subflow-utils'
import type { SerializedBlock, SerializedLoop, SerializedWorkflow } from '@/serializer/types'
import { EdgeConstructor } from './edges'

Expand Down Expand Up @@ -133,6 +134,124 @@ describe('EdgeConstructor', () => {
})
})

describe('nested subflow skip-at-start bypasses', () => {
it('wires a nested loop start exit to the next sibling inside a parallel branch', () => {
const parallelId = 'parallel-1'
const loopId = 'loop-1'
const afterId = 'after'
const loopStartId = `loop-${loopId}-sentinel-start`
const loopEndId = `loop-${loopId}-sentinel-end`
const afterTemplateId = buildBranchNodeId(afterId, 0)
const dag = createMockDAG([loopStartId, loopEndId, afterTemplateId])
dag.nodes.get(loopStartId)!.metadata = {
isSentinel: true,
sentinelType: 'start',
subflowId: loopId,
subflowType: 'loop',
}
dag.nodes.get(loopEndId)!.metadata = {
isSentinel: true,
sentinelType: 'end',
subflowId: loopId,
subflowType: 'loop',
}
dag.nodes.get(afterTemplateId)!.metadata = {
isParallelBranch: true,
subflowId: parallelId,
subflowType: 'parallel',
branchIndex: 0,
}
dag.loopConfigs.set(loopId, { id: loopId, nodes: [], iterations: 1 })
dag.parallelConfigs.set(parallelId, {
id: parallelId,
nodes: [loopId, afterId],
count: 1,
})

const workflow = createMockWorkflow(
[createMockBlock(loopId, 'loop'), createMockBlock(afterId)],
[{ source: loopId, target: afterId }]
)

edgeConstructor.execute(
workflow,
dag,
new Set([loopId, afterId]),
new Set(),
new Set([loopId, afterId]),
new Map()
)

const loopStartTargets = Array.from(dag.nodes.get(loopStartId)!.outgoingEdges.values())
expect(loopStartTargets).toContainEqual({
target: loopEndId,
sourceHandle: 'loop_exit',
targetHandle: undefined,
})
expect(dag.nodes.get(loopEndId)!.incomingEdges).not.toContain(loopStartId)
})

it('wires a parallel start exit bypass to a downstream parallel sentinel start', () => {
const sourceParallelId = 'parallel-a'
const targetParallelId = 'parallel-b'
const sourceStartId = `parallel-${sourceParallelId}-sentinel-start`
const sourceEndId = `parallel-${sourceParallelId}-sentinel-end`
const targetStartId = `parallel-${targetParallelId}-sentinel-start`
const targetEndId = `parallel-${targetParallelId}-sentinel-end`
const dag = createMockDAG([sourceStartId, sourceEndId, targetStartId, targetEndId])
dag.parallelConfigs.set(sourceParallelId, { id: sourceParallelId, nodes: [], count: 1 })
dag.parallelConfigs.set(targetParallelId, { id: targetParallelId, nodes: [], count: 1 })

const workflow = createMockWorkflow(
[
createMockBlock(sourceParallelId, 'parallel'),
createMockBlock(targetParallelId, 'parallel'),
],
[{ source: sourceParallelId, target: targetParallelId }]
)

edgeConstructor.execute(workflow, dag, new Set(), new Set(), new Set(), new Map())

const sourceStartTargets = Array.from(dag.nodes.get(sourceStartId)!.outgoingEdges.values())
expect(sourceStartTargets).toContainEqual({
target: sourceEndId,
sourceHandle: 'parallel_exit',
targetHandle: undefined,
})
expect(dag.nodes.get(sourceEndId)!.incomingEdges).not.toContain(sourceStartId)
})

it('wires terminal top-level loop start exit through its own sentinel end', () => {
const loopId = 'loop-1'
const taskId = 'task-1'
const loopStartId = `loop-${loopId}-sentinel-start`
const loopEndId = `loop-${loopId}-sentinel-end`
const dag = createMockDAG([loopStartId, loopEndId, taskId])
dag.loopConfigs.set(loopId, { id: loopId, nodes: [taskId], iterations: 1 })
const workflow = createMockWorkflow(
[createMockBlock(loopId, 'loop'), createMockBlock(taskId)],
[]
)

edgeConstructor.execute(
workflow,
dag,
new Set(),
new Set([taskId]),
new Set([taskId]),
new Map()
)

const loopStartTargets = Array.from(dag.nodes.get(loopStartId)!.outgoingEdges.values())
expect(loopStartTargets).toContainEqual({
target: loopEndId,
sourceHandle: 'loop_exit',
targetHandle: undefined,
})
expect(dag.nodes.get(loopEndId)!.incomingEdges).not.toContain(loopStartId)
})
})

describe('Condition block edge wiring', () => {
it('should wire condition block edges with proper condition prefixes', () => {
const conditionId = 'condition-1'
Expand Down Expand Up @@ -441,8 +560,9 @@ describe('EdgeConstructor', () => {

// Sentinel start should have edge to node in loop (it's a start node - no incoming from loop)
const sentinelStartNode = dag.nodes.get(sentinelStartId)!
expect(sentinelStartNode.outgoingEdges.size).toBe(1)
const startEdge = Array.from(sentinelStartNode.outgoingEdges.values())[0]
const startEdge = Array.from(sentinelStartNode.outgoingEdges.values()).find(
(edge) => edge.target === nodeInLoopId
)
expect(startEdge.target).toBe(nodeInLoopId)

// Node in loop should have edge to sentinel end (it's a terminal node - no outgoing to loop)
Expand Down Expand Up @@ -500,7 +620,10 @@ describe('EdgeConstructor', () => {

// Sentinel start should have edges to both nodes (both are start nodes)
const sentinelStartNode = dag.nodes.get(sentinelStartId)!
expect(sentinelStartNode.outgoingEdges.size).toBe(2)
const bodyStartEdges = Array.from(sentinelStartNode.outgoingEdges.values()).filter(
(edge) => edge.target === node1Id || edge.target === node2Id
)
expect(bodyStartEdges).toHaveLength(2)

// Both nodes should have edges to sentinel end (both are terminal nodes)
const node1 = dag.nodes.get(node1Id)!
Expand Down Expand Up @@ -702,7 +825,7 @@ describe('EdgeConstructor', () => {

const loop1StartNode = dag.nodes.get(loop1SentinelStart)!
const earlyExitEdges = Array.from(loop1StartNode.outgoingEdges.values()).filter(
(e) => e.target === loop2SentinelStart && e.sourceHandle === 'loop_exit'
(e) => e.target === loop1SentinelEnd && e.sourceHandle === 'loop_exit'
)
expect(earlyExitEdges.length).toBeGreaterThan(0)
})
Expand Down Expand Up @@ -864,7 +987,7 @@ describe('EdgeConstructor', () => {

const loopStartNode = dag.nodes.get(loopSentinelStart)!
const earlyExitEdges = Array.from(loopStartNode.outgoingEdges.values()).filter(
(e) => e.target === parallelSentinelStart && e.sourceHandle === 'loop_exit'
(e) => e.target === loopSentinelEnd && e.sourceHandle === 'loop_exit'
)
expect(earlyExitEdges.length).toBeGreaterThan(0)
})
Expand Down Expand Up @@ -925,6 +1048,15 @@ describe('EdgeConstructor', () => {
expect(edgesToRegular.length).toBe(1)
expect(edgesToRegular[0].sourceHandle).toBe('parallel_exit')

const edgesToParallelStart = Array.from(parallelEndNode.outgoingEdges.values()).filter(
(e) => e.target === parallelSentinelStart
)
expect(edgesToParallelStart.length).toBe(1)
expect(edgesToParallelStart[0].sourceHandle).toBe('parallel_continue')

const parallelStartNode = dag.nodes.get(parallelSentinelStart)!
expect(parallelStartNode.incomingEdges.has(parallelSentinelEnd)).toBe(false)

const regularBlockNode = dag.nodes.get(regularBlockId)!
expect(regularBlockNode.incomingEdges.has(parallelSentinelEnd)).toBe(true)
})
Expand Down Expand Up @@ -1356,31 +1488,32 @@ describe('EdgeConstructor', () => {
// Set up sentinel metadata
dag.nodes.get(outerSentinelStart)!.metadata = {
isSentinel: true,
isParallelSentinel: true,
sentinelType: 'start',
parallelId: outerParallelId,
subflowId: outerParallelId,
subflowType: 'parallel',
}
dag.nodes.get(outerSentinelEnd)!.metadata = {
isSentinel: true,
isParallelSentinel: true,
sentinelType: 'end',
parallelId: outerParallelId,
subflowId: outerParallelId,
subflowType: 'parallel',
}
dag.nodes.get(innerSentinelStart)!.metadata = {
isSentinel: true,
isParallelSentinel: true,
sentinelType: 'start',
parallelId: innerParallelId,
subflowId: innerParallelId,
subflowType: 'parallel',
}
dag.nodes.get(innerSentinelEnd)!.metadata = {
isSentinel: true,
isParallelSentinel: true,
sentinelType: 'end',
parallelId: innerParallelId,
subflowId: innerParallelId,
subflowType: 'parallel',
}
dag.nodes.get(funcTemplate)!.metadata = {
isParallelBranch: true,
parallelId: innerParallelId,
subflowId: innerParallelId,
subflowType: 'parallel',
branchIndex: 0,
branchTotal: 1,
originalBlockId: functionId,
Expand Down Expand Up @@ -1478,28 +1611,31 @@ describe('EdgeConstructor', () => {
dag.nodes.get(loopSentinelStart)!.metadata = {
isSentinel: true,
sentinelType: 'start',
loopId,
subflowId: loopId,
subflowType: 'loop',
}
dag.nodes.get(loopSentinelEnd)!.metadata = {
isSentinel: true,
sentinelType: 'end',
loopId,
subflowId: loopId,
subflowType: 'loop',
}
dag.nodes.get(parallelSentinelStart)!.metadata = {
isSentinel: true,
isParallelSentinel: true,
sentinelType: 'start',
parallelId: innerParallelId,
subflowId: innerParallelId,
subflowType: 'parallel',
}
dag.nodes.get(parallelSentinelEnd)!.metadata = {
isSentinel: true,
isParallelSentinel: true,
sentinelType: 'end',
parallelId: innerParallelId,
subflowId: innerParallelId,
subflowType: 'parallel',
}
dag.nodes.get(funcTemplate)!.metadata = {
isParallelBranch: true,
parallelId: innerParallelId,
subflowId: innerParallelId,
subflowType: 'parallel',
branchIndex: 0,
branchTotal: 1,
originalBlockId: functionId,
Expand Down Expand Up @@ -1592,25 +1728,27 @@ describe('EdgeConstructor', () => {

dag.nodes.get(outerSentinelStart)!.metadata = {
isSentinel: true,
isParallelSentinel: true,
sentinelType: 'start',
parallelId: outerParallelId,
subflowId: outerParallelId,
subflowType: 'parallel',
}
dag.nodes.get(outerSentinelEnd)!.metadata = {
isSentinel: true,
isParallelSentinel: true,
sentinelType: 'end',
parallelId: outerParallelId,
subflowId: outerParallelId,
subflowType: 'parallel',
}
dag.nodes.get(innerSentinelStart)!.metadata = {
isSentinel: true,
sentinelType: 'start',
loopId: innerLoopId,
subflowId: innerLoopId,
subflowType: 'loop',
}
dag.nodes.get(innerSentinelEnd)!.metadata = {
isSentinel: true,
sentinelType: 'end',
loopId: innerLoopId,
subflowId: innerLoopId,
subflowType: 'loop',
}

const innerLoop: SerializedLoop = {
Expand Down
Loading
Loading