diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 52577a2334e..49675608309 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -748,126 +748,6 @@ async function processDocumentsWithTrigger( } } -interface NewDocumentRow { - id: string - knowledgeBaseId: string - filename: string - fileUrl: string - fileSize: number - mimeType: string - chunkCount: number - tokenCount: number - characterCount: number - processingStatus: 'pending' - enabled: boolean - uploadedAt: Date - tag1: string | null - tag2: string | null - tag3: string | null - tag4: string | null - tag5: string | null - tag6: string | null - tag7: string | null - number1: number | null - number2: number | null - number3: number | null - number4: number | null - number5: number | null - date1: Date | null - date2: Date | null - boolean1: boolean | null - boolean2: boolean | null - boolean3: boolean | null -} - -/** - * Insert N document rows IF the parent knowledge base is still alive - * (`deleted_at IS NULL`) at the statement's MVCC snapshot. Returns the - * number of rows actually inserted. - * - * Knowledge bases are soft-deleted, so a normal FK can't catch a concurrent - * delete — the KB row physically remains. We do the existence check and the - * insert in a single statement via INSERT...SELECT...WHERE EXISTS, which - * Postgres evaluates atomically. No transaction or row lock required, no - * race window between check and insert. - * - * Returns 0 if the KB was soft-deleted; caller throws. - */ -async function insertDocumentsIfKbAlive( - rows: NewDocumentRow[], - knowledgeBaseId: string -): Promise { - if (rows.length === 0) return 0 - - // jsonb_to_recordset declares the column types once, so we don't need to - // cast every parameter individually to keep Postgres' type inference happy - // when nullable columns end up all-NULL across the batch. - const jsonRows = rows.map((d) => ({ - id: d.id, - knowledge_base_id: d.knowledgeBaseId, - filename: d.filename, - file_url: d.fileUrl, - file_size: d.fileSize, - mime_type: d.mimeType, - chunk_count: d.chunkCount, - token_count: d.tokenCount, - character_count: d.characterCount, - processing_status: d.processingStatus, - enabled: d.enabled, - uploaded_at: d.uploadedAt.toISOString(), - tag1: d.tag1, - tag2: d.tag2, - tag3: d.tag3, - tag4: d.tag4, - tag5: d.tag5, - tag6: d.tag6, - tag7: d.tag7, - number1: d.number1, - number2: d.number2, - number3: d.number3, - number4: d.number4, - number5: d.number5, - date1: d.date1?.toISOString() ?? null, - date2: d.date2?.toISOString() ?? null, - boolean1: d.boolean1, - boolean2: d.boolean2, - boolean3: d.boolean3, - })) - - const result = await db.execute(sql` - INSERT INTO document ( - id, knowledge_base_id, filename, file_url, file_size, mime_type, - chunk_count, token_count, character_count, processing_status, enabled, uploaded_at, - tag1, tag2, tag3, tag4, tag5, tag6, tag7, - number1, number2, number3, number4, number5, - date1, date2, - boolean1, boolean2, boolean3 - ) - SELECT - id, knowledge_base_id, filename, file_url, file_size, mime_type, - chunk_count, token_count, character_count, processing_status, enabled, uploaded_at, - tag1, tag2, tag3, tag4, tag5, tag6, tag7, - number1, number2, number3, number4, number5, - date1, date2, - boolean1, boolean2, boolean3 - FROM jsonb_to_recordset(${JSON.stringify(jsonRows)}::jsonb) AS x( - id text, knowledge_base_id text, filename text, file_url text, file_size integer, mime_type text, - chunk_count integer, token_count integer, character_count integer, processing_status text, enabled boolean, uploaded_at timestamp, - tag1 text, tag2 text, tag3 text, tag4 text, tag5 text, tag6 text, tag7 text, - number1 double precision, number2 double precision, number3 double precision, number4 double precision, number5 double precision, - date1 timestamp, date2 timestamp, - boolean1 boolean, boolean2 boolean, boolean3 boolean - ) - WHERE EXISTS ( - SELECT 1 FROM knowledge_base - WHERE id = ${knowledgeBaseId} AND deleted_at IS NULL - ) - RETURNING id - `) - - return Array.from(result).length -} - export async function createDocumentRecords( documents: Array<{ filename: string @@ -886,102 +766,99 @@ export async function createDocumentRecords( knowledgeBaseId: string, requestId: string ): Promise { - // Cheap upfront existence check so the common KB-not-found path fails fast - // before we burn CPU on tag processing. The atomic insert below is the - // race-safe guard against a concurrent KB soft-delete in the small window - // between this check and the insert. - const kb = await db - .select({ id: knowledgeBase.id }) - .from(knowledgeBase) - .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) - .limit(1) + return await db.transaction(async (tx) => { + await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`) - if (kb.length === 0) { - throw new Error('Knowledge base not found') - } + const kb = await tx + .select({ id: knowledgeBase.id }) + .from(knowledgeBase) + .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) + .limit(1) - const now = new Date() - const documentRecords: NewDocumentRow[] = [] - const returnData: DocumentData[] = [] + if (kb.length === 0) { + throw new Error('Knowledge base not found') + } - for (const docData of documents) { - const documentId = generateId() + const now = new Date() + const documentRecords = [] + const returnData: DocumentData[] = [] - let processedTags: Partial = {} + for (const docData of documents) { + const documentId = generateId() - if (docData.documentTagsData) { - try { - const tagData = JSON.parse(docData.documentTagsData) - if (Array.isArray(tagData)) { - processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId) - } - } catch (error) { - if (error instanceof SyntaxError) { - logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error) - } else { - throw error + let processedTags: Partial = {} + + if (docData.documentTagsData) { + try { + const tagData = JSON.parse(docData.documentTagsData) + if (Array.isArray(tagData)) { + processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId) + } + } catch (error) { + if (error instanceof SyntaxError) { + logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error) + } else { + throw error + } } } - } - const newDocument = { - id: documentId, - knowledgeBaseId, - filename: docData.filename, - fileUrl: docData.fileUrl, - fileSize: docData.fileSize, - mimeType: docData.mimeType, - chunkCount: 0, - tokenCount: 0, - characterCount: 0, - processingStatus: 'pending' as const, - enabled: true, - uploadedAt: now, - tag1: processedTags.tag1 ?? docData.tag1 ?? null, - tag2: processedTags.tag2 ?? docData.tag2 ?? null, - tag3: processedTags.tag3 ?? docData.tag3 ?? null, - tag4: processedTags.tag4 ?? docData.tag4 ?? null, - tag5: processedTags.tag5 ?? docData.tag5 ?? null, - tag6: processedTags.tag6 ?? docData.tag6 ?? null, - tag7: processedTags.tag7 ?? docData.tag7 ?? null, - number1: processedTags.number1 ?? null, - number2: processedTags.number2 ?? null, - number3: processedTags.number3 ?? null, - number4: processedTags.number4 ?? null, - number5: processedTags.number5 ?? null, - date1: processedTags.date1 ?? null, - date2: processedTags.date2 ?? null, - boolean1: processedTags.boolean1 ?? null, - boolean2: processedTags.boolean2 ?? null, - boolean3: processedTags.boolean3 ?? null, + const newDocument = { + id: documentId, + knowledgeBaseId, + filename: docData.filename, + fileUrl: docData.fileUrl, + fileSize: docData.fileSize, + mimeType: docData.mimeType, + chunkCount: 0, + tokenCount: 0, + characterCount: 0, + processingStatus: 'pending' as const, + enabled: true, + uploadedAt: now, + tag1: processedTags.tag1 ?? docData.tag1 ?? null, + tag2: processedTags.tag2 ?? docData.tag2 ?? null, + tag3: processedTags.tag3 ?? docData.tag3 ?? null, + tag4: processedTags.tag4 ?? docData.tag4 ?? null, + tag5: processedTags.tag5 ?? docData.tag5 ?? null, + tag6: processedTags.tag6 ?? docData.tag6 ?? null, + tag7: processedTags.tag7 ?? docData.tag7 ?? null, + number1: processedTags.number1 ?? null, + number2: processedTags.number2 ?? null, + number3: processedTags.number3 ?? null, + number4: processedTags.number4 ?? null, + number5: processedTags.number5 ?? null, + date1: processedTags.date1 ?? null, + date2: processedTags.date2 ?? null, + boolean1: processedTags.boolean1 ?? null, + boolean2: processedTags.boolean2 ?? null, + boolean3: processedTags.boolean3 ?? null, + } + + documentRecords.push(newDocument) + returnData.push({ + documentId, + filename: docData.filename, + fileUrl: docData.fileUrl, + fileSize: docData.fileSize, + mimeType: docData.mimeType, + }) } - documentRecords.push(newDocument) - returnData.push({ - documentId, - filename: docData.filename, - fileUrl: docData.fileUrl, - fileSize: docData.fileSize, - mimeType: docData.mimeType, - }) - } + if (documentRecords.length > 0) { + await tx.insert(document).values(documentRecords) + logger.info( + `[${requestId}] Bulk created ${documentRecords.length} document records in knowledge base ${knowledgeBaseId}` + ) - if (documentRecords.length > 0) { - const insertedCount = await insertDocumentsIfKbAlive(documentRecords, knowledgeBaseId) - if (insertedCount === 0) { - throw new Error('Knowledge base not found') + await tx + .update(knowledgeBase) + .set({ updatedAt: now }) + .where(eq(knowledgeBase.id, knowledgeBaseId)) } - logger.info( - `[${requestId}] Bulk created ${insertedCount} document records in knowledge base ${knowledgeBaseId}` - ) - - await db - .update(knowledgeBase) - .set({ updatedAt: now }) - .where(eq(knowledgeBase.id, knowledgeBaseId)) - } - return returnData + return returnData + }) } export interface TagFilterCondition { @@ -1420,7 +1297,7 @@ export async function createSingleDocument( } } - const newDocument: NewDocumentRow = { + const newDocument = { id: documentId, knowledgeBaseId, filename: documentData.filename, @@ -1430,21 +1307,31 @@ export async function createSingleDocument( chunkCount: 0, tokenCount: 0, characterCount: 0, - processingStatus: 'pending', enabled: true, uploadedAt: now, ...processedTags, } - const insertedCount = await insertDocumentsIfKbAlive([newDocument], knowledgeBaseId) - if (insertedCount === 0) { - throw new Error('Knowledge base not found') - } + await db.transaction(async (tx) => { + await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`) - await db - .update(knowledgeBase) - .set({ updatedAt: now }) - .where(eq(knowledgeBase.id, knowledgeBaseId)) + const kb = await tx + .select({ id: knowledgeBase.id }) + .from(knowledgeBase) + .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) + .limit(1) + + if (kb.length === 0) { + throw new Error('Knowledge base not found') + } + + await tx.insert(document).values(newDocument) + + await tx + .update(knowledgeBase) + .set({ updatedAt: now }) + .where(eq(knowledgeBase.id, knowledgeBaseId)) + }) logger.info(`[${requestId}] Document created: ${documentId} in knowledge base ${knowledgeBaseId}`) return newDocument as { diff --git a/apps/sim/lib/workspaces/lifecycle.test.ts b/apps/sim/lib/workspaces/lifecycle.test.ts index d165472830a..070b9c4ff25 100644 --- a/apps/sim/lib/workspaces/lifecycle.test.ts +++ b/apps/sim/lib/workspaces/lifecycle.test.ts @@ -55,7 +55,6 @@ describe('workspace lifecycle', () => { }) const tx = { - execute: vi.fn().mockResolvedValue([]), select: vi.fn().mockReturnValue({ from: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue([{ id: 'kb-1' }]), diff --git a/apps/sim/lib/workspaces/lifecycle.ts b/apps/sim/lib/workspaces/lifecycle.ts index 975529d27dc..b0a2b0d6161 100644 --- a/apps/sim/lib/workspaces/lifecycle.ts +++ b/apps/sim/lib/workspaces/lifecycle.ts @@ -49,13 +49,6 @@ export async function archiveWorkspace( .where(eq(workflowMcpServer.workspaceId, workspaceId)) await db.transaction(async (tx) => { - // Workspace archival is a rare admin/cleanup operation that touches every - // child table; on large workspaces it can exceed the 30s session default. - // Override per-tx with a generous ceiling — if it ever runs longer than - // this something is genuinely wrong. - await tx.execute(sql`SET LOCAL statement_timeout = '5min'`) - await tx.execute(sql`SET LOCAL lock_timeout = '30s'`) - await tx .update(knowledgeBase) .set({ diff --git a/packages/db/db.ts b/packages/db/db.ts index e56eaa004f0..9e5597fb57b 100644 --- a/packages/db/db.ts +++ b/packages/db/db.ts @@ -13,14 +13,6 @@ const postgresClient = postgres(connectionString, { connect_timeout: 30, max: 15, onnotice: () => {}, - // Server-side guards. lock_timeout cancels a query waiting on a row lock for - // >5s (e.g. another tx holding `SELECT ... FOR UPDATE`). statement_timeout - // cancels any query running >30s. Heavy paths that legitimately need longer - // (table service bulk JSONB rewrites) override per-tx with `SET LOCAL`. - connection: { - lock_timeout: 5_000, - statement_timeout: 30_000, - }, }) export const db = drizzle(postgresClient, { schema })