Skip to content

Commit 2b5e436

Browse files
authored
fix(snapshot): changed insert to upsert when concurrent identical child workflows are running (#3259)
* fix(snapshot): changed insert to upsert when concurrent identical child workflows are running * fixed ci tests failing
1 parent e24c824 commit 2b5e436

2 files changed

Lines changed: 234 additions & 39 deletions

File tree

apps/sim/lib/logs/execution/snapshot/service.test.ts

Lines changed: 216 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,59 @@
1-
import { describe, expect, it } from 'vitest'
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { databaseMock, drizzleOrmMock, loggerMock } from '@sim/testing'
5+
import { beforeEach, describe, expect, it, vi } from 'vitest'
6+
7+
const { mockSchemaExports } = vi.hoisted(() => ({
8+
mockSchemaExports: {
9+
workflowExecutionSnapshots: {
10+
id: 'id',
11+
workflowId: 'workflow_id',
12+
stateHash: 'state_hash',
13+
stateData: 'state_data',
14+
createdAt: 'created_at',
15+
},
16+
workflowExecutionLogs: {
17+
id: 'id',
18+
stateSnapshotId: 'state_snapshot_id',
19+
},
20+
},
21+
}))
22+
23+
vi.mock('@sim/db', () => databaseMock)
24+
vi.mock('@sim/db/schema', () => mockSchemaExports)
25+
vi.mock('@sim/logger', () => loggerMock)
26+
vi.mock('drizzle-orm', () => drizzleOrmMock)
27+
vi.mock('uuid', () => ({ v4: vi.fn(() => 'generated-uuid-1') }))
28+
229
import { SnapshotService } from '@/lib/logs/execution/snapshot/service'
330
import type { WorkflowState } from '@/lib/logs/types'
431

32+
const mockState: WorkflowState = {
33+
blocks: {
34+
block1: {
35+
id: 'block1',
36+
name: 'Test Agent',
37+
type: 'agent',
38+
position: { x: 100, y: 200 },
39+
subBlocks: {},
40+
outputs: {},
41+
enabled: true,
42+
horizontalHandles: true,
43+
advancedMode: false,
44+
height: 0,
45+
},
46+
},
47+
edges: [{ id: 'edge1', source: 'block1', target: 'block2' }],
48+
loops: {},
49+
parallels: {},
50+
}
51+
552
describe('SnapshotService', () => {
53+
beforeEach(() => {
54+
vi.clearAllMocks()
55+
})
56+
657
describe('computeStateHash', () => {
758
it.concurrent('should generate consistent hashes for identical states', () => {
859
const service = new SnapshotService()
@@ -62,7 +113,7 @@ describe('SnapshotService', () => {
62113
blocks: {
63114
block1: {
64115
...baseState.blocks.block1,
65-
position: { x: 500, y: 600 }, // Different position
116+
position: { x: 500, y: 600 },
66117
},
67118
},
68119
}
@@ -140,7 +191,7 @@ describe('SnapshotService', () => {
140191
const state2: WorkflowState = {
141192
blocks: {},
142193
edges: [
143-
{ id: 'edge2', source: 'b', target: 'c' }, // Different order
194+
{ id: 'edge2', source: 'b', target: 'c' },
144195
{ id: 'edge1', source: 'a', target: 'b' },
145196
],
146197
loops: {},
@@ -219,7 +270,6 @@ describe('SnapshotService', () => {
219270
const hash = service.computeStateHash(complexState)
220271
expect(hash).toHaveLength(64)
221272

222-
// Should be consistent
223273
const hash2 = service.computeStateHash(complexState)
224274
expect(hash).toBe(hash2)
225275
})
@@ -335,4 +385,166 @@ describe('SnapshotService', () => {
335385
expect(hash1).toHaveLength(64)
336386
})
337387
})
388+
389+
describe('createSnapshotWithDeduplication', () => {
390+
it('should use upsert to insert a new snapshot', async () => {
391+
const service = new SnapshotService()
392+
const workflowId = 'wf-123'
393+
394+
const mockReturning = vi.fn().mockResolvedValue([
395+
{
396+
id: 'generated-uuid-1',
397+
workflowId,
398+
stateHash: 'abc123',
399+
stateData: mockState,
400+
createdAt: new Date('2026-02-19T00:00:00Z'),
401+
},
402+
])
403+
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
404+
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
405+
const mockInsert = vi.fn().mockReturnValue({ values: mockValues })
406+
databaseMock.db.insert = mockInsert
407+
408+
const result = await service.createSnapshotWithDeduplication(workflowId, mockState)
409+
410+
expect(mockInsert).toHaveBeenCalled()
411+
expect(mockValues).toHaveBeenCalledWith(
412+
expect.objectContaining({
413+
id: 'generated-uuid-1',
414+
workflowId,
415+
stateData: mockState,
416+
})
417+
)
418+
expect(mockOnConflictDoUpdate).toHaveBeenCalledWith(
419+
expect.objectContaining({
420+
set: expect.any(Object),
421+
})
422+
)
423+
expect(result.snapshot.id).toBe('generated-uuid-1')
424+
expect(result.isNew).toBe(true)
425+
})
426+
427+
it('should detect reused snapshot when returned id differs from generated id', async () => {
428+
const service = new SnapshotService()
429+
const workflowId = 'wf-123'
430+
431+
const mockReturning = vi.fn().mockResolvedValue([
432+
{
433+
id: 'existing-snapshot-id',
434+
workflowId,
435+
stateHash: 'abc123',
436+
stateData: mockState,
437+
createdAt: new Date('2026-02-19T00:00:00Z'),
438+
},
439+
])
440+
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
441+
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
442+
const mockInsert = vi.fn().mockReturnValue({ values: mockValues })
443+
databaseMock.db.insert = mockInsert
444+
445+
const result = await service.createSnapshotWithDeduplication(workflowId, mockState)
446+
447+
expect(result.snapshot.id).toBe('existing-snapshot-id')
448+
expect(result.isNew).toBe(false)
449+
})
450+
451+
it('should not throw on concurrent inserts with the same hash', async () => {
452+
const service = new SnapshotService()
453+
const workflowId = 'wf-123'
454+
455+
const mockReturningNew = vi.fn().mockResolvedValue([
456+
{
457+
id: 'generated-uuid-1',
458+
workflowId,
459+
stateHash: 'abc123',
460+
stateData: mockState,
461+
createdAt: new Date('2026-02-19T00:00:00Z'),
462+
},
463+
])
464+
const mockReturningExisting = vi.fn().mockResolvedValue([
465+
{
466+
id: 'existing-snapshot-id',
467+
workflowId,
468+
stateHash: 'abc123',
469+
stateData: mockState,
470+
createdAt: new Date('2026-02-19T00:00:00Z'),
471+
},
472+
])
473+
474+
let callCount = 0
475+
databaseMock.db.insert = vi.fn().mockImplementation(() => ({
476+
values: vi.fn().mockImplementation(() => ({
477+
onConflictDoUpdate: vi.fn().mockImplementation(() => ({
478+
returning: callCount++ === 0 ? mockReturningNew : mockReturningExisting,
479+
})),
480+
})),
481+
}))
482+
483+
const [result1, result2] = await Promise.all([
484+
service.createSnapshotWithDeduplication(workflowId, mockState),
485+
service.createSnapshotWithDeduplication(workflowId, mockState),
486+
])
487+
488+
expect(result1.snapshot.id).toBe('generated-uuid-1')
489+
expect(result1.isNew).toBe(true)
490+
expect(result2.snapshot.id).toBe('existing-snapshot-id')
491+
expect(result2.isNew).toBe(false)
492+
})
493+
494+
it('should pass state_data in the ON CONFLICT SET clause', async () => {
495+
const service = new SnapshotService()
496+
const workflowId = 'wf-123'
497+
498+
let capturedConflictConfig: Record<string, unknown> | undefined
499+
const mockReturning = vi.fn().mockResolvedValue([
500+
{
501+
id: 'generated-uuid-1',
502+
workflowId,
503+
stateHash: 'abc123',
504+
stateData: mockState,
505+
createdAt: new Date('2026-02-19T00:00:00Z'),
506+
},
507+
])
508+
509+
databaseMock.db.insert = vi.fn().mockReturnValue({
510+
values: vi.fn().mockReturnValue({
511+
onConflictDoUpdate: vi.fn().mockImplementation((config: Record<string, unknown>) => {
512+
capturedConflictConfig = config
513+
return { returning: mockReturning }
514+
}),
515+
}),
516+
})
517+
518+
await service.createSnapshotWithDeduplication(workflowId, mockState)
519+
520+
expect(capturedConflictConfig).toBeDefined()
521+
expect(capturedConflictConfig!.target).toBeDefined()
522+
expect(capturedConflictConfig!.set).toBeDefined()
523+
expect(capturedConflictConfig!.set).toHaveProperty('stateData')
524+
})
525+
526+
it('should always call insert, never a separate select for deduplication', async () => {
527+
const service = new SnapshotService()
528+
const workflowId = 'wf-123'
529+
530+
const mockReturning = vi.fn().mockResolvedValue([
531+
{
532+
id: 'generated-uuid-1',
533+
workflowId,
534+
stateHash: 'abc123',
535+
stateData: mockState,
536+
createdAt: new Date('2026-02-19T00:00:00Z'),
537+
},
538+
])
539+
const mockOnConflictDoUpdate = vi.fn().mockReturnValue({ returning: mockReturning })
540+
const mockValues = vi.fn().mockReturnValue({ onConflictDoUpdate: mockOnConflictDoUpdate })
541+
databaseMock.db.insert = vi.fn().mockReturnValue({ values: mockValues })
542+
databaseMock.db.select = vi.fn()
543+
544+
await service.createSnapshotWithDeduplication(workflowId, mockState)
545+
546+
expect(databaseMock.db.insert).toHaveBeenCalledTimes(1)
547+
expect(databaseMock.db.select).not.toHaveBeenCalled()
548+
})
549+
})
338550
})

apps/sim/lib/logs/execution/snapshot/service.ts

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { createHash } from 'crypto'
22
import { db } from '@sim/db'
33
import { workflowExecutionLogs, workflowExecutionSnapshots } from '@sim/db/schema'
44
import { createLogger } from '@sim/logger'
5-
import { and, eq, lt, notExists } from 'drizzle-orm'
5+
import { and, eq, lt, notExists, sql } from 'drizzle-orm'
66
import { v4 as uuidv4 } from 'uuid'
77
import type {
88
SnapshotService as ISnapshotService,
@@ -28,58 +28,41 @@ export class SnapshotService implements ISnapshotService {
2828
workflowId: string,
2929
state: WorkflowState
3030
): Promise<SnapshotCreationResult> {
31-
// Hash the position-less state for deduplication (functional equivalence)
3231
const stateHash = this.computeStateHash(state)
3332

34-
const existingSnapshot = await this.getSnapshotByHash(workflowId, stateHash)
35-
if (existingSnapshot) {
36-
let refreshedState: WorkflowState = existingSnapshot.stateData
37-
try {
38-
await db
39-
.update(workflowExecutionSnapshots)
40-
.set({ stateData: state })
41-
.where(eq(workflowExecutionSnapshots.id, existingSnapshot.id))
42-
refreshedState = state
43-
} catch (error) {
44-
logger.warn(
45-
`Failed to refresh snapshot stateData for ${existingSnapshot.id}, continuing with existing data`,
46-
error
47-
)
48-
}
49-
50-
logger.info(
51-
`Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
52-
)
53-
return {
54-
snapshot: { ...existingSnapshot, stateData: refreshedState },
55-
isNew: false,
56-
}
57-
}
58-
59-
// Store the FULL state (including positions) so we can recreate the exact workflow
60-
// Even though we hash without positions, we want to preserve the complete state
6133
const snapshotData: WorkflowExecutionSnapshotInsert = {
6234
id: uuidv4(),
6335
workflowId,
6436
stateHash,
6537
stateData: state,
6638
}
6739

68-
const [newSnapshot] = await db
40+
const [upsertedSnapshot] = await db
6941
.insert(workflowExecutionSnapshots)
7042
.values(snapshotData)
43+
.onConflictDoUpdate({
44+
target: [workflowExecutionSnapshots.workflowId, workflowExecutionSnapshots.stateHash],
45+
set: {
46+
stateData: sql`excluded.state_data`,
47+
},
48+
})
7149
.returning()
7250

51+
const isNew = upsertedSnapshot.id === snapshotData.id
52+
7353
logger.info(
74-
`Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})`
54+
isNew
55+
? `Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})`
56+
: `Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
7557
)
58+
7659
return {
7760
snapshot: {
78-
...newSnapshot,
79-
stateData: newSnapshot.stateData as WorkflowState,
80-
createdAt: newSnapshot.createdAt.toISOString(),
61+
...upsertedSnapshot,
62+
stateData: upsertedSnapshot.stateData as WorkflowState,
63+
createdAt: upsertedSnapshot.createdAt.toISOString(),
8164
},
82-
isNew: true,
65+
isNew,
8366
}
8467
}
8568

0 commit comments

Comments
 (0)