diff --git a/.agents/skills/memory-load-check/SKILL.md b/.agents/skills/memory-load-check/SKILL.md index 5a46fce78ca..340f6b9757c 100644 --- a/.agents/skills/memory-load-check/SKILL.md +++ b/.agents/skills/memory-load-check/SKILL.md @@ -49,10 +49,35 @@ Read these when doing a deeper pass: - cap downloads and parsed output separately - preserve partial results when a later item exceeds the cap - never read untrusted response bodies without a byte cap +- KB connector file downloads in `apps/sim/connectors/utils.ts` + - `CONNECTOR_MAX_FILE_BYTES`: shared per-file cap (aligned with the manual KB upload limit) + - `readBodyWithLimit`: stream a download body to a Buffer with a hard byte cap (null on overflow) + - `stubOrSkipBySize`: listing-time skip when the reported size exceeds the cap + - `markSkipped` / `sizeLimitSkipReason`: surface oversized files as failed (skipped) KB rows + - `ConnectorFileTooLargeError`: thrown mid-download when the listing under-reported size - Large workflow value payloads - prefer durable references/manifests over inlining large arrays or files - materialize refs only behind an explicit byte budget +## KB Connector File Size Handling + +The connector size pattern in `apps/sim/connectors/utils.ts` (`CONNECTOR_MAX_FILE_BYTES` + `readBodyWithLimit` + `stubOrSkipBySize`/`markSkipped`) exists for one risk: a knowledge-base connector downloading **arbitrary, user-controlled file bytes** that the source does not hard-cap. Apply it by that risk, not by the connector's name. + +Use the pattern when the connector downloads file content via a stream/`download_url` where the user controls the size: +- file-storage connectors: Dropbox, OneDrive, SharePoint, Google Drive, S3, GitHub, GitLab, Azure DevOps +- any connector that fetches a file via a download URL even if it is not a "storage" service (e.g. the Zoom transcript `.vtt`) + +For those, require all three: +- stream the body with `readBodyWithLimit(resp, CONNECTOR_MAX_FILE_BYTES)` — never raw `response.text()`/`response.arrayBuffer()` +- skip oversize at listing (`stubOrSkipBySize` with the reported size) and again at fetch time (overflow -> `markSkipped`), since the listing size can be missing or under-reported +- never drop/truncate silently — oversized files become content-less failed rows carrying `skippedReason`, so they stay visible in the KB UI instead of vanishing from the index + +Skip the pattern when the source already bounds the payload: +- pure API/structured-data connectors (Jira, Linear, Notion, Confluence, Sentry, Slack, Zendesk, Gmail, ...) — paginated JSON/text; apply normal pagination + concurrency bounds instead of a per-file byte cap +- native-document connectors capped by the platform (Google Docs ~50 MB, Google Sheets via `MAX_ROWS`, Evernote ~25 MB/note) — a 100 MB cap can never fire, and wrapping a `response.json()`/Thrift parse in `readBodyWithLimit` is cargo-culting + +Litmus test: "Can a user make this one fetch arbitrarily large, with nothing upstream stopping it?" Yes -> use the pattern. No (platform hard-cap, or already paginated) -> a per-file byte cap adds noise, not safety. Borderline: a user-configured/self-hosted endpoint with no platform cap (e.g. Obsidian) — bound it only if the content is genuinely unbounded. + ## Review Workflow 1. Identify every changed data source: @@ -96,6 +121,7 @@ Read these when doing a deeper pass: - fetches all pages from an external API before processing - reads an entire file, HTTP response, or stream without a max byte budget - checks size only after `Buffer.concat`, `arrayBuffer`, `text`, `JSON.parse`, or parse expansion +- a KB connector silently drops or truncates an oversized file instead of recording it as a failed (skipped) row - chunks only after loading the complete dataset - paginates with unbounded/deep `OFFSET` on a mutable or large table - creates one queue job per row without batching or a queue-level concurrency key diff --git a/apps/sim/connectors/azure-devops/azure-devops.ts b/apps/sim/connectors/azure-devops/azure-devops.ts index 6f9eae611c7..7351709a90d 100644 --- a/apps/sim/connectors/azure-devops/azure-devops.ts +++ b/apps/sim/connectors/azure-devops/azure-devops.ts @@ -3,7 +3,15 @@ import { getErrorMessage, toError } from '@sim/utils/errors' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import { azureDevopsConnectorMeta } from '@/connectors/azure-devops/meta' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { htmlToPlainText, joinTagArray, parseTagDate, readBodyWithLimit } from '@/connectors/utils' +import { + CONNECTOR_MAX_FILE_BYTES, + htmlToPlainText, + joinTagArray, + markSkipped, + parseTagDate, + readBodyWithLimit, + sizeLimitSkipReason, +} from '@/connectors/utils' const logger = createLogger('AzureDevOpsConnector') @@ -30,7 +38,7 @@ const FILE_BATCH_SIZE = 100 * and aborts (returning null) the moment the cap is exceeded. Larger files are * skipped without being fully buffered. */ -const MAX_FILE_SIZE = 10 * 1024 * 1024 +const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES /** Bytes sniffed for a NUL byte when detecting binary files (matches git's heuristic). */ const BINARY_SNIFF_BYTES = 8000 /** @@ -1090,7 +1098,27 @@ async function getFileDocument( const buffer = await readBodyWithLimit(contentResponse, MAX_FILE_SIZE) if (buffer === null) { logger.info('Skipping oversized Azure DevOps file', { path }) - return null + const skippedTitle = path.split('/').filter(Boolean).pop() || path + return markSkipped( + { + externalId, + title: skippedTitle, + content: '', + mimeType: 'text/plain', + sourceUrl: buildFileSourceUrl(repo?.webUrl, branch, path), + contentHash: buildFileContentHash(repoId, item.objectId), + metadata: { + kind: 'file', + organization, + project, + repository: repo?.name ?? '', + repositoryId: repoId, + branch, + path, + }, + }, + sizeLimitSkipReason(MAX_FILE_SIZE) + ) } if (isBinaryBuffer(buffer)) { logger.info('Skipping binary Azure DevOps file', { path }) diff --git a/apps/sim/connectors/dropbox/dropbox.ts b/apps/sim/connectors/dropbox/dropbox.ts index 4b25be79485..9133b884245 100644 --- a/apps/sim/connectors/dropbox/dropbox.ts +++ b/apps/sim/connectors/dropbox/dropbox.ts @@ -3,7 +3,16 @@ import { getErrorMessage, toError } from '@sim/utils/errors' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import { dropboxConnectorMeta } from '@/connectors/dropbox/meta' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { htmlToPlainText, parseTagDate } from '@/connectors/utils' +import { + CONNECTOR_MAX_FILE_BYTES, + ConnectorFileTooLargeError, + htmlToPlainText, + markSkipped, + parseTagDate, + readBodyWithLimit, + sizeLimitSkipReason, + stubOrSkipBySize, +} from '@/connectors/utils' const logger = createLogger('DropboxConnector') @@ -23,7 +32,7 @@ const SUPPORTED_EXTENSIONS = new Set([ '.tsv', ]) -const MAX_FILE_SIZE = 10 * 1024 * 1024 // 10 MB +const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES interface DropboxFileEntry { '.tag': 'file' | 'folder' | 'deleted' @@ -44,16 +53,18 @@ interface DropboxListFolderResponse { has_more: boolean } -function isSupportedFile(entry: DropboxFileEntry): boolean { - if (entry['.tag'] !== 'file') return false - if (entry.is_downloadable === false) return false - if (entry.size && entry.size > MAX_FILE_SIZE) return false - - const name = entry.name.toLowerCase() - const dotIndex = name.lastIndexOf('.') +function hasSupportedExtension(name: string): boolean { + const lower = name.toLowerCase() + const dotIndex = lower.lastIndexOf('.') if (dotIndex === -1) return false + return SUPPORTED_EXTENSIONS.has(lower.slice(dotIndex)) +} - return SUPPORTED_EXTENSIONS.has(name.slice(dotIndex)) +/** A downloadable file with a supported extension, regardless of size. */ +function isDownloadableFile(entry: DropboxFileEntry): boolean { + return ( + entry['.tag'] === 'file' && entry.is_downloadable !== false && hasSupportedExtension(entry.name) + ) } async function downloadFileContent(accessToken: string, filePath: string): Promise { @@ -69,7 +80,15 @@ async function downloadFileContent(accessToken: string, filePath: string): Promi throw new Error(`Failed to download file ${filePath}: ${response.status}`) } - const text = await response.text() + // Stream with a hard byte cap so a file whose listing metadata under-reported + // (or omitted) its size can never be fully buffered into memory. Oversize raises + // so getDocument can surface it as a skipped (failed) row rather than dropping it. + const buffer = await readBodyWithLimit(response, MAX_FILE_SIZE) + if (!buffer) { + throw new ConnectorFileTooLargeError(MAX_FILE_SIZE) + } + + const text = buffer.toString('utf8') if (filePath.endsWith('.html') || filePath.endsWith('.htm')) { return htmlToPlainText(text) @@ -162,12 +181,16 @@ export const dropboxConnector: ConnectorConfig = { data = await response.json() } - const supportedFiles = data.entries.filter(isSupportedFile) + // Keep oversized files and surface them as skipped (failed) documents instead + // of dropping them silently at listing time. + const candidateFiles = data.entries.filter(isDownloadableFile) const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0 const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0 - let documents = supportedFiles.map(fileToStub) + let documents = candidateFiles.map((entry) => + stubOrSkipBySize(fileToStub(entry), entry.size, MAX_FILE_SIZE) + ) if (maxFiles > 0) { const remaining = maxFiles - previouslyFetched @@ -210,12 +233,24 @@ export const dropboxConnector: ConnectorConfig = { const entry = (await response.json()) as DropboxFileEntry - if (!isSupportedFile(entry)) return null + if (!isDownloadableFile(entry)) return null + + const stub = fileToStub(entry) + if (entry.size && entry.size > MAX_FILE_SIZE) { + return markSkipped(stub, sizeLimitSkipReason(MAX_FILE_SIZE)) + } - const content = await downloadFileContent(accessToken, entry.path_lower) + let content: string + try { + content = await downloadFileContent(accessToken, entry.path_lower) + } catch (error) { + if (error instanceof ConnectorFileTooLargeError) { + return markSkipped(stub, sizeLimitSkipReason(error.limitBytes)) + } + throw error + } if (!content.trim()) return null - const stub = fileToStub(entry) return { ...stub, content, contentDeferred: false } } catch (error) { logger.warn(`Failed to fetch document ${externalId}`, { diff --git a/apps/sim/connectors/github/github.ts b/apps/sim/connectors/github/github.ts index e90b51a9327..237b421a74c 100644 --- a/apps/sim/connectors/github/github.ts +++ b/apps/sim/connectors/github/github.ts @@ -3,14 +3,20 @@ import { getErrorMessage, toError } from '@sim/utils/errors' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import { githubConnectorMeta } from '@/connectors/github/meta' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { parseTagDate } from '@/connectors/utils' +import { + CONNECTOR_MAX_FILE_BYTES, + markSkipped, + parseTagDate, + sizeLimitSkipReason, + stubOrSkipBySize, +} from '@/connectors/utils' const logger = createLogger('GitHubConnector') const GITHUB_API_URL = 'https://api.github.com' const BATCH_SIZE = 30 const GIT_SHA_PREFIX = 'git-sha:' -const MAX_FILE_SIZE = 10 * 1024 * 1024 // 10 MB +const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES const BINARY_SNIFF_BYTES = 8000 /** @@ -197,11 +203,11 @@ export const githubConnector: ConnectorConfig = { } else { const tree = await fetchTree(accessToken, owner, repo, branch) - // Filter by path prefix, extensions, and size + // Filter by path prefix and extensions. Oversized files are kept here and + // surfaced as skipped (failed) documents at stub time so they stay visible. const filtered = tree.filter((item) => { if (pathPrefix && !item.path.startsWith(pathPrefix)) return false if (!matchesExtension(item.path, extSet)) return false - if (typeof item.size === 'number' && item.size > MAX_FILE_SIZE) return false return true }) @@ -223,7 +229,9 @@ export const githubConnector: ConnectorConfig = { batchSize: batch.length, }) - const documents = batch.map((item) => treeItemToStub(owner, repo, branch, item)) + const documents = batch.map((item) => + stubOrSkipBySize(treeItemToStub(owner, repo, branch, item), item.size, MAX_FILE_SIZE) + ) const nextOffset = offset + BATCH_SIZE const hasMore = nextOffset < capped.length @@ -281,7 +289,24 @@ export const githubConnector: ConnectorConfig = { size, limit: MAX_FILE_SIZE, }) - return null + return markSkipped( + { + externalId, + title: path.split('/').pop() || path, + content: '', + mimeType: 'text/plain', + sourceUrl: `https://github.com/${owner}/${repo}/blob/${branch.split('/').map(encodeURIComponent).join('/')}/${path.split('/').map(encodeURIComponent).join('/')}`, + contentHash: `${GIT_SHA_PREFIX}${data.sha as string}`, + metadata: { + path, + sha: data.sha as string, + size, + branch, + repository: `${owner}/${repo}`, + }, + }, + sizeLimitSkipReason(MAX_FILE_SIZE) + ) } const rawContent = (data.content as string) || '' diff --git a/apps/sim/connectors/gitlab/gitlab.ts b/apps/sim/connectors/gitlab/gitlab.ts index ed22010ad68..18247214006 100644 --- a/apps/sim/connectors/gitlab/gitlab.ts +++ b/apps/sim/connectors/gitlab/gitlab.ts @@ -6,14 +6,21 @@ import { secureFetchWithRetry } from '@/lib/knowledge/documents/secure-fetch.ser import { VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import { gitlabConnectorMeta } from '@/connectors/gitlab/meta' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { computeContentHash, joinTagArray, parseTagDate } from '@/connectors/utils' +import { + CONNECTOR_MAX_FILE_BYTES, + computeContentHash, + joinTagArray, + markSkipped, + parseTagDate, + sizeLimitSkipReason, +} from '@/connectors/utils' const logger = createLogger('GitLabConnector') const DEFAULT_HOST = 'gitlab.com' const PAGE_SIZE = 100 /** Max repository file size to index. Larger blobs are skipped. */ -const MAX_FILE_SIZE = 10 * 1024 * 1024 +const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES /** Bytes sniffed for NUL when detecting binary files (matches git's heuristic). */ const BINARY_SNIFF_BYTES = 8000 @@ -324,9 +331,25 @@ function fileToDocument( const blobSha = file.blob_id?.trim() if (!blobSha) return null + const title = path.split('/').pop() || path + const skippedForSize = (size: number): ExternalDocument => { + logger.info('Skipping oversized GitLab file', { path, size }) + return markSkipped( + { + externalId: `${FILE_PREFIX}${path}`, + title, + content: '', + mimeType: 'text/plain', + sourceUrl: buildFileSourceUrl(apiBase, encodedProject, host, projectPath, ref, path), + contentHash: buildFileContentHash(encodedProject, path, blobSha), + metadata: { contentType: 'file', title, path, size }, + }, + sizeLimitSkipReason(MAX_FILE_SIZE) + ) + } + if (typeof file.size === 'number' && file.size > MAX_FILE_SIZE) { - logger.info('Skipping oversized GitLab file', { path, size: file.size }) - return null + return skippedForSize(file.size) } const raw = typeof file.content === 'string' ? file.content : '' @@ -336,12 +359,10 @@ function fileToDocument( return null } if (buffer.byteLength > MAX_FILE_SIZE) { - logger.info('Skipping oversized GitLab file', { path, size: buffer.byteLength }) - return null + return skippedForSize(buffer.byteLength) } const content = buffer.toString('utf8') - const title = path.split('/').pop() || path const body = composeBody(title, content) if (!body.trim()) return null diff --git a/apps/sim/connectors/google-drive/google-drive.ts b/apps/sim/connectors/google-drive/google-drive.ts index 775bebe142e..14d98bc413d 100644 --- a/apps/sim/connectors/google-drive/google-drive.ts +++ b/apps/sim/connectors/google-drive/google-drive.ts @@ -3,7 +3,16 @@ import { getErrorMessage, toError } from '@sim/utils/errors' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import { googleDriveConnectorMeta } from '@/connectors/google-drive/meta' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils' +import { + CONNECTOR_MAX_FILE_BYTES, + ConnectorFileTooLargeError, + htmlToPlainText, + joinTagArray, + markSkipped, + parseTagDate, + readBodyWithLimit, + sizeLimitSkipReason, +} from '@/connectors/utils' const logger = createLogger('GoogleDriveConnector') @@ -22,7 +31,9 @@ const SUPPORTED_TEXT_MIME_TYPES = [ 'application/xml', ] -const MAX_EXPORT_SIZE = 10 * 1024 * 1024 // 10 MB (Google export limit) +// Google Drive's `files.export` API rejects exports over 10 MB (exportSizeLimitExceeded), +// so this is a hard external limit for Google Workspace docs — not the connector cap. +const MAX_EXPORT_SIZE = 10 * 1024 * 1024 function isGoogleWorkspaceFile(mimeType: string): boolean { return mimeType in GOOGLE_WORKSPACE_MIME_TYPES @@ -50,10 +61,22 @@ async function exportGoogleWorkspaceFile( }) if (!response.ok) { + // Google rejects exports over its 10 MB limit with a 403 exportSizeLimitExceeded + // before streaming any bytes — surface that as an oversize skip, not a hard error. + if (response.status === 403) { + const body = await response.text().catch(() => '') + if (body.includes('exportSizeLimitExceeded')) { + throw new ConnectorFileTooLargeError(MAX_EXPORT_SIZE) + } + } throw new Error(`Failed to export file ${fileId}: ${response.status}`) } - return response.text() + const buffer = await readBodyWithLimit(response, MAX_EXPORT_SIZE) + if (!buffer) { + throw new ConnectorFileTooLargeError(MAX_EXPORT_SIZE) + } + return buffer.toString('utf8') } async function downloadTextFile(accessToken: string, fileId: string): Promise { @@ -68,15 +91,14 @@ async function downloadTextFile(accessToken: string, fileId: string): Promise MAX_EXPORT_SIZE) { - logger.warn(`File exceeds ${MAX_EXPORT_SIZE} bytes, truncating`) - const buf = Buffer.from(text, 'utf8') - let end = MAX_EXPORT_SIZE - while (end > 0 && (buf[end] & 0xc0) === 0x80) end-- - return buf.subarray(0, end).toString('utf8') + // Stream with a hard byte cap so a file with missing/under-reported listing + // size metadata is never fully buffered into memory. Oversized files raise + // DriveFileTooLargeError so getDocument can surface them as skipped (failed) rows. + const buffer = await readBodyWithLimit(response, CONNECTOR_MAX_FILE_BYTES) + if (!buffer) { + throw new ConnectorFileTooLargeError(CONNECTOR_MAX_FILE_BYTES) } - return text + return buffer.toString('utf8') } async function fetchFileContent( @@ -287,6 +309,10 @@ export const googleDriveConnector: ConnectorConfig = { const stub = fileToStub(file) return { ...stub, content, contentDeferred: false } } catch (error) { + if (error instanceof ConnectorFileTooLargeError) { + logger.info('Skipping oversized Google Drive file', { fileId: file.id, name: file.name }) + return markSkipped(fileToStub(file), sizeLimitSkipReason(error.limitBytes)) + } logger.warn(`Failed to fetch content for file: ${file.name} (${file.id})`, { error: toError(error).message, }) diff --git a/apps/sim/connectors/onedrive/onedrive.ts b/apps/sim/connectors/onedrive/onedrive.ts index 63dfb0d2faa..4396b4fa850 100644 --- a/apps/sim/connectors/onedrive/onedrive.ts +++ b/apps/sim/connectors/onedrive/onedrive.ts @@ -3,7 +3,16 @@ import { getErrorMessage, toError } from '@sim/utils/errors' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import { onedriveConnectorMeta } from '@/connectors/onedrive/meta' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { htmlToPlainText, parseTagDate } from '@/connectors/utils' +import { + CONNECTOR_MAX_FILE_BYTES, + ConnectorFileTooLargeError, + htmlToPlainText, + markSkipped, + parseTagDate, + readBodyWithLimit, + sizeLimitSkipReason, + stubOrSkipBySize, +} from '@/connectors/utils' const logger = createLogger('OneDriveConnector') @@ -22,7 +31,7 @@ const SUPPORTED_EXTENSIONS = new Set([ '.tsv', ]) -const MAX_FILE_SIZE = 10 * 1024 * 1024 // 10 MB +const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES const GRAPH_BASE_URL = 'https://graph.microsoft.com/v1.0' @@ -71,11 +80,11 @@ async function downloadFileContent(accessToken: string, fileId: string): Promise throw new Error(`Failed to download file ${fileId}: ${response.status}`) } - const text = await response.text() - if (Buffer.byteLength(text, 'utf8') > MAX_FILE_SIZE) { - return Buffer.from(text, 'utf8').subarray(0, MAX_FILE_SIZE).toString('utf8') + const buffer = await readBodyWithLimit(response, MAX_FILE_SIZE) + if (!buffer) { + throw new ConnectorFileTooLargeError(MAX_FILE_SIZE) } - return text + return buffer.toString('utf8') } /** @@ -197,15 +206,16 @@ export const onedriveConnector: ConnectorConfig = { } } - const textFiles = items.filter( - (item) => - item.file && isSupportedTextFile(item.name) && (!item.size || item.size <= MAX_FILE_SIZE) - ) + // Keep oversized files and surface them as skipped (failed) documents instead + // of filtering them out silently. + const supportedFiles = items.filter((item) => item.file && isSupportedTextFile(item.name)) const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0 const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0 - let documents = textFiles.map(fileToStub) + let documents = supportedFiles.map((item) => + stubOrSkipBySize(fileToStub(item), item.size, MAX_FILE_SIZE) + ) if (maxFiles > 0) { const remaining = maxFiles - previouslyFetched @@ -275,6 +285,10 @@ export const onedriveConnector: ConnectorConfig = { const stub = fileToStub(item) return { ...stub, content, contentDeferred: false } } catch (error) { + if (error instanceof ConnectorFileTooLargeError) { + logger.info('Skipping oversized OneDrive file', { fileId: item.id, name: item.name }) + return markSkipped(fileToStub(item), sizeLimitSkipReason(error.limitBytes)) + } logger.warn(`Failed to fetch content for file: ${item.name} (${item.id})`, { error: toError(error).message, }) diff --git a/apps/sim/connectors/s3/s3.ts b/apps/sim/connectors/s3/s3.ts index 42e2766752b..c2f48a868ad 100644 --- a/apps/sim/connectors/s3/s3.ts +++ b/apps/sim/connectors/s3/s3.ts @@ -6,13 +6,20 @@ import { secureFetchWithRetry } from '@/lib/knowledge/documents/secure-fetch.ser import { VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import { s3ConnectorMeta } from '@/connectors/s3/meta' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { parseTagDate, readBodyWithLimit } from '@/connectors/utils' +import { + CONNECTOR_MAX_FILE_BYTES, + markSkipped, + parseTagDate, + readBodyWithLimit, + sizeLimitSkipReason, + stubOrSkipBySize, +} from '@/connectors/utils' import { encodeS3PathComponent, getSignatureKey } from '@/tools/s3/utils' const logger = createLogger('S3Connector') /** Maximum object size to sync. Larger objects are skipped during listing. */ -const MAX_FILE_SIZE = 10 * 1024 * 1024 // 10 MB +const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES /** Number of objects requested per ListObjectsV2 page (S3 caps at 1000). */ const LIST_MAX_KEYS = 1000 @@ -510,9 +517,8 @@ export const s3Connector: ConnectorConfig = { ) let documents = objects - .filter((entry) => isSupportedKey(entry.key, allowedExtensions)) - .filter((entry) => entry.size > 0 && entry.size <= MAX_FILE_SIZE) - .map((entry) => objectToStub(ctx, entry)) + .filter((entry) => isSupportedKey(entry.key, allowedExtensions) && entry.size > 0) + .map((entry) => stubOrSkipBySize(objectToStub(ctx, entry), entry.size, MAX_FILE_SIZE)) let slicedSome = false if (maxObjects > 0) { @@ -563,13 +569,24 @@ export const s3Connector: ConnectorConfig = { if (declaredLength > MAX_FILE_SIZE) { logger.warn('Skipping oversized S3 object', { key, size: declaredLength }) - return null + return markSkipped( + objectToStub(ctx, { key, etag, lastModified, size: declaredLength }), + sizeLimitSkipReason(MAX_FILE_SIZE) + ) } const body = await readBodyWithLimit(response, MAX_FILE_SIZE) if (body === null) { logger.warn('Skipping oversized S3 object (size cap exceeded while streaming)', { key }) - return null + return markSkipped( + objectToStub(ctx, { + key, + etag, + lastModified, + size: Number.isFinite(declaredLength) ? declaredLength : 0, + }), + sizeLimitSkipReason(MAX_FILE_SIZE) + ) } const content = body.toString('utf-8') if (!content.trim()) return null diff --git a/apps/sim/connectors/sharepoint/sharepoint.ts b/apps/sim/connectors/sharepoint/sharepoint.ts index 04b64b9a90e..cea98f4c731 100644 --- a/apps/sim/connectors/sharepoint/sharepoint.ts +++ b/apps/sim/connectors/sharepoint/sharepoint.ts @@ -3,7 +3,16 @@ import { getErrorMessage, toError } from '@sim/utils/errors' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import { sharepointConnectorMeta } from '@/connectors/sharepoint/meta' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { htmlToPlainText, parseTagDate } from '@/connectors/utils' +import { + CONNECTOR_MAX_FILE_BYTES, + ConnectorFileTooLargeError, + htmlToPlainText, + markSkipped, + parseTagDate, + readBodyWithLimit, + sizeLimitSkipReason, + stubOrSkipBySize, +} from '@/connectors/utils' const logger = createLogger('SharePointConnector') @@ -24,7 +33,7 @@ const SUPPORTED_TEXT_EXTENSIONS = new Set([ '.tsv', ]) -const MAX_DOWNLOAD_SIZE = 10 * 1024 * 1024 // 10 MB +const MAX_DOWNLOAD_SIZE = CONNECTOR_MAX_FILE_BYTES /** Microsoft Graph drive item shape (subset of fields we use). */ interface DriveItem { @@ -133,15 +142,14 @@ async function downloadFileContent( throw new Error(`Failed to download file "${fileName}" (${itemId}): ${response.status}`) } - const text = await response.text() - if (Buffer.byteLength(text, 'utf8') > MAX_DOWNLOAD_SIZE) { - logger.warn(`File "${fileName}" exceeds ${MAX_DOWNLOAD_SIZE} bytes, truncating`) - const buf = Buffer.from(text, 'utf8') - let end = MAX_DOWNLOAD_SIZE - while (end > 0 && (buf[end] & 0xc0) === 0x80) end-- - return buf.subarray(0, end).toString('utf8') + // Stream with a hard byte cap so a file with missing/under-reported listing + // size metadata is never fully buffered into memory. Oversized files are + // skipped (returned empty) rather than indexed as truncated partial content. + const buffer = await readBodyWithLimit(response, MAX_DOWNLOAD_SIZE) + if (!buffer) { + throw new ConnectorFileTooLargeError(MAX_DOWNLOAD_SIZE) } - return text + return buffer.toString('utf8') } /** @@ -341,11 +349,8 @@ export const sharepointConnector: ConnectorConfig = { for (const item of data.value) { if (item.folder) { subfolders.push(item.id) - } else if ( - item.file && - isSupportedTextFile(item.name) && - (!item.size || item.size <= MAX_DOWNLOAD_SIZE) - ) { + } else if (item.file && isSupportedTextFile(item.name)) { + // Keep oversized files; they are surfaced as skipped (failed) docs below. files.push(item) } } @@ -357,7 +362,7 @@ export const sharepointConnector: ConnectorConfig = { const previouslyFetched = totalFetched for (const file of files) { if (maxFiles > 0 && previouslyFetched + documents.length >= maxFiles) break - documents.push(itemToStub(file, siteName)) + documents.push(stubOrSkipBySize(itemToStub(file, siteName), file.size, MAX_DOWNLOAD_SIZE)) } totalFetched += documents.length @@ -443,6 +448,13 @@ export const sharepointConnector: ConnectorConfig = { const stub = itemToStub(item, siteName ?? siteUrl) return { ...stub, content, contentDeferred: false } } catch (error) { + if (error instanceof ConnectorFileTooLargeError) { + logger.info('Skipping oversized SharePoint file', { fileId: item.id, name: item.name }) + return markSkipped( + itemToStub(item, siteName ?? siteUrl), + sizeLimitSkipReason(error.limitBytes) + ) + } logger.warn(`Failed to fetch content for file: ${item.name} (${item.id})`, { error: toError(error).message, }) diff --git a/apps/sim/connectors/types.ts b/apps/sim/connectors/types.ts index f25359d63fd..c012a7ae56f 100644 --- a/apps/sim/connectors/types.ts +++ b/apps/sim/connectors/types.ts @@ -28,6 +28,13 @@ export interface ExternalDocument { contentHash: string /** When true, content is empty and will be fetched via getDocument for new/changed docs only */ contentDeferred?: boolean + /** + * When set, the document was intentionally not indexed (e.g. it exceeds the + * connector's size limit). The sync engine records it as a `failed` document + * carrying this reason so it is visible in the knowledge base UI instead of + * being silently dropped. + */ + skippedReason?: string /** Additional source-specific metadata */ metadata?: Record } diff --git a/apps/sim/connectors/utils.test.ts b/apps/sim/connectors/utils.test.ts index 956f796cbd1..ff6017652f8 100644 --- a/apps/sim/connectors/utils.test.ts +++ b/apps/sim/connectors/utils.test.ts @@ -2,6 +2,7 @@ * @vitest-environment node */ import { describe, expect, it, vi } from 'vitest' +import type { ExternalDocument } from '@/connectors/types' vi.mock('@/components/icons', () => ({ JiraIcon: () => null, @@ -59,6 +60,12 @@ import { rootlyConnector } from '@/connectors/rootly/rootly' import { s3Connector } from '@/connectors/s3/s3' import { sentryConnector } from '@/connectors/sentry/sentry' import { typeformConnector } from '@/connectors/typeform/typeform' +import { + ConnectorFileTooLargeError, + markSkipped, + readBodyWithLimit, + sizeLimitSkipReason, +} from '@/connectors/utils' import { xConnector } from '@/connectors/x/x' import { youtubeConnector } from '@/connectors/youtube/youtube' @@ -1134,3 +1141,102 @@ describe('Rootly mapTags', () => { expect(result).toEqual({}) }) }) + +function streamResponse(chunks: Uint8Array[], onCancel?: () => void): Response { + let index = 0 + const stream = new ReadableStream({ + pull(controller) { + if (index < chunks.length) { + controller.enqueue(chunks[index++]) + } else { + controller.close() + } + }, + cancel() { + onCancel?.() + }, + }) + return new Response(stream) +} + +describe('readBodyWithLimit', () => { + it('returns the full buffer when the streamed body is within the cap', async () => { + const chunk = new Uint8Array(1024).fill(65) + const result = await readBodyWithLimit(streamResponse([chunk, chunk]), 4096) + expect(result).not.toBeNull() + expect(result?.byteLength).toBe(2048) + }) + + it('returns the buffer when the body is exactly at the cap', async () => { + const chunk = new Uint8Array(1024).fill(65) + const result = await readBodyWithLimit(streamResponse([chunk, chunk]), 2048) + expect(result?.byteLength).toBe(2048) + }) + + it('returns null and cancels the stream once the cap is exceeded', async () => { + const onCancel = vi.fn() + const chunk = new Uint8Array(1024).fill(65) + // Cap is 2048; the third 1KB chunk pushes the total to 3072 and trips the cap, + // so the remaining body is never buffered into memory. + const result = await readBodyWithLimit(streamResponse([chunk, chunk, chunk], onCancel), 2048) + expect(result).toBeNull() + expect(onCancel).toHaveBeenCalled() + }) + + it('enforces the cap on bodyless responses via the arrayBuffer fallback', async () => { + // double-cast-allowed: minimal response stub exercising the no-stream branch + const oversized = { + body: null, + arrayBuffer: async () => new Uint8Array(5000).buffer, + } as unknown as Response + expect(await readBodyWithLimit(oversized, 4096)).toBeNull() + + // double-cast-allowed: minimal response stub exercising the no-stream branch + const within = { + body: null, + arrayBuffer: async () => new Uint8Array(100).buffer, + } as unknown as Response + expect((await readBodyWithLimit(within, 4096))?.byteLength).toBe(100) + }) +}) + +describe('markSkipped', () => { + const stub: ExternalDocument = { + externalId: 'file-1', + title: 'big.csv', + content: 'should be cleared', + contentDeferred: true, + mimeType: 'text/csv', + sourceUrl: 'https://example.com/big.csv', + contentHash: 'hash-1', + metadata: { fileSize: 20_000_000, path: '/big.csv' }, + } + + it('clears content and flags the stub as skipped while preserving identity', () => { + const skipped = markSkipped(stub, sizeLimitSkipReason(10 * 1024 * 1024)) + expect(skipped.content).toBe('') + expect(skipped.contentDeferred).toBe(false) + expect(skipped.skippedReason).toBe('File exceeds the 10MB size limit and was not indexed') + // Identity/metadata preserved so change detection + tags still work. + expect(skipped.externalId).toBe('file-1') + expect(skipped.contentHash).toBe('hash-1') + expect(skipped.sourceUrl).toBe('https://example.com/big.csv') + expect(skipped.metadata).toEqual({ fileSize: 20_000_000, path: '/big.csv' }) + }) + + it('does not mutate the original stub', () => { + markSkipped(stub, 'too big') + expect(stub.content).toBe('should be cleared') + expect(stub.skippedReason).toBeUndefined() + }) +}) + +describe('ConnectorFileTooLargeError', () => { + it('carries the limit and is catchable by type', () => { + const error = new ConnectorFileTooLargeError(100 * 1024 * 1024) + expect(error).toBeInstanceOf(Error) + expect(error).toBeInstanceOf(ConnectorFileTooLargeError) + expect(error.limitBytes).toBe(100 * 1024 * 1024) + expect(error.message).toContain('100MB') + }) +}) diff --git a/apps/sim/connectors/utils.ts b/apps/sim/connectors/utils.ts index d30bc67017f..31015a7260b 100644 --- a/apps/sim/connectors/utils.ts +++ b/apps/sim/connectors/utils.ts @@ -1,4 +1,18 @@ import type { SecureFetchResponse } from '@/lib/core/security/input-validation.server' +import { MAX_FILE_SIZE as KB_DOCUMENT_MAX_BYTES } from '@/lib/uploads/utils/validation' +import type { ExternalDocument } from '@/connectors/types' + +/** + * Per-file size cap for knowledge base connector syncs. Aligned with the limit for + * manually uploaded KB documents (`MAX_FILE_SIZE` in `uploads/validation`) so a + * connector indexes the same files a user could add by hand — rather than the much + * lower proxy-derived 10 MB number that previously (and arbitrarily) applied here. + * + * Connector downloads are streamed against this cap via `readBodyWithLimit`, and + * files above it are surfaced as skipped (failed) documents instead of being dropped + * silently, so raising the limit stays memory-safe and visible. + */ +export const CONNECTOR_MAX_FILE_BYTES = KB_DOCUMENT_MAX_BYTES /** * Strips HTML tags from content and decodes common HTML entities. @@ -113,3 +127,48 @@ export async function readBodyWithLimit( } return Buffer.concat(chunks) } + +/** + * Marks a listed document stub as intentionally skipped — for example because it + * exceeds the connector's size limit. The sync engine records these as `failed` + * documents carrying `skippedReason`, so oversized files stay visible in the + * knowledge base UI instead of vanishing from the index silently. Reuses the + * connector's own stub so the externalId, contentHash, sourceUrl, and metadata + * (including fileSize) are preserved. + */ +export function markSkipped(stub: ExternalDocument, reason: string): ExternalDocument { + return { ...stub, content: '', contentDeferred: false, skippedReason: reason } +} + +/** Human-readable size-limit skip reason, e.g. "File exceeds the 10MB size limit". */ +export function sizeLimitSkipReason(maxBytes: number): string { + return `File exceeds the ${Math.round(maxBytes / (1024 * 1024))}MB size limit and was not indexed` +} + +/** + * Returns the listing stub as-is, or a skipped marker when its size exceeds the cap. + * Lets each connector express the listing-time size decision once instead of + * repeating the `size > max ? markSkipped(...) : stub` ternary (and building the stub + * twice). A missing/zero size is treated as within the cap (oversize is then caught + * at fetch time via `ConnectorFileTooLargeError`). + */ +export function stubOrSkipBySize( + stub: ExternalDocument, + size: number | undefined, + maxBytes: number +): ExternalDocument { + return size && size > maxBytes ? markSkipped(stub, sizeLimitSkipReason(maxBytes)) : stub +} + +/** + * Raised by a connector when a file exceeds its size cap mid-download — i.e. the + * listing did not report a size, so the limit is only discovered while streaming. + * `getDocument` catches it and returns a `markSkipped` document so the file surfaces + * as a failed row instead of being dropped silently. + */ +export class ConnectorFileTooLargeError extends Error { + constructor(readonly limitBytes: number) { + super(`File exceeds the ${Math.round(limitBytes / (1024 * 1024))}MB size limit`) + this.name = 'ConnectorFileTooLargeError' + } +} diff --git a/apps/sim/connectors/zoom/zoom.ts b/apps/sim/connectors/zoom/zoom.ts index 68dd44761c5..0ddaea6d9e3 100644 --- a/apps/sim/connectors/zoom/zoom.ts +++ b/apps/sim/connectors/zoom/zoom.ts @@ -2,7 +2,14 @@ import { createLogger } from '@sim/logger' import { getErrorMessage, toError } from '@sim/utils/errors' import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils' import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types' -import { parseTagDate } from '@/connectors/utils' +import { + CONNECTOR_MAX_FILE_BYTES, + markSkipped, + parseTagDate, + readBodyWithLimit, + sizeLimitSkipReason, + stubOrSkipBySize, +} from '@/connectors/utils' import { zoomConnectorMeta } from '@/connectors/zoom/meta' const logger = createLogger('ZoomConnector') @@ -208,6 +215,7 @@ function recordingToStub( duration: recording.duration, meetingDate: recording.start_time, topic: recording.topic, + fileSize: transcriptFile.file_size, }, } } @@ -309,7 +317,13 @@ export const zoomConnector: ConnectorConfig = { if (!meeting.uuid) continue const transcript = findTranscriptFile(meeting.recording_files) if (!transcript) continue - allDocuments.push(recordingToStub(meeting, transcript)) + allDocuments.push( + stubOrSkipBySize( + recordingToStub(meeting, transcript), + transcript.file_size, + CONNECTOR_MAX_FILE_BYTES + ) + ) } const prevFetched = (syncContext?.totalDocsFetched as number) ?? 0 @@ -386,7 +400,15 @@ export const zoomConnector: ConnectorConfig = { return null } - const vttText = await vttResponse.text() + const vttBuffer = await readBodyWithLimit(vttResponse, CONNECTOR_MAX_FILE_BYTES) + if (!vttBuffer) { + return markSkipped( + recordingToStub(recording, transcript), + sizeLimitSkipReason(CONNECTOR_MAX_FILE_BYTES) + ) + } + + const vttText = vttBuffer.toString('utf8') const transcriptText = parseVtt(vttText).trim() if (!transcriptText) return null diff --git a/apps/sim/executor/variables/resolver.test.ts b/apps/sim/executor/variables/resolver.test.ts index 84b3bf92d6a..82fe5a85ed9 100644 --- a/apps/sim/executor/variables/resolver.test.ts +++ b/apps/sim/executor/variables/resolver.test.ts @@ -1,7 +1,7 @@ /** * @vitest-environment node */ -import { describe, expect, it, vi } from 'vitest' +import { beforeEach, describe, expect, it, vi } from 'vitest' import { LARGE_ARRAY_MANIFEST_VERSION, type LargeArrayManifest, @@ -13,6 +13,13 @@ import { VariableResolver } from '@/executor/variables/resolver' import { navigatePathAsync } from '@/executor/variables/resolvers/reference-async.server' import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types' +const { mockStoreLargeValue } = vi.hoisted(() => ({ mockStoreLargeValue: vi.fn() })) + +vi.mock('@/lib/execution/payloads/store', () => ({ + storeLargeValue: mockStoreLargeValue, + materializeLargeValueRef: vi.fn(), +})) + function createBlock(id: string, name: string, type: string, params = {}): SerializedBlock { return { id, @@ -1170,3 +1177,142 @@ describe('VariableResolver function block inputs', () => { expect(result.contextVariables).toEqual({ __blockRef_0: 'hello world' }) }) }) + +describe('VariableResolver function context overflow offload', () => { + const REF_KEY = 'execution/workspace-1/workflow-1/execution-1/large-value-lv_ABCDEFGHIJKL.json' + + function createOffloadEnv(language: string, producerOutput: Record) { + const { block, ctx } = createResolver(language) + const producer = createBlock('producer', 'Producer', BlockType.API) + const state = new ExecutionState() + state.setBlockOutput('producer', producerOutput) + const workflow: SerializedWorkflow = { + version: '1', + blocks: [producer, block], + connections: [], + loops: {}, + parallels: {}, + } + const resolver = new VariableResolver(workflow, {}, state) + const durableCtx = { + ...ctx, + blockStates: state.getBlockStates(), + workspaceId: 'workspace-1', + workflowId: 'workflow-1', + executionId: 'execution-1', + largeValueKeys: [] as string[], + } as ExecutionContext + return { block, resolver, durableCtx } + } + + beforeEach(() => { + mockStoreLargeValue.mockReset() + mockStoreLargeValue.mockResolvedValue({ + __simLargeValueRef: true, + version: 1, + id: 'lv_ABCDEFGHIJKL', + kind: 'string', + size: 4 * 1024 * 1024, + key: REF_KEY, + executionId: 'execution-1', + }) + }) + + it('offloads an oversized inline value to a lazily-read large-value ref', async () => { + const big = 'x'.repeat(4 * 1024 * 1024) + const { block, resolver, durableCtx } = createOffloadEnv('javascript', { result: big }) + + const result = await resolver.resolveInputsForFunctionBlock( + durableCtx, + 'function', + { code: 'return ' }, + block + ) + + expect(mockStoreLargeValue).toHaveBeenCalledTimes(1) + expect(result.resolvedInputs.code).toBe( + 'return (await sim.values.read(globalThis["__blockRef_0"]))' + ) + expect(result.contextVariables.__blockRef_0).toMatchObject({ + __simLargeValueRef: true, + id: 'lv_ABCDEFGHIJKL', + }) + // The bulky value must not be inlined into either the request data or display source. + expect(result.displayInputs.code.length).toBeLessThan(1024) + // The route must be authorized to materialize the ref it is about to receive. + expect(durableCtx.largeValueKeys).toContain(REF_KEY) + }) + + it('offloads only the values that overflow the budget when several are merged', async () => { + // Each value's inline footprint (data + display ~= 2x) is ~4 MB. The first fits the + // ~6 MB budget and stays inline; the second overflows and is offloaded. + const half = 'y'.repeat(2 * 1024 * 1024) + const { block, resolver, durableCtx } = createOffloadEnv('javascript', { + first: half, + second: half, + }) + + const result = await resolver.resolveInputsForFunctionBlock( + durableCtx, + 'function', + { code: 'return [, ]' }, + block + ) + + // First value fits the budget and stays inline; the second overflows and is offloaded. + expect(mockStoreLargeValue).toHaveBeenCalledTimes(1) + expect(result.resolvedInputs.code).toBe( + 'return [globalThis["__blockRef_0"], (await sim.values.read(globalThis["__blockRef_1"]))]' + ) + expect(result.contextVariables.__blockRef_0).toBe(half) + expect(result.contextVariables.__blockRef_1).toMatchObject({ __simLargeValueRef: true }) + }) + + it('keeps small inline values inline without offloading', async () => { + const { block, resolver, durableCtx } = createOffloadEnv('javascript', { + result: 'hello world', + }) + + const result = await resolver.resolveInputsForFunctionBlock( + durableCtx, + 'function', + { code: 'return ' }, + block + ) + + expect(mockStoreLargeValue).not.toHaveBeenCalled() + expect(result.resolvedInputs.code).toBe('return globalThis["__blockRef_0"]') + expect(result.contextVariables).toEqual({ __blockRef_0: 'hello world' }) + }) + + it('does not offload when the execution context cannot persist durably', async () => { + const big = 'x'.repeat(4 * 1024 * 1024) + const { block, resolver, durableCtx } = createOffloadEnv('javascript', { result: big }) + durableCtx.executionId = undefined + + const result = await resolver.resolveInputsForFunctionBlock( + durableCtx, + 'function', + { code: 'return ' }, + block + ) + + expect(mockStoreLargeValue).not.toHaveBeenCalled() + expect(result.resolvedInputs.code).toBe('return globalThis["__blockRef_0"]') + }) + + it('does not offload for non-JavaScript runtimes that lack the read broker', async () => { + const big = 'x'.repeat(4 * 1024 * 1024) + const { block, resolver, durableCtx } = createOffloadEnv('python', { result: big }) + + const result = await resolver.resolveInputsForFunctionBlock( + durableCtx, + 'function', + { code: 'return ' }, + block + ) + + expect(mockStoreLargeValue).not.toHaveBeenCalled() + expect(result.resolvedInputs.code).toBe('return globals()["__blockRef_0"]') + }) +}) diff --git a/apps/sim/executor/variables/resolver.ts b/apps/sim/executor/variables/resolver.ts index 22739fc5bd1..2f330f23b05 100644 --- a/apps/sim/executor/variables/resolver.ts +++ b/apps/sim/executor/variables/resolver.ts @@ -1,11 +1,13 @@ import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { isUserFileWithMetadata } from '@/lib/core/utils/user-file' +import { mergeLargeValueKeys } from '@/lib/execution/payloads/access-keys' import { isLargeArrayManifest } from '@/lib/execution/payloads/large-array-manifest-metadata' import { containsLargeValueRef, getLargeValueMaterializationError, isLargeValueRef, + type LargeValueRef, } from '@/lib/execution/payloads/large-value-ref' import { isLikelyReferenceSegment } from '@/lib/workflows/sanitization/references' import { BlockType, parseReferencePath, REFERENCE } from '@/executor/constants' @@ -32,6 +34,38 @@ export const FUNCTION_BLOCK_DISPLAY_CODE_KEY = '_runtimeDisplayCode' const logger = createLogger('VariableResolver') +/** + * Combined inline budget (data + display source) for a function block's resolved + * block-output context values. Internal routes cap request bodies at ~10 MB, and a + * resolved block reference is serialized into the function request both as data + * (`contextVariables`) and as a literal in the display source, so it costs roughly + * twice its size. Keeping the inline footprint under this budget leaves headroom for + * the code, params, and environment variables in the same body. Values that would + * overflow the budget are offloaded to durable large-value refs and lazily re-read in + * the sandbox via the `sim.values.read` broker. + */ +const FUNCTION_CONTEXT_INLINE_BUDGET_BYTES = 6 * 1024 * 1024 + +interface FunctionContextOffloadState { + inlineFootprintRemaining: number +} + +function createFunctionContextOffloadState(): FunctionContextOffloadState { + return { inlineFootprintRemaining: FUNCTION_CONTEXT_INLINE_BUDGET_BYTES } +} + +function measureJson(value: unknown): { json: string; size: number } | null { + try { + const json = JSON.stringify(value) + if (json === undefined) { + return null + } + return { json, size: Buffer.byteLength(json, 'utf8') } + } catch { + return null + } +} + function getNestedLargeValueMaterializationError(): Error { return new Error( 'This execution value contains nested large values. Reference the nested field directly so it can be lazy-loaded.' @@ -129,6 +163,7 @@ export class VariableResolver { const contextVariables: Record = {} const resolved: Record = {} const display: Record = {} + const offloadState = createFunctionContextOffloadState() if (!params) { return { resolvedInputs: resolved, displayInputs: display, contextVariables } @@ -143,7 +178,8 @@ export class VariableResolver { value, undefined, block, - contextVariables + contextVariables, + offloadState ) resolved[key] = code.resolvedCode display[key] = code.displayCode @@ -158,7 +194,8 @@ export class VariableResolver { item.content, undefined, block, - contextVariables + contextVariables, + offloadState ) resolvedItems.push({ ...item, @@ -339,7 +376,8 @@ export class VariableResolver { template: string, loopScope: LoopScope | undefined, block: SerializedBlock, - contextVarAccumulator: Record + contextVarAccumulator: Record, + offloadState: FunctionContextOffloadState = createFunctionContextOffloadState() ): Promise<{ resolvedCode: string; displayCode: string }> { const resolutionContext: ResolutionContext = { executionContext: ctx, @@ -388,8 +426,12 @@ export class VariableResolver { // Block output: store in contextVarAccumulator and replace the reference // with language-specific runtime access to that stored value. const varName = `__blockRef_${Object.keys(contextVarAccumulator).length}` - contextVarAccumulator[varName] = effectiveValue let replacement: string + // `storedValue` is placed in contextVarAccumulator and `displayValue` is + // rendered into the display source. They diverge only when an oversized + // inline value is offloaded to a ref (both then carry the small ref). + let storedValue: unknown = effectiveValue + let displayValue: unknown = effectiveValue if (isLargeValueRef(effectiveValue)) { const lazyReplacement = this.formatLazyLargeValueReference( varName, @@ -415,16 +457,40 @@ export class VariableResolver { } else if (containsLargeValueRef(effectiveValue)) { throw getNestedLargeValueMaterializationError() } else { - replacement = this.formatContextVariableReference( - varName, + const offloadedRef = await this.maybeOffloadInlineFunctionContextValue( + ctx, + effectiveValue, language, template, - index, - effectiveValue + offloadState ) + if (offloadedRef) { + storedValue = offloadedRef + displayValue = offloadedRef + // maybeOffload only returns a ref when the JS runtime helpers are usable — + // the same guard formatLazyLargeValueReference needs — so it is non-null here. + replacement = + this.formatLazyLargeValueReference(varName, language, template, index) ?? + this.formatContextVariableReference( + varName, + language, + template, + index, + effectiveValue + ) + } else { + replacement = this.formatContextVariableReference( + varName, + language, + template, + index, + effectiveValue + ) + } } + contextVarAccumulator[varName] = storedValue displayResult += this.formatDisplayValueForCodeContext( - effectiveValue, + displayValue, language, template, index @@ -577,6 +643,68 @@ export class VariableResolver { } } + /** + * Offloads an inline function block-output value to a durable large-value ref when + * keeping it inline would push the function execution request body past its budget. + * + * A few medium values merged in one function block (for example two fetched images) + * can exceed the ~10 MB internal-route body cap even though no single value crosses + * the per-value large-value threshold. Offloading the overflowing values lets the + * function runtime lazily re-read them via the `sim.values.read` broker instead of + * inlining their bytes into the request. + * + * Returns the stored reference when offloaded, or `null` to keep the value inline. + */ + private async maybeOffloadInlineFunctionContextValue( + ctx: ExecutionContext, + value: unknown, + language: string | undefined, + template: string, + offloadState: FunctionContextOffloadState + ): Promise { + // Lazy re-reading is only available in the JavaScript isolated-vm runtime; other + // runtimes have no broker to materialize a ref, so the value must stay inline. + if (!this.canUseJavaScriptRuntimeHelpers(language, template)) { + return null + } + if (!ctx.workspaceId || !ctx.workflowId || !ctx.executionId) { + return null + } + + const measured = measureJson(value) + if (measured === null) { + return null + } + + // Inline values are serialized into both the request data and the display source. + const footprint = measured.size * 2 + if (footprint <= offloadState.inlineFootprintRemaining) { + offloadState.inlineFootprintRemaining -= footprint + return null + } + + try { + const { storeLargeValue } = await import('@/lib/execution/payloads/store') + const ref = await storeLargeValue(value, measured.json, measured.size, { + workspaceId: ctx.workspaceId, + workflowId: ctx.workflowId, + executionId: ctx.executionId, + userId: ctx.userId, + requireDurable: true, + }) + // Authorize the function route to materialize the ref it is about to receive. + if (ref.key) { + mergeLargeValueKeys(ctx, [ref.key]) + } + return ref + } catch (error) { + logger.warn('Failed to offload oversized function context value; keeping inline', { + error: toError(error).message, + }) + return null + } + } + private formatLazyLargeValueReference( varName: string, language: string | undefined, diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.test.ts b/apps/sim/lib/knowledge/connectors/sync-engine.test.ts index b0de83c82be..56baf974ef9 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.test.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.test.ts @@ -179,3 +179,111 @@ describe('resolveTagMapping', () => { expect(result).toBeUndefined() }) }) + +describe('classifyExternalDoc', () => { + const base = { content: 'hello', contentDeferred: false, contentHash: 'h1' } + + it('records a new skipped file as a failed row', async () => { + const { classifyExternalDoc } = await import('@/lib/knowledge/connectors/sync-engine') + expect( + classifyExternalDoc({ ...base, content: '', skippedReason: 'too big' }, undefined) + ).toEqual({ type: 'skip' }) + }) + + it('keeps an already-indexed file as-is when it becomes skipped (last-known-good)', async () => { + const { classifyExternalDoc } = await import('@/lib/knowledge/connectors/sync-engine') + expect( + classifyExternalDoc( + { ...base, content: '', skippedReason: 'too big' }, + { + id: 'doc-1', + contentHash: 'old', + } + ) + ).toEqual({ type: 'unchanged' }) + }) + + it('drops empty non-deferred content', async () => { + const { classifyExternalDoc } = await import('@/lib/knowledge/connectors/sync-engine') + expect(classifyExternalDoc({ ...base, content: ' ' }, undefined)).toEqual({ type: 'drop' }) + }) + + it('adds new content and deferred stubs', async () => { + const { classifyExternalDoc } = await import('@/lib/knowledge/connectors/sync-engine') + expect(classifyExternalDoc(base, undefined)).toEqual({ type: 'add' }) + expect(classifyExternalDoc({ ...base, content: '', contentDeferred: true }, undefined)).toEqual( + { type: 'add' } + ) + }) + + it('updates when the content hash changed and is unchanged otherwise', async () => { + const { classifyExternalDoc } = await import('@/lib/knowledge/connectors/sync-engine') + expect(classifyExternalDoc(base, { id: 'doc-1', contentHash: 'old' })).toEqual({ + type: 'update', + existingId: 'doc-1', + }) + expect(classifyExternalDoc(base, { id: 'doc-1', contentHash: 'h1' })).toEqual({ + type: 'unchanged', + }) + }) +}) + +describe('chunkOpsByByteBudget', () => { + const MB = 1024 * 1024 + const addOp = (sizeBytes?: number) => ({ + type: 'add' as const, + extDoc: { + externalId: `e-${Math.random()}`, + title: 'f', + content: 'x', + contentHash: 'h', + mimeType: 'text/plain', + ...(sizeBytes != null ? { metadata: { fileSize: sizeBytes } } : {}), + }, + }) + const skipOp = (sizeBytes: number) => ({ + type: 'skip' as const, + extDoc: { + externalId: `s-${Math.random()}`, + title: 'f', + content: '', + contentHash: 'h', + mimeType: 'text/plain', + skippedReason: 'too big', + metadata: { fileSize: sizeBytes }, + }, + }) + + it('batches small ops up to the count cap', async () => { + const { chunkOpsByByteBudget } = await import('@/lib/knowledge/connectors/sync-engine') + const chunks = chunkOpsByByteBudget( + Array.from({ length: 7 }, () => addOp(1024)), + 64 * MB, + 5 + ) + expect(chunks.map((c) => c.length)).toEqual([5, 2]) + }) + + it('isolates a file larger than the budget into its own chunk', async () => { + const { chunkOpsByByteBudget } = await import('@/lib/knowledge/connectors/sync-engine') + const chunks = chunkOpsByByteBudget([addOp(100 * MB), addOp(1024)], 64 * MB, 5) + expect(chunks.map((c) => c.length)).toEqual([1, 1]) + }) + + it('caps summed bytes per chunk for medium files', async () => { + const { chunkOpsByByteBudget } = await import('@/lib/knowledge/connectors/sync-engine') + // 40 + 40 = 80 MB exceeds the 64 MB budget, so they split. + const chunks = chunkOpsByByteBudget([addOp(40 * MB), addOp(40 * MB)], 64 * MB, 5) + expect(chunks.map((c) => c.length)).toEqual([1, 1]) + }) + + it('treats skip ops as zero bytes so they do not consume the budget', async () => { + const { chunkOpsByByteBudget } = await import('@/lib/knowledge/connectors/sync-engine') + const chunks = chunkOpsByByteBudget( + [skipOp(100 * MB), skipOp(100 * MB), addOp(1024)], + 64 * MB, + 5 + ) + expect(chunks).toHaveLength(1) + }) +}) diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts index f74f21c6794..e7de809ac30 100644 --- a/apps/sim/lib/knowledge/connectors/sync-engine.ts +++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts @@ -10,7 +10,7 @@ import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' import { randomInt } from '@sim/utils/random' -import { and, eq, gt, inArray, isNull, lt, ne, or, sql } from 'drizzle-orm' +import { and, eq, gt, inArray, isNotNull, isNull, lt, ne, or, sql } from 'drizzle-orm' import { decryptApiKey } from '@/lib/api-key/crypto' import { getInternalApiBaseUrl } from '@/lib/core/utils/urls' import type { DocumentData } from '@/lib/knowledge/documents/service' @@ -43,6 +43,16 @@ class ConnectorDeletedException extends Error { } const SYNC_BATCH_SIZE = 5 +/** Estimated source bytes for a doc whose listing did not report a size. */ +const DEFAULT_OP_SIZE_BYTES = 4 * 1024 * 1024 +/** + * Max summed source bytes hydrated/uploaded concurrently within a batch. Each + * in-flight file materializes as a content string plus an upload buffer, so this + * bounds peak worker memory: a few large files near the per-file cap are processed + * in smaller sub-chunks instead of all at once, while small files still process up + * to SYNC_BATCH_SIZE at a time. + */ +const CONTENT_INFLIGHT_BUDGET_BYTES = 64 * 1024 * 1024 const MAX_PAGES = 500 const MAX_SAFE_TITLE_LENGTH = 200 const STALE_PROCESSING_MINUTES = 45 @@ -58,6 +68,82 @@ type KnowledgeBaseLockingTx = Pick type DocOp = | { type: 'add'; extDoc: ExternalDocument } | { type: 'update'; existingId: string; extDoc: ExternalDocument } + | { type: 'skip'; extDoc: ExternalDocument } + +type DocClassification = + | { type: 'add' } + | { type: 'update'; existingId: string } + | { type: 'skip' } + | { type: 'unchanged' } + | { type: 'drop' } + +/** + * Decides what a listed external document becomes during reconciliation. + * + * - `skip`: connector flagged it (e.g. too large) and it is not already indexed — + * record a visible `failed` document instead of dropping it silently. A file that + * is already indexed is kept as-is (last-known-good) rather than downgraded. + * - `drop`: empty, non-deferred content that cannot be indexed. + * - `add` / `update` / `unchanged`: normal content reconciliation by content hash. + */ +export function classifyExternalDoc( + extDoc: Pick, + existing: { id: string; contentHash: string | null } | undefined +): DocClassification { + if (extDoc.skippedReason) { + return existing ? { type: 'unchanged' } : { type: 'skip' } + } + if (!extDoc.content.trim() && !extDoc.contentDeferred) { + return { type: 'drop' } + } + if (!existing) { + return { type: 'add' } + } + if (existing.contentHash !== extDoc.contentHash) { + return { type: 'update', existingId: existing.id } + } + return { type: 'unchanged' } +} + +/** Estimated source bytes for a pending op, taken from its listing metadata. */ +function estimateOpSizeBytes(op: DocOp): number { + // Skip ops load no content (just a row insert), so they do not count against the + // in-flight content budget. + if (op.type === 'skip') return 0 + const size = op.extDoc.metadata?.fileSize ?? op.extDoc.metadata?.size + return typeof size === 'number' && Number.isFinite(size) && size > 0 + ? size + : DEFAULT_OP_SIZE_BYTES +} + +/** + * Splits content ops into sub-chunks bounded by both a count (maxCount) and a summed + * byte budget, so large files are hydrated/uploaded a few at a time. A single op + * larger than the budget still forms its own chunk (always >= 1 op per chunk). + */ +export function chunkOpsByByteBudget( + ops: DocOp[], + budgetBytes: number, + maxCount: number +): DocOp[][] { + const chunks: DocOp[][] = [] + let current: DocOp[] = [] + let currentBytes = 0 + for (const op of ops) { + const bytes = estimateOpSizeBytes(op) + if (current.length > 0 && (current.length >= maxCount || currentBytes + bytes > budgetBytes)) { + chunks.push(current) + current = [] + currentBytes = 0 + } + current.push(op) + currentBytes += bytes + } + if (current.length > 0) { + chunks.push(current) + } + return chunks +} /** Single-roundtrip liveness check used between batches. */ async function checkSyncLiveness( @@ -532,25 +618,34 @@ export async function executeSync( continue } - if (!extDoc.content.trim() && !extDoc.contentDeferred) { - logger.info(`Skipping empty document: ${extDoc.title}`, { - externalId: extDoc.externalId, - }) - continue - } - const existing = existingByExternalId.get(extDoc.externalId) - - if (!existing) { - pendingOps.push({ type: 'add', extDoc }) - } else if (existing.contentHash !== extDoc.contentHash) { - pendingOps.push({ type: 'update', existingId: existing.id, extDoc }) - } else { - result.docsUnchanged++ + const classification = classifyExternalDoc(extDoc, existing) + + switch (classification.type) { + case 'skip': + pendingOps.push({ type: 'skip', extDoc }) + break + case 'drop': + logger.info(`Skipping empty document: ${extDoc.title}`, { + externalId: extDoc.externalId, + }) + break + case 'add': + pendingOps.push({ type: 'add', extDoc }) + break + case 'update': + pendingOps.push({ type: 'update', existingId: classification.existingId, extDoc }) + break + case 'unchanged': + result.docsUnchanged++ + break } } - for (let i = 0; i < pendingOps.length; i += SYNC_BATCH_SIZE) { + // Batch by both count and summed content bytes so a few large files near the + // per-file cap never hydrate/upload together and exhaust the worker heap. + const batches = chunkOpsByByteBudget(pendingOps, CONTENT_INFLIGHT_BUDGET_BYTES, SYNC_BATCH_SIZE) + for (const rawBatch of batches) { const liveness = await checkSyncLiveness(connectorId, connector.knowledgeBaseId) if (liveness.connectorDeleted) { throw new ConnectorDeletedException(connectorId) @@ -559,10 +654,16 @@ export async function executeSync( throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`) } - const rawBatch = pendingOps.slice(i, i + SYNC_BATCH_SIZE) + // Oversized/skipped docs become visible `failed` rows (never silent). They are + // flagged either at listing time (skip ops here) or discovered only at fetch + // time during hydration below; both are collected and persisted after hydration. + const skipExtDocs: ExternalDocument[] = rawBatch + .filter((op) => op.type === 'skip') + .map((op) => op.extDoc) - const deferredOps = rawBatch.filter((op) => op.extDoc.contentDeferred) - const readyOps = rawBatch.filter((op) => !op.extDoc.contentDeferred) + const contentOps = rawBatch.filter((op) => op.type !== 'skip') + const deferredOps = contentOps.filter((op) => op.extDoc.contentDeferred) + const readyOps = contentOps.filter((op) => !op.extDoc.contentDeferred) if (deferredOps.length > 0) { if (connectorConfig.auth.mode === 'oauth') { @@ -577,6 +678,20 @@ export async function executeSync( op.extDoc.externalId, syncContext ) + // A connector may only learn a file is too large at fetch time (its + // listing has no size). Surface that as a failed row for new files; keep + // already-indexed files as last-known-good rather than downgrading them. + if (fullDoc?.skippedReason) { + if (op.type === 'add') { + skipExtDocs.push({ + ...op.extDoc, + skippedReason: fullDoc.skippedReason, + contentHash: fullDoc.contentHash ?? op.extDoc.contentHash, + metadata: { ...op.extDoc.metadata, ...fullDoc.metadata }, + }) + } + return null + } if (!fullDoc?.content.trim()) return null const hydratedHash = fullDoc.contentHash ?? op.extDoc.contentHash if ( @@ -615,6 +730,27 @@ export async function executeSync( } } + // Record all skipped (oversized) docs in this batch in one bulk insert. + if (skipExtDocs.length > 0) { + try { + const recorded = await skipDocuments( + connector.knowledgeBaseId, + connectorId, + connector.connectorType, + skipExtDocs, + sourceConfig + ) + result.docsFailed += recorded + } catch (error) { + result.docsFailed += skipExtDocs.length + logger.error('Failed to record skipped documents', { + connectorId, + count: skipExtDocs.length, + error: toError(error).message, + }) + } + } + const batch = readyOps const settled = await Promise.allSettled( @@ -734,6 +870,9 @@ export async function executeSync( lt(document.uploadedAt, syncStartedAt), gt(document.uploadedAt, retryCutoff), eq(document.userExcluded, false), + // Skipped (oversized) docs are recorded as content-less failed rows with no + // storage key; they cannot be reprocessed, so exclude them from retry. + isNotNull(document.storageKey), isNull(document.archivedAt), isNull(document.deletedAt) ) @@ -941,6 +1080,82 @@ function kbOwnershipMetadata( : undefined } +/** Builds a content-less `failed` document row for a skipped (e.g. oversized) file. */ +function buildSkippedDocumentRow( + knowledgeBaseId: string, + connectorId: string, + connectorType: string, + extDoc: ExternalDocument, + sourceConfig?: Record +) { + const reason = extDoc.skippedReason ?? 'Document was skipped during sync' + const tagValues = extDoc.metadata + ? resolveTagMapping(connectorType, extDoc.metadata, sourceConfig) + : undefined + // Connectors put the source size under either `fileSize` or `size`; accept both + // so the skipped failed row shows the real size instead of 0. + const rawSize = extDoc.metadata?.fileSize ?? extDoc.metadata?.size + const fileSize = + typeof rawSize === 'number' && Number.isFinite(rawSize) ? Math.max(0, Math.trunc(rawSize)) : 0 + + return { + id: generateId(), + knowledgeBaseId, + filename: extDoc.title, + fileUrl: '', + storageKey: null, + fileSize, + mimeType: 'text/plain', + processingStatus: 'failed', + processingError: reason, + enabled: true, + connectorId, + externalId: extDoc.externalId, + contentHash: extDoc.contentHash, + sourceUrl: extDoc.sourceUrl ?? null, + ...tagValues, + uploadedAt: new Date(), + } +} + +/** + * Records source files that were intentionally not indexed (e.g. they exceed the + * connector's size limit) as content-less `failed` documents in a single bulk insert. + * This keeps the files visible in the knowledge base UI — with `processingError` + * explaining why — instead of silently dropping them. The rows have no storage key, + * so they are excluded from the stuck-document retry sweep (nothing to reprocess). + * + * Only called for files not already indexed; previously-indexed files that later + * exceed the limit are kept as-is (last-known-good) by `classifyExternalDoc`. + * + * Returns the number of rows recorded. + */ +async function skipDocuments( + knowledgeBaseId: string, + connectorId: string, + connectorType: string, + extDocs: ExternalDocument[], + sourceConfig?: Record +): Promise { + if (extDocs.length === 0) { + return 0 + } + const rows = extDocs.map((extDoc) => + buildSkippedDocumentRow(knowledgeBaseId, connectorId, connectorType, extDoc, sourceConfig) + ) + + await db.transaction(async (tx) => { + const isActive = await isKnowledgeBaseActiveInTx(tx, knowledgeBaseId) + if (!isActive) { + throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`) + } + + await tx.insert(document).values(rows) + }) + + return rows.length +} + /** * Upload content to storage as a .txt file, create a document record, * and trigger processing via the existing pipeline.