Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(async): preserve execution correlation across queued runs
  • Loading branch information
PlaneInABottle authored and test committed Mar 13, 2026
commit b606219495e739198ead2e0216f71811807fb584
46 changes: 45 additions & 1 deletion apps/sim/app/api/schedules/execute/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ vi.mock('@sim/db', () => ({
},
}))

import { GET } from '@/app/api/schedules/execute/route'
vi.mock('uuid', () => ({
v4: vi.fn().mockReturnValue('schedule-execution-1'),
}))

import { GET } from './route'

const SINGLE_SCHEDULE = [
{
Expand Down Expand Up @@ -204,4 +208,44 @@ describe('Scheduled Workflow Execution API Route', () => {
const data = await response.json()
expect(data).toHaveProperty('executedCount', 2)
})

it('should enqueue preassigned correlation metadata for schedules', async () => {
mockDbReturning.mockReturnValue(SINGLE_SCHEDULE)

const response = await GET(createMockRequest())

expect(response.status).toBe(200)
expect(mockEnqueue).toHaveBeenCalledWith(
'schedule-execution',
expect.objectContaining({
scheduleId: 'schedule-1',
workflowId: 'workflow-1',
executionId: 'schedule-execution-1',
requestId: 'test-request-id',
correlation: {
executionId: 'schedule-execution-1',
requestId: 'test-request-id',
source: 'schedule',
workflowId: 'workflow-1',
scheduleId: 'schedule-1',
triggerType: 'schedule',
scheduledFor: '2025-01-01T00:00:00.000Z',
},
}),
{
metadata: {
workflowId: 'workflow-1',
correlation: {
executionId: 'schedule-execution-1',
requestId: 'test-request-id',
source: 'schedule',
workflowId: 'workflow-1',
scheduleId: 'schedule-1',
triggerType: 'schedule',
scheduledFor: '2025-01-01T00:00:00.000Z',
},
},
}
)
})
})
16 changes: 15 additions & 1 deletion apps/sim/app/api/schedules/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { db, workflowDeploymentVersion, workflowSchedule } from '@sim/db'
import { createLogger } from '@sim/logger'
import { and, eq, isNull, lt, lte, not, or, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { verifyCronAuth } from '@/lib/auth/internal'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { generateRequestId } from '@/lib/core/utils/request'
Expand Down Expand Up @@ -57,10 +58,23 @@ export async function GET(request: NextRequest) {

const queuePromises = dueSchedules.map(async (schedule) => {
const queueTime = schedule.lastQueuedAt ?? queuedAt
const executionId = uuidv4()
const correlation = {
executionId,
requestId,
source: 'schedule' as const,
workflowId: schedule.workflowId,
scheduleId: schedule.id,
triggerType: 'schedule',
scheduledFor: schedule.nextRunAt?.toISOString(),
}

const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
executionId,
requestId,
correlation,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
Expand All @@ -71,7 +85,7 @@ export async function GET(request: NextRequest) {

try {
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
metadata: { workflowId: schedule.workflowId },
metadata: { workflowId: schedule.workflowId, correlation },
})
logger.info(
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
Expand Down
62 changes: 44 additions & 18 deletions apps/sim/app/api/webhooks/trigger/[path]/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ const {
processWebhookMock,
executeMock,
getWorkspaceBilledAccountUserIdMock,
queueWebhookExecutionMock,
} = vi.hoisted(() => ({
generateRequestHashMock: vi.fn().mockResolvedValue('test-hash-123'),
validateSlackSignatureMock: vi.fn().mockResolvedValue(true),
Expand All @@ -125,6 +126,10 @@ const {
.mockImplementation(async (workspaceId: string | null | undefined) =>
workspaceId ? 'test-user-id' : null
),
queueWebhookExecutionMock: vi.fn().mockImplementation(async () => {
const { NextResponse } = await import('next/server')
return NextResponse.json({ message: 'Webhook processed' })
}),
}))

vi.mock('@trigger.dev/sdk', () => ({
Expand Down Expand Up @@ -359,12 +364,7 @@ vi.mock('@/lib/webhooks/processor', () => ({
}),
shouldSkipWebhookEvent: vi.fn().mockReturnValue(false),
handlePreDeploymentVerification: vi.fn().mockReturnValue(null),
queueWebhookExecution: vi.fn().mockImplementation(async () => {
// Call processWebhookMock so tests can verify it was called
processWebhookMock()
const { NextResponse } = await import('next/server')
return NextResponse.json({ message: 'Webhook processed' })
}),
queueWebhookExecution: queueWebhookExecutionMock,
}))

vi.mock('drizzle-orm/postgres-js', () => ({
Expand Down Expand Up @@ -419,7 +419,7 @@ describe('Webhook Trigger API Route', () => {

const params = Promise.resolve({ path: 'non-existent-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(404)

Expand Down Expand Up @@ -494,6 +494,32 @@ describe('Webhook Trigger API Route', () => {
})

describe('Generic Webhook Authentication', () => {
it('passes correlation-bearing request context into webhook queueing', async () => {
testData.webhooks.push({
id: 'generic-webhook-id',
provider: 'generic',
path: 'test-path',
isActive: true,
providerConfig: { requireAuth: false },
workflowId: 'test-workflow-id',
})

const req = createMockRequest('POST', { event: 'test', id: 'test-123' })
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req as any, { params })

expect(response.status).toBe(200)
expect(queueWebhookExecutionMock).toHaveBeenCalledOnce()
const call = queueWebhookExecutionMock.mock.calls[0]
expect(call[0]).toEqual(expect.objectContaining({ id: 'generic-webhook-id' }))
expect(call[1]).toEqual(expect.objectContaining({ id: 'test-workflow-id' }))
expect(call[2]).toEqual(expect.objectContaining({ event: 'test', id: 'test-123' }))
expect(call[4]).toEqual(
expect.objectContaining({ requestId: 'mock-request-id', path: 'test-path' })
)
})

it('should process generic webhook without authentication', async () => {
testData.webhooks.push({
id: 'generic-webhook-id',
Expand All @@ -514,7 +540,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'test', id: 'test-123' })
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(200)

Expand Down Expand Up @@ -544,7 +570,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'bearer.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(200)
})
Expand Down Expand Up @@ -575,7 +601,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'custom.header.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(200)
})
Expand Down Expand Up @@ -610,7 +636,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'case.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(200)
}
Expand Down Expand Up @@ -645,7 +671,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'custom.case.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(200)
}
Expand All @@ -668,7 +694,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'wrong.token.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(401)
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
Expand Down Expand Up @@ -696,7 +722,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'wrong.custom.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(401)
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
Expand All @@ -716,7 +742,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'no.auth.test' })
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(401)
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
Expand Down Expand Up @@ -744,7 +770,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'exclusivity.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(401)
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
Expand Down Expand Up @@ -772,7 +798,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'wrong.header.name.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(401)
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
Expand All @@ -797,7 +823,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'no.token.config.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })

const response = await POST(req, { params })
const response = await POST(req as any, { params })

expect(response.status).toBe(401)
expect(await response.text()).toContain(
Expand Down
Loading