Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(async): avoid pre-starting queued execution logs
Let executeWorkflowCore own normal-path logging start so queued workflow and schedule executions persist the richer deployment and environment metadata instead of an earlier placeholder start record.
  • Loading branch information
PlaneInABottle authored and test committed Mar 13, 2026
commit 7f2447ab56459c1db791c931aee0d967bc1e9ad2
130 changes: 119 additions & 11 deletions apps/sim/background/async-preprocessing-correlation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,28 @@

import { beforeEach, describe, expect, it, vi } from 'vitest'

const { mockPreprocessExecution, mockTask, mockDbUpdate } = vi.hoisted(() => ({
const {
mockPreprocessExecution,
mockTask,
mockDbUpdate,
mockExecuteWorkflowCore,
mockLoggingSession,
mockBlockExistsInDeployment,
mockLoadDeployedWorkflowState,
mockGetScheduleTimeValues,
mockGetSubBlockValue,
} = vi.hoisted(() => ({
mockPreprocessExecution: vi.fn(),
mockTask: vi.fn((config) => config),
mockDbUpdate: vi.fn(() => ({
set: vi.fn(() => ({ where: vi.fn().mockResolvedValue(undefined) })),
})),
mockExecuteWorkflowCore: vi.fn(),
mockLoggingSession: vi.fn(),
mockBlockExistsInDeployment: vi.fn(),
mockLoadDeployedWorkflowState: vi.fn(),
mockGetScheduleTimeValues: vi.fn(),
mockGetSubBlockValue: vi.fn(),
}))

vi.mock('@trigger.dev/sdk', () => ({ task: mockTask }))
Expand All @@ -30,11 +46,15 @@ vi.mock('@/lib/execution/preprocessing', () => ({
}))

vi.mock('@/lib/logs/execution/logging-session', () => ({
LoggingSession: vi.fn().mockImplementation(() => ({
safeStart: vi.fn().mockResolvedValue(true),
safeCompleteWithError: vi.fn().mockResolvedValue(undefined),
markAsFailed: vi.fn().mockResolvedValue(undefined),
})),
LoggingSession: vi.fn().mockImplementation(() => {
const instance = {
safeStart: vi.fn().mockResolvedValue(true),
safeCompleteWithError: vi.fn().mockResolvedValue(undefined),
markAsFailed: vi.fn().mockResolvedValue(undefined),
}
mockLoggingSession(instance)
return instance
}),
}))

vi.mock('@/lib/core/execution-limits', () => ({
Expand All @@ -52,7 +72,7 @@ vi.mock('@/lib/logs/execution/trace-spans/trace-spans', () => ({
}))

vi.mock('@/lib/workflows/executor/execution-core', () => ({
executeWorkflowCore: vi.fn(),
executeWorkflowCore: mockExecuteWorkflowCore,
wasExecutionFinalizedByCore: vi.fn().mockReturnValue(false),
}))

Expand All @@ -64,14 +84,14 @@ vi.mock('@/lib/workflows/executor/human-in-the-loop-manager', () => ({
}))

vi.mock('@/lib/workflows/persistence/utils', () => ({
blockExistsInDeployment: vi.fn(),
loadDeployedWorkflowState: vi.fn(),
blockExistsInDeployment: mockBlockExistsInDeployment,
loadDeployedWorkflowState: mockLoadDeployedWorkflowState,
}))

vi.mock('@/lib/workflows/schedules/utils', () => ({
calculateNextRunTime: vi.fn(),
getScheduleTimeValues: vi.fn(),
getSubBlockValue: vi.fn(),
getScheduleTimeValues: mockGetScheduleTimeValues,
getSubBlockValue: mockGetSubBlockValue,
}))

vi.mock('@/executor/execution/snapshot', () => ({
Expand All @@ -97,6 +117,94 @@ import { executeWorkflowJob } from './workflow-execution'
describe('async preprocessing correlation threading', () => {
beforeEach(() => {
vi.clearAllMocks()
mockLoadDeployedWorkflowState.mockResolvedValue({
blocks: {
'schedule-block': {
type: 'schedule',
},
},
edges: [],
loops: {},
parallels: {},
deploymentVersionId: 'deployment-1',
})
mockGetSubBlockValue.mockReturnValue('daily')
mockGetScheduleTimeValues.mockReturnValue({ timezone: 'UTC' })
})

it('does not pre-start workflow logging before core execution', async () => {
mockPreprocessExecution.mockResolvedValueOnce({
success: true,
actorUserId: 'actor-1',
workflowRecord: {
id: 'workflow-1',
userId: 'owner-1',
workspaceId: 'workspace-1',
variables: {},
},
executionTimeout: {},
})
mockExecuteWorkflowCore.mockResolvedValueOnce({
success: true,
status: 'success',
output: { ok: true },
metadata: { duration: 10, userId: 'actor-1' },
})

await executeWorkflowJob({
workflowId: 'workflow-1',
userId: 'user-1',
triggerType: 'api',
executionId: 'execution-1',
requestId: 'request-1',
})

const loggingSession = mockLoggingSession.mock.calls[0]?.[0]
expect(loggingSession).toBeDefined()
expect(loggingSession.safeStart).not.toHaveBeenCalled()
expect(mockExecuteWorkflowCore).toHaveBeenCalledWith(
expect.objectContaining({
loggingSession,
})
)
})

it('does not pre-start schedule logging before core execution', async () => {
mockPreprocessExecution.mockResolvedValueOnce({
success: true,
actorUserId: 'actor-2',
workflowRecord: {
id: 'workflow-1',
userId: 'owner-1',
workspaceId: 'workspace-1',
variables: {},
},
executionTimeout: {},
})
mockExecuteWorkflowCore.mockResolvedValueOnce({
success: true,
status: 'success',
output: { ok: true },
metadata: { duration: 12, userId: 'actor-2' },
})

await executeScheduleJob({
scheduleId: 'schedule-1',
workflowId: 'workflow-1',
executionId: 'execution-2',
requestId: 'request-2',
now: '2025-01-01T00:00:00.000Z',
scheduledFor: '2025-01-01T00:00:00.000Z',
})

const loggingSession = mockLoggingSession.mock.calls[0]?.[0]
expect(loggingSession).toBeDefined()
expect(loggingSession.safeStart).not.toHaveBeenCalled()
expect(mockExecuteWorkflowCore).toHaveBeenCalledWith(
expect.objectContaining({
loggingSession,
})
)
})

it('passes workflow correlation into preprocessing', async () => {
Expand Down
7 changes: 0 additions & 7 deletions apps/sim/background/schedule-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,13 +492,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
}

await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId,
variables: {},
triggerData: { correlation },
})

logger.info(`[${requestId}] Executing scheduled workflow ${payload.workflowId}`)

try {
Expand Down
7 changes: 0 additions & 7 deletions apps/sim/background/workflow-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,6 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {

logger.info(`[${requestId}] Preprocessing passed. Using actor: ${actorUserId}`)

await loggingSession.safeStart({
userId: actorUserId,
workspaceId,
variables: {},
triggerData: { correlation },
})

const workflow = preprocessResult.workflowRecord!

const metadata: ExecutionMetadata = {
Expand Down