Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 70 additions & 45 deletions apps/sim/lib/webhooks/providers/airtable.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import crypto from 'crypto'
import { db } from '@sim/db'
import { account, webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@sim/security/compare'
import { toError } from '@sim/utils/errors'
import { eq } from 'drizzle-orm'
import { NextResponse } from 'next/server'
import { validateAirtableId } from '@/lib/core/security/input-validation'
import { getBaseUrl } from '@/lib/core/utils/urls'
import {
Expand All @@ -10,6 +14,7 @@ import {
getProviderConfig,
} from '@/lib/webhooks/provider-subscription-utils'
import type {
AuthContext,
DeleteSubscriptionContext,
FormatInputContext,
SubscriptionContext,
Expand Down Expand Up @@ -44,22 +49,16 @@ interface AirtableTableChanges {
destroyedRecordIds?: string[]
}

/**
* Process Airtable payloads
*/
async function fetchAndProcessAirtablePayloads(
webhookData: Record<string, unknown>,
workflowData: Record<string, unknown>,
requestId: string // Original request ID from the ping, used for the final execution log
requestId: string
) {
// Logging handles all error logging
let currentCursor: number | null = null
let mightHaveMore = true
let payloadsFetched = 0
let apiCallCount = 0
// Use a Map to consolidate changes per record ID
const consolidatedChangesMap = new Map<string, AirtableChange>()
// Capture raw payloads from Airtable for exposure to workflows
const allPayloads = []
const localProviderConfig = {
...((webhookData.providerConfig as Record<string, unknown>) || {}),
Expand Down Expand Up @@ -179,7 +178,6 @@ async function fetchAndProcessAirtablePayloads(

while (mightHaveMore) {
apiCallCount++
// Safety break
if (apiCallCount > 10) {
mightHaveMore = false
break
Expand Down Expand Up @@ -217,7 +215,6 @@ async function fetchAndProcessAirtablePayloads(
error: errorMessage,
}
)
// Error logging handled by logging session
mightHaveMore = false
break
}
Expand All @@ -226,7 +223,6 @@ async function fetchAndProcessAirtablePayloads(

if (receivedPayloads.length > 0) {
payloadsFetched += receivedPayloads.length
// Keep the raw payloads for later exposure to the workflow
for (const p of receivedPayloads) {
allPayloads.push(p)
}
Expand All @@ -247,14 +243,11 @@ async function fetchAndProcessAirtablePayloads(
)) {
const existingChange = consolidatedChangesMap.get(recordId)
if (existingChange) {
// Record was created and possibly updated within the same batch
existingChange.changedFields = {
...existingChange.changedFields,
...(recordData.cellValuesByFieldId || {}),
}
// Keep changeType as 'created' if it started as created
} else {
// New creation
consolidatedChangesMap.set(recordId, {
tableId: tableId,
recordId: recordId,
Expand All @@ -265,7 +258,6 @@ async function fetchAndProcessAirtablePayloads(
}
}

// Handle updated records
if (tableChanges.changedRecordsById) {
const updatedCount = Object.keys(tableChanges.changedRecordsById).length
changeCount += updatedCount
Expand All @@ -277,16 +269,12 @@ async function fetchAndProcessAirtablePayloads(
const currentFields = recordData.current?.cellValuesByFieldId || {}

if (existingChange) {
// Existing record was updated again
existingChange.changedFields = {
...existingChange.changedFields,
...currentFields,
}
// Ensure type is 'updated' if it was previously 'created'
existingChange.changeType = 'updated'
// Do not update previousFields again
} else {
// First update for this record in the batch
const newChange: AirtableChange = {
tableId: tableId,
recordId: recordId,
Expand All @@ -300,7 +288,6 @@ async function fetchAndProcessAirtablePayloads(
}
}
}
// TODO: Handle deleted records (`destroyedRecordIds`) if needed
}
}
}
Expand All @@ -312,32 +299,29 @@ async function fetchAndProcessAirtablePayloads(
if (nextCursor && typeof nextCursor === 'number' && nextCursor !== currentCursor) {
currentCursor = nextCursor

// Follow exactly the old implementation - use awaited update instead of parallel
const updatedConfig = {
...localProviderConfig,
externalWebhookCursor: currentCursor,
}
try {
// Force a complete object update to ensure consistency in serverless env
await db
.update(webhook)
.set({
providerConfig: updatedConfig, // Use full object
providerConfig: updatedConfig,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookData.id as string))

localProviderConfig.externalWebhookCursor = currentCursor // Update local copy too
localProviderConfig.externalWebhookCursor = currentCursor
} catch (dbError: unknown) {
const err = dbError as Error
logger.error(`[${requestId}] Failed to persist Airtable cursor to DB`, {
webhookId: webhookData.id,
cursor: currentCursor,
error: err.message,
})
// Error logging handled by logging session
mightHaveMore = false
throw new Error('Failed to save Airtable cursor, stopping processing.') // Re-throw to break loop clearly
throw new Error('Failed to save Airtable cursor, stopping processing.')
}
} else if (!nextCursor || typeof nextCursor !== 'number') {
logger.warn(`[${requestId}] Invalid or missing cursor received, stopping poll`, {
Expand All @@ -347,34 +331,29 @@ async function fetchAndProcessAirtablePayloads(
})
mightHaveMore = false
} else if (nextCursor === currentCursor) {
mightHaveMore = false // Explicitly stop if cursor hasn't changed
mightHaveMore = false
}
} catch (fetchError: unknown) {
logger.error(
`[${requestId}] Network error calling Airtable GET /payloads (Call ${apiCallCount}) for webhook ${webhookData.id}`,
fetchError
)
// Error logging handled by logging session
mightHaveMore = false
break
}
}
// Convert map values to array for final processing
const finalConsolidatedChanges = Array.from(consolidatedChangesMap.values())
logger.info(
`[${requestId}] Consolidated ${finalConsolidatedChanges.length} Airtable changes across ${apiCallCount} API calls`
)

if (finalConsolidatedChanges.length > 0 || allPayloads.length > 0) {
try {
// Build input exposing raw payloads and consolidated changes
const latestPayload = allPayloads.length > 0 ? allPayloads[allPayloads.length - 1] : null
const input: Record<string, unknown> = {
payloads: allPayloads,
latestPayload,
// Consolidated, simplified changes for convenience
airtableChanges: finalConsolidatedChanges,
// Include webhook metadata for resolver fallbacks
webhook: {
data: {
provider: 'airtable',
Expand All @@ -384,9 +363,8 @@ async function fetchAndProcessAirtablePayloads(
},
}

// CRITICAL EXECUTION TRACE POINT
logger.info(
`[${requestId}] CRITICAL_TRACE: Beginning workflow execution with ${finalConsolidatedChanges.length} Airtable changes`,
`[${requestId}] Beginning workflow execution with ${finalConsolidatedChanges.length} Airtable changes`,
{
workflowId: workflowData.id,
recordCount: finalConsolidatedChanges.length,
Expand All @@ -395,8 +373,7 @@ async function fetchAndProcessAirtablePayloads(
}
)

// Return the processed input for the trigger.dev task to handle
logger.info(`[${requestId}] CRITICAL_TRACE: Airtable changes processed, returning input`, {
logger.info(`[${requestId}] Airtable changes processed, returning input`, {
workflowId: workflowData.id,
recordCount: finalConsolidatedChanges.length,
rawPayloadCount: allPayloads.length,
Expand All @@ -406,7 +383,7 @@ async function fetchAndProcessAirtablePayloads(
return input
} catch (processingError: unknown) {
const err = processingError as Error
logger.error(`[${requestId}] CRITICAL_TRACE: Error processing Airtable changes`, {
logger.error(`[${requestId}] Error processing Airtable changes`, {
workflowId: workflowData.id,
error: err.message,
stack: err.stack,
Expand All @@ -416,28 +393,71 @@ async function fetchAndProcessAirtablePayloads(
throw processingError
}
} else {
// DEBUG: Log when no changes are found
logger.info(`[${requestId}] TRACE: No Airtable changes to process`, {
logger.info(`[${requestId}] No Airtable changes to process`, {
workflowId: workflowData.id,
apiCallCount,
webhookId: webhookData.id,
})
}
} catch (error) {
// Catch any unexpected errors during the setup/polling logic itself
logger.error(
`[${requestId}] Unexpected error during asynchronous Airtable payload processing task`,
{
webhookId: webhookData.id,
workflowId: workflowData.id,
error: (error as Error).message,
error: toError(error).message,
}
)
// Error logging handled by logging session
}
}

export const airtableHandler: WebhookProviderHandler = {
verifyAuth({ request, rawBody, requestId, providerConfig }: AuthContext) {
const macSecretBase64 = providerConfig.macSecretBase64 as string | undefined | null

if (!macSecretBase64) {
logger.warn(
`[${requestId}] Airtable webhook has no macSecretBase64 in providerConfig — skipping MAC verification. Re-create the webhook to enable signature verification.`
)
return null
}

const signature = request.headers.get('X-Airtable-Content-MAC')
if (!signature) {
logger.warn(`[${requestId}] Airtable webhook missing X-Airtable-Content-MAC header`)
return new NextResponse('Unauthorized - Missing Airtable MAC header', { status: 401 })
}

const EXPECTED_PREFIX = 'hmac-sha256='
if (!signature.startsWith(EXPECTED_PREFIX)) {
logger.warn(`[${requestId}] Airtable MAC signature has invalid format`)
return new NextResponse('Unauthorized - Invalid Airtable MAC signature format', {
status: 401,
})
}
const providedHex = signature.slice(EXPECTED_PREFIX.length)

try {
const secretBytes = Buffer.from(macSecretBase64, 'base64')
const computedHex = crypto
.createHmac('sha256', secretBytes)
.update(rawBody, 'utf8')
.digest('hex')

if (!safeCompare(computedHex, providedHex)) {
logger.warn(`[${requestId}] Airtable MAC signature verification failed`)
return new NextResponse('Unauthorized - Invalid Airtable MAC signature', { status: 401 })
}
} catch (error) {
logger.error(`[${requestId}] Error verifying Airtable MAC signature`, {
error: toError(error).message,
})
return new NextResponse('Unauthorized - Signature verification error', { status: 401 })
}

return null
},

async createSubscription({
webhook: webhookRecord,
workflow,
Expand Down Expand Up @@ -554,9 +574,14 @@ export const airtableHandler: WebhookProviderHandler = {
airtableWebhookId: responseBody.id,
}
)
return { providerConfigUpdates: { externalId: responseBody.id } }
return {
providerConfigUpdates: {
externalId: responseBody.id,
macSecretBase64: responseBody.macSecretBase64 ?? null,
},
}
} catch (error: unknown) {
const err = error as Error
const err = toError(error)
logger.error(
`[${requestId}] Exception during Airtable webhook creation for webhook ${webhookRecord.id}.`,
{
Expand Down Expand Up @@ -678,7 +703,7 @@ export const airtableHandler: WebhookProviderHandler = {
} catch (e: unknown) {
externalIdLookupFailed = true
logger.warn(`[${requestId}] Error attempting to resolve Airtable externalId`, {
error: (e as Error)?.message,
error: toError(e).message,
})
}
}
Expand Down Expand Up @@ -731,7 +756,7 @@ export const airtableHandler: WebhookProviderHandler = {
})
}
} catch (error: unknown) {
const err = error as Error
const err = toError(error)
logger.error(`[${requestId}] Error deleting Airtable webhook`, {
webhookId: webhookRecord.id,
error: err.message,
Expand Down
41 changes: 41 additions & 0 deletions apps/sim/lib/webhooks/providers/hubspot.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { safeCompare } from '@sim/security/compare'
import { toError } from '@sim/utils/errors'
import { NextResponse } from 'next/server'
import type {
AuthContext,
EventMatchContext,
FormatInputContext,
FormatInputResult,
Expand All @@ -9,6 +14,42 @@ import type {
const logger = createLogger('WebhookProvider:HubSpot')

export const hubspotHandler: WebhookProviderHandler = {
verifyAuth({ request, rawBody, requestId, providerConfig }: AuthContext) {
const clientSecret = providerConfig.clientSecret as string | undefined

if (!clientSecret) {
logger.warn(
`[${requestId}] HubSpot webhook missing clientSecret in providerConfig — rejecting request`
)
return new NextResponse('Unauthorized - Webhook secret not configured', { status: 401 })
}

const signature = request.headers.get('X-HubSpot-Signature')
if (!signature) {
logger.warn(`[${requestId}] HubSpot webhook missing X-HubSpot-Signature header`)
return new NextResponse('Unauthorized - Missing HubSpot signature', { status: 401 })
}

Comment thread
waleedlatif1 marked this conversation as resolved.
try {
const computedHash = crypto
.createHash('sha256')
.update(clientSecret + rawBody, 'utf8')
.digest('hex')

if (!safeCompare(computedHash, signature)) {
logger.warn(`[${requestId}] HubSpot signature verification failed`)
return new NextResponse('Unauthorized - Invalid HubSpot signature', { status: 401 })
}
} catch (error) {
logger.error(`[${requestId}] Error verifying HubSpot signature`, {
error: toError(error).message,
})
return new NextResponse('Unauthorized - Signature verification error', { status: 401 })
}

return null
},

async matchEvent({ webhook, workflow, body, requestId, providerConfig }: EventMatchContext) {
const triggerId = providerConfig.triggerId as string | undefined

Expand Down
Loading
Loading