diff --git a/apps/sim/lib/webhooks/providers/airtable.ts b/apps/sim/lib/webhooks/providers/airtable.ts index fd0463b4b9..add8683774 100644 --- a/apps/sim/lib/webhooks/providers/airtable.ts +++ b/apps/sim/lib/webhooks/providers/airtable.ts @@ -1,7 +1,11 @@ +import crypto from 'crypto' import { db } from '@sim/db' import { account, webhook } from '@sim/db/schema' import { createLogger } from '@sim/logger' +import { safeCompare } from '@sim/security/compare' +import { toError } from '@sim/utils/errors' import { eq } from 'drizzle-orm' +import { NextResponse } from 'next/server' import { validateAirtableId } from '@/lib/core/security/input-validation' import { getBaseUrl } from '@/lib/core/utils/urls' import { @@ -10,6 +14,7 @@ import { getProviderConfig, } from '@/lib/webhooks/provider-subscription-utils' import type { + AuthContext, DeleteSubscriptionContext, FormatInputContext, SubscriptionContext, @@ -44,22 +49,16 @@ interface AirtableTableChanges { destroyedRecordIds?: string[] } -/** - * Process Airtable payloads - */ async function fetchAndProcessAirtablePayloads( webhookData: Record, workflowData: Record, - requestId: string // Original request ID from the ping, used for the final execution log + requestId: string ) { - // Logging handles all error logging let currentCursor: number | null = null let mightHaveMore = true let payloadsFetched = 0 let apiCallCount = 0 - // Use a Map to consolidate changes per record ID const consolidatedChangesMap = new Map() - // Capture raw payloads from Airtable for exposure to workflows const allPayloads = [] const localProviderConfig = { ...((webhookData.providerConfig as Record) || {}), @@ -179,7 +178,6 @@ async function fetchAndProcessAirtablePayloads( while (mightHaveMore) { apiCallCount++ - // Safety break if (apiCallCount > 10) { mightHaveMore = false break @@ -217,7 +215,6 @@ async function fetchAndProcessAirtablePayloads( error: errorMessage, } ) - // Error logging handled by logging session mightHaveMore = false break } @@ -226,7 +223,6 @@ async function fetchAndProcessAirtablePayloads( if (receivedPayloads.length > 0) { payloadsFetched += receivedPayloads.length - // Keep the raw payloads for later exposure to the workflow for (const p of receivedPayloads) { allPayloads.push(p) } @@ -247,14 +243,11 @@ async function fetchAndProcessAirtablePayloads( )) { const existingChange = consolidatedChangesMap.get(recordId) if (existingChange) { - // Record was created and possibly updated within the same batch existingChange.changedFields = { ...existingChange.changedFields, ...(recordData.cellValuesByFieldId || {}), } - // Keep changeType as 'created' if it started as created } else { - // New creation consolidatedChangesMap.set(recordId, { tableId: tableId, recordId: recordId, @@ -265,7 +258,6 @@ async function fetchAndProcessAirtablePayloads( } } - // Handle updated records if (tableChanges.changedRecordsById) { const updatedCount = Object.keys(tableChanges.changedRecordsById).length changeCount += updatedCount @@ -277,16 +269,12 @@ async function fetchAndProcessAirtablePayloads( const currentFields = recordData.current?.cellValuesByFieldId || {} if (existingChange) { - // Existing record was updated again existingChange.changedFields = { ...existingChange.changedFields, ...currentFields, } - // Ensure type is 'updated' if it was previously 'created' existingChange.changeType = 'updated' - // Do not update previousFields again } else { - // First update for this record in the batch const newChange: AirtableChange = { tableId: tableId, recordId: recordId, @@ -300,7 +288,6 @@ async function fetchAndProcessAirtablePayloads( } } } - // TODO: Handle deleted records (`destroyedRecordIds`) if needed } } } @@ -312,22 +299,20 @@ async function fetchAndProcessAirtablePayloads( if (nextCursor && typeof nextCursor === 'number' && nextCursor !== currentCursor) { currentCursor = nextCursor - // Follow exactly the old implementation - use awaited update instead of parallel const updatedConfig = { ...localProviderConfig, externalWebhookCursor: currentCursor, } try { - // Force a complete object update to ensure consistency in serverless env await db .update(webhook) .set({ - providerConfig: updatedConfig, // Use full object + providerConfig: updatedConfig, updatedAt: new Date(), }) .where(eq(webhook.id, webhookData.id as string)) - localProviderConfig.externalWebhookCursor = currentCursor // Update local copy too + localProviderConfig.externalWebhookCursor = currentCursor } catch (dbError: unknown) { const err = dbError as Error logger.error(`[${requestId}] Failed to persist Airtable cursor to DB`, { @@ -335,9 +320,8 @@ async function fetchAndProcessAirtablePayloads( cursor: currentCursor, error: err.message, }) - // Error logging handled by logging session mightHaveMore = false - throw new Error('Failed to save Airtable cursor, stopping processing.') // Re-throw to break loop clearly + throw new Error('Failed to save Airtable cursor, stopping processing.') } } else if (!nextCursor || typeof nextCursor !== 'number') { logger.warn(`[${requestId}] Invalid or missing cursor received, stopping poll`, { @@ -347,19 +331,17 @@ async function fetchAndProcessAirtablePayloads( }) mightHaveMore = false } else if (nextCursor === currentCursor) { - mightHaveMore = false // Explicitly stop if cursor hasn't changed + mightHaveMore = false } } catch (fetchError: unknown) { logger.error( `[${requestId}] Network error calling Airtable GET /payloads (Call ${apiCallCount}) for webhook ${webhookData.id}`, fetchError ) - // Error logging handled by logging session mightHaveMore = false break } } - // Convert map values to array for final processing const finalConsolidatedChanges = Array.from(consolidatedChangesMap.values()) logger.info( `[${requestId}] Consolidated ${finalConsolidatedChanges.length} Airtable changes across ${apiCallCount} API calls` @@ -367,14 +349,11 @@ async function fetchAndProcessAirtablePayloads( if (finalConsolidatedChanges.length > 0 || allPayloads.length > 0) { try { - // Build input exposing raw payloads and consolidated changes const latestPayload = allPayloads.length > 0 ? allPayloads[allPayloads.length - 1] : null const input: Record = { payloads: allPayloads, latestPayload, - // Consolidated, simplified changes for convenience airtableChanges: finalConsolidatedChanges, - // Include webhook metadata for resolver fallbacks webhook: { data: { provider: 'airtable', @@ -384,9 +363,8 @@ async function fetchAndProcessAirtablePayloads( }, } - // CRITICAL EXECUTION TRACE POINT logger.info( - `[${requestId}] CRITICAL_TRACE: Beginning workflow execution with ${finalConsolidatedChanges.length} Airtable changes`, + `[${requestId}] Beginning workflow execution with ${finalConsolidatedChanges.length} Airtable changes`, { workflowId: workflowData.id, recordCount: finalConsolidatedChanges.length, @@ -395,8 +373,7 @@ async function fetchAndProcessAirtablePayloads( } ) - // Return the processed input for the trigger.dev task to handle - logger.info(`[${requestId}] CRITICAL_TRACE: Airtable changes processed, returning input`, { + logger.info(`[${requestId}] Airtable changes processed, returning input`, { workflowId: workflowData.id, recordCount: finalConsolidatedChanges.length, rawPayloadCount: allPayloads.length, @@ -406,7 +383,7 @@ async function fetchAndProcessAirtablePayloads( return input } catch (processingError: unknown) { const err = processingError as Error - logger.error(`[${requestId}] CRITICAL_TRACE: Error processing Airtable changes`, { + logger.error(`[${requestId}] Error processing Airtable changes`, { workflowId: workflowData.id, error: err.message, stack: err.stack, @@ -416,28 +393,71 @@ async function fetchAndProcessAirtablePayloads( throw processingError } } else { - // DEBUG: Log when no changes are found - logger.info(`[${requestId}] TRACE: No Airtable changes to process`, { + logger.info(`[${requestId}] No Airtable changes to process`, { workflowId: workflowData.id, apiCallCount, webhookId: webhookData.id, }) } } catch (error) { - // Catch any unexpected errors during the setup/polling logic itself logger.error( `[${requestId}] Unexpected error during asynchronous Airtable payload processing task`, { webhookId: webhookData.id, workflowId: workflowData.id, - error: (error as Error).message, + error: toError(error).message, } ) - // Error logging handled by logging session } } export const airtableHandler: WebhookProviderHandler = { + verifyAuth({ request, rawBody, requestId, providerConfig }: AuthContext) { + const macSecretBase64 = providerConfig.macSecretBase64 as string | undefined | null + + if (!macSecretBase64) { + logger.warn( + `[${requestId}] Airtable webhook has no macSecretBase64 in providerConfig — skipping MAC verification. Re-create the webhook to enable signature verification.` + ) + return null + } + + const signature = request.headers.get('X-Airtable-Content-MAC') + if (!signature) { + logger.warn(`[${requestId}] Airtable webhook missing X-Airtable-Content-MAC header`) + return new NextResponse('Unauthorized - Missing Airtable MAC header', { status: 401 }) + } + + const EXPECTED_PREFIX = 'hmac-sha256=' + if (!signature.startsWith(EXPECTED_PREFIX)) { + logger.warn(`[${requestId}] Airtable MAC signature has invalid format`) + return new NextResponse('Unauthorized - Invalid Airtable MAC signature format', { + status: 401, + }) + } + const providedHex = signature.slice(EXPECTED_PREFIX.length) + + try { + const secretBytes = Buffer.from(macSecretBase64, 'base64') + const computedHex = crypto + .createHmac('sha256', secretBytes) + .update(rawBody, 'utf8') + .digest('hex') + + if (!safeCompare(computedHex, providedHex)) { + logger.warn(`[${requestId}] Airtable MAC signature verification failed`) + return new NextResponse('Unauthorized - Invalid Airtable MAC signature', { status: 401 }) + } + } catch (error) { + logger.error(`[${requestId}] Error verifying Airtable MAC signature`, { + error: toError(error).message, + }) + return new NextResponse('Unauthorized - Signature verification error', { status: 401 }) + } + + return null + }, + async createSubscription({ webhook: webhookRecord, workflow, @@ -554,9 +574,14 @@ export const airtableHandler: WebhookProviderHandler = { airtableWebhookId: responseBody.id, } ) - return { providerConfigUpdates: { externalId: responseBody.id } } + return { + providerConfigUpdates: { + externalId: responseBody.id, + macSecretBase64: responseBody.macSecretBase64 ?? null, + }, + } } catch (error: unknown) { - const err = error as Error + const err = toError(error) logger.error( `[${requestId}] Exception during Airtable webhook creation for webhook ${webhookRecord.id}.`, { @@ -678,7 +703,7 @@ export const airtableHandler: WebhookProviderHandler = { } catch (e: unknown) { externalIdLookupFailed = true logger.warn(`[${requestId}] Error attempting to resolve Airtable externalId`, { - error: (e as Error)?.message, + error: toError(e).message, }) } } @@ -731,7 +756,7 @@ export const airtableHandler: WebhookProviderHandler = { }) } } catch (error: unknown) { - const err = error as Error + const err = toError(error) logger.error(`[${requestId}] Error deleting Airtable webhook`, { webhookId: webhookRecord.id, error: err.message, diff --git a/apps/sim/lib/webhooks/providers/hubspot.ts b/apps/sim/lib/webhooks/providers/hubspot.ts index 2591ee4017..77163f1ac9 100644 --- a/apps/sim/lib/webhooks/providers/hubspot.ts +++ b/apps/sim/lib/webhooks/providers/hubspot.ts @@ -1,5 +1,10 @@ +import crypto from 'crypto' import { createLogger } from '@sim/logger' +import { safeCompare } from '@sim/security/compare' +import { toError } from '@sim/utils/errors' +import { NextResponse } from 'next/server' import type { + AuthContext, EventMatchContext, FormatInputContext, FormatInputResult, @@ -9,6 +14,42 @@ import type { const logger = createLogger('WebhookProvider:HubSpot') export const hubspotHandler: WebhookProviderHandler = { + verifyAuth({ request, rawBody, requestId, providerConfig }: AuthContext) { + const clientSecret = providerConfig.clientSecret as string | undefined + + if (!clientSecret) { + logger.warn( + `[${requestId}] HubSpot webhook missing clientSecret in providerConfig — rejecting request` + ) + return new NextResponse('Unauthorized - Webhook secret not configured', { status: 401 }) + } + + const signature = request.headers.get('X-HubSpot-Signature') + if (!signature) { + logger.warn(`[${requestId}] HubSpot webhook missing X-HubSpot-Signature header`) + return new NextResponse('Unauthorized - Missing HubSpot signature', { status: 401 }) + } + + try { + const computedHash = crypto + .createHash('sha256') + .update(clientSecret + rawBody, 'utf8') + .digest('hex') + + if (!safeCompare(computedHash, signature)) { + logger.warn(`[${requestId}] HubSpot signature verification failed`) + return new NextResponse('Unauthorized - Invalid HubSpot signature', { status: 401 }) + } + } catch (error) { + logger.error(`[${requestId}] Error verifying HubSpot signature`, { + error: toError(error).message, + }) + return new NextResponse('Unauthorized - Signature verification error', { status: 401 }) + } + + return null + }, + async matchEvent({ webhook, workflow, body, requestId, providerConfig }: EventMatchContext) { const triggerId = providerConfig.triggerId as string | undefined diff --git a/apps/sim/lib/webhooks/providers/webflow.ts b/apps/sim/lib/webhooks/providers/webflow.ts index 7494ae3956..1edcb319e9 100644 --- a/apps/sim/lib/webhooks/providers/webflow.ts +++ b/apps/sim/lib/webhooks/providers/webflow.ts @@ -1,8 +1,14 @@ +import crypto from 'crypto' import { createLogger } from '@sim/logger' +import { safeCompare } from '@sim/security/compare' +import { toError } from '@sim/utils/errors' +import { NextResponse } from 'next/server' +import { env } from '@/lib/core/config/env' import { validateAlphanumericId } from '@/lib/core/security/input-validation' import { getBaseUrl } from '@/lib/core/utils/urls' import { getCredentialOwner, getProviderConfig } from '@/lib/webhooks/provider-subscription-utils' import type { + AuthContext, DeleteSubscriptionContext, EventFilterContext, FormatInputContext, @@ -16,6 +22,51 @@ import { getOAuthToken, refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/ const logger = createLogger('WebhookProvider:Webflow') export const webflowHandler: WebhookProviderHandler = { + verifyAuth({ request, rawBody, requestId, providerConfig }: AuthContext) { + const secretKey = providerConfig.secretKey as string | undefined | null + + if (!secretKey) { + logger.warn( + `[${requestId}] Webflow webhook missing secretKey in providerConfig — rejecting request` + ) + return new NextResponse('Unauthorized - Webhook secret not configured', { status: 401 }) + } + + const signature = request.headers.get('x-webflow-signature') + const timestamp = request.headers.get('x-webflow-timestamp') + + if (!signature || !timestamp) { + logger.warn(`[${requestId}] Webflow webhook missing signature or timestamp headers`) + return new NextResponse('Unauthorized - Missing Webflow signature headers', { status: 401 }) + } + + // x-webflow-timestamp is Unix milliseconds — compare directly with Date.now() + const ts = Number.parseInt(timestamp, 10) + if (Number.isNaN(ts) || Date.now() - ts > 5 * 60 * 1000) { + logger.warn(`[${requestId}] Webflow webhook timestamp expired or invalid`) + return new NextResponse('Unauthorized - Webhook timestamp expired', { status: 401 }) + } + + try { + const computedHash = crypto + .createHmac('sha256', secretKey) + .update(`${timestamp}:${rawBody}`, 'utf8') + .digest('hex') + + if (!safeCompare(computedHash, signature)) { + logger.warn(`[${requestId}] Webflow signature verification failed`) + return new NextResponse('Unauthorized - Invalid Webflow signature', { status: 401 }) + } + } catch (error) { + logger.error(`[${requestId}] Error verifying Webflow signature`, { + error: toError(error).message, + }) + return new NextResponse('Unauthorized - Signature verification error', { status: 401 }) + } + + return null + }, + async createSubscription({ webhook: webhookRecord, workflow, @@ -132,9 +183,14 @@ export const webflowHandler: WebhookProviderHandler = { } ) - return { providerConfigUpdates: { externalId: responseBody.id || responseBody._id } } + return { + providerConfigUpdates: { + externalId: responseBody.id || responseBody._id, + secretKey: responseBody.secretKey ?? env.WEBFLOW_CLIENT_SECRET ?? null, + }, + } } catch (error: unknown) { - const err = error as Error + const err = toError(error) logger.error( `[${requestId}] Exception during Webflow webhook creation for webhook ${webhookRecord.id}.`, {