Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 100 additions & 213 deletions apps/sim/lib/knowledge/documents/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -748,126 +748,6 @@ async function processDocumentsWithTrigger(
}
}

interface NewDocumentRow {
id: string
knowledgeBaseId: string
filename: string
fileUrl: string
fileSize: number
mimeType: string
chunkCount: number
tokenCount: number
characterCount: number
processingStatus: 'pending'
enabled: boolean
uploadedAt: Date
tag1: string | null
tag2: string | null
tag3: string | null
tag4: string | null
tag5: string | null
tag6: string | null
tag7: string | null
number1: number | null
number2: number | null
number3: number | null
number4: number | null
number5: number | null
date1: Date | null
date2: Date | null
boolean1: boolean | null
boolean2: boolean | null
boolean3: boolean | null
}

/**
* Insert N document rows IF the parent knowledge base is still alive
* (`deleted_at IS NULL`) at the statement's MVCC snapshot. Returns the
* number of rows actually inserted.
*
* Knowledge bases are soft-deleted, so a normal FK can't catch a concurrent
* delete — the KB row physically remains. We do the existence check and the
* insert in a single statement via INSERT...SELECT...WHERE EXISTS, which
* Postgres evaluates atomically. No transaction or row lock required, no
* race window between check and insert.
*
* Returns 0 if the KB was soft-deleted; caller throws.
*/
async function insertDocumentsIfKbAlive(
rows: NewDocumentRow[],
knowledgeBaseId: string
): Promise<number> {
if (rows.length === 0) return 0

// jsonb_to_recordset declares the column types once, so we don't need to
// cast every parameter individually to keep Postgres' type inference happy
// when nullable columns end up all-NULL across the batch.
const jsonRows = rows.map((d) => ({
id: d.id,
knowledge_base_id: d.knowledgeBaseId,
filename: d.filename,
file_url: d.fileUrl,
file_size: d.fileSize,
mime_type: d.mimeType,
chunk_count: d.chunkCount,
token_count: d.tokenCount,
character_count: d.characterCount,
processing_status: d.processingStatus,
enabled: d.enabled,
uploaded_at: d.uploadedAt.toISOString(),
tag1: d.tag1,
tag2: d.tag2,
tag3: d.tag3,
tag4: d.tag4,
tag5: d.tag5,
tag6: d.tag6,
tag7: d.tag7,
number1: d.number1,
number2: d.number2,
number3: d.number3,
number4: d.number4,
number5: d.number5,
date1: d.date1?.toISOString() ?? null,
date2: d.date2?.toISOString() ?? null,
boolean1: d.boolean1,
boolean2: d.boolean2,
boolean3: d.boolean3,
}))

const result = await db.execute(sql`
INSERT INTO document (
id, knowledge_base_id, filename, file_url, file_size, mime_type,
chunk_count, token_count, character_count, processing_status, enabled, uploaded_at,
tag1, tag2, tag3, tag4, tag5, tag6, tag7,
number1, number2, number3, number4, number5,
date1, date2,
boolean1, boolean2, boolean3
)
SELECT
id, knowledge_base_id, filename, file_url, file_size, mime_type,
chunk_count, token_count, character_count, processing_status, enabled, uploaded_at,
tag1, tag2, tag3, tag4, tag5, tag6, tag7,
number1, number2, number3, number4, number5,
date1, date2,
boolean1, boolean2, boolean3
FROM jsonb_to_recordset(${JSON.stringify(jsonRows)}::jsonb) AS x(
id text, knowledge_base_id text, filename text, file_url text, file_size integer, mime_type text,
chunk_count integer, token_count integer, character_count integer, processing_status text, enabled boolean, uploaded_at timestamp,
tag1 text, tag2 text, tag3 text, tag4 text, tag5 text, tag6 text, tag7 text,
number1 double precision, number2 double precision, number3 double precision, number4 double precision, number5 double precision,
date1 timestamp, date2 timestamp,
boolean1 boolean, boolean2 boolean, boolean3 boolean
)
WHERE EXISTS (
SELECT 1 FROM knowledge_base
WHERE id = ${knowledgeBaseId} AND deleted_at IS NULL
)
RETURNING id
`)

return Array.from(result).length
}

export async function createDocumentRecords(
documents: Array<{
filename: string
Expand All @@ -886,102 +766,99 @@ export async function createDocumentRecords(
knowledgeBaseId: string,
requestId: string
): Promise<DocumentData[]> {
// Cheap upfront existence check so the common KB-not-found path fails fast
// before we burn CPU on tag processing. The atomic insert below is the
// race-safe guard against a concurrent KB soft-delete in the small window
// between this check and the insert.
const kb = await db
.select({ id: knowledgeBase.id })
.from(knowledgeBase)
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
.limit(1)
return await db.transaction(async (tx) => {
await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`)

if (kb.length === 0) {
throw new Error('Knowledge base not found')
}
const kb = await tx
.select({ id: knowledgeBase.id })
.from(knowledgeBase)
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
.limit(1)

const now = new Date()
const documentRecords: NewDocumentRow[] = []
const returnData: DocumentData[] = []
if (kb.length === 0) {
throw new Error('Knowledge base not found')
}

for (const docData of documents) {
const documentId = generateId()
const now = new Date()
const documentRecords = []
const returnData: DocumentData[] = []

let processedTags: Partial<ProcessedDocumentTags> = {}
for (const docData of documents) {
const documentId = generateId()

if (docData.documentTagsData) {
try {
const tagData = JSON.parse(docData.documentTagsData)
if (Array.isArray(tagData)) {
processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId)
}
} catch (error) {
if (error instanceof SyntaxError) {
logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error)
} else {
throw error
let processedTags: Partial<ProcessedDocumentTags> = {}

if (docData.documentTagsData) {
try {
const tagData = JSON.parse(docData.documentTagsData)
if (Array.isArray(tagData)) {
processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId)
}
} catch (error) {
if (error instanceof SyntaxError) {
logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error)
} else {
throw error
}
}
}
}

const newDocument = {
id: documentId,
knowledgeBaseId,
filename: docData.filename,
fileUrl: docData.fileUrl,
fileSize: docData.fileSize,
mimeType: docData.mimeType,
chunkCount: 0,
tokenCount: 0,
characterCount: 0,
processingStatus: 'pending' as const,
enabled: true,
uploadedAt: now,
tag1: processedTags.tag1 ?? docData.tag1 ?? null,
tag2: processedTags.tag2 ?? docData.tag2 ?? null,
tag3: processedTags.tag3 ?? docData.tag3 ?? null,
tag4: processedTags.tag4 ?? docData.tag4 ?? null,
tag5: processedTags.tag5 ?? docData.tag5 ?? null,
tag6: processedTags.tag6 ?? docData.tag6 ?? null,
tag7: processedTags.tag7 ?? docData.tag7 ?? null,
number1: processedTags.number1 ?? null,
number2: processedTags.number2 ?? null,
number3: processedTags.number3 ?? null,
number4: processedTags.number4 ?? null,
number5: processedTags.number5 ?? null,
date1: processedTags.date1 ?? null,
date2: processedTags.date2 ?? null,
boolean1: processedTags.boolean1 ?? null,
boolean2: processedTags.boolean2 ?? null,
boolean3: processedTags.boolean3 ?? null,
const newDocument = {
id: documentId,
knowledgeBaseId,
filename: docData.filename,
fileUrl: docData.fileUrl,
fileSize: docData.fileSize,
mimeType: docData.mimeType,
chunkCount: 0,
tokenCount: 0,
characterCount: 0,
processingStatus: 'pending' as const,
enabled: true,
uploadedAt: now,
tag1: processedTags.tag1 ?? docData.tag1 ?? null,
tag2: processedTags.tag2 ?? docData.tag2 ?? null,
tag3: processedTags.tag3 ?? docData.tag3 ?? null,
tag4: processedTags.tag4 ?? docData.tag4 ?? null,
tag5: processedTags.tag5 ?? docData.tag5 ?? null,
tag6: processedTags.tag6 ?? docData.tag6 ?? null,
tag7: processedTags.tag7 ?? docData.tag7 ?? null,
number1: processedTags.number1 ?? null,
number2: processedTags.number2 ?? null,
number3: processedTags.number3 ?? null,
number4: processedTags.number4 ?? null,
number5: processedTags.number5 ?? null,
date1: processedTags.date1 ?? null,
date2: processedTags.date2 ?? null,
boolean1: processedTags.boolean1 ?? null,
boolean2: processedTags.boolean2 ?? null,
boolean3: processedTags.boolean3 ?? null,
}

documentRecords.push(newDocument)
returnData.push({
documentId,
filename: docData.filename,
fileUrl: docData.fileUrl,
fileSize: docData.fileSize,
mimeType: docData.mimeType,
})
}

documentRecords.push(newDocument)
returnData.push({
documentId,
filename: docData.filename,
fileUrl: docData.fileUrl,
fileSize: docData.fileSize,
mimeType: docData.mimeType,
})
}
if (documentRecords.length > 0) {
await tx.insert(document).values(documentRecords)
logger.info(
`[${requestId}] Bulk created ${documentRecords.length} document records in knowledge base ${knowledgeBaseId}`
)

if (documentRecords.length > 0) {
const insertedCount = await insertDocumentsIfKbAlive(documentRecords, knowledgeBaseId)
if (insertedCount === 0) {
throw new Error('Knowledge base not found')
await tx
.update(knowledgeBase)
.set({ updatedAt: now })
.where(eq(knowledgeBase.id, knowledgeBaseId))
}
logger.info(
`[${requestId}] Bulk created ${insertedCount} document records in knowledge base ${knowledgeBaseId}`
)

await db
.update(knowledgeBase)
.set({ updatedAt: now })
.where(eq(knowledgeBase.id, knowledgeBaseId))
}

return returnData
return returnData
})
}

export interface TagFilterCondition {
Expand Down Expand Up @@ -1420,7 +1297,7 @@ export async function createSingleDocument(
}
}

const newDocument: NewDocumentRow = {
const newDocument = {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing processingStatus in single document creation path

Low Severity

The createSingleDocument function no longer sets processingStatus: 'pending' in its newDocument object (lines 1300–1313), while the bulk createDocumentRecords path still explicitly includes processingStatus: 'pending' as const (line 816). The database column has a DEFAULT 'pending' so the insert succeeds, but the two code paths are now inconsistent — and the object returned to API callers from the single-document path will be missing the processingStatus property in the JSON response.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 32d5dc4. Configure here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets do a full revert ill follow up after.

id: documentId,
knowledgeBaseId,
filename: documentData.filename,
Expand All @@ -1430,21 +1307,31 @@ export async function createSingleDocument(
chunkCount: 0,
tokenCount: 0,
characterCount: 0,
processingStatus: 'pending',
enabled: true,
uploadedAt: now,
...processedTags,
}
Comment on lines 1300 to 1313
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Missing processingStatus in createSingleDocument

createDocumentRecords explicitly sets processingStatus: 'pending' as const, but createSingleDocument no longer does after this revert — ...processedTags only carries tag/number/date/boolean fields. If the document table column lacks a DEFAULT 'pending' at the DB level, every single-document insert will fail with a NOT NULL violation. Worth verifying the schema default exists, or restoring the explicit field here for symmetry.


const insertedCount = await insertDocumentsIfKbAlive([newDocument], knowledgeBaseId)
if (insertedCount === 0) {
throw new Error('Knowledge base not found')
}
await db.transaction(async (tx) => {
await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`)

await db
.update(knowledgeBase)
.set({ updatedAt: now })
.where(eq(knowledgeBase.id, knowledgeBaseId))
const kb = await tx
.select({ id: knowledgeBase.id })
.from(knowledgeBase)
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
.limit(1)

if (kb.length === 0) {
throw new Error('Knowledge base not found')
}

await tx.insert(document).values(newDocument)

await tx
.update(knowledgeBase)
.set({ updatedAt: now })
.where(eq(knowledgeBase.id, knowledgeBaseId))
})
logger.info(`[${requestId}] Document created: ${documentId} in knowledge base ${knowledgeBaseId}`)

return newDocument as {
Expand Down
1 change: 0 additions & 1 deletion apps/sim/lib/workspaces/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ describe('workspace lifecycle', () => {
})

const tx = {
execute: vi.fn().mockResolvedValue([]),
select: vi.fn().mockReturnValue({
from: vi.fn().mockReturnValue({
where: vi.fn().mockResolvedValue([{ id: 'kb-1' }]),
Expand Down
7 changes: 0 additions & 7 deletions apps/sim/lib/workspaces/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ export async function archiveWorkspace(
.where(eq(workflowMcpServer.workspaceId, workspaceId))

await db.transaction(async (tx) => {
// Workspace archival is a rare admin/cleanup operation that touches every
// child table; on large workspaces it can exceed the 30s session default.
// Override per-tx with a generous ceiling — if it ever runs longer than
// this something is genuinely wrong.
await tx.execute(sql`SET LOCAL statement_timeout = '5min'`)
await tx.execute(sql`SET LOCAL lock_timeout = '30s'`)

await tx
.update(knowledgeBase)
.set({
Expand Down
8 changes: 0 additions & 8 deletions packages/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ const postgresClient = postgres(connectionString, {
connect_timeout: 30,
max: 15,
onnotice: () => {},
// Server-side guards. lock_timeout cancels a query waiting on a row lock for
// >5s (e.g. another tx holding `SELECT ... FOR UPDATE`). statement_timeout
// cancels any query running >30s. Heavy paths that legitimately need longer
// (table service bulk JSONB rewrites) override per-tx with `SET LOCAL`.
connection: {
lock_timeout: 5_000,
statement_timeout: 30_000,
},
})

export const db = drizzle(postgresClient, { schema })
Loading