-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Revert "improvement(db): add session statement/lock timeouts; simplif… #4599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -748,126 +748,6 @@ async function processDocumentsWithTrigger( | |
| } | ||
| } | ||
|
|
||
| interface NewDocumentRow { | ||
| id: string | ||
| knowledgeBaseId: string | ||
| filename: string | ||
| fileUrl: string | ||
| fileSize: number | ||
| mimeType: string | ||
| chunkCount: number | ||
| tokenCount: number | ||
| characterCount: number | ||
| processingStatus: 'pending' | ||
| enabled: boolean | ||
| uploadedAt: Date | ||
| tag1: string | null | ||
| tag2: string | null | ||
| tag3: string | null | ||
| tag4: string | null | ||
| tag5: string | null | ||
| tag6: string | null | ||
| tag7: string | null | ||
| number1: number | null | ||
| number2: number | null | ||
| number3: number | null | ||
| number4: number | null | ||
| number5: number | null | ||
| date1: Date | null | ||
| date2: Date | null | ||
| boolean1: boolean | null | ||
| boolean2: boolean | null | ||
| boolean3: boolean | null | ||
| } | ||
|
|
||
| /** | ||
| * Insert N document rows IF the parent knowledge base is still alive | ||
| * (`deleted_at IS NULL`) at the statement's MVCC snapshot. Returns the | ||
| * number of rows actually inserted. | ||
| * | ||
| * Knowledge bases are soft-deleted, so a normal FK can't catch a concurrent | ||
| * delete — the KB row physically remains. We do the existence check and the | ||
| * insert in a single statement via INSERT...SELECT...WHERE EXISTS, which | ||
| * Postgres evaluates atomically. No transaction or row lock required, no | ||
| * race window between check and insert. | ||
| * | ||
| * Returns 0 if the KB was soft-deleted; caller throws. | ||
| */ | ||
| async function insertDocumentsIfKbAlive( | ||
| rows: NewDocumentRow[], | ||
| knowledgeBaseId: string | ||
| ): Promise<number> { | ||
| if (rows.length === 0) return 0 | ||
|
|
||
| // jsonb_to_recordset declares the column types once, so we don't need to | ||
| // cast every parameter individually to keep Postgres' type inference happy | ||
| // when nullable columns end up all-NULL across the batch. | ||
| const jsonRows = rows.map((d) => ({ | ||
| id: d.id, | ||
| knowledge_base_id: d.knowledgeBaseId, | ||
| filename: d.filename, | ||
| file_url: d.fileUrl, | ||
| file_size: d.fileSize, | ||
| mime_type: d.mimeType, | ||
| chunk_count: d.chunkCount, | ||
| token_count: d.tokenCount, | ||
| character_count: d.characterCount, | ||
| processing_status: d.processingStatus, | ||
| enabled: d.enabled, | ||
| uploaded_at: d.uploadedAt.toISOString(), | ||
| tag1: d.tag1, | ||
| tag2: d.tag2, | ||
| tag3: d.tag3, | ||
| tag4: d.tag4, | ||
| tag5: d.tag5, | ||
| tag6: d.tag6, | ||
| tag7: d.tag7, | ||
| number1: d.number1, | ||
| number2: d.number2, | ||
| number3: d.number3, | ||
| number4: d.number4, | ||
| number5: d.number5, | ||
| date1: d.date1?.toISOString() ?? null, | ||
| date2: d.date2?.toISOString() ?? null, | ||
| boolean1: d.boolean1, | ||
| boolean2: d.boolean2, | ||
| boolean3: d.boolean3, | ||
| })) | ||
|
|
||
| const result = await db.execute(sql` | ||
| INSERT INTO document ( | ||
| id, knowledge_base_id, filename, file_url, file_size, mime_type, | ||
| chunk_count, token_count, character_count, processing_status, enabled, uploaded_at, | ||
| tag1, tag2, tag3, tag4, tag5, tag6, tag7, | ||
| number1, number2, number3, number4, number5, | ||
| date1, date2, | ||
| boolean1, boolean2, boolean3 | ||
| ) | ||
| SELECT | ||
| id, knowledge_base_id, filename, file_url, file_size, mime_type, | ||
| chunk_count, token_count, character_count, processing_status, enabled, uploaded_at, | ||
| tag1, tag2, tag3, tag4, tag5, tag6, tag7, | ||
| number1, number2, number3, number4, number5, | ||
| date1, date2, | ||
| boolean1, boolean2, boolean3 | ||
| FROM jsonb_to_recordset(${JSON.stringify(jsonRows)}::jsonb) AS x( | ||
| id text, knowledge_base_id text, filename text, file_url text, file_size integer, mime_type text, | ||
| chunk_count integer, token_count integer, character_count integer, processing_status text, enabled boolean, uploaded_at timestamp, | ||
| tag1 text, tag2 text, tag3 text, tag4 text, tag5 text, tag6 text, tag7 text, | ||
| number1 double precision, number2 double precision, number3 double precision, number4 double precision, number5 double precision, | ||
| date1 timestamp, date2 timestamp, | ||
| boolean1 boolean, boolean2 boolean, boolean3 boolean | ||
| ) | ||
| WHERE EXISTS ( | ||
| SELECT 1 FROM knowledge_base | ||
| WHERE id = ${knowledgeBaseId} AND deleted_at IS NULL | ||
| ) | ||
| RETURNING id | ||
| `) | ||
|
|
||
| return Array.from(result).length | ||
| } | ||
|
|
||
| export async function createDocumentRecords( | ||
| documents: Array<{ | ||
| filename: string | ||
|
|
@@ -886,102 +766,99 @@ export async function createDocumentRecords( | |
| knowledgeBaseId: string, | ||
| requestId: string | ||
| ): Promise<DocumentData[]> { | ||
| // Cheap upfront existence check so the common KB-not-found path fails fast | ||
| // before we burn CPU on tag processing. The atomic insert below is the | ||
| // race-safe guard against a concurrent KB soft-delete in the small window | ||
| // between this check and the insert. | ||
| const kb = await db | ||
| .select({ id: knowledgeBase.id }) | ||
| .from(knowledgeBase) | ||
| .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) | ||
| .limit(1) | ||
| return await db.transaction(async (tx) => { | ||
| await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`) | ||
|
|
||
| if (kb.length === 0) { | ||
| throw new Error('Knowledge base not found') | ||
| } | ||
| const kb = await tx | ||
| .select({ id: knowledgeBase.id }) | ||
| .from(knowledgeBase) | ||
| .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) | ||
| .limit(1) | ||
|
|
||
| const now = new Date() | ||
| const documentRecords: NewDocumentRow[] = [] | ||
| const returnData: DocumentData[] = [] | ||
| if (kb.length === 0) { | ||
| throw new Error('Knowledge base not found') | ||
| } | ||
|
|
||
| for (const docData of documents) { | ||
| const documentId = generateId() | ||
| const now = new Date() | ||
| const documentRecords = [] | ||
| const returnData: DocumentData[] = [] | ||
|
|
||
| let processedTags: Partial<ProcessedDocumentTags> = {} | ||
| for (const docData of documents) { | ||
| const documentId = generateId() | ||
|
|
||
| if (docData.documentTagsData) { | ||
| try { | ||
| const tagData = JSON.parse(docData.documentTagsData) | ||
| if (Array.isArray(tagData)) { | ||
| processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId) | ||
| } | ||
| } catch (error) { | ||
| if (error instanceof SyntaxError) { | ||
| logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error) | ||
| } else { | ||
| throw error | ||
| let processedTags: Partial<ProcessedDocumentTags> = {} | ||
|
|
||
| if (docData.documentTagsData) { | ||
| try { | ||
| const tagData = JSON.parse(docData.documentTagsData) | ||
| if (Array.isArray(tagData)) { | ||
| processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId) | ||
| } | ||
| } catch (error) { | ||
| if (error instanceof SyntaxError) { | ||
| logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error) | ||
| } else { | ||
| throw error | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| const newDocument = { | ||
| id: documentId, | ||
| knowledgeBaseId, | ||
| filename: docData.filename, | ||
| fileUrl: docData.fileUrl, | ||
| fileSize: docData.fileSize, | ||
| mimeType: docData.mimeType, | ||
| chunkCount: 0, | ||
| tokenCount: 0, | ||
| characterCount: 0, | ||
| processingStatus: 'pending' as const, | ||
| enabled: true, | ||
| uploadedAt: now, | ||
| tag1: processedTags.tag1 ?? docData.tag1 ?? null, | ||
| tag2: processedTags.tag2 ?? docData.tag2 ?? null, | ||
| tag3: processedTags.tag3 ?? docData.tag3 ?? null, | ||
| tag4: processedTags.tag4 ?? docData.tag4 ?? null, | ||
| tag5: processedTags.tag5 ?? docData.tag5 ?? null, | ||
| tag6: processedTags.tag6 ?? docData.tag6 ?? null, | ||
| tag7: processedTags.tag7 ?? docData.tag7 ?? null, | ||
| number1: processedTags.number1 ?? null, | ||
| number2: processedTags.number2 ?? null, | ||
| number3: processedTags.number3 ?? null, | ||
| number4: processedTags.number4 ?? null, | ||
| number5: processedTags.number5 ?? null, | ||
| date1: processedTags.date1 ?? null, | ||
| date2: processedTags.date2 ?? null, | ||
| boolean1: processedTags.boolean1 ?? null, | ||
| boolean2: processedTags.boolean2 ?? null, | ||
| boolean3: processedTags.boolean3 ?? null, | ||
| const newDocument = { | ||
| id: documentId, | ||
| knowledgeBaseId, | ||
| filename: docData.filename, | ||
| fileUrl: docData.fileUrl, | ||
| fileSize: docData.fileSize, | ||
| mimeType: docData.mimeType, | ||
| chunkCount: 0, | ||
| tokenCount: 0, | ||
| characterCount: 0, | ||
| processingStatus: 'pending' as const, | ||
| enabled: true, | ||
| uploadedAt: now, | ||
| tag1: processedTags.tag1 ?? docData.tag1 ?? null, | ||
| tag2: processedTags.tag2 ?? docData.tag2 ?? null, | ||
| tag3: processedTags.tag3 ?? docData.tag3 ?? null, | ||
| tag4: processedTags.tag4 ?? docData.tag4 ?? null, | ||
| tag5: processedTags.tag5 ?? docData.tag5 ?? null, | ||
| tag6: processedTags.tag6 ?? docData.tag6 ?? null, | ||
| tag7: processedTags.tag7 ?? docData.tag7 ?? null, | ||
| number1: processedTags.number1 ?? null, | ||
| number2: processedTags.number2 ?? null, | ||
| number3: processedTags.number3 ?? null, | ||
| number4: processedTags.number4 ?? null, | ||
| number5: processedTags.number5 ?? null, | ||
| date1: processedTags.date1 ?? null, | ||
| date2: processedTags.date2 ?? null, | ||
| boolean1: processedTags.boolean1 ?? null, | ||
| boolean2: processedTags.boolean2 ?? null, | ||
| boolean3: processedTags.boolean3 ?? null, | ||
| } | ||
|
|
||
| documentRecords.push(newDocument) | ||
| returnData.push({ | ||
| documentId, | ||
| filename: docData.filename, | ||
| fileUrl: docData.fileUrl, | ||
| fileSize: docData.fileSize, | ||
| mimeType: docData.mimeType, | ||
| }) | ||
| } | ||
|
|
||
| documentRecords.push(newDocument) | ||
| returnData.push({ | ||
| documentId, | ||
| filename: docData.filename, | ||
| fileUrl: docData.fileUrl, | ||
| fileSize: docData.fileSize, | ||
| mimeType: docData.mimeType, | ||
| }) | ||
| } | ||
| if (documentRecords.length > 0) { | ||
| await tx.insert(document).values(documentRecords) | ||
| logger.info( | ||
| `[${requestId}] Bulk created ${documentRecords.length} document records in knowledge base ${knowledgeBaseId}` | ||
| ) | ||
|
|
||
| if (documentRecords.length > 0) { | ||
| const insertedCount = await insertDocumentsIfKbAlive(documentRecords, knowledgeBaseId) | ||
| if (insertedCount === 0) { | ||
| throw new Error('Knowledge base not found') | ||
| await tx | ||
| .update(knowledgeBase) | ||
| .set({ updatedAt: now }) | ||
| .where(eq(knowledgeBase.id, knowledgeBaseId)) | ||
| } | ||
| logger.info( | ||
| `[${requestId}] Bulk created ${insertedCount} document records in knowledge base ${knowledgeBaseId}` | ||
| ) | ||
|
|
||
| await db | ||
| .update(knowledgeBase) | ||
| .set({ updatedAt: now }) | ||
| .where(eq(knowledgeBase.id, knowledgeBaseId)) | ||
| } | ||
|
|
||
| return returnData | ||
| return returnData | ||
| }) | ||
| } | ||
|
|
||
| export interface TagFilterCondition { | ||
|
|
@@ -1420,7 +1297,7 @@ export async function createSingleDocument( | |
| } | ||
| } | ||
|
|
||
| const newDocument: NewDocumentRow = { | ||
| const newDocument = { | ||
| id: documentId, | ||
| knowledgeBaseId, | ||
| filename: documentData.filename, | ||
|
|
@@ -1430,21 +1307,31 @@ export async function createSingleDocument( | |
| chunkCount: 0, | ||
| tokenCount: 0, | ||
| characterCount: 0, | ||
| processingStatus: 'pending', | ||
| enabled: true, | ||
| uploadedAt: now, | ||
| ...processedTags, | ||
| } | ||
|
Comment on lines
1300
to
1313
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| const insertedCount = await insertDocumentsIfKbAlive([newDocument], knowledgeBaseId) | ||
| if (insertedCount === 0) { | ||
| throw new Error('Knowledge base not found') | ||
| } | ||
| await db.transaction(async (tx) => { | ||
| await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`) | ||
|
|
||
| await db | ||
| .update(knowledgeBase) | ||
| .set({ updatedAt: now }) | ||
| .where(eq(knowledgeBase.id, knowledgeBaseId)) | ||
| const kb = await tx | ||
| .select({ id: knowledgeBase.id }) | ||
| .from(knowledgeBase) | ||
| .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) | ||
| .limit(1) | ||
|
|
||
| if (kb.length === 0) { | ||
| throw new Error('Knowledge base not found') | ||
| } | ||
|
|
||
| await tx.insert(document).values(newDocument) | ||
|
|
||
| await tx | ||
| .update(knowledgeBase) | ||
| .set({ updatedAt: now }) | ||
| .where(eq(knowledgeBase.id, knowledgeBaseId)) | ||
| }) | ||
| logger.info(`[${requestId}] Document created: ${documentId} in knowledge base ${knowledgeBaseId}`) | ||
|
|
||
| return newDocument as { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing
processingStatusin single document creation pathLow Severity
The
createSingleDocumentfunction no longer setsprocessingStatus: 'pending'in itsnewDocumentobject (lines 1300–1313), while the bulkcreateDocumentRecordspath still explicitly includesprocessingStatus: 'pending' as const(line 816). The database column has aDEFAULT 'pending'so the insert succeeds, but the two code paths are now inconsistent — and the object returned to API callers from the single-document path will be missing theprocessingStatusproperty in the JSON response.Additional Locations (1)
apps/sim/lib/knowledge/documents/service.ts#L805-L836Reviewed by Cursor Bugbot for commit 32d5dc4. Configure here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets do a full revert ill follow up after.