Skip to content

Commit 077e1d0

Browse files
feat(tables): background import for large CSVs with live progress
1 parent 919fa52 commit 077e1d0

46 files changed

Lines changed: 20750 additions & 353 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/sim/app/api/cron/cleanup-stale-executions/route.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { asyncJobs, db } from '@sim/db'
2-
import { workflowExecutionLogs } from '@sim/db/schema'
2+
import { userTableDefinitions, workflowExecutionLogs } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { toError } from '@sim/utils/errors'
55
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
@@ -110,6 +110,37 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
110110
})
111111
}
112112

113+
// Mark stale table imports as failed. Imports run detached on the web container and
114+
// are lost if the pod is killed mid-load. `updatedAt` is bumped by progress updates, so
115+
// an `importing` table with no recent update has stalled (not merely slow). Rows are
116+
// left in place (no rollback); the user re-imports.
117+
let staleImportsMarkedFailed = 0
118+
try {
119+
const staleImports = await db
120+
.update(userTableDefinitions)
121+
.set({
122+
importStatus: 'failed',
123+
importError: `Import terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
124+
updatedAt: new Date(),
125+
})
126+
.where(
127+
and(
128+
eq(userTableDefinitions.importStatus, 'importing'),
129+
lt(userTableDefinitions.updatedAt, staleThreshold)
130+
)
131+
)
132+
.returning({ id: userTableDefinitions.id })
133+
134+
staleImportsMarkedFailed = staleImports.length
135+
if (staleImportsMarkedFailed > 0) {
136+
logger.info(`Marked ${staleImportsMarkedFailed} stale table imports as failed`)
137+
}
138+
} catch (error) {
139+
logger.error('Failed to clean up stale table imports:', {
140+
error: toError(error).message,
141+
})
142+
}
143+
113144
// Clean up stale pending jobs (never started, e.g., due to server crash before startJob())
114145
let stalePendingJobsMarkedFailed = 0
115146

@@ -179,6 +210,9 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
179210
staleThresholdMinutes: STALE_THRESHOLD_MINUTES,
180211
retentionHours: JOB_RETENTION_HOURS,
181212
},
213+
tableImports: {
214+
staleMarkedFailed: staleImportsMarkedFailed,
215+
},
182216
})
183217
} catch (error) {
184218
logger.error('Error in stale execution cleanup job:', error)
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { hybridAuthMockFns } from '@sim/testing'
5+
import { NextRequest } from 'next/server'
6+
import { beforeEach, describe, expect, it, vi } from 'vitest'
7+
import type { TableDefinition } from '@/lib/table'
8+
9+
const { mockCheckAccess, mockMarkTableImporting, mockRunTableImport } = vi.hoisted(() => ({
10+
mockCheckAccess: vi.fn(),
11+
mockMarkTableImporting: vi.fn(),
12+
mockRunTableImport: vi.fn(),
13+
}))
14+
15+
vi.mock('@sim/utils/id', () => ({
16+
generateId: vi.fn().mockReturnValue('import-id-xyz'),
17+
generateShortId: vi.fn().mockReturnValue('short-id'),
18+
}))
19+
vi.mock('@/lib/table/service', () => ({ markTableImporting: mockMarkTableImporting }))
20+
vi.mock('@/lib/table/import-runner', () => ({ runTableImport: mockRunTableImport }))
21+
vi.mock('@/lib/core/utils/background', () => ({
22+
runDetached: (_label: string, work: () => Promise<unknown>) => {
23+
void work()
24+
},
25+
}))
26+
vi.mock('@/app/api/table/utils', async () => {
27+
const { NextResponse } = await import('next/server')
28+
return {
29+
checkAccess: mockCheckAccess,
30+
accessError: (result: { status: number }) =>
31+
NextResponse.json({ error: 'denied' }, { status: result.status }),
32+
}
33+
})
34+
35+
import { POST } from '@/app/api/table/[tableId]/import-async/route'
36+
37+
function buildTable(overrides: Partial<TableDefinition> = {}): TableDefinition {
38+
return {
39+
id: 'tbl_1',
40+
name: 'People',
41+
description: null,
42+
schema: { columns: [{ name: 'name', type: 'string' }] },
43+
metadata: null,
44+
rowCount: 0,
45+
maxRows: 1_000_000,
46+
workspaceId: 'workspace-1',
47+
createdBy: 'user-1',
48+
archivedAt: null,
49+
createdAt: new Date(),
50+
updatedAt: new Date(),
51+
...overrides,
52+
}
53+
}
54+
55+
function makeRequest(body: unknown, tableId = 'tbl_1') {
56+
const req = new NextRequest(`http://localhost:3000/api/table/${tableId}/import-async`, {
57+
method: 'POST',
58+
headers: { 'content-type': 'application/json' },
59+
body: JSON.stringify(body),
60+
})
61+
return POST(req, { params: Promise.resolve({ tableId }) })
62+
}
63+
64+
const validBody = {
65+
workspaceId: 'workspace-1',
66+
fileKey: 'workspace/123-data.csv',
67+
fileName: 'data.csv',
68+
mode: 'append',
69+
}
70+
71+
describe('POST /api/table/[tableId]/import-async', () => {
72+
beforeEach(() => {
73+
vi.clearAllMocks()
74+
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({
75+
success: true,
76+
userId: 'user-1',
77+
authType: 'session',
78+
})
79+
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable() })
80+
mockMarkTableImporting.mockResolvedValue(undefined)
81+
mockRunTableImport.mockResolvedValue(undefined)
82+
})
83+
84+
it('marks the table importing and kicks off the worker with mode + mapping', async () => {
85+
const response = await makeRequest({
86+
...validBody,
87+
mode: 'replace',
88+
mapping: { Name: 'name' },
89+
createColumns: ['Extra'],
90+
})
91+
const data = await response.json()
92+
93+
expect(response.status).toBe(200)
94+
expect(data.data).toEqual({ tableId: 'tbl_1', importId: 'import-id-xyz' })
95+
expect(mockMarkTableImporting).toHaveBeenCalledWith('tbl_1', 'import-id-xyz')
96+
expect(mockRunTableImport).toHaveBeenCalledWith(
97+
expect.objectContaining({
98+
tableId: 'tbl_1',
99+
mode: 'replace',
100+
delimiter: ',',
101+
mapping: { Name: 'name' },
102+
createColumns: ['Extra'],
103+
})
104+
)
105+
})
106+
107+
it('returns 401 when unauthenticated', async () => {
108+
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({ success: false })
109+
const response = await makeRequest(validBody)
110+
expect(response.status).toBe(401)
111+
expect(mockMarkTableImporting).not.toHaveBeenCalled()
112+
})
113+
114+
it('returns the access error status when access is denied', async () => {
115+
mockCheckAccess.mockResolvedValue({ ok: false, status: 403 })
116+
const response = await makeRequest(validBody)
117+
expect(response.status).toBe(403)
118+
expect(mockRunTableImport).not.toHaveBeenCalled()
119+
})
120+
121+
it('returns 400 when the target table is archived', async () => {
122+
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable({ archivedAt: new Date() }) })
123+
const response = await makeRequest(validBody)
124+
expect(response.status).toBe(400)
125+
expect(mockRunTableImport).not.toHaveBeenCalled()
126+
})
127+
128+
it('returns 400 on workspace mismatch', async () => {
129+
const response = await makeRequest({ ...validBody, workspaceId: 'other-ws' })
130+
expect(response.status).toBe(400)
131+
})
132+
133+
it('returns 400 for an invalid mode', async () => {
134+
const response = await makeRequest({ ...validBody, mode: 'bogus' })
135+
expect(response.status).toBe(400)
136+
})
137+
})
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import { createLogger } from '@sim/logger'
2+
import { generateId } from '@sim/utils/id'
3+
import { type NextRequest, NextResponse } from 'next/server'
4+
import { importIntoTableAsyncContract } from '@/lib/api/contracts/tables'
5+
import { parseRequest } from '@/lib/api/server'
6+
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
7+
import { runDetached } from '@/lib/core/utils/background'
8+
import { generateRequestId } from '@/lib/core/utils/request'
9+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
10+
import { runTableImport } from '@/lib/table/import-runner'
11+
import { markTableImporting } from '@/lib/table/service'
12+
import { accessError, checkAccess } from '@/app/api/table/utils'
13+
14+
const logger = createLogger('TableImportIntoAsync')
15+
16+
export const runtime = 'nodejs'
17+
export const dynamic = 'force-dynamic'
18+
19+
interface RouteParams {
20+
params: Promise<{ tableId: string }>
21+
}
22+
23+
export const POST = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
24+
const requestId = generateRequestId()
25+
26+
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
27+
if (!authResult.success || !authResult.userId) {
28+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
29+
}
30+
const userId = authResult.userId
31+
32+
const parsed = await parseRequest(importIntoTableAsyncContract, request, { params })
33+
if (!parsed.success) return parsed.response
34+
const { tableId } = parsed.data.params
35+
const { workspaceId, fileKey, fileName, mode, mapping, createColumns } = parsed.data.body
36+
37+
const access = await checkAccess(tableId, userId, 'write')
38+
if (!access.ok) return accessError(access, requestId, tableId)
39+
const { table } = access
40+
41+
if (table.workspaceId !== workspaceId) {
42+
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
43+
}
44+
if (table.archivedAt) {
45+
return NextResponse.json({ error: 'Cannot import into an archived table' }, { status: 400 })
46+
}
47+
48+
const ext = fileName.split('.').pop()?.toLowerCase()
49+
if (ext !== 'csv' && ext !== 'tsv') {
50+
return NextResponse.json({ error: 'Only CSV and TSV files are supported' }, { status: 400 })
51+
}
52+
const delimiter = ext === 'tsv' ? '\t' : ','
53+
54+
const importId = generateId()
55+
await markTableImporting(tableId, importId)
56+
57+
runDetached('table-import', () =>
58+
runTableImport({
59+
importId,
60+
tableId,
61+
workspaceId,
62+
userId,
63+
fileKey,
64+
fileName,
65+
delimiter,
66+
mode,
67+
mapping,
68+
createColumns,
69+
})
70+
)
71+
72+
logger.info(`[${requestId}] Async CSV import into existing table started`, {
73+
tableId,
74+
importId,
75+
mode,
76+
fileName,
77+
})
78+
return NextResponse.json({ success: true, data: { tableId, importId } })
79+
})

apps/sim/app/api/table/[tableId]/import/route.test.ts

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ vi.mock('@/app/api/table/utils', async () => {
3131
const message = result.status === 404 ? 'Table not found' : 'Access denied'
3232
return NextResponse.json({ error: message }, { status: result.status })
3333
},
34+
csvProxyBodyCapResponse: () => null,
35+
multipartErrorResponse: (error: { code: string; message: string }) =>
36+
NextResponse.json(
37+
{ error: error.message },
38+
{ status: error.code === 'FILE_TOO_LARGE' ? 413 : 400 }
39+
),
3440
}
3541
})
3642

@@ -61,8 +67,8 @@ function createFormData(
6167
createColumns?: unknown
6268
}
6369
): FormData {
70+
// Text fields must precede the file part for the streaming parser.
6471
const form = new FormData()
65-
form.append('file', file)
6672
if (options?.workspaceId !== null) {
6773
form.append('workspaceId', options?.workspaceId ?? 'workspace-1')
6874
}
@@ -83,6 +89,7 @@ function createFormData(
8389
: JSON.stringify(options.createColumns)
8490
)
8591
}
92+
form.append('file', file)
8693
return form
8794
}
8895

@@ -110,9 +117,10 @@ function buildTable(overrides: Partial<TableDefinition> = {}): TableDefinition {
110117
}
111118

112119
async function callPost(form: FormData, { tableId }: { tableId: string } = { tableId: 'tbl_1' }) {
120+
// Building the request from a FormData body gives a real multipart stream and
121+
// boundary, exercising the streaming `readMultipart` parser end-to-end.
113122
const req = new NextRequest(`http://localhost:3000/api/table/${tableId}/import`, {
114123
method: 'POST',
115-
headers: { 'content-length': '1024' },
116124
body: form,
117125
})
118126
return POST(req, { params: Promise.resolve({ tableId }) })
@@ -183,22 +191,30 @@ describe('POST /api/table/[tableId]/import', () => {
183191
expect(data.error).toMatch(/archived/i)
184192
})
185193

186-
it('returns 413 for oversized CSV files before reading their contents', async () => {
187-
const file = createCsvFile('name,age\nAlice,30')
188-
Object.defineProperty(file, 'size', {
189-
value: 26 * 1024 * 1024,
190-
})
191-
const arrayBufferSpy = vi.spyOn(file, 'arrayBuffer')
192-
194+
it('returns 400 when the file part precedes the required fields', async () => {
195+
// Build a raw multipart body with the file BEFORE workspaceId.
196+
const boundary = '----orderboundary'
197+
const body = Buffer.concat([
198+
Buffer.from(
199+
`--${boundary}\r\nContent-Disposition: form-data; name="file"; filename="data.csv"\r\nContent-Type: text/csv\r\n\r\nname,age\nAlice,30\r\n`
200+
),
201+
Buffer.from(`--${boundary}\r\nContent-Disposition: form-data; name="workspaceId"\r\n\r\n`),
202+
Buffer.from('workspace-1\r\n'),
203+
Buffer.from(`--${boundary}--\r\n`),
204+
])
193205
const req = {
194-
formData: async () => createFormData(file),
206+
headers: new Headers({ 'content-type': `multipart/form-data; boundary=${boundary}` }),
207+
body: new ReadableStream<Uint8Array>({
208+
start(controller) {
209+
controller.enqueue(new Uint8Array(body))
210+
controller.close()
211+
},
212+
}),
213+
signal: undefined,
195214
} as unknown as NextRequest
196215

197216
const response = await POST(req, { params: Promise.resolve({ tableId: 'tbl_1' }) })
198-
expect(response.status).toBe(413)
199-
const data = await response.json()
200-
expect(data.error).toMatch(/CSV import file exceeds maximum size/)
201-
expect(arrayBufferSpy).not.toHaveBeenCalled()
217+
expect(response.status).toBe(400)
202218
expect(mockBatchInsertRowsWithTx).not.toHaveBeenCalled()
203219
expect(mockReplaceTableRowsWithTx).not.toHaveBeenCalled()
204220
})

0 commit comments

Comments
 (0)