Skip to content

Commit a72a671

Browse files
author
test
committed
fix(webhooks): reuse preprocessing execution ids
Thread preprocessing execution identity into queued webhook execution so both phases share the same correlation and logs.
1 parent ef63e9b commit a72a671

File tree

4 files changed

+276
-16
lines changed

4 files changed

+276
-16
lines changed

apps/sim/app/api/webhooks/trigger/[path]/route.test.ts

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,21 @@ vi.mock('@/lib/webhooks/processor', () => ({
355355
return null
356356
}
357357
),
358-
checkWebhookPreprocessing: vi
359-
.fn()
360-
.mockResolvedValue({ error: null, actorUserId: 'test-user-id' }),
358+
checkWebhookPreprocessing: vi.fn().mockResolvedValue({
359+
error: null,
360+
actorUserId: 'test-user-id',
361+
executionId: 'preprocess-execution-id',
362+
correlation: {
363+
executionId: 'preprocess-execution-id',
364+
requestId: 'mock-request-id',
365+
source: 'webhook',
366+
workflowId: 'test-workflow-id',
367+
webhookId: 'generic-webhook-id',
368+
path: 'test-path',
369+
provider: 'generic',
370+
triggerType: 'webhook',
371+
},
372+
}),
361373
formatProviderErrorResponse: vi.fn().mockImplementation((_webhook, error, status) => {
362374
const { NextResponse } = require('next/server')
363375
return NextResponse.json({ error }, { status })
@@ -516,7 +528,22 @@ describe('Webhook Trigger API Route', () => {
516528
expect(call[1]).toEqual(expect.objectContaining({ id: 'test-workflow-id' }))
517529
expect(call[2]).toEqual(expect.objectContaining({ event: 'test', id: 'test-123' }))
518530
expect(call[4]).toEqual(
519-
expect.objectContaining({ requestId: 'mock-request-id', path: 'test-path' })
531+
expect.objectContaining({
532+
requestId: 'mock-request-id',
533+
path: 'test-path',
534+
actorUserId: 'test-user-id',
535+
executionId: 'preprocess-execution-id',
536+
correlation: {
537+
executionId: 'preprocess-execution-id',
538+
requestId: 'mock-request-id',
539+
source: 'webhook',
540+
workflowId: 'test-workflow-id',
541+
webhookId: 'generic-webhook-id',
542+
path: 'test-path',
543+
provider: 'generic',
544+
triggerType: 'webhook',
545+
},
546+
})
520547
)
521548
})
522549

apps/sim/app/api/webhooks/trigger/[path]/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ export async function POST(
144144
requestId,
145145
path,
146146
actorUserId: preprocessResult.actorUserId,
147+
executionId: preprocessResult.executionId,
148+
correlation: preprocessResult.correlation,
147149
})
148150
responses.push(response)
149151
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
5+
import { createMockRequest } from '@sim/testing'
6+
import { beforeEach, describe, expect, it, vi } from 'vitest'
7+
8+
const {
9+
mockUuidV4,
10+
mockPreprocessExecution,
11+
mockEnqueue,
12+
mockGetJobQueue,
13+
mockShouldExecuteInline,
14+
} = vi.hoisted(() => ({
15+
mockUuidV4: vi.fn(),
16+
mockPreprocessExecution: vi.fn(),
17+
mockEnqueue: vi.fn(),
18+
mockGetJobQueue: vi.fn(),
19+
mockShouldExecuteInline: vi.fn(),
20+
}))
21+
22+
vi.mock('@sim/db', () => ({
23+
db: {},
24+
webhook: {},
25+
workflow: {},
26+
workflowDeploymentVersion: {},
27+
}))
28+
29+
vi.mock('@sim/db/schema', () => ({
30+
credentialSet: {},
31+
subscription: {},
32+
}))
33+
34+
vi.mock('@sim/logger', () => ({
35+
createLogger: vi.fn().mockReturnValue({
36+
info: vi.fn(),
37+
warn: vi.fn(),
38+
error: vi.fn(),
39+
debug: vi.fn(),
40+
}),
41+
}))
42+
43+
vi.mock('drizzle-orm', () => ({
44+
and: vi.fn(),
45+
eq: vi.fn(),
46+
isNull: vi.fn(),
47+
or: vi.fn(),
48+
}))
49+
50+
vi.mock('uuid', () => ({
51+
v4: mockUuidV4,
52+
}))
53+
54+
vi.mock('@/lib/billing/subscriptions/utils', () => ({
55+
checkEnterprisePlan: vi.fn().mockReturnValue(true),
56+
checkTeamPlan: vi.fn().mockReturnValue(true),
57+
}))
58+
59+
vi.mock('@/lib/core/async-jobs', () => ({
60+
getInlineJobQueue: vi.fn(),
61+
getJobQueue: mockGetJobQueue,
62+
shouldExecuteInline: mockShouldExecuteInline,
63+
}))
64+
65+
vi.mock('@/lib/core/config/feature-flags', () => ({
66+
isProd: false,
67+
}))
68+
69+
vi.mock('@/lib/core/security/encryption', () => ({
70+
safeCompare: vi.fn().mockReturnValue(true),
71+
}))
72+
73+
vi.mock('@/lib/environment/utils', () => ({
74+
getEffectiveDecryptedEnv: vi.fn().mockResolvedValue({}),
75+
}))
76+
77+
vi.mock('@/lib/execution/preprocessing', () => ({
78+
preprocessExecution: mockPreprocessExecution,
79+
}))
80+
81+
vi.mock('@/lib/webhooks/pending-verification', () => ({
82+
getPendingWebhookVerification: vi.fn(),
83+
matchesPendingWebhookVerificationProbe: vi.fn().mockReturnValue(false),
84+
requiresPendingWebhookVerification: vi.fn().mockReturnValue(false),
85+
}))
86+
87+
vi.mock('@/lib/webhooks/utils', () => ({
88+
convertSquareBracketsToTwiML: vi.fn((value: string) => value),
89+
}))
90+
91+
vi.mock('@/lib/webhooks/utils.server', () => ({
92+
handleSlackChallenge: vi.fn().mockReturnValue(null),
93+
handleWhatsAppVerification: vi.fn().mockResolvedValue(null),
94+
validateAttioSignature: vi.fn().mockReturnValue(true),
95+
validateCalcomSignature: vi.fn().mockReturnValue(true),
96+
validateCirclebackSignature: vi.fn().mockReturnValue(true),
97+
validateFirefliesSignature: vi.fn().mockReturnValue(true),
98+
validateGitHubSignature: vi.fn().mockReturnValue(true),
99+
validateJiraSignature: vi.fn().mockReturnValue(true),
100+
validateLinearSignature: vi.fn().mockReturnValue(true),
101+
validateMicrosoftTeamsSignature: vi.fn().mockReturnValue(true),
102+
validateTwilioSignature: vi.fn().mockResolvedValue(true),
103+
validateTypeformSignature: vi.fn().mockReturnValue(true),
104+
verifyProviderWebhook: vi.fn().mockReturnValue(null),
105+
}))
106+
107+
vi.mock('@/background/webhook-execution', () => ({
108+
executeWebhookJob: vi.fn().mockResolvedValue({ success: true }),
109+
}))
110+
111+
vi.mock('@/executor/utils/reference-validation', () => ({
112+
resolveEnvVarReferences: vi.fn((value: string) => value),
113+
}))
114+
115+
vi.mock('@/triggers/confluence/utils', () => ({
116+
isConfluencePayloadMatch: vi.fn().mockReturnValue(true),
117+
}))
118+
119+
vi.mock('@/triggers/constants', () => ({
120+
isPollingWebhookProvider: vi.fn((provider: string) => provider === 'gmail'),
121+
}))
122+
123+
vi.mock('@/triggers/github/utils', () => ({
124+
isGitHubEventMatch: vi.fn().mockReturnValue(true),
125+
}))
126+
127+
vi.mock('@/triggers/hubspot/utils', () => ({
128+
isHubSpotContactEventMatch: vi.fn().mockReturnValue(true),
129+
}))
130+
131+
vi.mock('@/triggers/jira/utils', () => ({
132+
isJiraEventMatch: vi.fn().mockReturnValue(true),
133+
}))
134+
135+
import { checkWebhookPreprocessing, queueWebhookExecution } from '@/lib/webhooks/processor'
136+
137+
describe('webhook processor execution identity', () => {
138+
beforeEach(() => {
139+
vi.clearAllMocks()
140+
mockPreprocessExecution.mockResolvedValue({
141+
success: true,
142+
actorUserId: 'actor-user-1',
143+
})
144+
mockEnqueue.mockResolvedValue('job-1')
145+
mockGetJobQueue.mockResolvedValue({ enqueue: mockEnqueue })
146+
mockShouldExecuteInline.mockReturnValue(false)
147+
mockUuidV4.mockReturnValue('generated-execution-id')
148+
})
149+
150+
it('reuses preprocessing execution identity when queueing a polling webhook', async () => {
151+
const preprocessingResult = await checkWebhookPreprocessing(
152+
{
153+
id: 'workflow-1',
154+
userId: 'owner-1',
155+
workspaceId: 'workspace-1',
156+
},
157+
{
158+
id: 'webhook-1',
159+
path: 'incoming/gmail',
160+
provider: 'gmail',
161+
},
162+
'request-1'
163+
)
164+
165+
expect(preprocessingResult).toMatchObject({
166+
error: null,
167+
actorUserId: 'actor-user-1',
168+
executionId: 'generated-execution-id',
169+
correlation: {
170+
executionId: 'generated-execution-id',
171+
requestId: 'request-1',
172+
source: 'webhook',
173+
workflowId: 'workflow-1',
174+
webhookId: 'webhook-1',
175+
path: 'incoming/gmail',
176+
provider: 'gmail',
177+
triggerType: 'webhook',
178+
},
179+
})
180+
181+
await queueWebhookExecution(
182+
{
183+
id: 'webhook-1',
184+
path: 'incoming/gmail',
185+
provider: 'gmail',
186+
providerConfig: {},
187+
blockId: 'block-1',
188+
},
189+
{
190+
id: 'workflow-1',
191+
workspaceId: 'workspace-1',
192+
},
193+
{ event: 'message.received' },
194+
createMockRequest('POST', { event: 'message.received' }) as any,
195+
{
196+
requestId: 'request-1',
197+
path: 'incoming/gmail',
198+
actorUserId: preprocessingResult.actorUserId,
199+
executionId: preprocessingResult.executionId,
200+
correlation: preprocessingResult.correlation,
201+
}
202+
)
203+
204+
expect(mockUuidV4).toHaveBeenCalledTimes(1)
205+
expect(mockEnqueue).toHaveBeenCalledWith(
206+
'webhook-execution',
207+
expect.objectContaining({
208+
executionId: 'generated-execution-id',
209+
requestId: 'request-1',
210+
correlation: preprocessingResult.correlation,
211+
}),
212+
expect.objectContaining({
213+
metadata: expect.objectContaining({
214+
correlation: preprocessingResult.correlation,
215+
}),
216+
})
217+
)
218+
})
219+
})

apps/sim/lib/webhooks/processor.ts

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { type NextRequest, NextResponse } from 'next/server'
66
import { v4 as uuidv4 } from 'uuid'
77
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
88
import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
9+
import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
910
import { isProd } from '@/lib/core/config/feature-flags'
1011
import { safeCompare } from '@/lib/core/security/encryption'
1112
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
@@ -46,11 +47,15 @@ export interface WebhookProcessorOptions {
4647
path?: string
4748
webhookId?: string
4849
actorUserId?: string
50+
executionId?: string
51+
correlation?: AsyncExecutionCorrelation
4952
}
5053

5154
export interface WebhookPreprocessingResult {
5255
error: NextResponse | null
5356
actorUserId?: string
57+
executionId?: string
58+
correlation?: AsyncExecutionCorrelation
5459
}
5560

5661
function getExternalUrl(request: NextRequest): string {
@@ -977,7 +982,12 @@ export async function checkWebhookPreprocessing(
977982
}
978983
}
979984

980-
return { error: null, actorUserId: preprocessResult.actorUserId }
985+
return {
986+
error: null,
987+
actorUserId: preprocessResult.actorUserId,
988+
executionId,
989+
correlation,
990+
}
981991
} catch (preprocessError) {
982992
logger.error(`[${requestId}] Error during webhook preprocessing:`, preprocessError)
983993

@@ -1204,17 +1214,19 @@ export async function queueWebhookExecution(
12041214
return NextResponse.json({ error: 'Unable to resolve billing account' }, { status: 500 })
12051215
}
12061216

1207-
const executionId = uuidv4()
1208-
const correlation = {
1209-
executionId,
1210-
requestId: options.requestId,
1211-
source: 'webhook' as const,
1212-
workflowId: foundWorkflow.id,
1213-
webhookId: foundWebhook.id,
1214-
path: options.path || foundWebhook.path,
1215-
provider: foundWebhook.provider,
1216-
triggerType: 'webhook',
1217-
}
1217+
const executionId = options.executionId ?? uuidv4()
1218+
const correlation =
1219+
options.correlation ??
1220+
({
1221+
executionId,
1222+
requestId: options.requestId,
1223+
source: 'webhook' as const,
1224+
workflowId: foundWorkflow.id,
1225+
webhookId: foundWebhook.id,
1226+
path: options.path || foundWebhook.path,
1227+
provider: foundWebhook.provider,
1228+
triggerType: 'webhook',
1229+
} satisfies AsyncExecutionCorrelation)
12181230

12191231
const payload = {
12201232
webhookId: foundWebhook.id,

0 commit comments

Comments
 (0)