Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion apps/sim/app/api/workflows/[id]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console-logger'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { db } from '@/db'
import { workflow, workspaceMember } from '@/db/schema'

Expand All @@ -10,6 +11,7 @@ const logger = createLogger('WorkflowByIdAPI')
/**
* GET /api/workflows/[id]
* Fetch a single workflow by ID
* Uses hybrid approach: try normalized tables first, fallback to JSON blob
*/
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
Expand Down Expand Up @@ -69,10 +71,43 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}

// Try to load from normalized tables first
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)

const finalWorkflowData = { ...workflowData }

if (normalizedData) {
// Use normalized table data - reconstruct complete state object
// First get any existing state properties, then override with normalized data
const existingState =
workflowData.state && typeof workflowData.state === 'object' ? workflowData.state : {}

finalWorkflowData.state = {
// Default values for expected properties
deploymentStatuses: {},
hasActiveSchedule: false,
hasActiveWebhook: false,
// Preserve any existing state properties
...existingState,
// Override with normalized data (this takes precedence)
blocks: normalizedData.blocks,
edges: normalizedData.edges,
loops: normalizedData.loops,
parallels: normalizedData.parallels,
lastSaved: Date.now(),
isDeployed: workflowData.isDeployed || false,
deployedAt: workflowData.deployedAt,
}
Comment thread
icecrasher321 marked this conversation as resolved.
logger.info(`[${requestId}] Loaded workflow ${workflowId} from normalized tables`)
} else {
// Fallback to JSON blob
logger.info(`[${requestId}] Using JSON blob for workflow ${workflowId}`)
}

const elapsed = Date.now() - startTime
logger.info(`[${requestId}] Successfully fetched workflow ${workflowId} in ${elapsed}ms`)

return NextResponse.json({ data: workflowData }, { status: 200 })
return NextResponse.json({ data: finalWorkflowData }, { status: 200 })
} catch (error: any) {
const elapsed = Date.now() - startTime
logger.error(`[${requestId}] Error fetching workflow ${workflowId} after ${elapsed}ms`, error)
Expand Down
35 changes: 32 additions & 3 deletions apps/sim/app/api/workflows/sync/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console-logger'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/db-helpers'
import { db } from '@/db'
import { workflow, workspace, workspaceMember } from '@/db/schema'

Expand Down Expand Up @@ -386,6 +387,34 @@ export async function POST(req: NextRequest) {
// Ensure the workflow has the correct workspaceId
const effectiveWorkspaceId = clientWorkflow.workspaceId || workspaceId

// Save to normalized tables for all workflows (hybrid approach)
const normalizedResult = await saveWorkflowToNormalizedTables(id, {
blocks: clientWorkflow.state.blocks || {},
edges: clientWorkflow.state.edges || [],
loops: clientWorkflow.state.loops || {},
parallels: clientWorkflow.state.parallels || {},
lastSaved: clientWorkflow.state.lastSaved,
isDeployed: clientWorkflow.state.isDeployed,
deployedAt: clientWorkflow.state.deployedAt,
deploymentStatuses: (clientWorkflow.state as any).deploymentStatuses || {},
hasActiveSchedule: (clientWorkflow.state as any).hasActiveSchedule,
hasActiveWebhook: (clientWorkflow.state as any).hasActiveWebhook,
})
Comment thread
icecrasher321 marked this conversation as resolved.

// Use the JSON blob from normalized save for compatibility, or fallback to original state
const stateToSave =
normalizedResult.success && normalizedResult.jsonBlob
? normalizedResult.jsonBlob
: clientWorkflow.state

if (normalizedResult.success) {
logger.info(`[${requestId}] Saved workflow ${id} to normalized tables`)
} else {
logger.warn(
`[${requestId}] Failed to save workflow ${id} to normalized tables: ${normalizedResult.error}`
)
}

if (!dbWorkflow) {
// New workflow - create (state is required by schema)
operations.push(
Expand All @@ -397,7 +426,7 @@ export async function POST(req: NextRequest) {
name: clientWorkflow.name,
description: clientWorkflow.description,
color: clientWorkflow.color,
state: clientWorkflow.state,
state: stateToSave,
marketplaceData: clientWorkflow.marketplaceData || null,
lastSynced: now,
createdAt: now,
Expand Down Expand Up @@ -451,8 +480,8 @@ export async function POST(req: NextRequest) {
}

// Always update state since we only sync the active workflow with valid state
if (JSON.stringify(dbWorkflow.state) !== JSON.stringify(clientWorkflow.state)) {
updateData.state = clientWorkflow.state
if (JSON.stringify(dbWorkflow.state) !== JSON.stringify(stateToSave)) {
updateData.state = stateToSave
needsUpdate = true
}

Expand Down
58 changes: 58 additions & 0 deletions apps/sim/db/migrations/0043_silent_the_anarchist.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
CREATE TABLE "workflow_blocks" (
"id" text PRIMARY KEY NOT NULL,
"workflow_id" text NOT NULL,
"type" text NOT NULL,
"name" text NOT NULL,
"position_x" integer NOT NULL,
"position_y" integer NOT NULL,
"enabled" boolean DEFAULT true NOT NULL,
"horizontal_handles" boolean DEFAULT true NOT NULL,
"is_wide" boolean DEFAULT false NOT NULL,
"height" integer DEFAULT 0 NOT NULL,
"sub_blocks" jsonb DEFAULT '{}' NOT NULL,
"outputs" jsonb DEFAULT '{}' NOT NULL,
"data" jsonb DEFAULT '{}',
Comment thread
icecrasher321 marked this conversation as resolved.
"parent_id" text,
"extent" text,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE TABLE "workflow_edges" (
"id" text PRIMARY KEY NOT NULL,
"workflow_id" text NOT NULL,
"source_block_id" text NOT NULL,
"target_block_id" text NOT NULL,
Comment thread
icecrasher321 marked this conversation as resolved.
"source_handle" text,
"target_handle" text,
"created_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE TABLE "workflow_subflows" (
"id" text PRIMARY KEY NOT NULL,
"workflow_id" text NOT NULL,
"type" text NOT NULL,
"config" jsonb DEFAULT '{}' NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "workflow_blocks" ADD CONSTRAINT "workflow_blocks_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_blocks" ADD CONSTRAINT "workflow_blocks_parent_id_workflow_blocks_id_fk" FOREIGN KEY ("parent_id") REFERENCES "public"."workflow_blocks"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_edges" ADD CONSTRAINT "workflow_edges_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_edges" ADD CONSTRAINT "workflow_edges_source_block_id_workflow_blocks_id_fk" FOREIGN KEY ("source_block_id") REFERENCES "public"."workflow_blocks"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_edges" ADD CONSTRAINT "workflow_edges_target_block_id_workflow_blocks_id_fk" FOREIGN KEY ("target_block_id") REFERENCES "public"."workflow_blocks"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_subflows" ADD CONSTRAINT "workflow_subflows_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "workflow_blocks_workflow_id_idx" ON "workflow_blocks" USING btree ("workflow_id");--> statement-breakpoint
CREATE INDEX "workflow_blocks_parent_id_idx" ON "workflow_blocks" USING btree ("parent_id");--> statement-breakpoint
CREATE INDEX "workflow_blocks_workflow_parent_idx" ON "workflow_blocks" USING btree ("workflow_id","parent_id");--> statement-breakpoint
CREATE INDEX "workflow_blocks_workflow_type_idx" ON "workflow_blocks" USING btree ("workflow_id","type");--> statement-breakpoint
CREATE INDEX "workflow_edges_workflow_id_idx" ON "workflow_edges" USING btree ("workflow_id");--> statement-breakpoint
CREATE INDEX "workflow_edges_source_block_idx" ON "workflow_edges" USING btree ("source_block_id");--> statement-breakpoint
CREATE INDEX "workflow_edges_target_block_idx" ON "workflow_edges" USING btree ("target_block_id");--> statement-breakpoint
CREATE INDEX "workflow_edges_workflow_source_idx" ON "workflow_edges" USING btree ("workflow_id","source_block_id");--> statement-breakpoint
CREATE INDEX "workflow_edges_workflow_target_idx" ON "workflow_edges" USING btree ("workflow_id","target_block_id");--> statement-breakpoint
CREATE INDEX "workflow_edges_source_block_fk_idx" ON "workflow_edges" USING btree ("source_block_id");--> statement-breakpoint
CREATE INDEX "workflow_edges_target_block_fk_idx" ON "workflow_edges" USING btree ("target_block_id");--> statement-breakpoint
Comment thread
icecrasher321 marked this conversation as resolved.
CREATE INDEX "workflow_subflows_workflow_id_idx" ON "workflow_subflows" USING btree ("workflow_id");--> statement-breakpoint
CREATE INDEX "workflow_subflows_workflow_type_idx" ON "workflow_subflows" USING btree ("workflow_id","type");
Loading