From 67cc80c9d652fa22de9761f61600eb38313f6d32 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 13:17:32 -0700 Subject: [PATCH 1/5] improvement(knowledge): batch trigger dispatch, prune redundant DB roundtrips Connector sync was dispatching Trigger.dev document-processing jobs one HTTP roundtrip at a time. processDocumentsWithQueue now uses tasks.batchTrigger when Trigger.dev is available, collapsing N roundtrips to ceil(N/1000). Idempotency keys protect against duplicate runs on retry. Also trims DB roundtrips inside the sync loop: - Per-batch isConnectorDeleted + isKnowledgeBaseDeleted collapsed into a single checkSyncLiveness JOIN (one SELECT instead of two per batch). - Dropped redundant pre-upload isKnowledgeBaseDeleted checks from addDocument/updateDocument: the batch-boundary liveness check already catches pre-batch deletions and the in-tx FOR UPDATE is authoritative for races during the batch. - Removed dead processDocumentsWithTrigger helper (never called). --- .../lib/knowledge/connectors/sync-engine.ts | 53 ++++++---- apps/sim/lib/knowledge/documents/service.ts | 100 ++++++++---------- 2 files changed, 76 insertions(+), 77 deletions(-) diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index 7da40aebd1..e6ec9bae96 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -58,22 +58,33 @@ type DocOp = | { type: 'add'; extDoc: ExternalDocument } | { type: 'update'; existingId: string; extDoc: ExternalDocument } -async function isConnectorDeleted(connectorId: string): Promise { +/** + * Combined liveness check used between batches. One JOIN query checks both + * connector and knowledge base state in a single roundtrip. + */ +async function checkSyncLiveness( + connectorId: string, + knowledgeBaseId: string +): Promise<{ connectorDeleted: boolean; knowledgeBaseDeleted: boolean }> { const rows = await db - .select({ archivedAt: knowledgeConnector.archivedAt, deletedAt: knowledgeConnector.deletedAt }) + .select({ + connectorArchivedAt: knowledgeConnector.archivedAt, + connectorDeletedAt: knowledgeConnector.deletedAt, + kbDeletedAt: knowledgeBase.deletedAt, + }) .from(knowledgeConnector) - .where(eq(knowledgeConnector.id, connectorId)) + .innerJoin(knowledgeBase, eq(knowledgeBase.id, knowledgeConnector.knowledgeBaseId)) + .where(and(eq(knowledgeConnector.id, connectorId), eq(knowledgeBase.id, knowledgeBaseId))) .limit(1) - return rows.length === 0 || rows[0].archivedAt !== null || rows[0].deletedAt !== null -} -async function isKnowledgeBaseDeleted(knowledgeBaseId: string): Promise { - const rows = await db - .select({ deletedAt: knowledgeBase.deletedAt }) - .from(knowledgeBase) - .where(eq(knowledgeBase.id, knowledgeBaseId)) - .limit(1) - return rows.length === 0 || rows[0].deletedAt !== null + if (rows.length === 0) { + return { connectorDeleted: true, knowledgeBaseDeleted: true } + } + const row = rows[0] + return { + connectorDeleted: row.connectorArchivedAt !== null || row.connectorDeletedAt !== null, + knowledgeBaseDeleted: row.kbDeletedAt !== null, + } } async function isKnowledgeBaseActiveInTx( @@ -502,10 +513,11 @@ export async function executeSync( } for (let i = 0; i < pendingOps.length; i += SYNC_BATCH_SIZE) { - if (await isConnectorDeleted(connectorId)) { + const liveness = await checkSyncLiveness(connectorId, connector.knowledgeBaseId) + if (liveness.connectorDeleted) { throw new ConnectorDeletedException(connectorId) } - if (await isKnowledgeBaseDeleted(connector.knowledgeBaseId)) { + if (liveness.knowledgeBaseDeleted) { throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`) } @@ -642,11 +654,12 @@ export async function executeSync( } } - // Check if connector was deleted before retrying stuck documents - if (await isConnectorDeleted(connectorId)) { + // Check if connector/KB were deleted before retrying stuck documents + const postBatchLiveness = await checkSyncLiveness(connectorId, connector.knowledgeBaseId) + if (postBatchLiveness.connectorDeleted) { throw new ConnectorDeletedException(connectorId) } - if (await isKnowledgeBaseDeleted(connector.knowledgeBaseId)) { + if (postBatchLiveness.knowledgeBaseDeleted) { throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`) } @@ -881,9 +894,6 @@ async function addDocument( extDoc: ExternalDocument, sourceConfig?: Record ): Promise { - if (await isKnowledgeBaseDeleted(knowledgeBaseId)) { - throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`) - } const documentId = generateId() const contentBuffer = Buffer.from(extDoc.content, 'utf-8') const safeTitle = sanitizeStorageTitle(extDoc.title) @@ -963,9 +973,6 @@ async function updateDocument( extDoc: ExternalDocument, sourceConfig?: Record ): Promise { - if (await isKnowledgeBaseDeleted(knowledgeBaseId)) { - throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`) - } // Fetch old file URL before uploading replacement const existingRows = await db .select({ fileUrl: document.fileUrl }) diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 204b517911..06e0cb9b7d 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -48,7 +48,6 @@ import { estimateTokenCount } from '@/lib/tokenization/estimators' import { deleteFile } from '@/lib/uploads/core/storage-service' import { extractStorageKey } from '@/lib/uploads/utils/file-utils' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' -import type { DocumentProcessingPayload } from '@/background/knowledge-processing' import { calculateCost } from '@/providers/utils' const logger = createLogger('DocumentService') @@ -314,12 +313,16 @@ async function processDocumentTags( return result } +const TRIGGER_BATCH_SIZE = 1000 + export async function processDocumentsWithQueue( createdDocuments: DocumentData[], knowledgeBaseId: string, processingOptions: ProcessingOptions, requestId: string ): Promise { + if (createdDocuments.length === 0) return + const jobPayloads = createdDocuments.map((doc) => ({ knowledgeBaseId, documentId: doc.documentId, @@ -333,13 +336,52 @@ export async function processDocumentsWithQueue( requestId, })) + const useTrigger = isTriggerAvailable() logger.info( `[${requestId}] Dispatching background processing for ${jobPayloads.length} documents`, - { - backend: isTriggerAvailable() ? 'trigger-dev' : 'direct', - } + { backend: useTrigger ? 'trigger-dev' : 'direct' } ) + if (useTrigger) { + /** + * Single batched dispatch per chunk of up to TRIGGER_BATCH_SIZE — collapses + * N HTTP roundtrips to ceil(N / TRIGGER_BATCH_SIZE). Idempotency keys allow + * safe re-dispatch on retry without duplicating runs. + */ + let dispatched = 0 + for (let i = 0; i < jobPayloads.length; i += TRIGGER_BATCH_SIZE) { + const chunk = jobPayloads.slice(i, i + TRIGGER_BATCH_SIZE) + try { + await tasks.batchTrigger( + 'knowledge-process-document', + chunk.map((payload) => ({ + payload, + options: { + idempotencyKey: `doc-process-${payload.documentId}-${requestId}`, + tags: [ + `knowledgeBaseId:${payload.knowledgeBaseId}`, + `documentId:${payload.documentId}`, + ], + }, + })) + ) + dispatched += chunk.length + } catch (error) { + logger.error(`[${requestId}] Failed to batchTrigger ${chunk.length} document jobs`, { + error: getErrorMessage(error), + }) + if (dispatched === 0 && i + TRIGGER_BATCH_SIZE >= jobPayloads.length) { + throw new Error(`All ${jobPayloads.length} document processing dispatches failed`) + } + } + } + + logger.info( + `[${requestId}] Document dispatch complete: ${dispatched}/${jobPayloads.length} succeeded` + ) + return + } + const results = await Promise.allSettled( jobPayloads.map((payload) => dispatchDocumentProcessingJob(payload)) ) @@ -358,8 +400,6 @@ export async function processDocumentsWithQueue( if (failures.length === results.length) { throw new Error(`All ${failures.length} document processing dispatches failed`) } - - return } export async function processDocumentAsync( @@ -698,54 +738,6 @@ export function isTriggerAvailable(): boolean { return Boolean(env.TRIGGER_SECRET_KEY) && isTriggerDevEnabled } -async function processDocumentsWithTrigger( - documents: DocumentProcessingPayload[], - requestId: string -): Promise<{ success: boolean; message: string; batchIds?: string[] }> { - if (!isTriggerAvailable()) { - throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing') - } - - try { - logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`) - - const MAX_BATCH_SIZE = 1000 - const batchIds: string[] = [] - - for (let i = 0; i < documents.length; i += MAX_BATCH_SIZE) { - const chunk = documents.slice(i, i + MAX_BATCH_SIZE) - const batchResult = await tasks.batchTrigger( - 'knowledge-process-document', - chunk.map((doc) => ({ - payload: doc, - options: { - idempotencyKey: `doc-process-${doc.documentId}-${requestId}`, - tags: [`knowledgeBaseId:${doc.knowledgeBaseId}`, `documentId:${doc.documentId}`], - }, - })) - ) - batchIds.push(batchResult.batchId) - } - - logger.info( - `[${requestId}] Triggered ${documents.length} document processing jobs in ${batchIds.length} batch(es)` - ) - - return { - success: true, - message: `${documents.length} document processing jobs triggered`, - batchIds, - } - } catch (error) { - logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error) - - return { - success: false, - message: getErrorMessage(error, 'Failed to trigger background jobs'), - } - } -} - export async function createDocumentRecords( documents: Array<{ filename: string From b783c904e0ee824187ca1bb490895784acd8b9b6 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 13:24:54 -0700 Subject: [PATCH 2/5] refactor(knowledge): split dispatch helpers, drop dead trigger branch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use the canonical DocumentProcessingPayload from the task module instead of the duplicate DocumentJobData interface in service.ts - Pass typeof processDocumentTask as a generic to tasks.batchTrigger so the payload shape is type-checked against the task definition - Inline TRIGGER_BATCH_SIZE provenance (Trigger.dev SDK 4.3.1+ doc'd cap, we're on 4.4.3) - Split direct vs trigger dispatch into dispatchInProcess and dispatchViaBatchTrigger; collapse the all-failed throw into a single check on the combined dispatched counter - Remove dispatchDocumentProcessingJob — its trigger branch is no longer reachable now that batchTrigger handles the trigger path, and the direct branch is inlined --- apps/sim/lib/knowledge/documents/service.ts | 177 ++++++++++---------- 1 file changed, 90 insertions(+), 87 deletions(-) diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 06e0cb9b7d..7231001841 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -48,6 +48,10 @@ import { estimateTokenCount } from '@/lib/tokenization/estimators' import { deleteFile } from '@/lib/uploads/core/storage-service' import { extractStorageKey } from '@/lib/uploads/utils/file-utils' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' +import type { + DocumentProcessingPayload, + processDocument as processDocumentTask, +} from '@/background/knowledge-processing' import { calculateCost } from '@/providers/utils' const logger = createLogger('DocumentService') @@ -101,35 +105,6 @@ export interface ProcessingOptions { lang?: string } -interface DocumentJobData { - knowledgeBaseId: string - documentId: string - docData: { - filename: string - fileUrl: string - fileSize: number - mimeType: string - } - processingOptions: ProcessingOptions - requestId: string -} - -async function dispatchDocumentProcessingJob(payload: DocumentJobData): Promise { - if (isTriggerAvailable()) { - await tasks.trigger('knowledge-process-document', payload, { - tags: [`knowledgeBaseId:${payload.knowledgeBaseId}`, `documentId:${payload.documentId}`], - }) - return - } - - await processDocumentAsync( - payload.knowledgeBaseId, - payload.documentId, - payload.docData, - payload.processingOptions - ) -} - interface DocumentTagData { tagName: string fieldType: string @@ -313,17 +288,20 @@ async function processDocumentTags( return result } +/** + * Trigger.dev's documented per-call cap for `tasks.batchTrigger` is 1,000 + * items on SDK 4.3.1+ (we're on 4.4.3). Payloads above this are chunked. + * https://trigger.dev/docs/triggering + */ const TRIGGER_BATCH_SIZE = 1000 -export async function processDocumentsWithQueue( - createdDocuments: DocumentData[], +function buildJobPayload( + doc: DocumentData, knowledgeBaseId: string, processingOptions: ProcessingOptions, requestId: string -): Promise { - if (createdDocuments.length === 0) return - - const jobPayloads = createdDocuments.map((doc) => ({ +): DocumentProcessingPayload { + return { knowledgeBaseId, documentId: doc.documentId, docData: { @@ -334,7 +312,28 @@ export async function processDocumentsWithQueue( }, processingOptions, requestId, - })) + } +} + +/** + * Dispatches document processing jobs. On Trigger.dev, collapses N runs into + * `ceil(N / TRIGGER_BATCH_SIZE)` HTTP calls via `tasks.batchTrigger`. Without + * Trigger.dev, falls back to in-process execution via `processDocumentAsync`. + * + * Throws only when every dispatch fails — partial failures are logged. Stuck + * docs left in 'pending' are reaped by the next sync's stuck-doc retry pass. + */ +export async function processDocumentsWithQueue( + createdDocuments: DocumentData[], + knowledgeBaseId: string, + processingOptions: ProcessingOptions, + requestId: string +): Promise { + if (createdDocuments.length === 0) return + + const jobPayloads = createdDocuments.map((doc) => + buildJobPayload(doc, knowledgeBaseId, processingOptions, requestId) + ) const useTrigger = isTriggerAvailable() logger.info( @@ -342,64 +341,68 @@ export async function processDocumentsWithQueue( { backend: useTrigger ? 'trigger-dev' : 'direct' } ) - if (useTrigger) { - /** - * Single batched dispatch per chunk of up to TRIGGER_BATCH_SIZE — collapses - * N HTTP roundtrips to ceil(N / TRIGGER_BATCH_SIZE). Idempotency keys allow - * safe re-dispatch on retry without duplicating runs. - */ - let dispatched = 0 - for (let i = 0; i < jobPayloads.length; i += TRIGGER_BATCH_SIZE) { - const chunk = jobPayloads.slice(i, i + TRIGGER_BATCH_SIZE) - try { - await tasks.batchTrigger( - 'knowledge-process-document', - chunk.map((payload) => ({ - payload, - options: { - idempotencyKey: `doc-process-${payload.documentId}-${requestId}`, - tags: [ - `knowledgeBaseId:${payload.knowledgeBaseId}`, - `documentId:${payload.documentId}`, - ], - }, - })) - ) - dispatched += chunk.length - } catch (error) { - logger.error(`[${requestId}] Failed to batchTrigger ${chunk.length} document jobs`, { - error: getErrorMessage(error), - }) - if (dispatched === 0 && i + TRIGGER_BATCH_SIZE >= jobPayloads.length) { - throw new Error(`All ${jobPayloads.length} document processing dispatches failed`) - } - } - } - - logger.info( - `[${requestId}] Document dispatch complete: ${dispatched}/${jobPayloads.length} succeeded` - ) - return - } + const dispatched = useTrigger + ? await dispatchViaBatchTrigger(jobPayloads, requestId) + : await dispatchInProcess(jobPayloads) - const results = await Promise.allSettled( - jobPayloads.map((payload) => dispatchDocumentProcessingJob(payload)) + logger.info( + `[${requestId}] Document dispatch complete: ${dispatched}/${jobPayloads.length} succeeded` ) - const failures = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected') - if (failures.length > 0) { - logger.error(`[${requestId}] ${failures.length}/${results.length} document dispatches failed`, { - errors: failures.map((f) => getErrorMessage(f.reason)), - }) + if (dispatched === 0) { + throw new Error(`All ${jobPayloads.length} document processing dispatches failed`) } +} - logger.info( - `[${requestId}] Document dispatch complete: ${results.length - failures.length}/${results.length} succeeded` - ) +async function dispatchViaBatchTrigger( + jobPayloads: DocumentProcessingPayload[], + requestId: string +): Promise { + let dispatched = 0 + for (let i = 0; i < jobPayloads.length; i += TRIGGER_BATCH_SIZE) { + const chunk = jobPayloads.slice(i, i + TRIGGER_BATCH_SIZE) + try { + await tasks.batchTrigger( + 'knowledge-process-document', + chunk.map((payload) => ({ + payload, + options: { + /** + * Scoped to (documentId, requestId) so HTTP-level retries inside a + * single dispatch don't double-enqueue, while legitimate re-dispatch + * (e.g. stuck-doc retry on a later sync) gets a fresh requestId and + * is allowed through. + */ + idempotencyKey: `doc-process-${payload.documentId}-${requestId}`, + tags: [ + `knowledgeBaseId:${payload.knowledgeBaseId}`, + `documentId:${payload.documentId}`, + ], + }, + })) + ) + dispatched += chunk.length + } catch (error) { + logger.error(`[${requestId}] Failed to batchTrigger ${chunk.length} document jobs`, { + error: getErrorMessage(error), + }) + } + } + return dispatched +} - if (failures.length === results.length) { - throw new Error(`All ${failures.length} document processing dispatches failed`) +async function dispatchInProcess(jobPayloads: DocumentProcessingPayload[]): Promise { + const results = await Promise.allSettled( + jobPayloads.map((p) => + processDocumentAsync(p.knowledgeBaseId, p.documentId, p.docData, p.processingOptions) + ) + ) + let dispatched = 0 + for (const r of results) { + if (r.status === 'fulfilled') dispatched++ + else logger.error('Document dispatch failed', { error: getErrorMessage(r.reason) }) } + return dispatched } export async function processDocumentAsync( From a2cf1834f5eec226b43cd62ab2497d3078f4b842 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 13:25:47 -0700 Subject: [PATCH 3/5] improvement(knowledge): log Trigger.dev batchIds for audit trail tasks.batchTrigger returns a batchId per call. Collecting and logging them after dispatch makes it possible to look up or cancel batches in the Trigger.dev dashboard when investigating stuck or missing documents. --- apps/sim/lib/knowledge/documents/service.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 7231001841..5f90f87ee6 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -359,10 +359,11 @@ async function dispatchViaBatchTrigger( requestId: string ): Promise { let dispatched = 0 + const batchIds: string[] = [] for (let i = 0; i < jobPayloads.length; i += TRIGGER_BATCH_SIZE) { const chunk = jobPayloads.slice(i, i + TRIGGER_BATCH_SIZE) try { - await tasks.batchTrigger( + const result = await tasks.batchTrigger( 'knowledge-process-document', chunk.map((payload) => ({ payload, @@ -381,6 +382,7 @@ async function dispatchViaBatchTrigger( }, })) ) + batchIds.push(result.batchId) dispatched += chunk.length } catch (error) { logger.error(`[${requestId}] Failed to batchTrigger ${chunk.length} document jobs`, { @@ -388,6 +390,9 @@ async function dispatchViaBatchTrigger( }) } } + if (batchIds.length > 0) { + logger.info(`[${requestId}] Trigger.dev batches dispatched`, { batchIds }) + } return dispatched } From 3bc2515cbe9d881d223c494293fa2e1ce8c1229a Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 13:28:20 -0700 Subject: [PATCH 4/5] improvement(knowledge): thread requestId through direct dispatch logs Symmetry polish: dispatchInProcess now includes [requestId] in its error log so direct-mode failures are correlatable the same way trigger-mode failures already are. --- apps/sim/lib/knowledge/documents/service.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 5f90f87ee6..90667bc22d 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -343,7 +343,7 @@ export async function processDocumentsWithQueue( const dispatched = useTrigger ? await dispatchViaBatchTrigger(jobPayloads, requestId) - : await dispatchInProcess(jobPayloads) + : await dispatchInProcess(jobPayloads, requestId) logger.info( `[${requestId}] Document dispatch complete: ${dispatched}/${jobPayloads.length} succeeded` @@ -396,7 +396,10 @@ async function dispatchViaBatchTrigger( return dispatched } -async function dispatchInProcess(jobPayloads: DocumentProcessingPayload[]): Promise { +async function dispatchInProcess( + jobPayloads: DocumentProcessingPayload[], + requestId: string +): Promise { const results = await Promise.allSettled( jobPayloads.map((p) => processDocumentAsync(p.knowledgeBaseId, p.documentId, p.docData, p.processingOptions) @@ -405,7 +408,8 @@ async function dispatchInProcess(jobPayloads: DocumentProcessingPayload[]): Prom let dispatched = 0 for (const r of results) { if (r.status === 'fulfilled') dispatched++ - else logger.error('Document dispatch failed', { error: getErrorMessage(r.reason) }) + else + logger.error(`[${requestId}] Document dispatch failed`, { error: getErrorMessage(r.reason) }) } return dispatched } From b98429db7a9fb226b4bdb87188b9afcbe27a7ed7 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 13:31:43 -0700 Subject: [PATCH 5/5] improvement(knowledge): trim verbose comments Tightens TSDoc on processDocumentsWithQueue, TRIGGER_BATCH_SIZE, checkSyncLiveness, and the idempotency-key inline comment. --- .../lib/knowledge/connectors/sync-engine.ts | 5 +--- apps/sim/lib/knowledge/documents/service.ts | 23 +++++-------------- 2 files changed, 7 insertions(+), 21 deletions(-) diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index e6ec9bae96..848bc3de77 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -58,10 +58,7 @@ type DocOp = | { type: 'add'; extDoc: ExternalDocument } | { type: 'update'; existingId: string; extDoc: ExternalDocument } -/** - * Combined liveness check used between batches. One JOIN query checks both - * connector and knowledge base state in a single roundtrip. - */ +/** Single-roundtrip liveness check used between batches. */ async function checkSyncLiveness( connectorId: string, knowledgeBaseId: string diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 90667bc22d..be71c27971 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -288,11 +288,7 @@ async function processDocumentTags( return result } -/** - * Trigger.dev's documented per-call cap for `tasks.batchTrigger` is 1,000 - * items on SDK 4.3.1+ (we're on 4.4.3). Payloads above this are chunked. - * https://trigger.dev/docs/triggering - */ +/** Per-call cap for `tasks.batchTrigger` on Trigger.dev SDK 4.3.1+. */ const TRIGGER_BATCH_SIZE = 1000 function buildJobPayload( @@ -316,12 +312,9 @@ function buildJobPayload( } /** - * Dispatches document processing jobs. On Trigger.dev, collapses N runs into - * `ceil(N / TRIGGER_BATCH_SIZE)` HTTP calls via `tasks.batchTrigger`. Without - * Trigger.dev, falls back to in-process execution via `processDocumentAsync`. - * - * Throws only when every dispatch fails — partial failures are logged. Stuck - * docs left in 'pending' are reaped by the next sync's stuck-doc retry pass. + * Dispatches document processing jobs via Trigger.dev's `batchTrigger` when + * available, or in-process otherwise. Throws only when every dispatch fails; + * partial failures are logged and recovered by the next sync's stuck-doc pass. */ export async function processDocumentsWithQueue( createdDocuments: DocumentData[], @@ -368,12 +361,8 @@ async function dispatchViaBatchTrigger( chunk.map((payload) => ({ payload, options: { - /** - * Scoped to (documentId, requestId) so HTTP-level retries inside a - * single dispatch don't double-enqueue, while legitimate re-dispatch - * (e.g. stuck-doc retry on a later sync) gets a fresh requestId and - * is allowed through. - */ + // Scoped to (documentId, requestId): blocks intra-dispatch retries + // from double-enqueuing; later syncs use a fresh requestId. idempotencyKey: `doc-process-${payload.documentId}-${requestId}`, tags: [ `knowledgeBaseId:${payload.knowledgeBaseId}`,