Skip to content

feat(tables): background import for large CSVs with live progress#4861

Open
TheodoreSpeaks wants to merge 3 commits into
stagingfrom
feat/1-million-table
Open

feat(tables): background import for large CSVs with live progress#4861
TheodoreSpeaks wants to merge 3 commits into
stagingfrom
feat/1-million-table

Conversation

@TheodoreSpeaks
Copy link
Copy Markdown
Collaborator

Summary

  • Add async background CSV/TSV import for large files (~1M rows): client uploads direct-to-storage, a detached worker streams the file, infers/maps the schema, and bulk-inserts in committed batches — no request/ALB timeout.
  • Replace per-row user_table_rows count triggers with statement-level triggers (transition tables) so bulk insert/delete no longer serialize per row (migration 0222).
  • Fix missing final boundary on CSV upload by streaming multipart with busboy instead of request.formData().
  • Surface import progress via a reusable ProgressItem emcn component + a header indicator, driven by SSE events; uploading → processing → done/failed stages defined programmatically.
  • Emit import state from the GET table routes so the indicator survives refresh; auto-uniquify the imported table name on collision.
  • Implement copilot materialize_file operation: 'table' + fail-fast guard for unimplemented ops.

Type of Change

  • New feature

Testing

Tested manually: imported 10MB and ~1M-row CSVs from the list and in-table; verified upload→processing→done progress, refresh persistence, and failure handling. bun run lint clean; bun run check:api-validation:strict passes.

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@vercel
Copy link
Copy Markdown

vercel Bot commented Jun 3, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
docs Ready Ready Preview, Comment Jun 3, 2026 7:55am

Request Review

@cursor
Copy link
Copy Markdown

cursor Bot commented Jun 3, 2026

PR Summary

High Risk
Touches bulk row insertion, DB triggers, and long-running detached workers on web pods; partial imports persist on failure and stale-import cleanup must stay aligned with progress heartbeats.

Overview
Adds background CSV/TSV import for large files: clients upload to workspace storage, kick off async routes, and a detached runTableImport worker streams the file, infers/maps schema, and bulk-inserts in committed batches (no full-request buffering, no per-row workflow triggers). Synchronous import routes now use busboy streaming multipart (readMultipart), createCsvParser, and a 10MB proxy cap; files at/above 8MB use the async path with direct upload.

DB/UI: New table import columns and lifecycle (importing / ready / failed); SSE import events and a header import progress tray (ProgressItem, hydration from list API). Migration 0224 replaces per-row user_table_rows count triggers with statement-level triggers for bulk performance. Cron stale-import cleanup marks stuck importing tables failed.

Also: copilot materialize_file table operation, shared parseFileRows, and API tests for async/sync import paths.

Reviewed by Cursor Bugbot for commit 9284acc. Bugbot is set up for automated code reviews on this repo. Configure here.

@TheodoreSpeaks
Copy link
Copy Markdown
Collaborator Author

@greptile review

Comment thread apps/sim/lib/table/service.ts
Comment thread apps/sim/lib/table/import-runner.ts
Comment thread apps/sim/app/api/table/[tableId]/import-async/route.ts
Comment thread apps/sim/lib/table/import-runner.ts
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 3, 2026

Greptile Summary

This PR adds async background CSV/TSV import for large files, routing them direct-to-storage while a detached worker streams, infers schema, and bulk-inserts in committed batches — avoiding request/ALB timeouts. It also replaces per-row row_count triggers with statement-level transition-table triggers, fixes the missing final boundary multipart bug via busboy, surfaces import progress through a new ProgressItem SSE component, and wires the materialize_file copilot tool's table operation.

  • lib/table/import-runner.ts: new detached worker; handles create, append, and replace modes with progress heartbeats and terminal ready/failed SSE events.
  • lib/core/utils/multipart.ts: new busboy-backed streaming parser replacing request.formData(); both sync import routes migrate to it with correct field-before-file ordering enforced on the client.
  • packages/db/migrations/0224_table_import_columns.sql: adds five import-state columns and upgrades row_count maintenance to AFTER-statement triggers using transition tables.

Confidence Score: 4/5

Safe to merge for create and replace async imports; the append async path produces rows with wrong sort positions until the one-line fix is applied.

The async import worker initialises inserted = 0 regardless of mode. For replace this is correct (all rows are deleted first), and for create the table is empty. But for append, existing rows already occupy positions 0..rowCount-1; the worker writes new rows at those same positions, leaving the table sort order broken after every async append import. The rest of the PR — statement-level triggers, multipart streaming rewrite, concurrent-import 409 guard, stale-import janitor heartbeat, and SSE progress surface — all look correct.

apps/sim/lib/table/import-runner.ts — the inserted initialisation for append mode.

Important Files Changed

Filename Overview
apps/sim/lib/table/import-runner.ts New background import worker: append-mode rows always start at position 0, colliding with existing row positions and breaking display order.
apps/sim/app/api/table/import-async/route.ts New route to kick off async import for a fresh table — creates placeholder table, starts detached worker. Logic and auth look correct.
apps/sim/app/api/table/[tableId]/import-async/route.ts New route to kick off async import into an existing table; includes 409 concurrent-import guard and workspace/archive checks.
apps/sim/lib/table/service.ts Adds import lifecycle service functions; updateImportProgress now bumps updatedAt for the stale-janitor heartbeat as documented.
packages/db/migrations/0224_table_import_columns.sql Adds import-state columns and replaces per-row triggers with statement-level transition-table triggers for bulk performance.
apps/sim/lib/core/utils/multipart.ts New busboy-based streaming multipart parser; correctly handles abort, limits, field ordering, and file-stream lifecycle.
apps/sim/app/api/table/import-csv/route.ts Rewrites sync CSV import to stream via busboy parser; cleanup on error is correct and table is deleted if row insertion fails.
apps/sim/app/api/table/[tableId]/import/route.ts Sync append/replace import similarly migrated to busboy streaming; field ordering and stream cleanup handled correctly.
apps/sim/app/api/cron/cleanup-stale-executions/route.ts Adds stale-import janitor: marks importing tables with no recent updatedAt heartbeat as failed using the shared stale threshold.
apps/sim/stores/table/import-tray/store.ts New Zustand store for import progress tray; upsert/dismiss/clearTerminal logic is straightforward and correct.
apps/sim/lib/copilot/tools/handlers/materialize-file.ts Adds operation: 'table' to materialize_file copilot tool using shared parseFileRows; fail-fast guard for unknown operations.
apps/sim/hooks/queries/tables.ts Adds useImportCsvAsync and useImportCsvIntoTableAsync hooks; correctly orders workspaceId before file in FormData for busboy field-before-file requirement.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Storage
    participant KickoffRoute as POST /import-async
    participant Worker as runTableImport (detached)
    participant DB
    participant SSE as SSE stream

    Client->>Storage: PUT file (direct-to-storage upload)
    Storage-->>Client: fileKey
    Client->>KickoffRoute: "POST { workspaceId, fileKey, fileName, mode }"
    KickoffRoute->>DB: markTableImporting(tableId, importId)
    KickoffRoute-->>Client: "200 { tableId, importId }"
    KickoffRoute--)Worker: runDetached("table-import", ...)

    Worker->>DB: getTableById(tableId)
    alt replace mode
        Worker->>DB: deleteAllTableRows(tableId)
    end
    Worker->>Storage: downloadFile(fileKey)
    Storage-->>Worker: Buffer

    loop for each batch (CSV_MAX_BATCH_SIZE rows)
        Worker->>DB: "bulkInsertImportBatch(startPosition=inserted)"
        Worker->>DB: updateImportProgress(rows)
        Worker->>SSE: appendTableEvent(importing, progress)
    end

    alt success
        Worker->>DB: markImportReady(tableId)
        Worker->>SSE: appendTableEvent(ready)
    else failure
        Worker->>DB: markImportFailed(tableId, error)
        Worker->>SSE: appendTableEvent(failed)
    end

    Client->>SSE: "EventSource /api/table/{tableId}/events/stream"
    SSE-->>Client: import progress ticks → ready / failed
Loading

Reviews (3): Last reviewed commit: "Merge remote-tracking branch 'origin/sta..." | Re-trigger Greptile

Comment thread apps/sim/lib/table/service.ts
Comment thread apps/sim/app/api/table/[tableId]/import-async/route.ts
Comment on lines +60 to +65
* for `create`, mapping onto the existing schema for `append`/`replace`), then bulk-inserts
* in committed batches — **no rollback**: committed batches persist even if a later batch
* fails. Progress and the terminal state are surfaced via the table-events SSE stream.
*/
export async function runTableImport(payload: TableImportPayload): Promise<void> {
const { importId, tableId, workspaceId, userId, fileKey, fileName, delimiter, mode } = payload
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Uploaded CSV file is never deleted after import

downloadFile({ key: fileKey, context: 'workspace' }) fetches the file from workspace storage, but there is no deleteFile call in either the success or the catch path. Every async import permanently retains the source CSV/TSV in the workspace storage bucket. Over many imports this becomes a non-trivial storage accumulation with no cleanup mechanism.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 3, 2026

Greptile Summary

This PR adds async background CSV/TSV import for large files (≥ 8 MB): the client uploads directly to storage, two new kickoff routes create a placeholder table (or mark an existing one as importing), and a detached worker streams the file, infers/maps the schema, and bulk-inserts in committed batches with live SSE progress. Synchronous imports are also improved by replacing request.formData() with a streaming busboy parser, and the per-row row_count triggers are replaced with statement-level transition-table triggers (migration 0222) to eliminate per-row lock contention on bulk operations.

  • Async import pipeline: new /api/table/import-async and /api/table/[tableId]/import-async kickoff routes, a runTableImport background worker, import-state columns on user_table_definitions, and a useImportTrayStore + ImportProgressMenu UI indicator driven by SSE events.
  • Streaming multipart parser: readMultipart (busboy) replaces request.formData() in the synchronous import routes, fixing the "missing final boundary" issue and enabling pre-file auth/permission work before the file body is consumed.
  • Statement-level triggers: 0222_stormy_surge.sql drops the per-row BEFORE INSERT / AFTER DELETE triggers and replaces them with AFTER … FOR EACH STATEMENT variants using REFERENCING NEW/OLD TABLE, cutting the lock-per-row cost from bulk inserts to a single UPDATE per statement.

Confidence Score: 3/5

Two correctness issues need attention before shipping: a potential silent data loss in replace-mode async imports, and a liveness-check gap that could cause legitimate long-running imports to be terminated by the cron cleaner.

In replace-mode async imports, deleteAllTableRows executes before downloadFile in the background worker. A storage error after the delete leaves the table permanently empty with no way to recover the original rows. Separately, updateImportProgress does not advance updatedAt, while the stale-import cron check uses updatedAt as its liveness signal and its inline comment states the column is bumped on each progress tick. For 1 M-row files the import finishes in minutes and the mismatch is unlikely to trigger; for very large files or slow storage it could cause a legitimate import to be terminated mid-run.

apps/sim/lib/table/import-runner.ts (replace-mode row deletion order) and apps/sim/lib/table/service.ts (updateImportProgress missing updatedAt)

Important Files Changed

Filename Overview
apps/sim/lib/table/import-runner.ts New background CSV import worker. Has a data-loss risk in replace mode: existing rows are deleted before the file download, so a download failure leaves the table empty.
apps/sim/lib/table/service.ts Adds import-state management functions. updateImportProgress omits updatedAt, contradicting the cron cleanup comment that uses updatedAt as the liveness signal.
apps/sim/app/api/table/import-async/route.ts New async kickoff route for large CSVs into a new table. Auth, permission, and name-collision handling look correct.
apps/sim/app/api/table/[tableId]/import-async/route.ts Async kickoff for importing into an existing table. Validates workspace ownership against the table record before proceeding.
packages/db/migrations/0222_stormy_surge.sql Replaces per-row INSERT/DELETE triggers with statement-level transition-table triggers to fix bulk-insert serialization. Logic looks correct; new columns added for async-import state.
apps/sim/lib/core/utils/multipart.ts New streaming multipart parser built on busboy. Error codes and abort-signal handling are thorough; resolves before file body is consumed so auth can run first.
apps/sim/app/api/cron/cleanup-stale-executions/route.ts Adds stale-import detection to the cron cleanup job. The comment claims updatedAt is bumped by progress ticks, but updateImportProgress does not update that column.
apps/sim/app/api/table/import-csv/route.ts Rewrites synchronous CSV import to use streaming busboy instead of request.formData(); properly destroys the stream in a finally block.
apps/sim/hooks/queries/tables.ts Adds two async-import mutation hooks; correctly orders FormData fields (text before file) for the streaming parser.
apps/sim/stores/table/import-tray/store.ts New Zustand store for tracking in-progress CSV imports in the header tray; clean merge semantics and correct terminal-entry handling.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Storage
    participant KickoffRoute as POST /import-async
    participant BG as runTableImport (detached)
    participant DB

    Client->>Storage: upload file (direct-to-storage)
    Storage-->>Client: fileKey
    Client->>KickoffRoute: "POST {workspaceId, fileKey, fileName}"
    KickoffRoute->>DB: createTable / markTableImporting
    KickoffRoute->>BG: runDetached(runTableImport)
    KickoffRoute-->>Client: "{tableId, importId}"

    BG->>DB: deleteAllTableRows (replace mode only)
    BG->>Storage: downloadFile(fileKey)
    Storage-->>BG: buffer
    BG->>DB: "appendTableEvent(importing, progress=0, total)"
    loop each batch (1000 rows)
        BG->>DB: bulkInsertImportBatch
        BG->>DB: "appendTableEvent(importing, progress=N)"
    end
    BG->>DB: markImportReady / markImportFailed
    BG->>DB: "appendTableEvent(ready | failed)"
    DB-->>Client: SSE stream to ImportProgressMenu
Loading

Reviews (2): Last reviewed commit: "feat(tables): background import for larg..." | Re-trigger Greptile

Comment on lines +73 to +76
if (mode === 'replace') await deleteAllTableRows(tableId)

const buffer = await downloadFile({ key: fileKey, context: 'workspace' })

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Replace-mode deletes rows before the download succeeds

For replace mode, deleteAllTableRows runs before downloadFile. If the storage download fails (network hiccup, key mismatch, storage outage), all original rows are permanently gone — markImportFailed will correctly set the status to failed, but there is no rollback for the deleted data. The user must re-upload from scratch, and any rows not preserved client-side are unrecoverable.

The safer sequence is: download the file first, resolve the schema/mapping from the sample, then delete the existing rows, then stream-insert. That way a download or schema-inference failure leaves the original data intact.

Comment thread apps/sim/lib/table/service.ts Outdated
# Conflicts:
#	apps/sim/app/api/table/[tableId]/import/route.ts
#	apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts
#	apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx
#	apps/sim/lib/table/events.ts
#	packages/db/migrations/meta/0222_snapshot.json
#	packages/db/migrations/meta/_journal.json
#	scripts/check-api-validation-contracts.ts
@TheodoreSpeaks
Copy link
Copy Markdown
Collaborator Author

Addressed Bugbot review + synced with staging:

  • Stale-import false positives (High): updateImportProgress now bumps updatedAt, so a live import keeps a heartbeat and the janitor won't fail it mid-run.
  • Overlapping imports: existing-table kickoff returns 409 if the table is already importing.
  • createColumns validation: worker rejects column names not present in the CSV headers (matches the sync route).
  • Empty CSV: marks the import failed ("CSV file has no data rows") instead of reporting a successful empty import.
  • Merged origin/staging and resolved conflicts (kept both the new import and usageLimitReached SSE handlers).
  • Migration regenerated via drizzle as 0224_table_import_columns (was a hand-written 0222 that collided with staging's 0222); the statement-level row-count triggers are appended after the generated column DDL since drizzle doesn't manage raw triggers.

bun run lint:check, bun run check:api-validation:strict, and tsc --noEmit all clean.

@greptile review

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 4 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 9284acc. Configure here.

}

let createColumns: string[] | undefined
if (rawCreateColumns) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync import during background

High Severity

The synchronous multipart import never rejects tables with importStatus importing. A user can run an in-request append/replace while a detached async worker is inserting the same table, mixing position strategies and corrupting rows.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 9284acc. Configure here.


const { file, workspaceId } = formValidation.data

const rawMode = fields.mode ?? 'append'
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mode field after file ignored

Medium Severity

Streaming multipart only requires workspaceId before the file. mode is read immediately after parse; if it appears after the file part, it is missing and the import silently defaults to append instead of the intended replace.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 9284acc. Configure here.

for (let i = 0; i < buffer.length; i++) {
if (buffer[i] === 0x0a) newlineCount++
}
const estimatedTotal = Math.max(0, newlineCount - 1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async worker buffers whole file

High Severity

The background import worker loads the entire uploaded object into a single Buffer before parsing. Direct-to-storage uploads allow very large workspace files, so a big CSV can exhaust web-container memory and crash the pod mid-import.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 9284acc. Configure here.


const parsed = await parseRequest(importTableAsyncContract, request, {})
if (!parsed.success) return parsed.response
const { workspaceId, fileKey, fileName } = parsed.data.body
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileKey lacks workspace check

Medium Severity

Async import kickoff accepts any fileKey string after workspace write auth and passes it to downloadFile without verifying the key is scoped to that workspace (e.g. workspace/{workspaceId}/…). A caller could import another workspace’s object into their table.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 9284acc. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant