Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5028875
feat(transport): replace shared chat transport with mothership-stream…
Sg312 Mar 25, 2026
448ea02
improvement(contracts): regenerate contracts from go
Sg312 Mar 25, 2026
7c5547f
feat(tools): add tool catalog codegen from go tool contracts
Sg312 Mar 25, 2026
d5131ae
feat(tools): add tool-executor dispatch framework for sim side tool r…
Sg312 Mar 26, 2026
1157dfc
feat(orchestrator): rewrite tool dispatch with catalog-driven executo…
Sg312 Mar 26, 2026
997896d
feat(orchestrator): checkpoint resume flow
Sg312 Mar 26, 2026
f9c185b
refactor(copilot): consolidate orchestrator into request/ layer
Sg312 Mar 27, 2026
97d41e9
refactor(mothership): reorganize lib/copilot into structured subdirec…
Sg312 Mar 27, 2026
c4876ba
refactor(mothership): canonical transcript layer, dead code cleanup, …
Sg312 Mar 27, 2026
410dd9a
refactor(mothership): rebase onto latest staging
Sg312 Mar 27, 2026
741d856
refactor(mothership): rename request continue to lifecycle
Sg312 Mar 27, 2026
407d254
feat(trace): add initial version of request traces
Sg312 Mar 28, 2026
4b3b6ae
improvement(stream): batch stream from redis
Sg312 Mar 28, 2026
946751e
fix(resume): fix the resume checkpoint
Sg312 Apr 1, 2026
ba3bdd0
fix(resume): fix resume client tool
Sg312 Apr 1, 2026
e3f8663
fix(subagents): subagent resume should join on existing subagent text…
Sg312 Apr 1, 2026
e22fccd
improvement(reconnect): harden reconnect logic
Sg312 Apr 1, 2026
86207ee
fix(superagent): fix superagent integration tools
Sg312 Apr 2, 2026
83cf090
improvement(stream): improve stream perf
Sg312 Apr 3, 2026
54266b9
Rebase with origin dev
Sg312 Apr 3, 2026
d7bfe16
fix(tests): fix failing test
Sg312 Apr 3, 2026
8f61262
fix(build): fix type errors
Sg312 Apr 3, 2026
63e9dff
fix(build): fix build errors
Sg312 Apr 3, 2026
2548912
fix(build): fix type errors
Sg312 Apr 3, 2026
7cd4545
feat(mothership): add cli execution
Sg312 Apr 4, 2026
fb12805
fix(mothership): fix function execute tests
Sg312 Apr 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(orchestrator): checkpoint resume flow
  • Loading branch information
Sg312 committed Apr 3, 2026
commit 997896d2fac92f045f257bc43649fcd268b9d1a3
143 changes: 115 additions & 28 deletions apps/sim/lib/copilot/orchestrator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,25 @@ import type {
} from '@/lib/copilot/orchestrator/types'
import { env } from '@/lib/core/config/env'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { buildToolCallSummaries, createStreamingContext, runStreamLoop } from './stream/core'
import {
buildToolCallSummaries,
CopilotBackendError,
createStreamingContext,
runStreamLoop,
} from './stream/core'

const logger = createLogger('CopilotOrchestrator')

const MAX_RESUME_ATTEMPTS = 3
const RESUME_BACKOFF_MS = [250, 500, 1000]

export interface OrchestrateStreamOptions extends OrchestratorOptions {
userId: string
workflowId?: string
workspaceId?: string
chatId?: string
executionId?: string
runId?: string
/** Go-side route to proxy to. Defaults to '/api/copilot'. */
goRoute?: string
}

Expand Down Expand Up @@ -59,9 +66,7 @@ export async function orchestrateCopilotStream(
decryptedEnvVars,
}
}
if (userTimezone) {
execContext.userTimezone = userTimezone
}
if (userTimezone) execContext.userTimezone = userTimezone
execContext.executionId = executionId
execContext.runId = runId
execContext.abortSignal = options.abortSignal
Expand All @@ -74,14 +79,24 @@ export async function orchestrateCopilotStream(
runId,
messageId: typeof payloadMsgId === 'string' ? payloadMsgId : crypto.randomUUID(),
})

try {
let route = goRoute
let payload = requestPayload
let payload: Record<string, unknown> = requestPayload

const callerOnEvent = options.onEvent

let resumeAttempt = 0

for (;;) {
context.streamComplete = false
const isResume = route === '/api/tools/resume'

if (isResume && isAborted(options, context)) {
cancelPendingTools(context)
context.awaitingAsyncContinuation = undefined
break
}

const loopOptions = {
...options,
Expand All @@ -97,30 +112,45 @@ export async function orchestrateCopilotStream(
},
}

await runStreamLoop(
`${SIM_AGENT_API_URL}${route}`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
'X-Client-Version': SIM_AGENT_VERSION,
try {
await runStreamLoop(
`${SIM_AGENT_API_URL}${route}`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
'X-Client-Version': SIM_AGENT_VERSION,
},
body: JSON.stringify(payload),
},
body: JSON.stringify(payload),
},
context,
execContext,
loopOptions
)

if (options.abortSignal?.aborted || context.wasAborted) {
for (const [toolCallId, toolCall] of context.toolCalls) {
if (toolCall.status === 'pending' || toolCall.status === 'executing') {
toolCall.status = 'cancelled'
toolCall.endTime = Date.now()
toolCall.error = 'Stopped by user'
}
context,
execContext,
loopOptions
)
resumeAttempt = 0
} catch (streamError) {
if (
isResume &&
isRetryableStreamError(streamError) &&
resumeAttempt < MAX_RESUME_ATTEMPTS - 1
) {
resumeAttempt++
const backoff = RESUME_BACKOFF_MS[resumeAttempt - 1] ?? 1000
logger.warn('Resume stream failed, retrying', {
attempt: resumeAttempt + 1,
maxAttempts: MAX_RESUME_ATTEMPTS,
backoffMs: backoff,
error: streamError instanceof Error ? streamError.message : String(streamError),
})
await sleepWithAbort(backoff, options.abortSignal)
continue
}
throw streamError
}

if (isAborted(options, context)) {
cancelPendingTools(context)
context.awaitingAsyncContinuation = undefined
break
}
Expand All @@ -136,6 +166,12 @@ export async function orchestrateCopilotStream(
await Promise.allSettled(context.pendingToolPromises.values())
}

if (isAborted(options, context)) {
cancelPendingTools(context)
context.awaitingAsyncContinuation = undefined
break
}

const results = continuation.pendingToolCallIds.map((toolCallId) => {
const tool = context.toolCalls.get(toolCallId)
return {
Expand Down Expand Up @@ -190,3 +226,54 @@ export async function orchestrateCopilotStream(
}
}
}

function isAborted(
options: OrchestrateStreamOptions,
context: ReturnType<typeof createStreamingContext>
): boolean {
return !!(options.abortSignal?.aborted || context.wasAborted)
}

function cancelPendingTools(context: ReturnType<typeof createStreamingContext>): void {
for (const [, toolCall] of context.toolCalls) {
if (toolCall.status === 'pending' || toolCall.status === 'executing') {
toolCall.status = 'cancelled'
toolCall.endTime = Date.now()
toolCall.error = 'Stopped by user'
}
}
}

function isRetryableStreamError(error: unknown): boolean {
if (error instanceof DOMException && error.name === 'AbortError') {
return false
}
if (error instanceof CopilotBackendError) {
return error.status !== undefined && error.status >= 500
}
if (error instanceof TypeError) {
return true
}
return false
}

function sleepWithAbort(ms: number, abortSignal?: AbortSignal): Promise<void> {
if (!abortSignal) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
if (abortSignal.aborted) {
return Promise.resolve()
}
return new Promise((resolve) => {
const timeoutId = setTimeout(() => {
abortSignal.removeEventListener('abort', onAbort)
resolve()
}, ms)
const onAbort = () => {
clearTimeout(timeoutId)
abortSignal.removeEventListener('abort', onAbort)
resolve()
}
abortSignal.addEventListener('abort', onAbort, { once: true })
})
}
10 changes: 5 additions & 5 deletions apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -478,16 +478,16 @@ export const sseHandlers: Record<string, StreamHandler> = {
if (d.usage) {
const u = asRecord(d.usage)
context.usage = {
prompt: (u.input_tokens as number) || 0,
completion: (u.output_tokens as number) || 0,
prompt: (context.usage?.prompt || 0) + ((u.input_tokens as number) || 0),
completion: (context.usage?.completion || 0) + ((u.output_tokens as number) || 0),
}
}
if (d.cost) {
const c = asRecord(d.cost)
context.cost = {
input: (c.input as number) || 0,
output: (c.output as number) || 0,
total: (c.total as number) || 0,
input: (context.cost?.input || 0) + ((c.input as number) || 0),
output: (context.cost?.output || 0) + ((c.output as number) || 0),
total: (context.cost?.total || 0) + ((c.total as number) || 0),
}
}
context.streamComplete = true
Expand Down
19 changes: 16 additions & 3 deletions apps/sim/lib/copilot/orchestrator/stream/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ import type {

const logger = createLogger('CopilotStreamCore')

export class CopilotBackendError extends Error {
status?: number
body?: string

constructor(message: string, options?: { status?: number; body?: string }) {
super(message)
this.name = 'CopilotBackendError'
this.status = options?.status
this.body = options?.body
}
}

/**
* Options for the shared stream processing loop.
*/
Expand Down Expand Up @@ -143,13 +155,14 @@ export async function runStreamLoop(
return
}

throw new Error(
`Copilot backend error (${response.status}): ${errorText || response.statusText}`
throw new CopilotBackendError(
`Copilot backend error (${response.status}): ${errorText || response.statusText}`,
{ status: response.status, body: errorText || response.statusText }
)
}

if (!response.body) {
throw new Error('Copilot backend response missing body')
throw new CopilotBackendError('Copilot backend response missing body')
}

const reader = response.body.getReader()
Expand Down