Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 43 additions & 17 deletions apps/sim/app/api/cron/cleanup-stale-executions/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { asyncJobs, db } from '@sim/db'
import { userTableDefinitions, workflowExecutionLogs } from '@sim/db/schema'
import { tableJobs, workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
Expand All @@ -8,12 +8,15 @@ import { verifyCronAuth } from '@/lib/auth/internal'
import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { deleteFile } from '@/lib/uploads/core/storage-service'

const logger = createLogger('CleanupStaleExecutions')

const STALE_THRESHOLD_MS = getMaxExecutionTimeout() + 5 * 60 * 1000
const STALE_THRESHOLD_MINUTES = Math.ceil(STALE_THRESHOLD_MS / 60000)
const MAX_INT32 = 2_147_483_647
/** Terminal table-jobs older than this are pruned; only the latest job per table is ever read. */
const TABLE_JOB_RETENTION_HOURS = 24

export const GET = withRouteHandler(async (request: NextRequest) => {
try {
Expand Down Expand Up @@ -110,33 +113,56 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
})
}

// Mark stale table imports as failed. Imports run detached on the web container and
// are lost if the pod is killed mid-load. `updatedAt` is bumped by progress updates, so
// an `importing` table with no recent update has stalled (not merely slow). Rows are
// left in place (no rollback); the user re-imports.
// Mark stale table jobs (import or delete) as failed. Jobs run detached on the web container
// and are lost if the pod is killed mid-run. `updated_at` is bumped by progress updates, so a
// `running` job with no recent update has stalled (not merely slow). Committed work is left in
// place (no rollback); the user retries. Also prune long-settled terminal jobs so the table
// doesn't grow unbounded (the latest job per table is what list/detail reads surface).
let staleImportsMarkedFailed = 0
try {
const now = new Date()
const staleImports = await db
.update(userTableDefinitions)
.update(tableJobs)
.set({
importStatus: 'failed',
importError: `Import terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
updatedAt: new Date(),
status: 'failed',
error: `Job terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
completedAt: now,
updatedAt: now,
})
.where(and(eq(tableJobs.status, 'running'), lt(tableJobs.updatedAt, staleThreshold)))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale janitor fails export jobs

Medium Severity

The stale cleanup now marks every table_jobs row with status='running' as failed, including new export jobs. Exports can run for a long time and only bump updated_at between paginated batches, so a slow or blocked batch can look “stalled” and be terminated even though the worker is still healthy.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit f8a2aee. Configure here.

.returning({ id: tableJobs.id })

staleImportsMarkedFailed = staleImports.length
if (staleImportsMarkedFailed > 0) {
logger.info(`Marked ${staleImportsMarkedFailed} stale table jobs as failed`)
}

const terminalRetention = new Date(Date.now() - TABLE_JOB_RETENTION_HOURS * 60 * 60 * 1000)
const pruned = await db
.delete(tableJobs)
.where(
and(
eq(userTableDefinitions.importStatus, 'importing'),
lt(userTableDefinitions.updatedAt, staleThreshold)
inArray(tableJobs.status, ['ready', 'failed', 'canceled']),
lt(tableJobs.updatedAt, terminalRetention)
)
)
.returning({ id: userTableDefinitions.id })

staleImportsMarkedFailed = staleImports.length
if (staleImportsMarkedFailed > 0) {
logger.info(`Marked ${staleImportsMarkedFailed} stale table imports as failed`)
.returning({ type: tableJobs.type, payload: tableJobs.payload })

// Pruned export jobs carry the generated file's storage key — delete the file with the job
// so the exports prefix doesn't accumulate. Best-effort: a miss just orphans one object.
for (const job of pruned) {
if (job.type !== 'export') continue
const resultKey = (job.payload as { resultKey?: string } | null)?.resultKey
if (!resultKey) continue
await deleteFile({ key: resultKey, context: 'workspace' }).catch((err) => {
logger.warn('Failed to delete pruned export file', {
resultKey,
error: toError(err).message,
})
})
}
} catch (error) {
logger.error('Failed to clean up stale table imports:', {
logger.error('Failed to clean up stale table jobs:', {
error: toError(error).message,
})
}
Expand Down
9 changes: 7 additions & 2 deletions apps/sim/app/api/table/[tableId]/columns/run/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { runWorkflowColumn } from '@/lib/table/workflow-columns'
import { accessError, checkAccess } from '@/app/api/table/utils'
import { accessError, checkAccess, tableFilterError } from '@/app/api/table/utils'

const logger = createLogger('TableRunColumnAPI')

Expand All @@ -25,16 +25,21 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
const parsed = await parseRequest(runColumnContract, request, { params })
if (!parsed.success) return parsed.response
const { tableId } = parsed.data.params
const { workspaceId, groupIds, runMode, rowIds, limit } = parsed.data.body
const { workspaceId, groupIds, runMode, rowIds, filter, limit } = parsed.data.body
const access = await checkAccess(tableId, auth.userId, 'write')
if (!access.ok) return accessError(access, requestId, tableId)

// Validate the filter up front (the dispatcher reuses it) so a bad field fails fast.
const filterError = tableFilterError(filter, access.table.schema.columns)
if (filterError) return filterError

const { dispatchId } = await runWorkflowColumn({
tableId,
workspaceId,
groupIds,
mode: runMode,
rowIds,
filter,
limit,
requestId,
})
Expand Down
197 changes: 197 additions & 0 deletions apps/sim/app/api/table/[tableId]/delete-async/route.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/**
* @vitest-environment node
*/
import { hybridAuthMockFns } from '@sim/testing'
import { NextRequest, NextResponse } from 'next/server'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import type { TableDefinition } from '@/lib/table'

const {
mockCheckAccess,
mockMarkTableJobRunning,
mockRunTableDelete,
mockTableFilterError,
mockTasksTrigger,
flags,
} = vi.hoisted(() => ({
mockCheckAccess: vi.fn(),
mockMarkTableJobRunning: vi.fn(),
mockRunTableDelete: vi.fn(),
mockTableFilterError: vi.fn(),
mockTasksTrigger: vi.fn(),
flags: { triggerDev: false },
}))

vi.mock('@sim/utils/id', () => ({
generateId: vi.fn().mockReturnValue('job-id-xyz'),
generateShortId: vi.fn().mockReturnValue('short-id'),
}))
vi.mock('@/lib/table/service', () => ({ markTableJobRunning: mockMarkTableJobRunning }))
vi.mock('@/lib/table/delete-runner', () => ({ runTableDelete: mockRunTableDelete }))
vi.mock('@/lib/core/config/feature-flags', () => ({
get isTriggerDevEnabled() {
return flags.triggerDev
},
}))
vi.mock('@/background/table-delete', () => ({ tableDeleteTask: { id: 'table-delete' } }))
vi.mock('@trigger.dev/sdk', () => ({
tasks: { trigger: mockTasksTrigger },
task: (config: unknown) => config,
}))
vi.mock('@/lib/core/utils/background', () => ({
runDetached: (_label: string, work: () => Promise<unknown>) => {
void work()
},
}))
vi.mock('@/app/api/table/utils', async () => {
const { NextResponse } = await import('next/server')
return {
checkAccess: mockCheckAccess,
accessError: (result: { status: number }) =>
NextResponse.json({ error: 'denied' }, { status: result.status }),
tableFilterError: mockTableFilterError,
}
})

import { POST } from '@/app/api/table/[tableId]/delete-async/route'

function buildTable(overrides: Partial<TableDefinition> = {}): TableDefinition {
return {
id: 'tbl_1',
name: 'People',
description: null,
schema: { columns: [{ name: 'status', type: 'string' }] },
metadata: null,
rowCount: 1000,
maxRows: 1_000_000,
workspaceId: 'workspace-1',
createdBy: 'user-1',
archivedAt: null,
createdAt: new Date(),
updatedAt: new Date(),
...overrides,
}
}

function makeRequest(body: unknown, tableId = 'tbl_1') {
const req = new NextRequest(`http://localhost:3000/api/table/${tableId}/delete-async`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body),
})
return POST(req, { params: Promise.resolve({ tableId }) })
}

const validBody = {
workspaceId: 'workspace-1',
filter: { status: 'archived' },
excludeRowIds: ['row_keep'],
}

describe('POST /api/table/[tableId]/delete-async', () => {
beforeEach(() => {
vi.clearAllMocks()
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({
success: true,
userId: 'user-1',
authType: 'session',
})
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable() })
mockMarkTableJobRunning.mockResolvedValue(true)
mockRunTableDelete.mockResolvedValue(undefined)
mockTableFilterError.mockReturnValue(null)
mockTasksTrigger.mockResolvedValue({ id: 'run_1' })
flags.triggerDev = false
})

it('claims the job slot and kicks off the delete worker with filter + exclusions', async () => {
const response = await makeRequest(validBody)
const data = await response.json()

expect(response.status).toBe(200)
expect(data.data).toEqual({ tableId: 'tbl_1', jobId: 'job-id-xyz' })
expect(mockMarkTableJobRunning).toHaveBeenCalledWith('tbl_1', 'job-id-xyz', 'delete', {
filter: { status: 'archived' },
excludeRowIds: ['row_keep'],
cutoff: expect.any(String),
})
expect(mockRunTableDelete).toHaveBeenCalledWith(
expect.objectContaining({
jobId: 'job-id-xyz',
tableId: 'tbl_1',
workspaceId: 'workspace-1',
filter: { status: 'archived' },
excludeRowIds: ['row_keep'],
cutoff: expect.any(Date),
})
)
})

it('allows a whole-table delete with no filter', async () => {
const response = await makeRequest({ workspaceId: 'workspace-1' })
expect(response.status).toBe(200)
expect(mockRunTableDelete).toHaveBeenCalledWith(
expect.objectContaining({ filter: undefined, cutoff: expect.any(Date) })
)
})

it('returns 409 when a job is already in progress (claim lost)', async () => {
mockMarkTableJobRunning.mockResolvedValue(false)
const response = await makeRequest(validBody)
expect(response.status).toBe(409)
expect(mockRunTableDelete).not.toHaveBeenCalled()
})

it('returns 400 on an invalid filter without claiming the slot', async () => {
mockTableFilterError.mockReturnValue(NextResponse.json({ error: 'bad field' }, { status: 400 }))
const response = await makeRequest(validBody)
expect(response.status).toBe(400)
expect(mockMarkTableJobRunning).not.toHaveBeenCalled()
expect(mockRunTableDelete).not.toHaveBeenCalled()
})

it('returns 401 when unauthenticated', async () => {
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({ success: false })
const response = await makeRequest(validBody)
expect(response.status).toBe(401)
expect(mockMarkTableJobRunning).not.toHaveBeenCalled()
})

it('returns the access error status when access is denied', async () => {
mockCheckAccess.mockResolvedValue({ ok: false, status: 403 })
const response = await makeRequest(validBody)
expect(response.status).toBe(403)
expect(mockRunTableDelete).not.toHaveBeenCalled()
})

it('returns 400 when the table is archived', async () => {
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable({ archivedAt: new Date() }) })
const response = await makeRequest(validBody)
expect(response.status).toBe(400)
expect(mockRunTableDelete).not.toHaveBeenCalled()
})

it('returns 400 on workspace mismatch', async () => {
const response = await makeRequest({ ...validBody, workspaceId: 'other-ws' })
expect(response.status).toBe(400)
})

it('routes through trigger.dev (ISO cutoff, tagged) when the flag is on', async () => {
flags.triggerDev = true
const response = await makeRequest(validBody)

expect(response.status).toBe(200)
expect(mockRunTableDelete).not.toHaveBeenCalled()
expect(mockTasksTrigger).toHaveBeenCalledWith(
'table-delete',
expect.objectContaining({
jobId: 'job-id-xyz',
tableId: 'tbl_1',
filter: { status: 'archived' },
excludeRowIds: ['row_keep'],
cutoff: expect.any(String),
}),
{ tags: ['tableId:tbl_1', 'jobId:job-id-xyz'] }
)
})
})
Loading