Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix resolver claim
  • Loading branch information
icecrasher321 committed Apr 8, 2026
commit c4ee839e42d20072006bc8405377d059f42c81a7
141 changes: 137 additions & 4 deletions apps/sim/executor/variables/resolvers/parallel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import type { ResolutionContext } from './reference'

vi.mock('@sim/logger', () => loggerMock)

interface BlockDef {
id: string
name: string
}

/**
* Creates a minimal workflow for testing.
*/
Expand All @@ -18,7 +23,8 @@ function createTestWorkflow(
distribution?: any
parallelType?: 'count' | 'collection'
}
> = {}
> = {},
blockDefs: BlockDef[] = []
) {
const normalizedParallels: Record<
string,
Expand All @@ -37,9 +43,18 @@ function createTestWorkflow(
parallelType: parallel.parallelType,
}
}
const blocks = blockDefs.map((b) => ({
id: b.id,
position: { x: 0, y: 0 },
config: { tool: 'test', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'function', name: b.name },
enabled: true,
}))
return {
version: '1.0',
blocks: [],
blocks,
connections: [],
loops: {},
parallels: normalizedParallels,
Expand All @@ -63,13 +78,16 @@ function createParallelScope(items: any[]) {
*/
function createTestContext(
currentNodeId: string,
parallelExecutions?: Map<string, any>
parallelExecutions?: Map<string, any>,
blockOutputs?: Record<string, any>
): ResolutionContext {
return {
executionContext: {
parallelExecutions: parallelExecutions ?? new Map(),
},
executionState: {},
executionState: {
getBlockOutput: (id: string) => blockOutputs?.[id],
},
currentNodeId,
} as ResolutionContext
}
Expand Down Expand Up @@ -383,4 +401,119 @@ describe('ParallelResolver', () => {
expect(resolver.resolve('<parallel.currentItem>', createTestContext('block-3₍1₎'))).toBe('p4')
})
})

describe('named parallel references', () => {
it.concurrent('should resolve result from anywhere after parallel completes', () => {
const workflow = createTestWorkflow(
{ 'parallel-1': { nodes: ['block-1'], distribution: ['a', 'b'] } },
[{ id: 'parallel-1', name: 'Parallel 1' }]
)
const resolver = new ParallelResolver(workflow)
const results = [[{ response: 'a' }], [{ response: 'b' }]]
const ctx = createTestContext('block-outside', new Map(), {
'parallel-1': { results },
})

expect(resolver.resolve('<parallel1.result>', ctx)).toEqual(results)
expect(resolver.resolve('<parallel1.results>', ctx)).toEqual(results)
})

it.concurrent('should resolve result with nested path', () => {
const workflow = createTestWorkflow(
{ 'parallel-1': { nodes: ['block-1'], distribution: ['a', 'b'] } },
[{ id: 'parallel-1', name: 'Parallel 1' }]
)
const resolver = new ParallelResolver(workflow)
const results = [[{ response: 'a' }], [{ response: 'b' }]]
const ctx = createTestContext('block-outside', new Map(), {
'parallel-1': { results },
})

expect(resolver.resolve('<parallel1.result.0>', ctx)).toEqual([{ response: 'a' }])
expect(resolver.resolve('<parallel1.result.1.0.response>', ctx)).toBe('b')
})

it.concurrent('should resolve result with empty currentNodeId', () => {
const workflow = createTestWorkflow(
{ 'parallel-1': { nodes: ['block-1'], distribution: ['a', 'b'] } },
[{ id: 'parallel-1', name: 'Parallel 1' }]
)
const resolver = new ParallelResolver(workflow)
const results = [[{ output: 'x' }], [{ output: 'y' }]]
const ctx = createTestContext('', new Map(), {
'parallel-1': { results },
})

expect(resolver.resolve('<parallel1.results>', ctx)).toEqual(results)
})

it.concurrent('should return undefined when no output stored yet', () => {
const workflow = createTestWorkflow(
{ 'parallel-1': { nodes: ['block-1'], distribution: ['a'] } },
[{ id: 'parallel-1', name: 'Parallel 1' }]
)
const resolver = new ParallelResolver(workflow)
const ctx = createTestContext('block-outside', new Map())

expect(resolver.resolve('<parallel1.results>', ctx)).toBeUndefined()
})

it.concurrent('should resolve iteration properties via named reference', () => {
const workflow = createTestWorkflow(
{
'parallel-1': {
nodes: ['block-1'],
distribution: ['x', 'y', 'z'],
parallelType: 'collection',
},
},
[{ id: 'parallel-1', name: 'Parallel 1' }]
)
const resolver = new ParallelResolver(workflow)
const ctx = createTestContext('block-1₍1₎')

expect(resolver.resolve('<parallel1.index>', ctx)).toBe(1)
expect(resolver.resolve('<parallel1.currentItem>', ctx)).toBe('y')
expect(resolver.resolve('<parallel1.items>', ctx)).toEqual(['x', 'y', 'z'])
})

it.concurrent('should throw InvalidFieldError for unknown property on named ref', () => {
const workflow = createTestWorkflow(
{
'parallel-1': {
nodes: ['block-1'],
distribution: ['a'],
parallelType: 'collection',
},
},
[{ id: 'parallel-1', name: 'Parallel 1' }]
)
const resolver = new ParallelResolver(workflow)
const ctx = createTestContext('block-1₍0₎')

expect(() => resolver.resolve('<parallel1.unknownProp>', ctx)).toThrow(InvalidFieldError)
})

it.concurrent('should not resolve named ref when no matching block exists', () => {
const workflow = createTestWorkflow({ 'parallel-1': { nodes: ['block-1'] } }, [
{ id: 'parallel-1', name: 'Parallel 1' },
])
const resolver = new ParallelResolver(workflow)
expect(resolver.canResolve('<parallel99.index>')).toBe(false)
})

it.concurrent('should resolve generic parallel results from inside a branch', () => {
const workflow = createTestWorkflow({
'parallel-1': { nodes: ['block-1'], distribution: ['a', 'b'] },
})
const resolver = new ParallelResolver(workflow)
const results = [[{ response: 'a' }], [{ response: 'b' }]]
const ctx = createTestContext('block-1₍0₎', new Map(), {
'parallel-1': { results },
})

expect(resolver.resolve('<parallel.results>', ctx)).toEqual(results)
expect(resolver.resolve('<parallel.result>', ctx)).toEqual(results)
})
})
})
25 changes: 24 additions & 1 deletion apps/sim/executor/variables/resolvers/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export class ParallelResolver implements Resolver {
}
}

private static OUTPUT_PROPERTIES = new Set(['result', 'results'])
private static KNOWN_PROPERTIES = new Set(['index', 'currentItem', 'items'])

canResolve(reference: string): boolean {
Expand Down Expand Up @@ -73,6 +74,10 @@ export class ParallelResolver implements Resolver {
)
}

if (rest.length > 0 && ParallelResolver.OUTPUT_PROPERTIES.has(rest[0])) {
return this.resolveOutput(targetParallelId, rest.slice(1), context)
}

// Look up config using the original (non-cloned) ID
const originalParallelId = stripOuterBranchSuffix(targetParallelId)
const parallelConfig = this.workflow.parallels?.[originalParallelId]
Expand Down Expand Up @@ -116,7 +121,9 @@ export class ParallelResolver implements Resolver {

if (!ParallelResolver.KNOWN_PROPERTIES.has(property)) {
const isCollection = parallelConfig.parallelType === 'collection'
const availableFields = isCollection ? ['index', 'currentItem', 'items'] : ['index']
const availableFields = isCollection
? ['index', 'currentItem', 'items', 'result']
: ['index', 'result']
throw new InvalidFieldError(firstPart, property, availableFields)
}

Expand Down Expand Up @@ -216,6 +223,22 @@ export class ParallelResolver implements Resolver {
return undefined
}

private resolveOutput(
parallelId: string,
pathParts: string[],
context: ResolutionContext
): unknown {
const output = context.executionState.getBlockOutput(parallelId)
if (!output || typeof output !== 'object') {
return undefined
}
const value = (output as Record<string, unknown>).results
if (pathParts.length > 0) {
return navigatePath(value, pathParts)
}
return value
}

private getDistributionItems(parallelConfig: SerializedParallel): unknown[] {
const rawItems = parallelConfig.distribution ?? []

Expand Down
Loading