Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix(deployments): use deployed state for API sync and async execs
  • Loading branch information
icecrasher321 committed Aug 5, 2025
commit 8e428ebaf1b5a2aab37e3db9ff5667be1eebcac5
21 changes: 7 additions & 14 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import {
createHttpResponseFromBlock,
updateWorkflowRunCounts,
Expand Down Expand Up @@ -111,20 +111,13 @@ async function executeWorkflow(workflow: any, requestId: string, input?: any): P
runningExecutions.add(executionKey)
logger.info(`[${requestId}] Starting workflow execution: ${workflowId}`)

// Load workflow data from normalized tables
logger.debug(`[${requestId}] Loading workflow ${workflowId} from normalized tables`)
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
// Load workflow data from deployed state for API executions
const deployedData = await loadDeployedWorkflowState(workflowId)
Comment thread
icecrasher321 marked this conversation as resolved.

if (!normalizedData) {
throw new Error(
`Workflow ${workflowId} has no normalized data available. Ensure the workflow is properly saved to normalized tables.`
)
}

// Use normalized data as primary source
const { blocks, edges, loops, parallels } = normalizedData
logger.info(`[${requestId}] Using normalized tables for workflow execution: ${workflowId}`)
logger.debug(`[${requestId}] Normalized data loaded:`, {
// Use deployed data as primary source for API executions
const { blocks, edges, loops, parallels } = deployedData
logger.info(`[${requestId}] Using deployed state for workflow execution: ${workflowId}`)
logger.debug(`[${requestId}] Deployed data loaded:`, {
blocksCount: Object.keys(blocks || {}).length,
edgesCount: (edges || []).length,
loopsCount: Object.keys(loops || {}).length,
Expand Down
51 changes: 46 additions & 5 deletions apps/sim/lib/workflows/db-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { eq } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console/logger'
import { db } from '@/db'
import { workflowBlocks, workflowEdges, workflowSubflows } from '@/db/schema'
import type { LoopConfig, WorkflowState } from '@/stores/workflows/workflow/types'
import { workflow, workflowBlocks, workflowEdges, workflowSubflows } from '@/db/schema'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import { SUBFLOW_TYPES } from '@/stores/workflows/workflow/types'

const logger = createLogger('WorkflowDBHelpers')
Expand All @@ -12,7 +12,49 @@ export interface NormalizedWorkflowData {
edges: any[]
loops: Record<string, any>
parallels: Record<string, any>
isFromNormalizedTables: true // Flag to indicate this came from new tables
isFromNormalizedTables: boolean // Flag to indicate source (true = normalized tables, false = deployed state)
}

/**
* Load deployed workflow state for execution
* Returns deployed state if available, otherwise throws error
*/
export async function loadDeployedWorkflowState(
workflowId: string
): Promise<NormalizedWorkflowData> {
try {
// First check if workflow is deployed and get deployed state
const [workflowResult] = await db
.select({
isDeployed: workflow.isDeployed,
deployedState: workflow.deployedState,
})
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)

if (!workflowResult) {
throw new Error(`Workflow ${workflowId} not found`)
}

if (!workflowResult.isDeployed || !workflowResult.deployedState) {
throw new Error(`Workflow ${workflowId} is not deployed or has no deployed state`)
}

const deployedState = workflowResult.deployedState as any
Comment thread
icecrasher321 marked this conversation as resolved.

// Convert deployed state to normalized format
return {
blocks: deployedState.blocks || {},
edges: deployedState.edges || [],
loops: deployedState.loops || {},
parallels: deployedState.parallels || {},
isFromNormalizedTables: false, // Flag to indicate this came from deployed state
}
} catch (error) {
logger.error(`Error loading deployed workflow state ${workflowId}:`, error)
throw error
}
}

/**
Expand Down Expand Up @@ -88,7 +130,6 @@ export async function loadWorkflowFromNormalizedTables(
const config = subflow.config || {}

if (subflow.type === SUBFLOW_TYPES.LOOP) {
const loopConfig = config as LoopConfig
loops[subflow.id] = {
id: subflow.id,
...config,
Expand Down Expand Up @@ -126,7 +167,7 @@ export async function saveWorkflowToNormalizedTables(
): Promise<{ success: boolean; jsonBlob?: any; error?: string }> {
try {
// Start a transaction
const result = await db.transaction(async (tx) => {
await db.transaction(async (tx) => {
// Clear existing data for this workflow
await Promise.all([
tx.delete(workflowBlocks).where(eq(workflowBlocks.workflowId, workflowId)),
Expand Down
14 changes: 4 additions & 10 deletions apps/sim/trigger/workflow-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { db } from '@/db'
import { environment as environmentTable, userStats } from '@/db/schema'
Expand Down Expand Up @@ -60,16 +60,10 @@ export const workflowExecution = task({
)
}

// Load workflow data from normalized tables
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
if (!normalizedData) {
logger.error(`[${requestId}] Workflow not found in normalized tables: ${workflowId}`)
throw new Error(`Workflow ${workflowId} data not found in normalized tables`)
}

logger.info(`[${requestId}] Workflow loaded successfully: ${workflowId}`)
// Load workflow data from deployed state (this task is only used for API executions right now)
const workflowData = await loadDeployedWorkflowState(workflowId)

const { blocks, edges, loops, parallels } = normalizedData
const { blocks, edges, loops, parallels } = workflowData

// Merge subblock states (server-safe version doesn't need workflowId)
const mergedStates = mergeSubblockState(blocks, {})
Expand Down