Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions apps/sim/app/api/workflows/[id]/execute/route.async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ const {
}))

vi.mock('@/lib/auth/hybrid', () => ({
AuthType: {
SESSION: 'session',
API_KEY: 'api_key',
INTERNAL_JWT: 'internal_jwt',
},
checkHybridAuth: mockCheckHybridAuth,
AuthType: {
SESSION: 'session',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ import { useQueryClient } from '@tanstack/react-query'
import { v4 as uuidv4 } from 'uuid'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { processStreamingBlockLogs } from '@/lib/tokenization'
import type {
BlockCompletedData,
BlockErrorData,
BlockStartedData,
} from '@/lib/workflows/executor/execution-events'
import {
extractTriggerMockPayload,
selectBestTrigger,
Expand All @@ -21,21 +16,14 @@ import {
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import {
markOutgoingEdgesFromOutput,
updateActiveBlockRefCount,
type BlockEventHandlerConfig,
createBlockEventHandlers,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
import { getBlock } from '@/blocks'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type {
BlockLog,
BlockState,
ExecutionResult,
NormalizedBlockOutput,
StreamingExecution,
} from '@/executor/types'
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { stripCloneSuffixes } from '@/executor/utils/subflow-utils'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { WorkflowValidationError } from '@/serializer'
Expand Down Expand Up @@ -63,20 +51,6 @@ interface DebugValidationResult {
error?: string
}

interface BlockEventHandlerConfig {
workflowId?: string
executionIdRef: { current: string }
workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }>
activeBlocksSet: Set<string>
activeBlockRefCounts: Map<string, number>
accumulatedBlockLogs: BlockLog[]
accumulatedBlockStates: Map<string, BlockState>
executedBlockIds: Set<string>
consoleMode: 'update' | 'add'
includeStartConsoleEntry: boolean
onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise<void>
}

const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed'

function isRecord(value: unknown): value is Record<string, unknown> {
Expand Down Expand Up @@ -309,279 +283,15 @@ export function useWorkflowExecution() {
)

const buildBlockEventHandlers = useCallback(
(config: BlockEventHandlerConfig) => {
const {
workflowId,
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
consoleMode,
includeStartConsoleEntry,
onBlockCompleteCallback,
} = config

/** Returns true if this execution was cancelled or superseded by another run. */
const isStaleExecution = () =>
!!(
workflowId &&
executionIdRef.current &&
useExecutionStore.getState().getCurrentExecutionId(workflowId) !== executionIdRef.current
)

const updateActiveBlocks = (blockId: string, isActive: boolean) => {
if (!workflowId) return
updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive)
setActiveBlocks(workflowId, new Set(activeBlocksSet))
}

const markOutgoingEdges = (blockId: string, output: Record<string, any> | undefined) => {
if (!workflowId) return
markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus)
}

const isContainerBlockType = (blockType?: string) => {
return blockType === 'loop' || blockType === 'parallel'
}

/** Extracts iteration and child-workflow fields shared across console entry call sites. */
const extractIterationFields = (
data: BlockStartedData | BlockCompletedData | BlockErrorData
) => ({
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
parentIterations: data.parentIterations,
childWorkflowBlockId: data.childWorkflowBlockId,
childWorkflowName: data.childWorkflowName,
...('childWorkflowInstanceId' in data && {
childWorkflowInstanceId: data.childWorkflowInstanceId,
}),
})

const createBlockLogEntry = (
data: BlockCompletedData | BlockErrorData,
options: { success: boolean; output?: unknown; error?: string }
): BlockLog => ({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: options.output ?? {},
success: options.success,
error: options.error,
durationMs: data.durationMs,
startedAt: data.startedAt,
executionOrder: data.executionOrder,
endedAt: data.endedAt,
})

const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => {
if (!workflowId) return
addConsole({
input: data.input || {},
output,
success: true,
durationMs: data.durationMs,
startedAt: data.startedAt,
executionOrder: data.executionOrder,
endedAt: data.endedAt,
workflowId,
blockId: data.blockId,
executionId: executionIdRef.current,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
...extractIterationFields(data),
})
}

const addConsoleErrorEntry = (data: BlockErrorData) => {
if (!workflowId) return
addConsole({
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt: data.startedAt,
executionOrder: data.executionOrder,
endedAt: data.endedAt,
workflowId,
blockId: data.blockId,
executionId: executionIdRef.current,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
...extractIterationFields(data),
})
}

const updateConsoleEntry = (data: BlockCompletedData) => {
updateConsole(
data.blockId,
{
executionOrder: data.executionOrder,
input: data.input || {},
replaceOutput: data.output,
success: true,
durationMs: data.durationMs,
startedAt: data.startedAt,
endedAt: data.endedAt,
isRunning: false,
...extractIterationFields(data),
},
executionIdRef.current
)
}

const updateConsoleErrorEntry = (data: BlockErrorData) => {
updateConsole(
data.blockId,
{
executionOrder: data.executionOrder,
input: data.input || {},
replaceOutput: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt: data.startedAt,
endedAt: data.endedAt,
isRunning: false,
...extractIterationFields(data),
},
executionIdRef.current
)
}

const onBlockStarted = (data: BlockStartedData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, true)

if (!includeStartConsoleEntry || !workflowId) return

const startedAt = new Date().toISOString()
addConsole({
input: {},
output: undefined,
success: undefined,
durationMs: undefined,
startedAt,
executionOrder: data.executionOrder,
endedAt: undefined,
workflowId,
blockId: data.blockId,
executionId: executionIdRef.current,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
isRunning: true,
...extractIterationFields(data),
})
}

const onBlockCompleted = (data: BlockCompletedData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, false)
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
markOutgoingEdges(data.blockId, data.output as Record<string, any> | undefined)
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})

// For nested containers, the SSE blockId may be a cloned ID (e.g. P1__obranch-0).
// Also record the original workflow-level ID so the canvas can highlight it.
if (isContainerBlockType(data.blockType)) {
const originalId = stripCloneSuffixes(data.blockId)
if (originalId !== data.blockId) {
executedBlockIds.add(originalId)
if (workflowId) setBlockRunStatus(workflowId, originalId, 'success')
}
}

if (isContainerBlockType(data.blockType) && !data.iterationContainerId) {
const output = data.output as Record<string, any> | undefined
const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0
if (!isEmptySubflow) return
}

accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))

if (consoleMode === 'update') {
updateConsoleEntry(data)
} else {
addConsoleEntry(data, data.output as NormalizedBlockOutput)
}

if (onBlockCompleteCallback) {
onBlockCompleteCallback(data.blockId, data.output).catch((error) => {
logger.error('Error in onBlockComplete callback:', error)
})
}
}

const onBlockError = (data: BlockErrorData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, false)
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
markOutgoingEdges(data.blockId, { error: data.error })

executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: { error: data.error },
executed: true,
executionTime: data.durationMs || 0,
})

// For nested containers, also record the original workflow-level ID
if (isContainerBlockType(data.blockType)) {
const originalId = stripCloneSuffixes(data.blockId)
if (originalId !== data.blockId) {
executedBlockIds.add(originalId)
if (workflowId) setBlockRunStatus(workflowId, originalId, 'error')
}
}

accumulatedBlockLogs.push(
createBlockLogEntry(data, { success: false, output: {}, error: data.error })
)

if (consoleMode === 'update') {
updateConsoleErrorEntry(data)
} else {
addConsoleErrorEntry(data)
}
}

const onBlockChildWorkflowStarted = (data: {
blockId: string
childWorkflowInstanceId: string
iterationCurrent?: number
iterationContainerId?: string
executionOrder?: number
}) => {
if (isStaleExecution()) return
updateConsole(
data.blockId,
{
childWorkflowInstanceId: data.childWorkflowInstanceId,
...(data.iterationCurrent !== undefined && { iterationCurrent: data.iterationCurrent }),
...(data.iterationContainerId !== undefined && {
iterationContainerId: data.iterationContainerId,
}),
...(data.executionOrder !== undefined && { executionOrder: data.executionOrder }),
},
executionIdRef.current
)
}

return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted }
},
[addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole]
(config: BlockEventHandlerConfig) =>
createBlockEventHandlers(config, {
addConsole,
updateConsole,
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
}),
[addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus]
)

/**
Expand Down
Loading
Loading