Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 107 additions & 2 deletions apps/sim/executor/execution/engine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,27 @@
import { sleep } from '@sim/utils/helpers'
import { afterEach, beforeEach, describe, expect, it, type Mock, vi } from 'vitest'

const { mockCancellationSubscribers } = vi.hoisted(() => ({
mockCancellationSubscribers: new Set<(event: { executionId: string }) => void>(),
}))

vi.mock('@/lib/execution/cancellation', () => ({
isExecutionCancelled: vi.fn(),
isRedisCancellationEnabled: vi.fn(),
getCancellationChannel: () => ({
publish: (event: { executionId: string }) => {
for (const handler of mockCancellationSubscribers) handler(event)
},
subscribe: (handler: (event: { executionId: string }) => void) => {
mockCancellationSubscribers.add(handler)
return () => {
mockCancellationSubscribers.delete(handler)
}
},
dispose: () => {
mockCancellationSubscribers.clear()
},
}),
}))

import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
Expand Down Expand Up @@ -115,6 +133,7 @@ function createMockNodeOrchestrator(executeDelay = 0): MockNodeOrchestrator {
describe('ExecutionEngine', () => {
beforeEach(() => {
vi.clearAllMocks()
mockCancellationSubscribers.clear()
;(isExecutionCancelled as Mock).mockResolvedValue(false)
;(isRedisCancellationEnabled as Mock).mockReturnValue(false)
})
Expand Down Expand Up @@ -346,7 +365,93 @@ describe('ExecutionEngine', () => {
expect(result.status).toBe('cancelled')
})

it('should respect cancellation check interval', async () => {
it('wakes from a slow in-flight node when a pub/sub cancellation arrives', async () => {
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
;(isExecutionCancelled as Mock).mockResolvedValue(false)

const startNode = createMockNode('start', 'starter')
const slowNode = createMockNode('slow', 'function')
startNode.outgoingEdges.set('edge1', { target: 'slow' })

const dag = createMockDAG([startNode, slowNode])
const context = createMockContext({ executionId: 'pubsub-execution' })
const edgeManager = createMockEdgeManager((node) => (node.id === 'start' ? ['slow'] : []))
const nodeOrchestrator = createMockNodeOrchestrator(500)

const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const executionPromise = engine.run('start')

setTimeout(() => {
for (const handler of mockCancellationSubscribers) {
handler({ executionId: 'pubsub-execution' })
}
}, 5)

const startTime = Date.now()
const result = await executionPromise
const duration = Date.now() - startTime

expect(result.status).toBe('cancelled')
expect(duration).toBeLessThan(100)
})

it('ignores pub/sub events targeting other executions', async () => {
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
;(isExecutionCancelled as Mock).mockResolvedValue(false)

const startNode = createMockNode('start', 'starter')
const dag = createMockDAG([startNode])
const context = createMockContext({ executionId: 'execution-a' })
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()

const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)

for (const handler of mockCancellationSubscribers) {
handler({ executionId: 'execution-b' })
}

const result = await engine.run('start')
expect(result.status).toBeUndefined()
expect(result.success).toBe(true)
})

it('unsubscribes from the cancellation channel after run completes', async () => {
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
;(isExecutionCancelled as Mock).mockResolvedValue(false)

const startNode = createMockNode('start', 'starter')
const dag = createMockDAG([startNode])
const context = createMockContext({ executionId: 'cleanup-execution' })
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()

const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
expect(mockCancellationSubscribers.size).toBe(1)

await engine.run('start')

expect(mockCancellationSubscribers.size).toBe(0)
})

it('honours the durable backstop when cancelled before subscribing', async () => {
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
;(isExecutionCancelled as Mock).mockResolvedValue(true)

const startNode = createMockNode('start', 'starter')
const dag = createMockDAG([startNode])
const context = createMockContext()
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()

const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('start')

expect(result.status).toBe('cancelled')
expect(nodeOrchestrator.executionCount).toBe(0)
})

it('calls isExecutionCancelled once as the startup backstop check', async () => {
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
;(isExecutionCancelled as Mock).mockResolvedValue(false)

Expand All @@ -359,7 +464,7 @@ describe('ExecutionEngine', () => {
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await engine.run('start')

expect((isExecutionCancelled as Mock).mock.calls.length).toBeGreaterThanOrEqual(1)
expect((isExecutionCancelled as Mock).mock.calls.length).toBe(1)
})
})

Expand Down
129 changes: 58 additions & 71 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { createLogger, type Logger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
import {
getCancellationChannel,
isExecutionCancelled,
isRedisCancellationEnabled,
} from '@/lib/execution/cancellation'
import { BlockType } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
import type { EdgeManager } from '@/executor/execution/edge-manager'
Expand Down Expand Up @@ -31,11 +35,9 @@ export class ExecutionEngine {
private errorFlag = false
private stoppedEarlyFlag = false
private executionError: Error | null = null
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
private abortPromise: Promise<void> | null = null
private abortResolve: (() => void) | null = null
private abortPromise!: Promise<void>
private abortResolve!: () => void
private cancellationUnsubscribe: (() => void) | null = null
private execLogger: Logger

constructor(
Expand All @@ -45,7 +47,6 @@ export class ExecutionEngine {
private nodeOrchestrator: NodeExecutionOrchestrator
) {
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
this.execLogger = logger.withMetadata({
workflowId: this.context.workflowId,
workspaceId: this.context.workspaceId,
Expand All @@ -54,72 +55,64 @@ export class ExecutionEngine {
requestId: this.context.metadata.requestId,
})
this.initializeAbortHandler()
this.subscribeToCancellationChannel()
}

private subscribeToCancellationChannel(): void {
if (!this.context.executionId) return
const executionId = this.context.executionId
this.cancellationUnsubscribe = getCancellationChannel().subscribe((event) => {
if (event.executionId !== executionId) return
this.execLogger.info('Execution cancelled via pub/sub', { executionId })
this.signalCancelled()
})
}

/**
* Sets up a single abort promise that can be reused throughout execution.
* This avoids creating multiple event listeners and potential memory leaks.
*/
private initializeAbortHandler(): void {
this.abortPromise = new Promise<void>((resolve) => {
this.abortResolve = resolve
})

if (!this.context.abortSignal) return

if (this.context.abortSignal.aborted) {
this.cancelledFlag = true
this.abortPromise = Promise.resolve()
this.signalCancelled()
return
}

this.abortPromise = new Promise<void>((resolve) => {
this.abortResolve = resolve
})

this.context.abortSignal.addEventListener(
'abort',
() => {
this.cancelledFlag = true
this.abortResolve?.()
},
{ once: true }
)
this.context.abortSignal.addEventListener('abort', () => this.signalCancelled(), { once: true })
}

private async checkCancellation(): Promise<boolean> {
if (this.cancelledFlag) {
return true
}

if (this.useRedisCancellation) {
const now = Date.now()
if (now - this.lastCancellationCheck < this.CANCELLATION_CHECK_INTERVAL_MS) {
return false
}
this.lastCancellationCheck = now
private signalCancelled(): void {
if (this.cancelledFlag) return
this.cancelledFlag = true
this.abortResolve()
}

const cancelled = await isExecutionCancelled(this.context.executionId!)
if (cancelled) {
this.cancelledFlag = true
this.execLogger.info('Execution cancelled via Redis', {
executionId: this.context.executionId,
})
}
return cancelled
}
private checkCancellation(): boolean {
return this.cancelledFlag
}

if (this.context.abortSignal?.aborted) {
this.cancelledFlag = true
return true
/** Catches cancellations published before this engine subscribed (e.g. resume from snapshot). */
private async checkCancellationBackstop(): Promise<void> {
if (!this.context.executionId || !isRedisCancellationEnabled()) return
const cancelled = await isExecutionCancelled(this.context.executionId)
if (cancelled) {
this.execLogger.info('Execution already cancelled at engine start (Redis backstop)', {
executionId: this.context.executionId,
})
this.signalCancelled()
}

return false
}

async run(triggerBlockId?: string): Promise<ExecutionResult> {
const startTime = performance.now()
try {
this.initializeQueue(triggerBlockId)
await this.checkCancellationBackstop()

while (this.hasWork()) {
if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) {
if (this.checkCancellation() || this.errorFlag || this.stoppedEarlyFlag) {
break
}
await this.processQueue()
Expand Down Expand Up @@ -194,6 +187,15 @@ export class ExecutionEngine {
attachExecutionResult(error, executionResult)
}
throw error
} finally {
this.cleanup()
}
}

private cleanup(): void {
if (this.cancellationUnsubscribe) {
this.cancellationUnsubscribe()
this.cancellationUnsubscribe = null
}
}

Expand Down Expand Up @@ -238,32 +240,17 @@ export class ExecutionEngine {

private async waitForAnyExecution(): Promise<void> {
if (this.executing.size > 0) {
const abortPromise = this.getAbortPromise()
if (abortPromise) {
await Promise.race([...this.executing, abortPromise])
} else {
await Promise.race(this.executing)
}
await Promise.race([...this.executing, this.abortPromise])
}
}

private async waitForAllExecutions(): Promise<void> {
const abortPromise = this.getAbortPromise()
if (abortPromise) {
await Promise.race([Promise.all(this.executing), abortPromise])
} else {
await Promise.all(this.executing)
await Promise.race([Promise.all(this.executing), this.abortPromise])
if (this.executing.size > 0) {
await Promise.allSettled(this.executing)
}
}

/**
* Returns the cached abort promise. This is safe to call multiple times
* as it reuses the same promise instance created during initialization.
*/
private getAbortPromise(): Promise<void> | null {
return this.abortPromise
}

private async withQueueLock<T>(fn: () => Promise<T> | T): Promise<T> {
const prevLock = this.queueLock
let resolveLock: () => void
Expand Down Expand Up @@ -363,7 +350,7 @@ export class ExecutionEngine {

private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if ((await this.checkCancellation()) || this.errorFlag) {
if (this.checkCancellation() || this.errorFlag) {
break
}
const nodeId = this.dequeue()
Expand Down
12 changes: 4 additions & 8 deletions apps/sim/executor/handlers/api/api-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ describe('ApiBlockHandler', () => {
body: { key: 'value' }, // Expect parsed body
_context: { workflowId: 'test-workflow-id' },
},
false, // skipPostProcess
mockContext // execution context
{ executionContext: mockContext }
)
expect(result).toEqual(expectedOutput)
})
Expand Down Expand Up @@ -177,8 +176,7 @@ describe('ApiBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'http_request',
expect.objectContaining({ body: expectedParsedBody }),
false, // skipPostProcess
mockContext // execution context
{ executionContext: mockContext }
)
})

Expand All @@ -193,8 +191,7 @@ describe('ApiBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'http_request',
expect.objectContaining({ body: 'This is plain text' }),
false, // skipPostProcess
mockContext // execution context
{ executionContext: mockContext }
)
})

Expand All @@ -209,8 +206,7 @@ describe('ApiBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'http_request',
expect.objectContaining({ body: undefined }),
false, // skipPostProcess
mockContext // execution context
{ executionContext: mockContext }
)
})

Expand Down
3 changes: 1 addition & 2 deletions apps/sim/executor/handlers/api/api-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ export class ApiBlockHandler implements BlockHandler {
callChain: ctx.callChain,
},
},
false,
ctx
{ executionContext: ctx }
)

if (!result.success) {
Expand Down
Loading
Loading