Skip to content

Commit 209ca5f

Browse files
fix(large-refs): cleanup based on table read (#4716)
* fix(large-refs): cleanup based on table read * address comments * address comments * bubble up storage ref errors * cleanup code * do not attempt blob deletion for infra outage * cleanup dup helper
1 parent 786c6f0 commit 209ca5f

18 files changed

Lines changed: 19392 additions & 135 deletions

File tree

apps/sim/app/api/workflows/[id]/execute/response-block.test.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,16 @@ import { EXECUTION_RESOURCE_LIMIT_CODE } from '@/lib/execution/resource-errors'
1515
import type { ExecutionResult } from '@/lib/workflows/types'
1616
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
1717

18-
const { mockDownloadFile, mockUploadFile, uploadedFiles } = vi.hoisted(() => ({
18+
const {
19+
mockAddLargeValueReference,
20+
mockDownloadFile,
21+
mockRegisterLargeValueOwner,
22+
mockUploadFile,
23+
uploadedFiles,
24+
} = vi.hoisted(() => ({
25+
mockAddLargeValueReference: vi.fn(),
1926
mockDownloadFile: vi.fn(),
27+
mockRegisterLargeValueOwner: vi.fn(),
2028
mockUploadFile: vi.fn(),
2129
uploadedFiles: new Map<string, Buffer>(),
2230
}))
@@ -35,6 +43,11 @@ vi.mock('@/lib/uploads', () => ({
3543
},
3644
}))
3745

46+
vi.mock('@/lib/execution/payloads/large-value-metadata', () => ({
47+
addLargeValueReference: mockAddLargeValueReference,
48+
registerLargeValueOwner: mockRegisterLargeValueOwner,
49+
}))
50+
3851
function buildExecutionResult(overrides: Partial<ExecutionResult> = {}): ExecutionResult {
3952
return {
4053
success: true,
@@ -66,6 +79,8 @@ describe('Response block gating by auth type', () => {
6679
vi.clearAllMocks()
6780
clearLargeValueCacheForTests()
6881
uploadedFiles.clear()
82+
mockAddLargeValueReference.mockResolvedValue(undefined)
83+
mockRegisterLargeValueOwner.mockResolvedValue(true)
6984
mockUploadFile.mockImplementation(async ({ customKey, file }) => {
7085
uploadedFiles.set(customKey, file)
7186
return { key: customKey }
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
5+
import { beforeEach, describe, expect, it, vi } from 'vitest'
6+
7+
interface CleanupRow {
8+
id: string
9+
files: unknown
10+
}
11+
12+
interface CapturedBatchDeleteOptions {
13+
selectChunk: (chunkIds: string[], limit: number) => Promise<unknown>
14+
onBatch?: (rows: CleanupRow[]) => Promise<void>
15+
batchSize?: number
16+
maxBatches?: number
17+
totalRowLimit?: number
18+
}
19+
20+
const {
21+
mockAnd,
22+
mockBatchDeleteByWorkspaceAndTimestamp,
23+
mockChunkedBatchDelete,
24+
mockDeleteFileMetadata,
25+
mockDeleteFiles,
26+
mockEq,
27+
mockExecute,
28+
mockFrom,
29+
mockInArray,
30+
mockIsNull,
31+
mockLeftJoin,
32+
mockLimit,
33+
mockLt,
34+
mockMarkLargeValuesDeleted,
35+
mockNotInArray,
36+
mockOr,
37+
mockOrderBy,
38+
mockPruneLargeValueMetadata,
39+
mockSelect,
40+
mockTask,
41+
mockWhere,
42+
} = vi.hoisted(() => {
43+
const mockLimit = vi.fn(async () => [])
44+
const mockOrderBy = vi.fn(() => ({ limit: mockLimit }))
45+
const mockWhere = vi.fn(() => ({ limit: mockLimit, orderBy: mockOrderBy }))
46+
const mockLeftJoin = vi.fn(() => ({ where: mockWhere }))
47+
const mockFrom = vi.fn(() => ({ leftJoin: mockLeftJoin, where: mockWhere }))
48+
const mockSelect = vi.fn(() => ({ from: mockFrom }))
49+
50+
return {
51+
mockAnd: vi.fn((...args: unknown[]) => ({ op: 'and', args })),
52+
mockBatchDeleteByWorkspaceAndTimestamp: vi.fn(async () => ({
53+
table: 'job',
54+
deleted: 0,
55+
failed: 0,
56+
})),
57+
mockChunkedBatchDelete: vi.fn(),
58+
mockDeleteFileMetadata: vi.fn(async () => true),
59+
mockDeleteFiles: vi.fn(async () => ({ deleted: 2, failed: [] })),
60+
mockEq: vi.fn((...args: unknown[]) => ({ op: 'eq', args })),
61+
mockExecute: vi.fn(),
62+
mockFrom,
63+
mockInArray: vi.fn((...args: unknown[]) => ({ op: 'inArray', args })),
64+
mockIsNull: vi.fn((...args: unknown[]) => ({ op: 'isNull', args })),
65+
mockLeftJoin,
66+
mockLimit,
67+
mockLt: vi.fn((...args: unknown[]) => ({ op: 'lt', args })),
68+
mockMarkLargeValuesDeleted: vi.fn(async () => undefined),
69+
mockNotInArray: vi.fn((...args: unknown[]) => ({ op: 'notInArray', args })),
70+
mockOr: vi.fn((...args: unknown[]) => ({ op: 'or', args })),
71+
mockOrderBy,
72+
mockPruneLargeValueMetadata: vi.fn(async () => ({
73+
referencesDeleted: 0,
74+
dependenciesDeleted: 0,
75+
tombstonesDeleted: 0,
76+
})),
77+
mockSelect,
78+
mockTask: vi.fn((config: unknown) => config),
79+
mockWhere,
80+
}
81+
})
82+
83+
vi.mock('@sim/db', () => ({
84+
db: {
85+
execute: mockExecute,
86+
select: mockSelect,
87+
},
88+
}))
89+
90+
vi.mock('@sim/db/schema', () => ({
91+
executionLargeValueDependencies: {
92+
childKey: 'executionLargeValueDependencies.childKey',
93+
parentKey: 'executionLargeValueDependencies.parentKey',
94+
workspaceId: 'executionLargeValueDependencies.workspaceId',
95+
},
96+
executionLargeValueReferences: {
97+
executionId: 'executionLargeValueReferences.executionId',
98+
key: 'executionLargeValueReferences.key',
99+
source: 'executionLargeValueReferences.source',
100+
},
101+
executionLargeValues: {
102+
createdAt: 'executionLargeValues.createdAt',
103+
deletedAt: 'executionLargeValues.deletedAt',
104+
key: 'executionLargeValues.key',
105+
workspaceId: 'executionLargeValues.workspaceId',
106+
},
107+
jobExecutionLogs: {
108+
startedAt: 'jobExecutionLogs.startedAt',
109+
workspaceId: 'jobExecutionLogs.workspaceId',
110+
},
111+
pausedExecutions: {
112+
executionId: 'pausedExecutions.executionId',
113+
status: 'pausedExecutions.status',
114+
},
115+
workspaceFiles: {
116+
context: 'workspaceFiles.context',
117+
deletedAt: 'workspaceFiles.deletedAt',
118+
key: 'workspaceFiles.key',
119+
uploadedAt: 'workspaceFiles.uploadedAt',
120+
workspaceId: 'workspaceFiles.workspaceId',
121+
},
122+
workflowExecutionLogs: {
123+
executionData: 'workflowExecutionLogs.executionData',
124+
executionId: 'workflowExecutionLogs.executionId',
125+
files: 'workflowExecutionLogs.files',
126+
id: 'workflowExecutionLogs.id',
127+
startedAt: 'workflowExecutionLogs.startedAt',
128+
workspaceId: 'workflowExecutionLogs.workspaceId',
129+
},
130+
}))
131+
132+
vi.mock('@sim/logger', () => ({
133+
createLogger: vi.fn(() => ({
134+
error: vi.fn(),
135+
info: vi.fn(),
136+
warn: vi.fn(),
137+
})),
138+
}))
139+
140+
vi.mock('@trigger.dev/sdk', () => ({ task: mockTask }))
141+
142+
vi.mock('drizzle-orm', () => ({
143+
and: mockAnd,
144+
asc: vi.fn((column: unknown) => ({ op: 'asc', column })),
145+
eq: mockEq,
146+
inArray: mockInArray,
147+
isNull: mockIsNull,
148+
lt: mockLt,
149+
notInArray: mockNotInArray,
150+
or: mockOr,
151+
sql: vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ strings, values })),
152+
}))
153+
154+
vi.mock('@/lib/cleanup/batch-delete', () => ({
155+
batchDeleteByWorkspaceAndTimestamp: mockBatchDeleteByWorkspaceAndTimestamp,
156+
chunkArray: (items: string[], size: number) => {
157+
const chunks: string[][] = []
158+
for (let index = 0; index < items.length; index += size) {
159+
chunks.push(items.slice(index, index + size))
160+
}
161+
return chunks
162+
},
163+
chunkedBatchDelete: mockChunkedBatchDelete,
164+
}))
165+
166+
vi.mock('@/lib/execution/payloads/large-value-metadata', () => ({
167+
LIVE_PAUSED_REFERENCE_STATUSES: ['paused', 'partially_resumed', 'cancelling'],
168+
markLargeValuesDeleted: mockMarkLargeValuesDeleted,
169+
pruneLargeValueMetadata: mockPruneLargeValueMetadata,
170+
unreferencedLargeValuePredicate: vi.fn(() => ({ op: 'unreferencedLargeValuePredicate' })),
171+
}))
172+
173+
vi.mock('@/lib/logs/execution/snapshot/service', () => ({
174+
snapshotService: {
175+
cleanupOrphanedSnapshots: vi.fn(async () => 0),
176+
},
177+
}))
178+
179+
vi.mock('@/lib/uploads', () => ({
180+
isUsingCloudStorage: vi.fn(() => true),
181+
StorageService: {
182+
deleteFiles: mockDeleteFiles,
183+
},
184+
}))
185+
186+
vi.mock('@/lib/uploads/server/metadata', () => ({
187+
deleteFileMetadata: mockDeleteFileMetadata,
188+
}))
189+
190+
import { cleanupLogsTask, runCleanupLogs } from '@/background/cleanup-logs'
191+
192+
describe('cleanup logs worker', () => {
193+
beforeEach(() => {
194+
vi.clearAllMocks()
195+
mockChunkedBatchDelete.mockImplementation(async (options: CapturedBatchDeleteOptions) => {
196+
await options.selectChunk(['workspace-1'], 500)
197+
await options.onBatch?.([
198+
{
199+
id: 'log-1',
200+
files: [
201+
{ key: 'execution-file-a' },
202+
{ key: 'execution-file-a' },
203+
{ key: 'execution-file-b' },
204+
],
205+
},
206+
])
207+
return { table: 'workflow_execution_logs', deleted: 1, failed: 0 }
208+
})
209+
})
210+
211+
it('cleans logs without selecting execution_data or scanning refs', async () => {
212+
await runCleanupLogs({
213+
label: 'free/1',
214+
plan: 'free',
215+
retentionHours: 720,
216+
workspaceIds: ['workspace-1'],
217+
})
218+
219+
expect(mockChunkedBatchDelete).toHaveBeenCalledWith(
220+
expect.objectContaining({
221+
batchSize: 500,
222+
maxBatches: 50,
223+
totalRowLimit: 25_000,
224+
})
225+
)
226+
expect(mockSelect).toHaveBeenCalledWith({
227+
id: 'workflowExecutionLogs.id',
228+
files: 'workflowExecutionLogs.files',
229+
})
230+
expect(mockExecute).not.toHaveBeenCalled()
231+
expect(mockDeleteFiles).toHaveBeenCalledWith(
232+
['execution-file-a', 'execution-file-b'],
233+
'execution'
234+
)
235+
expect(mockDeleteFileMetadata).toHaveBeenCalledTimes(2)
236+
expect(mockPruneLargeValueMetadata).toHaveBeenCalledWith(
237+
expect.objectContaining({ workspaceIds: ['workspace-1'] })
238+
)
239+
expect(mockBatchDeleteByWorkspaceAndTimestamp).toHaveBeenCalledOnce()
240+
})
241+
242+
it('does not count large values as deleted when deleted_at marking fails', async () => {
243+
const largeValueKey =
244+
'execution/workspace-1/workflow-1/execution-1/large-value-lv_abcdefghijkl.json'
245+
mockLimit.mockResolvedValueOnce([]).mockResolvedValueOnce([{ key: largeValueKey }])
246+
mockDeleteFiles
247+
.mockResolvedValueOnce({ deleted: 2, failed: [] })
248+
.mockResolvedValueOnce({ deleted: 1, failed: [] })
249+
mockMarkLargeValuesDeleted.mockRejectedValueOnce(new Error('db unavailable'))
250+
251+
await runCleanupLogs({
252+
label: 'free/1',
253+
plan: 'free',
254+
retentionHours: 720,
255+
workspaceIds: ['workspace-1'],
256+
})
257+
258+
expect(mockMarkLargeValuesDeleted).toHaveBeenCalledWith([largeValueKey])
259+
expect(mockDeleteFileMetadata).toHaveBeenCalledTimes(2)
260+
})
261+
262+
it('cleans legacy large values from file metadata without selecting execution_data', async () => {
263+
const legacyKey =
264+
'execution/workspace-1/workflow-1/execution-1/large-value-lv_abcdefghijkl.json'
265+
mockLimit
266+
.mockResolvedValueOnce([])
267+
.mockResolvedValueOnce([])
268+
.mockResolvedValueOnce([{ key: legacyKey }])
269+
mockDeleteFiles
270+
.mockResolvedValueOnce({ deleted: 2, failed: [] })
271+
.mockResolvedValueOnce({ deleted: 1, failed: [] })
272+
273+
await runCleanupLogs({
274+
label: 'free/1',
275+
plan: 'free',
276+
retentionHours: 720,
277+
workspaceIds: ['workspace-1'],
278+
})
279+
280+
expect(mockSelect).toHaveBeenCalledWith({
281+
id: 'workflowExecutionLogs.id',
282+
files: 'workflowExecutionLogs.files',
283+
})
284+
expect(mockSelect).not.toHaveBeenCalledWith(
285+
expect.objectContaining({ executionData: expect.anything() })
286+
)
287+
const legacyWhereArgs = mockAnd.mock.calls
288+
.flat()
289+
.filter((arg): arg is { strings: string[] } => {
290+
return (
291+
typeof arg === 'object' &&
292+
arg !== null &&
293+
Array.isArray((arg as { strings?: unknown }).strings)
294+
)
295+
})
296+
.map((arg) => arg.strings.join(' '))
297+
.join(' ')
298+
expect(legacyWhereArgs).toContain('FROM ')
299+
expect(legacyWhereArgs).toContain("ref.source = 'execution_log'")
300+
expect(legacyWhereArgs).toContain("ref.source = 'paused_snapshot'")
301+
expect(legacyWhereArgs).toContain('dependency.child_key')
302+
expect(mockDeleteFiles).toHaveBeenLastCalledWith([legacyKey], 'execution')
303+
expect(mockDeleteFileMetadata).toHaveBeenCalledWith(legacyKey)
304+
})
305+
306+
it('caps Trigger.dev concurrency for log cleanup tasks', () => {
307+
expect(cleanupLogsTask).toMatchObject({
308+
queue: { concurrencyLimit: 2 },
309+
})
310+
})
311+
})

0 commit comments

Comments
 (0)