Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/provide
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useExecutionStore } from '@/stores/execution'
import { useCurrentWorkflowExecution, useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
Expand Down Expand Up @@ -114,7 +114,8 @@ export const ActionBar = memo(
)

const { activeWorkflowId } = useWorkflowRegistry()
const { isExecuting, getLastExecutionSnapshot } = useExecutionStore()
const { isExecuting } = useCurrentWorkflowExecution()
const { getLastExecutionSnapshot } = useExecutionStore()
const userPermissions = useUserPermissionsContext()
const edges = useWorkflowStore((state) => state.edges)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowI
import type { BlockLog, ExecutionResult } from '@/executor/types'
import { useChatStore } from '@/stores/chat/store'
import { getChatPosition } from '@/stores/chat/utils'
import { useExecutionStore } from '@/stores/execution'
import { useCurrentWorkflowExecution } from '@/stores/execution'
import { useOperationQueue } from '@/stores/operation-queue/store'
import { useTerminalConsoleStore } from '@/stores/terminal'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
Expand Down Expand Up @@ -256,7 +256,7 @@ export function Chat() {
const hasConsoleHydrated = useTerminalConsoleStore((state) => state._hasHydrated)
const entriesFromStore = useTerminalConsoleStore((state) => state.entries)
const entries = hasConsoleHydrated ? entriesFromStore : []
const { isExecuting } = useExecutionStore()
const { isExecuting } = useCurrentWorkflowExecution()
const { handleRunWorkflow, handleCancelExecution } = useWorkflowExecution()
const { data: session } = useSession()
const { addToQueue } = useOperationQueue()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { useCallback } from 'react'
import type { DiffStatus } from '@/lib/workflows/diff/types'
import { hasDiffStatus } from '@/lib/workflows/diff/types'
import { useExecutionStore } from '@/stores/execution'
import { useIsBlockActive } from '@/stores/execution'
import { useWorkflowDiffStore } from '@/stores/workflow-diff'
import type { CurrentWorkflow } from '../../../hooks/use-current-workflow'
import type { WorkflowBlockProps } from '../types'
Expand Down Expand Up @@ -67,7 +67,7 @@ export function useBlockState(
const isDeletedBlock = !isShowingDiff && diffAnalysis?.deleted_blocks?.includes(blockId)

// Execution state
const isActiveBlock = useExecutionStore((state) => state.activeBlockIds.has(blockId))
const isActiveBlock = useIsBlockActive(blockId)
const isActive = data.isActive || isActiveBlock

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { X } from 'lucide-react'
import { BaseEdge, EdgeLabelRenderer, type EdgeProps, getSmoothStepPath } from 'reactflow'
import { useShallow } from 'zustand/react/shallow'
import type { EdgeDiffStatus } from '@/lib/workflows/diff/types'
import { useExecutionStore } from '@/stores/execution'
import { useLastRunEdges } from '@/stores/execution'
import { useWorkflowDiffStore } from '@/stores/workflow-diff'

/** Extended edge props with optional handle identifiers */
Expand Down Expand Up @@ -49,7 +49,7 @@ const WorkflowEdgeComponent = ({
isDiffReady: state.isDiffReady,
}))
)
const lastRunEdges = useExecutionStore((state) => state.lastRunEdges)
const lastRunEdges = useLastRunEdges()

const dataSourceHandle = (data as { sourceHandle?: string } | undefined)?.sourceHandle
const isErrorEdge = (sourceHandle ?? dataSourceHandle) === 'error'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { useBlockState } from '@/app/workspace/[workspaceId]/w/[workflowId]/comp
import type { WorkflowBlockProps } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/types'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import { getBlockRingStyles } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/block-ring-utils'
import { useExecutionStore } from '@/stores/execution'
import { useLastRunPath } from '@/stores/execution'
import { usePanelEditorStore, usePanelStore } from '@/stores/panel'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'

Expand Down Expand Up @@ -64,7 +64,7 @@ export function useBlockVisual({
)
const isEditorOpen = !isPreview && isThisBlockInEditor && activeTabIsEditor

const lastRunPath = useExecutionStore((state) => state.lastRunPath)
const lastRunPath = useLastRunPath()
const runPathStatus = isPreview ? undefined : lastRunPath.get(blockId)

const setCurrentBlockId = usePanelEditorStore((state) => state.setCurrentBlockId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { WorkflowValidationError } from '@/serializer'
import { useExecutionStore } from '@/stores/execution'
import { useCurrentWorkflowExecution, useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useVariablesStore } from '@/stores/panel'
import { useEnvironmentStore } from '@/stores/settings/environment'
Expand Down Expand Up @@ -112,12 +112,9 @@ export function useWorkflowExecution() {
useTerminalConsoleStore()
const { getAllVariables } = useEnvironmentStore()
const { getVariablesByWorkflowId, variables } = useVariablesStore()
const { isExecuting, isDebugging, pendingBlocks, executor, debugContext } =
useCurrentWorkflowExecution()
const {
isExecuting,
isDebugging,
pendingBlocks,
executor,
debugContext,
setIsExecuting,
setIsDebugging,
setPendingBlocks,
Expand Down Expand Up @@ -158,13 +155,15 @@ export function useWorkflowExecution() {
* Resets all debug-related state
*/
const resetDebugState = useCallback(() => {
setIsExecuting(false)
setIsDebugging(false)
setDebugContext(null)
setExecutor(null)
setPendingBlocks([])
setActiveBlocks(new Set())
if (!activeWorkflowId) return
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setDebugContext(activeWorkflowId, null)
setExecutor(activeWorkflowId, null)
setPendingBlocks(activeWorkflowId, [])
setActiveBlocks(activeWorkflowId, new Set())
}, [
activeWorkflowId,
setIsExecuting,
setIsDebugging,
setDebugContext,
Expand Down Expand Up @@ -312,18 +311,20 @@ export function useWorkflowExecution() {
} = config

const updateActiveBlocks = (blockId: string, isActive: boolean) => {
if (!workflowId) return
if (isActive) {
activeBlocksSet.add(blockId)
} else {
activeBlocksSet.delete(blockId)
}
setActiveBlocks(new Set(activeBlocksSet))
setActiveBlocks(workflowId, new Set(activeBlocksSet))
}

const markIncomingEdges = (blockId: string) => {
if (!workflowId) return
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(edge.id, 'success')
setEdgeRunStatus(workflowId, edge.id, 'success')
})
}

Expand Down Expand Up @@ -459,7 +460,7 @@ export function useWorkflowExecution() {

const onBlockCompleted = (data: BlockCompletedData) => {
updateActiveBlocks(data.blockId, false)
setBlockRunStatus(data.blockId, 'success')
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')

executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
Expand Down Expand Up @@ -489,7 +490,7 @@ export function useWorkflowExecution() {

const onBlockError = (data: BlockErrorData) => {
updateActiveBlocks(data.blockId, false)
setBlockRunStatus(data.blockId, 'error')
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')

executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
Expand Down Expand Up @@ -547,19 +548,20 @@ export function useWorkflowExecution() {
*/
const handleDebugSessionContinuation = useCallback(
(result: ExecutionResult) => {
if (!activeWorkflowId) return
logger.info('Debug step completed, next blocks pending', {
nextPendingBlocks: result.metadata?.pendingBlocks?.length || 0,
})

// Update debug context and pending blocks
if (result.metadata?.context) {
setDebugContext(result.metadata.context)
setDebugContext(activeWorkflowId, result.metadata.context)
}
if (result.metadata?.pendingBlocks) {
setPendingBlocks(result.metadata.pendingBlocks)
setPendingBlocks(activeWorkflowId, result.metadata.pendingBlocks)
}
},
[setDebugContext, setPendingBlocks]
[activeWorkflowId, setDebugContext, setPendingBlocks]
)

/**
Expand Down Expand Up @@ -663,11 +665,11 @@ export function useWorkflowExecution() {

// Reset execution result and set execution state
setExecutionResult(null)
setIsExecuting(true)
setIsExecuting(activeWorkflowId, true)

// Set debug mode only if explicitly requested
if (enableDebug) {
setIsDebugging(true)
setIsDebugging(activeWorkflowId, true)
}

// Determine if this is a chat execution
Expand Down Expand Up @@ -965,9 +967,9 @@ export function useWorkflowExecution() {
controller.close()
}
if (currentChatExecutionIdRef.current === executionId) {
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}
}
},
Expand All @@ -989,16 +991,16 @@ export function useWorkflowExecution() {
'manual'
)
if (result && 'metadata' in result && result.metadata?.isDebugSession) {
setDebugContext(result.metadata.context || null)
setDebugContext(activeWorkflowId, result.metadata.context || null)
if (result.metadata.pendingBlocks) {
setPendingBlocks(result.metadata.pendingBlocks)
setPendingBlocks(activeWorkflowId, result.metadata.pendingBlocks)
}
} else if (result && 'success' in result) {
setExecutionResult(result)
// Reset execution state after successful non-debug execution
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())

if (isChatExecution) {
if (!result.metadata) {
Expand Down Expand Up @@ -1179,7 +1181,7 @@ export function useWorkflowExecution() {
logger.error('No trigger blocks found for manual run', {
allBlockTypes: Object.values(filteredStates).map((b) => b.type),
})
setIsExecuting(false)
if (activeWorkflowId) setIsExecuting(activeWorkflowId, false)
throw error
}

Expand All @@ -1195,7 +1197,7 @@ export function useWorkflowExecution() {
'Workflow Validation'
)
logger.error('Multiple API triggers found')
setIsExecuting(false)
if (activeWorkflowId) setIsExecuting(activeWorkflowId, false)
throw error
}

Expand All @@ -1220,7 +1222,7 @@ export function useWorkflowExecution() {
'Workflow Validation'
)
logger.error('Trigger has no outgoing connections', { triggerName, startBlockId })
setIsExecuting(false)
if (activeWorkflowId) setIsExecuting(activeWorkflowId, false)
throw error
}
}
Expand Down Expand Up @@ -1251,7 +1253,7 @@ export function useWorkflowExecution() {
'Workflow Validation'
)
logger.error('No startBlockId found after trigger search')
setIsExecuting(false)
if (activeWorkflowId) setIsExecuting(activeWorkflowId, false)
throw error
}

Expand Down Expand Up @@ -1457,8 +1459,10 @@ export function useWorkflowExecution() {
logger.info('Execution aborted by user')

// Reset execution state
setIsExecuting(false)
setActiveBlocks(new Set())
if (activeWorkflowId) {
setIsExecuting(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}

// Return gracefully without error
return {
Expand Down Expand Up @@ -1533,9 +1537,11 @@ export function useWorkflowExecution() {
}

setExecutionResult(errorResult)
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
if (activeWorkflowId) {
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}

let notificationMessage = WORKFLOW_EXECUTION_FAILURE_MESSAGE
if (isRecord(error) && isRecord(error.request) && sanitizeMessage(error.request.url)) {
Expand Down Expand Up @@ -1706,21 +1712,21 @@ export function useWorkflowExecution() {
const handleCancelExecution = useCallback(() => {
logger.info('Workflow execution cancellation requested')

// Cancel the execution stream (server-side)
executionStream.cancel()
// Cancel the execution stream for this workflow (server-side)
executionStream.cancel(activeWorkflowId ?? undefined)

// Mark current chat execution as superseded so its cleanup won't affect new executions
currentChatExecutionIdRef.current = null

// Mark all running entries as canceled in the terminal
if (activeWorkflowId) {
cancelRunningEntries(activeWorkflowId)
}

// Reset execution state - this triggers chat stream cleanup via useEffect in chat.tsx
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
// Reset execution state - this triggers chat stream cleanup via useEffect in chat.tsx
setIsExecuting(activeWorkflowId, false)
setIsDebugging(activeWorkflowId, false)
setActiveBlocks(activeWorkflowId, new Set())
}
Comment thread
waleedlatif1 marked this conversation as resolved.

// If in debug mode, also reset debug state
if (isDebugging) {
Expand Down Expand Up @@ -1833,7 +1839,7 @@ export function useWorkflowExecution() {
}
}

setIsExecuting(true)
setIsExecuting(workflowId, true)
const executionId = uuidv4()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
Expand Down Expand Up @@ -1929,8 +1935,8 @@ export function useWorkflowExecution() {
logger.error('Run-from-block failed:', error)
}
} finally {
setIsExecuting(false)
setActiveBlocks(new Set())
setIsExecuting(workflowId, false)
setActiveBlocks(workflowId, new Set())
}
},
[
Expand Down Expand Up @@ -1962,7 +1968,7 @@ export function useWorkflowExecution() {
logger.info('Starting run-until-block execution', { workflowId, stopAfterBlockId: blockId })

setExecutionResult(null)
setIsExecuting(true)
setIsExecuting(workflowId, true)

const executionId = uuidv4()
try {
Expand All @@ -1981,9 +1987,9 @@ export function useWorkflowExecution() {
const errorResult = handleExecutionError(error, { executionId })
return errorResult
} finally {
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
setIsExecuting(workflowId, false)
setIsDebugging(workflowId, false)
setActiveBlocks(workflowId, new Set())
}
},
[activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks]
Expand Down
Loading