Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix(mothership): key resumes by orchestration id
  • Loading branch information
icecrasher321 committed Mar 25, 2026
commit 984387e35fb5b983571226389da6541e533dd18c
10 changes: 9 additions & 1 deletion apps/sim/lib/copilot/orchestrator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ export async function orchestrateCopilotStream(
runId,
messageId: typeof payloadMsgId === 'string' ? payloadMsgId : crypto.randomUUID(),
})
const continuationWorkerId = `sim-resume:${crypto.randomUUID()}`
let claimedToolCallIds: string[] = []
let claimedByWorkerId: string | null = null

Expand Down Expand Up @@ -198,7 +199,7 @@ export async function orchestrateCopilotStream(
for (;;) {
claimedToolCallIds = []
claimedByWorkerId = null
const resumeWorkerId = continuation.runId || context.runId || context.messageId
const resumeWorkerId = continuationWorkerId
const readyTools: ReadyContinuationTool[] = []
const localPendingPromises: Promise<unknown>[] = []
const missingToolCallIds: string[] = []
Expand All @@ -213,6 +214,7 @@ export async function orchestrateCopilotStream(
logger.info('Waiting for local async tool completion before retrying resume claim', {
toolCallId,
runId: continuation.runId,
workerId: resumeWorkerId,
})
continue
}
Expand All @@ -223,6 +225,7 @@ export async function orchestrateCopilotStream(
logger.warn('Async tool continuation is waiting on a claim held by another worker', {
toolCallId,
runId: continuation.runId,
workerId: resumeWorkerId,
claimedBy: durableRow.claimedBy,
})
continue
Expand Down Expand Up @@ -278,6 +281,7 @@ export async function orchestrateCopilotStream(
logger.info('Retrying async resume after some tool calls were not yet ready', {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
workerId: resumeWorkerId,
retry: resumeRetries,
missingToolCallIds,
})
Expand All @@ -295,6 +299,7 @@ export async function orchestrateCopilotStream(
logger.info('Retrying async resume because no tool calls were ready yet', {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
workerId: resumeWorkerId,
retry: resumeRetries,
})
await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries))
Expand Down Expand Up @@ -323,6 +328,7 @@ export async function orchestrateCopilotStream(
logger.info('Releasing async tool claims after claim contention during resume', {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
workerId: resumeWorkerId,
newlyClaimedToolCallIds,
claimFailures,
})
Expand All @@ -337,6 +343,7 @@ export async function orchestrateCopilotStream(
logger.info('Retrying async resume after claim contention', {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
workerId: resumeWorkerId,
retry: resumeRetries,
claimFailures,
})
Expand All @@ -359,6 +366,7 @@ export async function orchestrateCopilotStream(
logger.info('Resuming async tool continuation', {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
workerId: resumeWorkerId,
toolCallIds: readyTools.map((tool) => tool.toolCallId),
})

Expand Down