Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion apps/sim/app/api/cron/cleanup-stale-executions/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { asyncJobs, db } from '@sim/db'
import { workflowExecutionLogs } from '@sim/db/schema'
import { userTableDefinitions, workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
Expand Down Expand Up @@ -110,6 +110,37 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
})
}

// Mark stale table imports as failed. Imports run detached on the web container and
// are lost if the pod is killed mid-load. `updatedAt` is bumped by progress updates, so
// an `importing` table with no recent update has stalled (not merely slow). Rows are
// left in place (no rollback); the user re-imports.
let staleImportsMarkedFailed = 0
try {
const staleImports = await db
.update(userTableDefinitions)
.set({
importStatus: 'failed',
importError: `Import terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
updatedAt: new Date(),
})
.where(
and(
eq(userTableDefinitions.importStatus, 'importing'),
lt(userTableDefinitions.updatedAt, staleThreshold)
)
)
.returning({ id: userTableDefinitions.id })

staleImportsMarkedFailed = staleImports.length
if (staleImportsMarkedFailed > 0) {
logger.info(`Marked ${staleImportsMarkedFailed} stale table imports as failed`)
}
} catch (error) {
logger.error('Failed to clean up stale table imports:', {
error: toError(error).message,
})
}

// Clean up stale pending jobs (never started, e.g., due to server crash before startJob())
let stalePendingJobsMarkedFailed = 0

Expand Down Expand Up @@ -179,6 +210,9 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
staleThresholdMinutes: STALE_THRESHOLD_MINUTES,
retentionHours: JOB_RETENTION_HOURS,
},
tableImports: {
staleMarkedFailed: staleImportsMarkedFailed,
},
})
} catch (error) {
logger.error('Error in stale execution cleanup job:', error)
Expand Down
137 changes: 137 additions & 0 deletions apps/sim/app/api/table/[tableId]/import-async/route.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* @vitest-environment node
*/
import { hybridAuthMockFns } from '@sim/testing'
import { NextRequest } from 'next/server'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import type { TableDefinition } from '@/lib/table'

const { mockCheckAccess, mockMarkTableImporting, mockRunTableImport } = vi.hoisted(() => ({
mockCheckAccess: vi.fn(),
mockMarkTableImporting: vi.fn(),
mockRunTableImport: vi.fn(),
}))

vi.mock('@sim/utils/id', () => ({
generateId: vi.fn().mockReturnValue('import-id-xyz'),
generateShortId: vi.fn().mockReturnValue('short-id'),
}))
vi.mock('@/lib/table/service', () => ({ markTableImporting: mockMarkTableImporting }))
vi.mock('@/lib/table/import-runner', () => ({ runTableImport: mockRunTableImport }))
vi.mock('@/lib/core/utils/background', () => ({
runDetached: (_label: string, work: () => Promise<unknown>) => {
void work()
},
}))
vi.mock('@/app/api/table/utils', async () => {
const { NextResponse } = await import('next/server')
return {
checkAccess: mockCheckAccess,
accessError: (result: { status: number }) =>
NextResponse.json({ error: 'denied' }, { status: result.status }),
}
})

import { POST } from '@/app/api/table/[tableId]/import-async/route'

function buildTable(overrides: Partial<TableDefinition> = {}): TableDefinition {
return {
id: 'tbl_1',
name: 'People',
description: null,
schema: { columns: [{ name: 'name', type: 'string' }] },
metadata: null,
rowCount: 0,
maxRows: 1_000_000,
workspaceId: 'workspace-1',
createdBy: 'user-1',
archivedAt: null,
createdAt: new Date(),
updatedAt: new Date(),
...overrides,
}
}

function makeRequest(body: unknown, tableId = 'tbl_1') {
const req = new NextRequest(`http://localhost:3000/api/table/${tableId}/import-async`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body),
})
return POST(req, { params: Promise.resolve({ tableId }) })
}

const validBody = {
workspaceId: 'workspace-1',
fileKey: 'workspace/123-data.csv',
fileName: 'data.csv',
mode: 'append',
}

describe('POST /api/table/[tableId]/import-async', () => {
beforeEach(() => {
vi.clearAllMocks()
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({
success: true,
userId: 'user-1',
authType: 'session',
})
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable() })
mockMarkTableImporting.mockResolvedValue(undefined)
mockRunTableImport.mockResolvedValue(undefined)
})

it('marks the table importing and kicks off the worker with mode + mapping', async () => {
const response = await makeRequest({
...validBody,
mode: 'replace',
mapping: { Name: 'name' },
createColumns: ['Extra'],
})
const data = await response.json()

expect(response.status).toBe(200)
expect(data.data).toEqual({ tableId: 'tbl_1', importId: 'import-id-xyz' })
expect(mockMarkTableImporting).toHaveBeenCalledWith('tbl_1', 'import-id-xyz')
expect(mockRunTableImport).toHaveBeenCalledWith(
expect.objectContaining({
tableId: 'tbl_1',
mode: 'replace',
delimiter: ',',
mapping: { Name: 'name' },
createColumns: ['Extra'],
})
)
})

it('returns 401 when unauthenticated', async () => {
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({ success: false })
const response = await makeRequest(validBody)
expect(response.status).toBe(401)
expect(mockMarkTableImporting).not.toHaveBeenCalled()
})

it('returns the access error status when access is denied', async () => {
mockCheckAccess.mockResolvedValue({ ok: false, status: 403 })
const response = await makeRequest(validBody)
expect(response.status).toBe(403)
expect(mockRunTableImport).not.toHaveBeenCalled()
})

it('returns 400 when the target table is archived', async () => {
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable({ archivedAt: new Date() }) })
const response = await makeRequest(validBody)
expect(response.status).toBe(400)
expect(mockRunTableImport).not.toHaveBeenCalled()
})

it('returns 400 on workspace mismatch', async () => {
const response = await makeRequest({ ...validBody, workspaceId: 'other-ws' })
expect(response.status).toBe(400)
})

it('returns 400 for an invalid mode', async () => {
const response = await makeRequest({ ...validBody, mode: 'bogus' })
expect(response.status).toBe(400)
})
})
86 changes: 86 additions & 0 deletions apps/sim/app/api/table/[tableId]/import-async/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { createLogger } from '@sim/logger'
import { generateId } from '@sim/utils/id'
import { type NextRequest, NextResponse } from 'next/server'
import { importIntoTableAsyncContract } from '@/lib/api/contracts/tables'
import { parseRequest } from '@/lib/api/server'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { runDetached } from '@/lib/core/utils/background'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { runTableImport } from '@/lib/table/import-runner'
import { markTableImporting } from '@/lib/table/service'
import { accessError, checkAccess } from '@/app/api/table/utils'

const logger = createLogger('TableImportIntoAsync')

export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'

interface RouteParams {
params: Promise<{ tableId: string }>
}

export const POST = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
const requestId = generateRequestId()

const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
if (!authResult.success || !authResult.userId) {
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
}
const userId = authResult.userId

const parsed = await parseRequest(importIntoTableAsyncContract, request, { params })
if (!parsed.success) return parsed.response
const { tableId } = parsed.data.params
const { workspaceId, fileKey, fileName, mode, mapping, createColumns } = parsed.data.body

const access = await checkAccess(tableId, userId, 'write')
if (!access.ok) return accessError(access, requestId, tableId)
const { table } = access

if (table.workspaceId !== workspaceId) {
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
}
if (table.archivedAt) {
return NextResponse.json({ error: 'Cannot import into an archived table' }, { status: 400 })
}
// Reject overlapping imports: a second worker would insert at colliding row positions.
if (table.importStatus === 'importing') {
return NextResponse.json(
{ error: 'An import is already in progress for this table' },
{ status: 409 }
)
}

const ext = fileName.split('.').pop()?.toLowerCase()
if (ext !== 'csv' && ext !== 'tsv') {
Comment thread
greptile-apps[bot] marked this conversation as resolved.
return NextResponse.json({ error: 'Only CSV and TSV files are supported' }, { status: 400 })
}
const delimiter = ext === 'tsv' ? '\t' : ','

const importId = generateId()
await markTableImporting(tableId, importId)

runDetached('table-import', () =>
runTableImport({
importId,
tableId,
workspaceId,
userId,
fileKey,
fileName,
delimiter,
mode,
mapping,
createColumns,
})
)
Comment thread
cursor[bot] marked this conversation as resolved.

logger.info(`[${requestId}] Async CSV import into existing table started`, {
tableId,
importId,
mode,
fileName,
})
return NextResponse.json({ success: true, data: { tableId, importId } })
})
44 changes: 30 additions & 14 deletions apps/sim/app/api/table/[tableId]/import/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ vi.mock('@/app/api/table/utils', async () => {
const message = result.status === 404 ? 'Table not found' : 'Access denied'
return NextResponse.json({ error: message }, { status: result.status })
},
csvProxyBodyCapResponse: () => null,
multipartErrorResponse: (error: { code: string; message: string }) =>
NextResponse.json(
{ error: error.message },
{ status: error.code === 'FILE_TOO_LARGE' ? 413 : 400 }
),
}
})

Expand Down Expand Up @@ -64,8 +70,8 @@ function createFormData(
createColumns?: unknown
}
): FormData {
// Text fields must precede the file part for the streaming parser.
const form = new FormData()
form.append('file', file)
if (options?.workspaceId !== null) {
form.append('workspaceId', options?.workspaceId ?? 'workspace-1')
}
Expand All @@ -86,6 +92,7 @@ function createFormData(
: JSON.stringify(options.createColumns)
)
}
form.append('file', file)
return form
}

Expand Down Expand Up @@ -113,9 +120,10 @@ function buildTable(overrides: Partial<TableDefinition> = {}): TableDefinition {
}

async function callPost(form: FormData, { tableId }: { tableId: string } = { tableId: 'tbl_1' }) {
// Building the request from a FormData body gives a real multipart stream and
// boundary, exercising the streaming `readMultipart` parser end-to-end.
const req = new NextRequest(`http://localhost:3000/api/table/${tableId}/import`, {
method: 'POST',
headers: { 'content-length': '1024' },
body: form,
})
return POST(req, { params: Promise.resolve({ tableId }) })
Expand Down Expand Up @@ -186,22 +194,30 @@ describe('POST /api/table/[tableId]/import', () => {
expect(data.error).toMatch(/archived/i)
})

it('returns 413 for oversized CSV files before reading their contents', async () => {
const file = createCsvFile('name,age\nAlice,30')
Object.defineProperty(file, 'size', {
value: 26 * 1024 * 1024,
})
const arrayBufferSpy = vi.spyOn(file, 'arrayBuffer')

it('returns 400 when the file part precedes the required fields', async () => {
// Build a raw multipart body with the file BEFORE workspaceId.
const boundary = '----orderboundary'
const body = Buffer.concat([
Buffer.from(
`--${boundary}\r\nContent-Disposition: form-data; name="file"; filename="data.csv"\r\nContent-Type: text/csv\r\n\r\nname,age\nAlice,30\r\n`
),
Buffer.from(`--${boundary}\r\nContent-Disposition: form-data; name="workspaceId"\r\n\r\n`),
Buffer.from('workspace-1\r\n'),
Buffer.from(`--${boundary}--\r\n`),
])
const req = {
formData: async () => createFormData(file),
headers: new Headers({ 'content-type': `multipart/form-data; boundary=${boundary}` }),
body: new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array(body))
controller.close()
},
}),
signal: undefined,
} as unknown as NextRequest

const response = await POST(req, { params: Promise.resolve({ tableId: 'tbl_1' }) })
expect(response.status).toBe(413)
const data = await response.json()
expect(data.error).toMatch(/CSV import file exceeds maximum size/)
expect(arrayBufferSpy).not.toHaveBeenCalled()
expect(response.status).toBe(400)
expect(mockBatchInsertRowsWithTx).not.toHaveBeenCalled()
expect(mockReplaceTableRowsWithTx).not.toHaveBeenCalled()
})
Expand Down
Loading
Loading