diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index 7da40aebd1..848bc3de77 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -58,22 +58,30 @@ type DocOp = | { type: 'add'; extDoc: ExternalDocument } | { type: 'update'; existingId: string; extDoc: ExternalDocument } -async function isConnectorDeleted(connectorId: string): Promise { +/** Single-roundtrip liveness check used between batches. */ +async function checkSyncLiveness( + connectorId: string, + knowledgeBaseId: string +): Promise<{ connectorDeleted: boolean; knowledgeBaseDeleted: boolean }> { const rows = await db - .select({ archivedAt: knowledgeConnector.archivedAt, deletedAt: knowledgeConnector.deletedAt }) + .select({ + connectorArchivedAt: knowledgeConnector.archivedAt, + connectorDeletedAt: knowledgeConnector.deletedAt, + kbDeletedAt: knowledgeBase.deletedAt, + }) .from(knowledgeConnector) - .where(eq(knowledgeConnector.id, connectorId)) + .innerJoin(knowledgeBase, eq(knowledgeBase.id, knowledgeConnector.knowledgeBaseId)) + .where(and(eq(knowledgeConnector.id, connectorId), eq(knowledgeBase.id, knowledgeBaseId))) .limit(1) - return rows.length === 0 || rows[0].archivedAt !== null || rows[0].deletedAt !== null -} -async function isKnowledgeBaseDeleted(knowledgeBaseId: string): Promise { - const rows = await db - .select({ deletedAt: knowledgeBase.deletedAt }) - .from(knowledgeBase) - .where(eq(knowledgeBase.id, knowledgeBaseId)) - .limit(1) - return rows.length === 0 || rows[0].deletedAt !== null + if (rows.length === 0) { + return { connectorDeleted: true, knowledgeBaseDeleted: true } + } + const row = rows[0] + return { + connectorDeleted: row.connectorArchivedAt !== null || row.connectorDeletedAt !== null, + knowledgeBaseDeleted: row.kbDeletedAt !== null, + } } async function isKnowledgeBaseActiveInTx( @@ -502,10 +510,11 @@ export async function executeSync( } for (let i = 0; i < pendingOps.length; i += SYNC_BATCH_SIZE) { - if (await isConnectorDeleted(connectorId)) { + const liveness = await checkSyncLiveness(connectorId, connector.knowledgeBaseId) + if (liveness.connectorDeleted) { throw new ConnectorDeletedException(connectorId) } - if (await isKnowledgeBaseDeleted(connector.knowledgeBaseId)) { + if (liveness.knowledgeBaseDeleted) { throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`) } @@ -642,11 +651,12 @@ export async function executeSync( } } - // Check if connector was deleted before retrying stuck documents - if (await isConnectorDeleted(connectorId)) { + // Check if connector/KB were deleted before retrying stuck documents + const postBatchLiveness = await checkSyncLiveness(connectorId, connector.knowledgeBaseId) + if (postBatchLiveness.connectorDeleted) { throw new ConnectorDeletedException(connectorId) } - if (await isKnowledgeBaseDeleted(connector.knowledgeBaseId)) { + if (postBatchLiveness.knowledgeBaseDeleted) { throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`) } @@ -881,9 +891,6 @@ async function addDocument( extDoc: ExternalDocument, sourceConfig?: Record ): Promise { - if (await isKnowledgeBaseDeleted(knowledgeBaseId)) { - throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`) - } const documentId = generateId() const contentBuffer = Buffer.from(extDoc.content, 'utf-8') const safeTitle = sanitizeStorageTitle(extDoc.title) @@ -963,9 +970,6 @@ async function updateDocument( extDoc: ExternalDocument, sourceConfig?: Record ): Promise { - if (await isKnowledgeBaseDeleted(knowledgeBaseId)) { - throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`) - } // Fetch old file URL before uploading replacement const existingRows = await db .select({ fileUrl: document.fileUrl }) diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 204b517911..be71c27971 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -48,7 +48,10 @@ import { estimateTokenCount } from '@/lib/tokenization/estimators' import { deleteFile } from '@/lib/uploads/core/storage-service' import { extractStorageKey } from '@/lib/uploads/utils/file-utils' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' -import type { DocumentProcessingPayload } from '@/background/knowledge-processing' +import type { + DocumentProcessingPayload, + processDocument as processDocumentTask, +} from '@/background/knowledge-processing' import { calculateCost } from '@/providers/utils' const logger = createLogger('DocumentService') @@ -102,35 +105,6 @@ export interface ProcessingOptions { lang?: string } -interface DocumentJobData { - knowledgeBaseId: string - documentId: string - docData: { - filename: string - fileUrl: string - fileSize: number - mimeType: string - } - processingOptions: ProcessingOptions - requestId: string -} - -async function dispatchDocumentProcessingJob(payload: DocumentJobData): Promise { - if (isTriggerAvailable()) { - await tasks.trigger('knowledge-process-document', payload, { - tags: [`knowledgeBaseId:${payload.knowledgeBaseId}`, `documentId:${payload.documentId}`], - }) - return - } - - await processDocumentAsync( - payload.knowledgeBaseId, - payload.documentId, - payload.docData, - payload.processingOptions - ) -} - interface DocumentTagData { tagName: string fieldType: string @@ -314,13 +288,16 @@ async function processDocumentTags( return result } -export async function processDocumentsWithQueue( - createdDocuments: DocumentData[], +/** Per-call cap for `tasks.batchTrigger` on Trigger.dev SDK 4.3.1+. */ +const TRIGGER_BATCH_SIZE = 1000 + +function buildJobPayload( + doc: DocumentData, knowledgeBaseId: string, processingOptions: ProcessingOptions, requestId: string -): Promise { - const jobPayloads = createdDocuments.map((doc) => ({ +): DocumentProcessingPayload { + return { knowledgeBaseId, documentId: doc.documentId, docData: { @@ -331,35 +308,99 @@ export async function processDocumentsWithQueue( }, processingOptions, requestId, - })) + } +} - logger.info( - `[${requestId}] Dispatching background processing for ${jobPayloads.length} documents`, - { - backend: isTriggerAvailable() ? 'trigger-dev' : 'direct', - } +/** + * Dispatches document processing jobs via Trigger.dev's `batchTrigger` when + * available, or in-process otherwise. Throws only when every dispatch fails; + * partial failures are logged and recovered by the next sync's stuck-doc pass. + */ +export async function processDocumentsWithQueue( + createdDocuments: DocumentData[], + knowledgeBaseId: string, + processingOptions: ProcessingOptions, + requestId: string +): Promise { + if (createdDocuments.length === 0) return + + const jobPayloads = createdDocuments.map((doc) => + buildJobPayload(doc, knowledgeBaseId, processingOptions, requestId) ) - const results = await Promise.allSettled( - jobPayloads.map((payload) => dispatchDocumentProcessingJob(payload)) + const useTrigger = isTriggerAvailable() + logger.info( + `[${requestId}] Dispatching background processing for ${jobPayloads.length} documents`, + { backend: useTrigger ? 'trigger-dev' : 'direct' } ) - const failures = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected') - if (failures.length > 0) { - logger.error(`[${requestId}] ${failures.length}/${results.length} document dispatches failed`, { - errors: failures.map((f) => getErrorMessage(f.reason)), - }) - } + const dispatched = useTrigger + ? await dispatchViaBatchTrigger(jobPayloads, requestId) + : await dispatchInProcess(jobPayloads, requestId) logger.info( - `[${requestId}] Document dispatch complete: ${results.length - failures.length}/${results.length} succeeded` + `[${requestId}] Document dispatch complete: ${dispatched}/${jobPayloads.length} succeeded` ) - if (failures.length === results.length) { - throw new Error(`All ${failures.length} document processing dispatches failed`) + if (dispatched === 0) { + throw new Error(`All ${jobPayloads.length} document processing dispatches failed`) + } +} + +async function dispatchViaBatchTrigger( + jobPayloads: DocumentProcessingPayload[], + requestId: string +): Promise { + let dispatched = 0 + const batchIds: string[] = [] + for (let i = 0; i < jobPayloads.length; i += TRIGGER_BATCH_SIZE) { + const chunk = jobPayloads.slice(i, i + TRIGGER_BATCH_SIZE) + try { + const result = await tasks.batchTrigger( + 'knowledge-process-document', + chunk.map((payload) => ({ + payload, + options: { + // Scoped to (documentId, requestId): blocks intra-dispatch retries + // from double-enqueuing; later syncs use a fresh requestId. + idempotencyKey: `doc-process-${payload.documentId}-${requestId}`, + tags: [ + `knowledgeBaseId:${payload.knowledgeBaseId}`, + `documentId:${payload.documentId}`, + ], + }, + })) + ) + batchIds.push(result.batchId) + dispatched += chunk.length + } catch (error) { + logger.error(`[${requestId}] Failed to batchTrigger ${chunk.length} document jobs`, { + error: getErrorMessage(error), + }) + } + } + if (batchIds.length > 0) { + logger.info(`[${requestId}] Trigger.dev batches dispatched`, { batchIds }) } + return dispatched +} - return +async function dispatchInProcess( + jobPayloads: DocumentProcessingPayload[], + requestId: string +): Promise { + const results = await Promise.allSettled( + jobPayloads.map((p) => + processDocumentAsync(p.knowledgeBaseId, p.documentId, p.docData, p.processingOptions) + ) + ) + let dispatched = 0 + for (const r of results) { + if (r.status === 'fulfilled') dispatched++ + else + logger.error(`[${requestId}] Document dispatch failed`, { error: getErrorMessage(r.reason) }) + } + return dispatched } export async function processDocumentAsync( @@ -698,54 +739,6 @@ export function isTriggerAvailable(): boolean { return Boolean(env.TRIGGER_SECRET_KEY) && isTriggerDevEnabled } -async function processDocumentsWithTrigger( - documents: DocumentProcessingPayload[], - requestId: string -): Promise<{ success: boolean; message: string; batchIds?: string[] }> { - if (!isTriggerAvailable()) { - throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing') - } - - try { - logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`) - - const MAX_BATCH_SIZE = 1000 - const batchIds: string[] = [] - - for (let i = 0; i < documents.length; i += MAX_BATCH_SIZE) { - const chunk = documents.slice(i, i + MAX_BATCH_SIZE) - const batchResult = await tasks.batchTrigger( - 'knowledge-process-document', - chunk.map((doc) => ({ - payload: doc, - options: { - idempotencyKey: `doc-process-${doc.documentId}-${requestId}`, - tags: [`knowledgeBaseId:${doc.knowledgeBaseId}`, `documentId:${doc.documentId}`], - }, - })) - ) - batchIds.push(batchResult.batchId) - } - - logger.info( - `[${requestId}] Triggered ${documents.length} document processing jobs in ${batchIds.length} batch(es)` - ) - - return { - success: true, - message: `${documents.length} document processing jobs triggered`, - batchIds, - } - } catch (error) { - logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error) - - return { - success: false, - message: getErrorMessage(error, 'Failed to trigger background jobs'), - } - } -} - export async function createDocumentRecords( documents: Array<{ filename: string