Skip to content
Merged
Prev Previous commit
Next Next commit
improvement(polling): fix correctness and efficiency across all polli…
…ng handlers (#4067)

* improvement(polling): fix correctness and efficiency across all polling handlers

- Gmail: paginate history API, add historyTypes filter, differentiate 403/429,
  fetch fresh historyId on fallback to break 404 retry loop
- Outlook: follow @odata.nextLink pagination, use fetchWithRetry for all Graph
  calls, fix $top alignment, skip folder filter on partial resolution failure,
  remove Content-Type from GET requests
- RSS: add conditional GET (ETag/If-None-Match), raise GUID cap to 500, fix 304
  ETag capture per RFC 9111, align GUID tracking with idempotency fallback key
- IMAP: single connection reuse, UIDVALIDITY tracking per mailbox, advance UID
  only on successful fetch, fix messageFlagsAdd range type, remove cross-mailbox
  legacy UID fallback
- Dispatch polling via trigger.dev task with per-provider concurrency key;
  fall back to synchronous Redis-locked polling for self-hosted

* fix(rss): align idempotency key GUID fallback with tracking/filter guard

* removed comments

* fix(imap): clear stale UID when UIDVALIDITY changes during state merge

* fix(rss): skip items with no identifiable GUID to avoid idempotency key collisions

* fix(schedules): convert dynamic import of getWorkflowById to static import

* fix(imap): preserve fresh UID after UIDVALIDITY reset in state merge

* improvement(polling): remove trigger.dev dispatch, use synchronous Redis-locked polling

* fix(polling): decouple outlook page size from total email cap so pagination works
  • Loading branch information
waleedlatif1 authored Apr 9, 2026
commit 8e222fa369a5879e08d385c8799a79292fca2e53
2 changes: 1 addition & 1 deletion apps/sim/app/api/schedules/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { generateRequestId } from '@/lib/core/utils/request'
import { generateId } from '@/lib/core/utils/uuid'
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import { getWorkflowById } from '@/lib/workflows/utils'
import {
executeJobInline,
executeScheduleJob,
Expand Down Expand Up @@ -115,7 +116,6 @@ export async function GET(request: NextRequest) {
}

try {
const { getWorkflowById } = await import('@/lib/workflows/utils')
const resolvedWorkflow = schedule.workflowId
? await getWorkflowById(schedule.workflowId)
: null
Expand Down
58 changes: 30 additions & 28 deletions apps/sim/app/api/webhooks/poll/[provider]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ export async function GET(
const { provider } = await params
const requestId = generateShortId()

const LOCK_KEY = `${provider}-polling-lock`
let lockValue: string | undefined

try {
const authError = verifyCronAuth(request, `${provider} webhook polling`)
if (authError) return authError
Expand All @@ -31,29 +28,38 @@ export async function GET(
return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 })
}

lockValue = requestId
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
if (!locked) {
return NextResponse.json(
{
success: true,
message: 'Polling already in progress – skipped',
requestId,
status: 'skip',
},
{ status: 202 }
)
}
const LOCK_KEY = `${provider}-polling-lock`
let lockValue: string | undefined

try {
lockValue = requestId
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
if (!locked) {
return NextResponse.json(
{
success: true,
message: 'Polling already in progress – skipped',
requestId,
status: 'skip',
},
{ status: 202 }
)
}

const results = await pollProvider(provider)
const results = await pollProvider(provider)

return NextResponse.json({
success: true,
message: `${provider} polling completed`,
requestId,
status: 'completed',
...results,
})
return NextResponse.json({
success: true,
message: `${provider} polling completed`,
requestId,
status: 'completed',
...results,
})
} finally {
if (lockValue) {
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
}
}
} catch (error) {
logger.error(`Error during ${provider} polling (${requestId}):`, error)
return NextResponse.json(
Expand All @@ -65,9 +71,5 @@ export async function GET(
},
{ status: 500 }
)
} finally {
if (lockValue) {
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
}
}
}
105 changes: 75 additions & 30 deletions apps/sim/lib/webhooks/polling/gmail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,44 +151,68 @@ async function fetchNewEmails(
let latestHistoryId = config.historyId

if (useHistoryApi) {
const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}`
const messageIds = new Set<string>()
let pageToken: string | undefined

const historyResponse = await fetch(historyUrl, {
headers: { Authorization: `Bearer ${accessToken}` },
})
do {
let historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}&historyTypes=messageAdded`
if (pageToken) {
historyUrl += `&pageToken=${pageToken}`
}

if (!historyResponse.ok) {
const errorData = await historyResponse.json()
logger.error(`[${requestId}] Gmail history API error:`, {
status: historyResponse.status,
statusText: historyResponse.statusText,
error: errorData,
const historyResponse = await fetch(historyUrl, {
headers: { Authorization: `Bearer ${accessToken}` },
})

logger.info(`[${requestId}] Falling back to search API after history API failure`)
return searchEmails(accessToken, config, requestId, logger)
}
if (!historyResponse.ok) {
const status = historyResponse.status
const errorData = await historyResponse.json().catch(() => ({}))
logger.error(`[${requestId}] Gmail history API error:`, {
status,
statusText: historyResponse.statusText,
error: errorData,
})

if (status === 403 || status === 429) {
throw new Error(
`Gmail API error ${status} — skipping to retry next poll cycle: ${JSON.stringify(errorData)}`
)
}

const historyData = await historyResponse.json()
logger.info(`[${requestId}] Falling back to search API after history API error ${status}`)
const searchResult = await searchEmails(accessToken, config, requestId, logger)
if (searchResult.emails.length === 0) {
const freshHistoryId = await getGmailProfileHistoryId(accessToken, requestId, logger)
if (freshHistoryId) {
logger.info(
`[${requestId}] Fetched fresh historyId ${freshHistoryId} after invalid historyId (was: ${config.historyId})`
)
return { emails: [], latestHistoryId: freshHistoryId }
}
}
return searchResult
}

if (!historyData.history || !historyData.history.length) {
return { emails: [], latestHistoryId }
}
const historyData = await historyResponse.json()

if (historyData.historyId) {
latestHistoryId = historyData.historyId
}
if (historyData.historyId) {
latestHistoryId = historyData.historyId
}

const messageIds = new Set<string>()
for (const history of historyData.history) {
if (history.messagesAdded) {
for (const messageAdded of history.messagesAdded) {
messageIds.add(messageAdded.message.id)
if (historyData.history) {
for (const history of historyData.history) {
if (history.messagesAdded) {
for (const messageAdded of history.messagesAdded) {
messageIds.add(messageAdded.message.id)
}
}
}
}
}

if (messageIds.size === 0) {
pageToken = historyData.nextPageToken
} while (pageToken)

if (!messageIds.size) {
return { emails: [], latestHistoryId }
}

Expand Down Expand Up @@ -352,6 +376,29 @@ async function searchEmails(
}
}

async function getGmailProfileHistoryId(
accessToken: string,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<string | null> {
try {
const response = await fetch('https://gmail.googleapis.com/gmail/v1/users/me/profile', {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!response.ok) {
logger.warn(
`[${requestId}] Failed to fetch Gmail profile for fresh historyId: ${response.status}`
)
return null
}
const profile = await response.json()
return (profile.historyId as string | undefined) ?? null
} catch (error) {
logger.warn(`[${requestId}] Error fetching Gmail profile:`, error)
return null
}
}

async function getEmailDetails(accessToken: string, messageId: string): Promise<GmailEmail> {
const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full`

Expand Down Expand Up @@ -442,9 +489,7 @@ async function processEmails(
if (headers.date) {
try {
date = new Date(headers.date).toISOString()
} catch (_e) {
// Keep date as null if parsing fails
}
} catch (_e) {}
} else if (email.internalDate) {
date = new Date(Number.parseInt(email.internalDate)).toISOString()
}
Expand Down
Loading
Loading