Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(knowledge): add upsert document operation (#3644)
* feat(knowledge): add upsert document operation to Knowledge block

Add a "Create or Update" (upsert) document capability that finds an
existing document by ID or filename, deletes it if found, then creates
a new document and queues re-processing. Includes new tool, API route,
block wiring, and typed interfaces.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(knowledge): address review comments on upsert document

- Reorder create-then-delete to prevent data loss if creation fails
- Move Zod validation before workflow authorization for validated input
- Fix btoa stack overflow for large content using loop-based encoding

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(knowledge): guard against empty createDocumentRecords result

Add safety check before accessing firstDocument to prevent TypeError
and data loss if createDocumentRecords unexpectedly returns empty.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(knowledge): prevent documentId fallthrough and use byte-count limit

- Use if/else so filename lookup only runs when no documentId is provided,
  preventing stale IDs from silently replacing unrelated documents
- Check utf8 byte length instead of character count for 1MB size limit,
  correctly handling multi-byte characters (CJK, emoji)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(knowledge): rollback on delete failure, deduplicate sub-block IDs

- Add compensating rollback: if deleteDocument throws after create
  succeeds, clean up the new record to prevent orphaned pending docs
- Merge duplicate name/content sub-blocks into single entries with
  array conditions, matching the documentTags pattern

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* lint

* lint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
  • Loading branch information
waleedlatif1 and claude authored Mar 18, 2026
commit 5f89c7140caed7ce5d84327a20ec3ea57791ae1b
248 changes: 248 additions & 0 deletions apps/sim/app/api/knowledge/[id]/documents/upsert/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { document } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import {
createDocumentRecords,
deleteDocument,
getProcessingConfig,
processDocumentsWithQueue,
} from '@/lib/knowledge/documents/service'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
import { checkKnowledgeBaseWriteAccess } from '@/app/api/knowledge/utils'

const logger = createLogger('DocumentUpsertAPI')

const UpsertDocumentSchema = z.object({
documentId: z.string().optional(),
filename: z.string().min(1, 'Filename is required'),
fileUrl: z.string().min(1, 'File URL is required'),
fileSize: z.number().min(1, 'File size must be greater than 0'),
mimeType: z.string().min(1, 'MIME type is required'),
documentTagsData: z.string().optional(),
processingOptions: z.object({
chunkSize: z.number().min(100).max(4000),
minCharactersPerChunk: z.number().min(1).max(2000),
recipe: z.string(),
lang: z.string(),
chunkOverlap: z.number().min(0).max(500),
}),
workflowId: z.string().optional(),
})

export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = randomUUID().slice(0, 8)
const { id: knowledgeBaseId } = await params

try {
const body = await req.json()

logger.info(`[${requestId}] Knowledge base document upsert request`, {
knowledgeBaseId,
hasDocumentId: !!body.documentId,
filename: body.filename,
})

const auth = await checkSessionOrInternalAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
logger.warn(`[${requestId}] Authentication failed: ${auth.error || 'Unauthorized'}`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId

const validatedData = UpsertDocumentSchema.parse(body)

if (validatedData.workflowId) {
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId: validatedData.workflowId,
userId,
action: 'write',
})
if (!authorization.allowed) {
return NextResponse.json(
{ error: authorization.message || 'Access denied' },
{ status: authorization.status }
)
}
}

const accessCheck = await checkKnowledgeBaseWriteAccess(knowledgeBaseId, userId)

if (!accessCheck.hasAccess) {
if ('notFound' in accessCheck && accessCheck.notFound) {
logger.warn(`[${requestId}] Knowledge base not found: ${knowledgeBaseId}`)
return NextResponse.json({ error: 'Knowledge base not found' }, { status: 404 })
}
logger.warn(
`[${requestId}] User ${userId} attempted to upsert document in unauthorized knowledge base ${knowledgeBaseId}`
)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}

let existingDocumentId: string | null = null
let isUpdate = false

if (validatedData.documentId) {
const existingDoc = await db
.select({ id: document.id })
.from(document)
.where(
and(
eq(document.id, validatedData.documentId),
eq(document.knowledgeBaseId, knowledgeBaseId),
isNull(document.deletedAt)
)
)
.limit(1)

if (existingDoc.length > 0) {
existingDocumentId = existingDoc[0].id
}
} else {
const docsByFilename = await db
.select({ id: document.id })
.from(document)
.where(
and(
eq(document.filename, validatedData.filename),
eq(document.knowledgeBaseId, knowledgeBaseId),
isNull(document.deletedAt)
)
)
.limit(1)

if (docsByFilename.length > 0) {
existingDocumentId = docsByFilename[0].id
}
}

if (existingDocumentId) {
isUpdate = true
logger.info(
`[${requestId}] Found existing document ${existingDocumentId}, creating replacement before deleting old`
)
}

const createdDocuments = await createDocumentRecords(
[
{
filename: validatedData.filename,
fileUrl: validatedData.fileUrl,
fileSize: validatedData.fileSize,
mimeType: validatedData.mimeType,
...(validatedData.documentTagsData && {
documentTagsData: validatedData.documentTagsData,
}),
},
],
knowledgeBaseId,
requestId
)

const firstDocument = createdDocuments[0]
if (!firstDocument) {
logger.error(`[${requestId}] createDocumentRecords returned empty array unexpectedly`)
return NextResponse.json({ error: 'Failed to create document record' }, { status: 500 })
}

if (existingDocumentId) {
try {
await deleteDocument(existingDocumentId, requestId)
} catch (deleteError) {
logger.error(
`[${requestId}] Failed to delete old document ${existingDocumentId}, rolling back new record`,
deleteError
)
await deleteDocument(firstDocument.documentId, requestId).catch(() => {})
return NextResponse.json({ error: 'Failed to replace existing document' }, { status: 500 })
}
}

Comment on lines +138 to +165
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Silent rollback failure leaves orphaned records

The create-then-delete rollback silently swallows errors:

await deleteDocument(firstDocument.documentId, requestId).catch(() => {})

If the rollback itself fails (e.g. transient DB error), both the old document and the newly created document record will exist in the knowledge base simultaneously. The caller receives a 500, but neither record is cleaned up, leading to duplicate documents that are invisible to normal user flows but still consume storage and can surface in search results.

Since the whole operation is logically atomic (replace), wrapping createDocumentRecords and deleteDocument in a database transaction would be the safest fix. If a transaction isn't feasible here (e.g. the service layer doesn't expose transaction contexts), at minimum the rollback failure should be logged at error level with enough context to trigger manual cleanup:

await deleteDocument(firstDocument.documentId, requestId).catch((rollbackError) => {
  logger.error(
    `[${requestId}] Rollback failed — orphaned document ${firstDocument.documentId} may exist`,
    rollbackError
  )
})

processDocumentsWithQueue(
createdDocuments,
knowledgeBaseId,
validatedData.processingOptions,
requestId
).catch((error: unknown) => {
logger.error(`[${requestId}] Critical error in document processing pipeline:`, error)
})

try {
const { PlatformEvents } = await import('@/lib/core/telemetry')
PlatformEvents.knowledgeBaseDocumentsUploaded({
knowledgeBaseId,
documentsCount: 1,
uploadType: 'single',
chunkSize: validatedData.processingOptions.chunkSize,
recipe: validatedData.processingOptions.recipe,
})
} catch (_e) {
// Silently fail
}

recordAudit({
workspaceId: accessCheck.knowledgeBase?.workspaceId ?? null,
actorId: userId,
actorName: auth.userName,
actorEmail: auth.userEmail,
action: isUpdate ? AuditAction.DOCUMENT_UPDATED : AuditAction.DOCUMENT_UPLOADED,
resourceType: AuditResourceType.DOCUMENT,
resourceId: knowledgeBaseId,
resourceName: validatedData.filename,
description: isUpdate
? `Upserted (replaced) document "${validatedData.filename}" in knowledge base "${knowledgeBaseId}"`
: `Upserted (created) document "${validatedData.filename}" in knowledge base "${knowledgeBaseId}"`,
metadata: {
fileName: validatedData.filename,
previousDocumentId: existingDocumentId,
isUpdate,
},
request: req,
})

return NextResponse.json({
success: true,
data: {
documentsCreated: [
{
documentId: firstDocument.documentId,
filename: firstDocument.filename,
status: 'pending',
},
],
isUpdate,
previousDocumentId: existingDocumentId,
processingMethod: 'background',
processingConfig: {
maxConcurrentDocuments: getProcessingConfig().maxConcurrentDocuments,
batchSize: getProcessingConfig().batchSize,
},
},
})
} catch (error) {
if (error instanceof z.ZodError) {
logger.warn(`[${requestId}] Invalid upsert request data`, { errors: error.errors })
return NextResponse.json(
{ error: 'Invalid request data', details: error.errors },
{ status: 400 }
)
}

logger.error(`[${requestId}] Error upserting document`, error)

const errorMessage = error instanceof Error ? error.message : 'Failed to upsert document'
const isStorageLimitError =
errorMessage.includes('Storage limit exceeded') || errorMessage.includes('storage limit')
const isMissingKnowledgeBase = errorMessage === 'Knowledge base not found'

return NextResponse.json(
{ error: errorMessage },
{ status: isMissingKnowledgeBase ? 404 : isStorageLimitError ? 413 : 500 }
)
}
}
25 changes: 21 additions & 4 deletions apps/sim/blocks/blocks/knowledge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const KnowledgeBlock: BlockConfig = {
{ label: 'List Documents', id: 'list_documents' },
{ label: 'Get Document', id: 'get_document' },
{ label: 'Create Document', id: 'create_document' },
{ label: 'Upsert Document', id: 'upsert_document' },
{ label: 'Delete Document', id: 'delete_document' },
{ label: 'List Chunks', id: 'list_chunks' },
{ label: 'Upload Chunk', id: 'upload_chunk' },
Expand Down Expand Up @@ -175,14 +176,14 @@ export const KnowledgeBlock: BlockConfig = {
condition: { field: 'operation', value: 'upload_chunk' },
},

// --- Create Document ---
// --- Create Document / Upsert Document ---
{
id: 'name',
title: 'Document Name',
type: 'short-input',
placeholder: 'Enter document name',
required: true,
condition: { field: 'operation', value: 'create_document' },
condition: { field: 'operation', value: ['create_document', 'upsert_document'] },
},
{
id: 'content',
Expand All @@ -191,14 +192,21 @@ export const KnowledgeBlock: BlockConfig = {
placeholder: 'Enter the document content',
rows: 6,
required: true,
condition: { field: 'operation', value: 'create_document' },
condition: { field: 'operation', value: ['create_document', 'upsert_document'] },
},
{
id: 'upsertDocumentId',
title: 'Document ID (Optional)',
type: 'short-input',
placeholder: 'Enter existing document ID to update (or leave empty to match by name)',
condition: { field: 'operation', value: 'upsert_document' },
},
{
id: 'documentTags',
title: 'Document Tags',
type: 'document-tag-entry',
dependsOn: ['knowledgeBaseSelector'],
condition: { field: 'operation', value: 'create_document' },
condition: { field: 'operation', value: ['create_document', 'upsert_document'] },
},

// --- Update Chunk / Delete Chunk ---
Expand Down Expand Up @@ -264,6 +272,7 @@ export const KnowledgeBlock: BlockConfig = {
'knowledge_search',
'knowledge_upload_chunk',
'knowledge_create_document',
'knowledge_upsert_document',
'knowledge_list_tags',
'knowledge_list_documents',
'knowledge_get_document',
Expand All @@ -284,6 +293,8 @@ export const KnowledgeBlock: BlockConfig = {
return 'knowledge_upload_chunk'
case 'create_document':
return 'knowledge_create_document'
case 'upsert_document':
return 'knowledge_upsert_document'
case 'list_tags':
return 'knowledge_list_tags'
case 'list_documents':
Expand Down Expand Up @@ -355,6 +366,11 @@ export const KnowledgeBlock: BlockConfig = {
if (params.chunkEnabledFilter) params.enabled = params.chunkEnabledFilter
}

// Map upsert sub-block field to tool param
if (params.operation === 'upsert_document' && params.upsertDocumentId) {
params.documentId = String(params.upsertDocumentId).trim()
}

// Convert enabled dropdown string to boolean for update_chunk
if (params.operation === 'update_chunk' && typeof params.enabled === 'string') {
params.enabled = params.enabled === 'true'
Expand Down Expand Up @@ -382,6 +398,7 @@ export const KnowledgeBlock: BlockConfig = {
documentTags: { type: 'string', description: 'Document tags' },
chunkSearch: { type: 'string', description: 'Search filter for chunks' },
chunkEnabledFilter: { type: 'string', description: 'Filter chunks by enabled status' },
upsertDocumentId: { type: 'string', description: 'Document ID for upsert operation' },
connectorId: { type: 'string', description: 'Connector identifier' },
},
outputs: {
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/tools/knowledge/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { knowledgeSearchTool } from '@/tools/knowledge/search'
import { knowledgeTriggerSyncTool } from '@/tools/knowledge/trigger_sync'
import { knowledgeUpdateChunkTool } from '@/tools/knowledge/update_chunk'
import { knowledgeUploadChunkTool } from '@/tools/knowledge/upload_chunk'
import { knowledgeUpsertDocumentTool } from '@/tools/knowledge/upsert_document'

export {
knowledgeSearchTool,
Expand All @@ -26,4 +27,5 @@ export {
knowledgeListConnectorsTool,
knowledgeGetConnectorTool,
knowledgeTriggerSyncTool,
knowledgeUpsertDocumentTool,
}
30 changes: 30 additions & 0 deletions apps/sim/tools/knowledge/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,33 @@ export interface KnowledgeTriggerSyncResponse {
}
error?: string
}

export interface KnowledgeUpsertDocumentParams {
knowledgeBaseId: string
name: string
content: string
documentId?: string
documentTags?: Record<string, unknown>
_context?: { workflowId?: string }
}

export interface KnowledgeUpsertDocumentResult {
documentId: string
documentName: string
type: string
enabled: boolean
isUpdate: boolean
previousDocumentId: string | null
createdAt: string
updatedAt: string
}

export interface KnowledgeUpsertDocumentResponse {
success: boolean
output: {
data: KnowledgeUpsertDocumentResult
message: string
documentId: string
}
error?: string
}
Loading
Loading