feat(tables): background import for large CSVs with live progress#4861
feat(tables): background import for large CSVs with live progress#4861TheodoreSpeaks wants to merge 3 commits into
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
PR SummaryHigh Risk Overview DB/UI: New table import columns and lifecycle ( Also: copilot Reviewed by Cursor Bugbot for commit 9284acc. Bugbot is set up for automated code reviews on this repo. Configure here. |
|
@greptile review |
Greptile SummaryThis PR adds async background CSV/TSV import for large files, routing them direct-to-storage while a detached worker streams, infers schema, and bulk-inserts in committed batches — avoiding request/ALB timeouts. It also replaces per-row
Confidence Score: 4/5Safe to merge for create and replace async imports; the append async path produces rows with wrong sort positions until the one-line fix is applied. The async import worker initialises apps/sim/lib/table/import-runner.ts — the Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant Storage
participant KickoffRoute as POST /import-async
participant Worker as runTableImport (detached)
participant DB
participant SSE as SSE stream
Client->>Storage: PUT file (direct-to-storage upload)
Storage-->>Client: fileKey
Client->>KickoffRoute: "POST { workspaceId, fileKey, fileName, mode }"
KickoffRoute->>DB: markTableImporting(tableId, importId)
KickoffRoute-->>Client: "200 { tableId, importId }"
KickoffRoute--)Worker: runDetached("table-import", ...)
Worker->>DB: getTableById(tableId)
alt replace mode
Worker->>DB: deleteAllTableRows(tableId)
end
Worker->>Storage: downloadFile(fileKey)
Storage-->>Worker: Buffer
loop for each batch (CSV_MAX_BATCH_SIZE rows)
Worker->>DB: "bulkInsertImportBatch(startPosition=inserted)"
Worker->>DB: updateImportProgress(rows)
Worker->>SSE: appendTableEvent(importing, progress)
end
alt success
Worker->>DB: markImportReady(tableId)
Worker->>SSE: appendTableEvent(ready)
else failure
Worker->>DB: markImportFailed(tableId, error)
Worker->>SSE: appendTableEvent(failed)
end
Client->>SSE: "EventSource /api/table/{tableId}/events/stream"
SSE-->>Client: import progress ticks → ready / failed
Reviews (3): Last reviewed commit: "Merge remote-tracking branch 'origin/sta..." | Re-trigger Greptile |
| * for `create`, mapping onto the existing schema for `append`/`replace`), then bulk-inserts | ||
| * in committed batches — **no rollback**: committed batches persist even if a later batch | ||
| * fails. Progress and the terminal state are surfaced via the table-events SSE stream. | ||
| */ | ||
| export async function runTableImport(payload: TableImportPayload): Promise<void> { | ||
| const { importId, tableId, workspaceId, userId, fileKey, fileName, delimiter, mode } = payload |
There was a problem hiding this comment.
Uploaded CSV file is never deleted after import
downloadFile({ key: fileKey, context: 'workspace' }) fetches the file from workspace storage, but there is no deleteFile call in either the success or the catch path. Every async import permanently retains the source CSV/TSV in the workspace storage bucket. Over many imports this becomes a non-trivial storage accumulation with no cleanup mechanism.
Greptile SummaryThis PR adds async background CSV/TSV import for large files (≥ 8 MB): the client uploads directly to storage, two new kickoff routes create a placeholder table (or mark an existing one as
Confidence Score: 3/5Two correctness issues need attention before shipping: a potential silent data loss in replace-mode async imports, and a liveness-check gap that could cause legitimate long-running imports to be terminated by the cron cleaner. In replace-mode async imports, apps/sim/lib/table/import-runner.ts (replace-mode row deletion order) and apps/sim/lib/table/service.ts (updateImportProgress missing updatedAt) Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant Storage
participant KickoffRoute as POST /import-async
participant BG as runTableImport (detached)
participant DB
Client->>Storage: upload file (direct-to-storage)
Storage-->>Client: fileKey
Client->>KickoffRoute: "POST {workspaceId, fileKey, fileName}"
KickoffRoute->>DB: createTable / markTableImporting
KickoffRoute->>BG: runDetached(runTableImport)
KickoffRoute-->>Client: "{tableId, importId}"
BG->>DB: deleteAllTableRows (replace mode only)
BG->>Storage: downloadFile(fileKey)
Storage-->>BG: buffer
BG->>DB: "appendTableEvent(importing, progress=0, total)"
loop each batch (1000 rows)
BG->>DB: bulkInsertImportBatch
BG->>DB: "appendTableEvent(importing, progress=N)"
end
BG->>DB: markImportReady / markImportFailed
BG->>DB: "appendTableEvent(ready | failed)"
DB-->>Client: SSE stream to ImportProgressMenu
Reviews (2): Last reviewed commit: "feat(tables): background import for larg..." | Re-trigger Greptile |
| if (mode === 'replace') await deleteAllTableRows(tableId) | ||
|
|
||
| const buffer = await downloadFile({ key: fileKey, context: 'workspace' }) | ||
|
|
There was a problem hiding this comment.
Replace-mode deletes rows before the download succeeds
For replace mode, deleteAllTableRows runs before downloadFile. If the storage download fails (network hiccup, key mismatch, storage outage), all original rows are permanently gone — markImportFailed will correctly set the status to failed, but there is no rollback for the deleted data. The user must re-upload from scratch, and any rows not preserved client-side are unrecoverable.
The safer sequence is: download the file first, resolve the schema/mapping from the sample, then delete the existing rows, then stream-insert. That way a download or schema-inference failure leaves the original data intact.
…/empty validation
# Conflicts: # apps/sim/app/api/table/[tableId]/import/route.ts # apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts # apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx # apps/sim/lib/table/events.ts # packages/db/migrations/meta/0222_snapshot.json # packages/db/migrations/meta/_journal.json # scripts/check-api-validation-contracts.ts
|
Addressed Bugbot review + synced with staging:
@greptile review |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 4 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 9284acc. Configure here.
| } | ||
|
|
||
| let createColumns: string[] | undefined | ||
| if (rawCreateColumns) { |
There was a problem hiding this comment.
Sync import during background
High Severity
The synchronous multipart import never rejects tables with importStatus importing. A user can run an in-request append/replace while a detached async worker is inserting the same table, mixing position strategies and corrupting rows.
Reviewed by Cursor Bugbot for commit 9284acc. Configure here.
|
|
||
| const { file, workspaceId } = formValidation.data | ||
|
|
||
| const rawMode = fields.mode ?? 'append' |
There was a problem hiding this comment.
Mode field after file ignored
Medium Severity
Streaming multipart only requires workspaceId before the file. mode is read immediately after parse; if it appears after the file part, it is missing and the import silently defaults to append instead of the intended replace.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 9284acc. Configure here.
| for (let i = 0; i < buffer.length; i++) { | ||
| if (buffer[i] === 0x0a) newlineCount++ | ||
| } | ||
| const estimatedTotal = Math.max(0, newlineCount - 1) |
There was a problem hiding this comment.
Async worker buffers whole file
High Severity
The background import worker loads the entire uploaded object into a single Buffer before parsing. Direct-to-storage uploads allow very large workspace files, so a big CSV can exhaust web-container memory and crash the pod mid-import.
Reviewed by Cursor Bugbot for commit 9284acc. Configure here.
|
|
||
| const parsed = await parseRequest(importTableAsyncContract, request, {}) | ||
| if (!parsed.success) return parsed.response | ||
| const { workspaceId, fileKey, fileName } = parsed.data.body |
There was a problem hiding this comment.
fileKey lacks workspace check
Medium Severity
Async import kickoff accepts any fileKey string after workspace write auth and passes it to downloadFile without verifying the key is scoped to that workspace (e.g. workspace/{workspaceId}/…). A caller could import another workspace’s object into their table.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 9284acc. Configure here.


Summary
user_table_rowscount triggers with statement-level triggers (transition tables) so bulk insert/delete no longer serialize per row (migration 0222).missing final boundaryon CSV upload by streaming multipart with busboy instead ofrequest.formData().ProgressItememcn component + a header indicator, driven by SSE events; uploading → processing → done/failed stages defined programmatically.materialize_fileoperation: 'table'+ fail-fast guard for unimplemented ops.Type of Change
Testing
Tested manually: imported 10MB and ~1M-row CSVs from the list and in-table; verified upload→processing→done progress, refresh persistence, and failure handling.
bun run lintclean;bun run check:api-validation:strictpasses.Checklist