Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions apps/sim/app/api/copilot/chat/abort/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1'
import { fetchGo } from '@/lib/copilot/request/go/fetch'
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http'
import { withCopilotSpan, withIncomingGoSpan } from '@/lib/copilot/request/otel'
import { abortActiveStream, waitForPendingChatStream } from '@/lib/copilot/request/session'
import {
abortActiveStream,
releasePendingChatStream,
waitForPendingChatStream,
} from '@/lib/copilot/request/session'
import { getMothershipBaseURL, getMothershipSourceEnvHeaders } from '@/lib/copilot/server/agent-url'
import { env } from '@/lib/core/config/env'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
Expand Down Expand Up @@ -139,11 +143,21 @@ export const POST = withRouteHandler((request: NextRequest) =>
}
)
if (!settled) {
rootSpan.setAttribute(TraceAttr.CopilotAbortOutcome, CopilotAbortOutcome.SettleTimeout)
return NextResponse.json(
{ error: 'Previous response is still shutting down', aborted, settled: false },
{ status: 409 }
)
// The holder didn't settle within the grace window even though the
// user explicitly stopped it and abort markers are written on both
// sides (local + Go). Don't leave the chat hostage to a wedged
// handler: break its stream lock. This is safe by construction —
// releaseLock only deletes when the value still matches this
// streamId (never clobbers a newer stream), and the old handler's
// heartbeat uses extendLock-if-owner, so it observes the loss and
// stops heartbeating rather than re-asserting.
await releasePendingChatStream(chatId, streamId)
logger.warn('Stream did not settle after abort; force-released chat stream lock', {
chatId,
streamId,
})
rootSpan.setAttribute(TraceAttr.CopilotAbortOutcome, CopilotAbortOutcome.ForceReleased)
return NextResponse.json({ aborted, settled: false, forceReleased: true })
}
rootSpan.setAttribute(TraceAttr.CopilotAbortOutcome, CopilotAbortOutcome.Settled)
return NextResponse.json({ aborted, settled: true })
Expand Down
3 changes: 3 additions & 0 deletions apps/sim/lib/api/contracts/copilot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,9 @@ export const copilotChatAbortContract = defineRouteContract({
schema: z.object({
aborted: z.boolean(),
settled: z.boolean().optional(),
// True when the stream did not settle within the grace window and the
// chat stream lock was force-broken so the chat is immediately usable.
forceReleased: z.boolean().optional(),
}),
},
})
Expand Down
20 changes: 20 additions & 0 deletions apps/sim/lib/copilot/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ export const SIM_AGENT_API_URL =
/** Default timeout for the copilot orchestration stream loop (60 min). */
export const ORCHESTRATION_TIMEOUT_MS = 3_600_000

/**
* Watchdog cap for a single sim-executed copilot tool. A tool that neither
* resolves nor rejects within its cap is failed with a timeout error so the
* checkpoint loop can resume Go with an error result instead of wedging the
* chat (and its pending-stream lock) behind a hung await forever.
*/
export const TOOL_WATCHDOG_DEFAULT_MS = 60_000

/**
* Watchdog cap for tool classes with legitimately long runtimes (workflow
* executions, media/image generation, sandboxed code, deep research). Those
* tools carry their own inner budgets (plan execution timeouts, sandbox
* timeouts), so this cap only backstops a true hang and sits above all of
* them — matching ORCHESTRATION_TIMEOUT_MS so it never undercuts a legal run.
*/
export const TOOL_WATCHDOG_LONG_RUNNING_MS = ORCHESTRATION_TIMEOUT_MS

/** Extra slack the resume gate allows past the slowest pending tool's watchdog. */
export const TOOL_WATCHDOG_RESUME_GRACE_MS = 30_000

/** Timeout for the client-side streaming response handler (60 min). */
export const STREAM_TIMEOUT_MS = 3_600_000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export type BillingRouteOutcomeValue = (typeof BillingRouteOutcome)[BillingRoute
export const CopilotAbortOutcome = {
BadRequest: 'bad_request',
FallbackPersistFailed: 'fallback_persist_failed',
ForceReleased: 'force_released',
MissingMessageId: 'missing_message_id',
MissingStreamId: 'missing_stream_id',
NoChatId: 'no_chat_id',
Expand Down
4 changes: 4 additions & 0 deletions apps/sim/lib/copilot/generated/trace-attributes-v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ export const TraceAttr = {
CopilotVfsInputMediaTypeClaimed: 'copilot.vfs.input.media_type_claimed',
CopilotVfsInputMediaTypeDetected: 'copilot.vfs.input.media_type_detected',
CopilotVfsInputWidth: 'copilot.vfs.input.width',
CopilotVfsMaterializeFileCount: 'copilot.vfs.materialize.file_count',
CopilotVfsMaterializePhaseMs: 'copilot.vfs.materialize.phase_ms',
CopilotVfsMetadataFailed: 'copilot.vfs.metadata.failed',
CopilotVfsOutcome: 'copilot.vfs.outcome',
CopilotVfsOutputBytes: 'copilot.vfs.output.bytes',
Expand Down Expand Up @@ -877,6 +879,8 @@ export const TraceAttrValues: readonly TraceAttrValue[] = [
'copilot.vfs.input.media_type_claimed',
'copilot.vfs.input.media_type_detected',
'copilot.vfs.input.width',
'copilot.vfs.materialize.file_count',
'copilot.vfs.materialize.phase_ms',
'copilot.vfs.metadata.failed',
'copilot.vfs.outcome',
'copilot.vfs.output.bytes',
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/lib/copilot/generated/trace-spans-v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export const TraceSpan = {
CopilotToolsWriteCsvToTable: 'copilot.tools.write_csv_to_table',
CopilotToolsWriteOutputFile: 'copilot.tools.write_output_file',
CopilotToolsWriteOutputTable: 'copilot.tools.write_output_table',
CopilotVfsMaterialize: 'copilot.vfs.materialize',
CopilotVfsPrepareImage: 'copilot.vfs.prepare_image',
CopilotVfsReadFile: 'copilot.vfs.read_file',
GenAiAgentExecute: 'gen_ai.agent.execute',
Expand Down Expand Up @@ -140,6 +141,7 @@ export const TraceSpanValues: readonly TraceSpanValue[] = [
'copilot.tools.write_csv_to_table',
'copilot.tools.write_output_file',
'copilot.tools.write_output_table',
'copilot.vfs.materialize',
'copilot.vfs.prepare_image',
'copilot.vfs.read_file',
'gen_ai.agent.execute',
Expand Down
104 changes: 104 additions & 0 deletions apps/sim/lib/copilot/request/lifecycle/run.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@ import type { ExecutionContext, StreamingContext } from '@/lib/copilot/request/t

const {
mockCreateRunSegment,
mockForceFailHungToolCall,
mockGetEffectiveDecryptedEnv,
mockGetMothershipBaseURL,
mockGetMothershipSourceEnvHeaders,
mockPrepareExecutionContext,
mockRunStreamLoop,
mockToolWatchdogTimeoutMs,
mockUpdateRunStatus,
} = vi.hoisted(() => ({
mockCreateRunSegment: vi.fn(),
mockForceFailHungToolCall: vi.fn(),
mockGetEffectiveDecryptedEnv: vi.fn(),
mockGetMothershipBaseURL: vi.fn(),
mockGetMothershipSourceEnvHeaders: vi.fn(),
mockPrepareExecutionContext: vi.fn(),
mockRunStreamLoop: vi.fn(),
mockToolWatchdogTimeoutMs: vi.fn(() => 60_000),
mockUpdateRunStatus: vi.fn(),
}))

Expand Down Expand Up @@ -84,6 +88,8 @@ vi.mock('@/lib/copilot/request/tools/billing', () => ({

vi.mock('@/lib/copilot/request/tools/executor', () => ({
executeToolAndReport: vi.fn(),
forceFailHungToolCall: mockForceFailHungToolCall,
toolWatchdogTimeoutMs: mockToolWatchdogTimeoutMs,
}))

import { MothershipStreamV1ToolOutcome } from '@/lib/copilot/generated/mothership-stream-v1'
Expand Down Expand Up @@ -583,4 +589,102 @@ describe('runCopilotLifecycle', () => {
// Final attempt (2) is terminal → not flagged, so Go bills + surfaces it.
expect(bodies[3].willRetryOnStreamError).toBeUndefined()
})

it('force-fails a hung tool promise and resumes with an error result instead of wedging', async () => {
vi.useFakeTimers()
try {
const fetchUrls: string[] = []
const bodies: Record<string, unknown>[] = []
const executionContext: ExecutionContext = {
userId: 'user-1',
workflowId: '',
workspaceId: 'ws-1',
chatId: 'chat-1',
decryptedEnvVars: {},
}

// Mirror the real helper: settle the tool call into a terminal error
// state so the resume loop can serialize an error result for it.
mockForceFailHungToolCall.mockImplementation(
async (toolCallId: string, context: StreamingContext, message: string) => {
const tool = context.toolCalls.get(toolCallId)
if (!tool) return
tool.status = MothershipStreamV1ToolOutcome.error
tool.endTime = Date.now()
tool.result = { success: false }
tool.error = message
}
)

// Initial leg checkpoints on an async tool whose promise NEVER settles —
// the exact shape of the prod incident (claimed, marked running, hung).
mockRunStreamLoop.mockImplementationOnce(
async (
fetchUrl: string,
fetchOptions: RequestInit,
context: StreamingContext
): Promise<void> => {
fetchUrls.push(fetchUrl)
bodies.push(JSON.parse(String(fetchOptions.body)))
context.toolCalls.set('tool-hung', {
id: 'tool-hung',
name: 'read',
status: 'executing',
})
context.pendingToolPromises.set('tool-hung', new Promise(() => {}))
context.awaitingAsyncContinuation = {
checkpointId: 'ckpt-1',
pendingToolCallIds: ['tool-hung'],
}
}
)

// Resume leg completes normally with the error result delivered.
mockRunStreamLoop.mockImplementationOnce(
async (
fetchUrl: string,
fetchOptions: RequestInit,
context: StreamingContext
): Promise<void> => {
fetchUrls.push(fetchUrl)
bodies.push(JSON.parse(String(fetchOptions.body)))
context.accumulatedContent = 'The file read failed, but here is what I know.'
}
)

const lifecycle = runCopilotLifecycle(
{ message: 'hello', messageId: 'stream-1' },
{
userId: 'user-1',
workspaceId: 'ws-1',
chatId: 'chat-1',
executionId: 'exec-1',
runId: 'run-1',
executionContext,
}
)

// Wait budget = watchdog (60s, mocked) + resume grace (30s). Advance past it.
await vi.advanceTimersByTimeAsync(91_000)
const result = await lifecycle

expect(mockForceFailHungToolCall).toHaveBeenCalledWith(
'tool-hung',
expect.anything(),
expect.stringContaining('hung')
)
expect(fetchUrls[1]).toBe('http://mothership.test/api/tools/resume')
expect(bodies[1].results).toEqual([
expect.objectContaining({
callId: 'tool-hung',
name: 'read',
success: false,
data: { error: expect.stringContaining('hung') },
}),
])
expect(result.success).toBe(true)
} finally {
vi.useRealTimers()
}
})
})
43 changes: 40 additions & 3 deletions apps/sim/lib/copilot/request/lifecycle/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { sleep } from '@sim/utils/helpers'
import { generateId } from '@sim/utils/id'
import { isWorkspaceOnEnterprisePlan } from '@/lib/billing/core/subscription'
import { createRunSegment, updateRunStatus } from '@/lib/copilot/async-runs/repository'
import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import { SIM_AGENT_VERSION, TOOL_WATCHDOG_RESUME_GRACE_MS } from '@/lib/copilot/constants'
import {
MothershipStreamV1EventType,
MothershipStreamV1RunKind,
Expand All @@ -24,7 +24,11 @@ import {
setTerminalToolCallState,
} from '@/lib/copilot/request/tool-call-state'
import { handleBillingLimitResponse } from '@/lib/copilot/request/tools/billing'
import { executeToolAndReport } from '@/lib/copilot/request/tools/executor'
import {
executeToolAndReport,
forceFailHungToolCall,
toolWatchdogTimeoutMs,
} from '@/lib/copilot/request/tools/executor'
import type { TraceCollector } from '@/lib/copilot/request/trace'
import { RequestTraceV1SpanStatus } from '@/lib/copilot/request/trace'
import type {
Expand Down Expand Up @@ -405,15 +409,48 @@ async function runCheckpointLoop(
if (!continuation) break

if (context.pendingToolPromises.size > 0) {
// Bounded by the slowest pending tool's watchdog plus grace. The
// per-tool watchdog already guarantees each promise settles; this gate
// is the structural backstop so that no tool failure mode — known or
// unknown — can park the checkpoint loop (and the chat's pending-stream
// lock) forever.
const waitBudgetMs =
Array.from(context.pendingToolPromises.keys()).reduce(
(max, toolCallId) =>
Math.max(max, toolWatchdogTimeoutMs(context.toolCalls.get(toolCallId)?.name)),
0
) + TOOL_WATCHDOG_RESUME_GRACE_MS
Comment thread
Sg312 marked this conversation as resolved.
const waitSpan = context.trace.startSpan('Wait for Tools', 'lifecycle.wait_tools', {
checkpointId: continuation.checkpointId,
pendingCount: context.pendingToolPromises.size,
waitBudgetMs,
})
logger.info('Waiting for in-flight tool executions before resume', {
checkpointId: continuation.checkpointId,
pendingCount: context.pendingToolPromises.size,
waitBudgetMs,
})
await Promise.allSettled(context.pendingToolPromises.values())
const settledInTime = await Promise.race([
Promise.allSettled(context.pendingToolPromises.values()).then(() => true),
sleep(waitBudgetMs).then(() => false),
])
if (!settledInTime) {
const hungToolCallIds = Array.from(context.pendingToolPromises.keys())
logger.error('Pending tool executions exceeded the resume wait budget; force-failing', {
checkpointId: continuation.checkpointId,
waitBudgetMs,
hungToolCallIds,
})
for (const toolCallId of hungToolCallIds) {
await forceFailHungToolCall(
toolCallId,
context,
'Tool execution hung on the Sim executor and was abandoned so the conversation could continue.'
)
context.pendingToolPromises.delete(toolCallId)
}
}
waitSpan.attributes = { ...waitSpan.attributes, settledInTime }
context.trace.endSpan(waitSpan)
}

Expand Down
Loading
Loading