Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5028875
feat(transport): replace shared chat transport with mothership-stream…
Sg312 Mar 25, 2026
448ea02
improvement(contracts): regenerate contracts from go
Sg312 Mar 25, 2026
7c5547f
feat(tools): add tool catalog codegen from go tool contracts
Sg312 Mar 25, 2026
d5131ae
feat(tools): add tool-executor dispatch framework for sim side tool r…
Sg312 Mar 26, 2026
1157dfc
feat(orchestrator): rewrite tool dispatch with catalog-driven executo…
Sg312 Mar 26, 2026
997896d
feat(orchestrator): checkpoint resume flow
Sg312 Mar 26, 2026
f9c185b
refactor(copilot): consolidate orchestrator into request/ layer
Sg312 Mar 27, 2026
97d41e9
refactor(mothership): reorganize lib/copilot into structured subdirec…
Sg312 Mar 27, 2026
c4876ba
refactor(mothership): canonical transcript layer, dead code cleanup, …
Sg312 Mar 27, 2026
410dd9a
refactor(mothership): rebase onto latest staging
Sg312 Mar 27, 2026
741d856
refactor(mothership): rename request continue to lifecycle
Sg312 Mar 27, 2026
407d254
feat(trace): add initial version of request traces
Sg312 Mar 28, 2026
4b3b6ae
improvement(stream): batch stream from redis
Sg312 Mar 28, 2026
946751e
fix(resume): fix the resume checkpoint
Sg312 Apr 1, 2026
ba3bdd0
fix(resume): fix resume client tool
Sg312 Apr 1, 2026
e3f8663
fix(subagents): subagent resume should join on existing subagent text…
Sg312 Apr 1, 2026
e22fccd
improvement(reconnect): harden reconnect logic
Sg312 Apr 1, 2026
86207ee
fix(superagent): fix superagent integration tools
Sg312 Apr 2, 2026
83cf090
improvement(stream): improve stream perf
Sg312 Apr 3, 2026
54266b9
Rebase with origin dev
Sg312 Apr 3, 2026
d7bfe16
fix(tests): fix failing test
Sg312 Apr 3, 2026
8f61262
fix(build): fix type errors
Sg312 Apr 3, 2026
63e9dff
fix(build): fix build errors
Sg312 Apr 3, 2026
2548912
fix(build): fix type errors
Sg312 Apr 3, 2026
7cd4545
feat(mothership): add cli execution
Sg312 Apr 4, 2026
fb12805
fix(mothership): fix function execute tests
Sg312 Apr 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(transport): replace shared chat transport with mothership-stream…
… module
  • Loading branch information
Sg312 committed Apr 3, 2026
commit 5028875ae87596a591a87fc926eda160e2777a17
7 changes: 1 addition & 6 deletions apps/sim/app/api/copilot/chat/abort/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { NextResponse } from 'next/server'
import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
import { abortActiveStream, waitForPendingChatStream } from '@/lib/copilot/chat-streaming'
import { abortActiveStream } from '@/lib/copilot/chat-streaming'
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
import { env } from '@/lib/core/config/env'
Expand Down Expand Up @@ -55,10 +55,5 @@ export async function POST(request: Request) {
}

const aborted = await abortActiveStream(streamId)
if (chatId) {
await waitForPendingChatStream(chatId, GO_EXPLICIT_ABORT_TIMEOUT_MS + 1000, streamId).catch(
() => false
)
}
return NextResponse.json({ aborted })
}
123 changes: 30 additions & 93 deletions apps/sim/app/api/copilot/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@ import { createRunSegment } from '@/lib/copilot/async-runs/repository'
import { getAccessibleCopilotChat, resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
import {
acquirePendingChatStream,
createSSEStream,
releasePendingChatStream,
requestChatTitle,
SSE_RESPONSE_HEADERS,
} from '@/lib/copilot/chat-streaming'
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer'
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
import { resolveActiveResourceContext } from '@/lib/copilot/process-contents'
import {
Expand Down Expand Up @@ -111,9 +108,6 @@ const ChatMessageSchema = z.object({
export async function POST(req: NextRequest) {
const tracker = createRequestTracker()
let actualChatId: string | undefined
let pendingChatStreamAcquired = false
let pendingChatStreamHandedOff = false
let pendingChatStreamID: string | undefined

try {
// Get session to access user information including name
Expand Down Expand Up @@ -183,9 +177,7 @@ export async function POST(req: NextRequest) {
const wf = await getWorkflowById(workflowId)
resolvedWorkspaceId = wf?.workspaceId ?? undefined
} catch {
logger
.withMetadata({ requestId: tracker.requestId, messageId: userMessageId })
.warn('Failed to resolve workspaceId from workflow')
logger.warn(`[${tracker.requestId}] Failed to resolve workspaceId from workflow`)
}

const userMessageIdToUse = userMessageId || crypto.randomUUID()
Expand All @@ -194,7 +186,7 @@ export async function POST(req: NextRequest) {
messageId: userMessageIdToUse,
})
try {
reqLogger.info('Received chat POST', {
logger.info(`[${tracker.requestId}] Received chat POST`, {
workflowId,
hasContexts: Array.isArray(normalizedContexts),
contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0,
Expand Down Expand Up @@ -245,7 +237,7 @@ export async function POST(req: NextRequest) {
actualChatId
)
agentContexts = processed
reqLogger.info('Contexts processed for request', {
logger.info(`[${tracker.requestId}] Contexts processed for request`, {
processedCount: agentContexts.length,
kinds: agentContexts.map((c) => c.type),
lengthPreview: agentContexts.map((c) => c.content?.length ?? 0),
Expand All @@ -255,12 +247,12 @@ export async function POST(req: NextRequest) {
normalizedContexts.length > 0 &&
agentContexts.length === 0
) {
reqLogger.warn(
'Contexts provided but none processed. Check executionId for logs contexts.'
logger.warn(
`[${tracker.requestId}] Contexts provided but none processed. Check executionId for logs contexts.`
)
}
} catch (e) {
reqLogger.error('Failed to process contexts', e)
logger.error(`[${tracker.requestId}] Failed to process contexts`, e)
}
}

Expand Down Expand Up @@ -289,7 +281,10 @@ export async function POST(req: NextRequest) {
if (result.status === 'fulfilled' && result.value) {
agentContexts.push(result.value)
} else if (result.status === 'rejected') {
reqLogger.error('Failed to resolve resource attachment', result.reason)
logger.error(
`[${tracker.requestId}] Failed to resolve resource attachment`,
result.reason
)
}
}
}
Expand Down Expand Up @@ -328,7 +323,7 @@ export async function POST(req: NextRequest) {
)

try {
reqLogger.info('About to call Sim Agent', {
logger.info(`[${tracker.requestId}] About to call Sim Agent`, {
hasContext: agentContexts.length > 0,
contextCount: agentContexts.length,
hasFileAttachments: Array.isArray(requestPayload.fileAttachments),
Expand All @@ -344,21 +339,6 @@ export async function POST(req: NextRequest) {
})
} catch {}

if (stream && actualChatId) {
const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse)
if (!acquired) {
return NextResponse.json(
{
error:
'A response is already in progress for this chat. Wait for it to finish or use Stop.',
},
{ status: 409 }
)
}
pendingChatStreamAcquired = true
pendingChatStreamID = userMessageIdToUse
}

if (actualChatId) {
const userMsg = {
id: userMessageIdToUse,
Expand Down Expand Up @@ -405,7 +385,6 @@ export async function POST(req: NextRequest) {
titleProvider: provider,
requestId: tracker.requestId,
workspaceId: resolvedWorkspaceId,
pendingChatStreamAlreadyRegistered: Boolean(actualChatId && stream),
orchestrateOptions: {
userId: authenticatedUserId,
workflowId,
Expand Down Expand Up @@ -485,15 +464,14 @@ export async function POST(req: NextRequest) {
.where(eq(copilotChats.id, actualChatId))
}
} catch (error) {
reqLogger.error('Failed to persist chat messages', {
logger.error(`[${tracker.requestId}] Failed to persist chat messages`, {
chatId: actualChatId,
error: error instanceof Error ? error.message : 'Unknown error',
})
}
},
},
})
pendingChatStreamHandedOff = true

return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS })
}
Expand Down Expand Up @@ -530,7 +508,7 @@ export async function POST(req: NextRequest) {
provider: typeof requestPayload?.provider === 'string' ? requestPayload.provider : undefined,
}

reqLogger.info('Non-streaming response from orchestrator', {
logger.info(`[${tracker.requestId}] Non-streaming response from orchestrator:`, {
hasContent: !!responseData.content,
contentLength: responseData.content?.length || 0,
model: responseData.model,
Expand Down Expand Up @@ -569,8 +547,8 @@ export async function POST(req: NextRequest) {

// Start title generation in parallel if this is first message (non-streaming)
if (actualChatId && !currentChat.title && conversationHistory.length === 0) {
reqLogger.info('Starting title generation for non-streaming response')
requestChatTitle({ message, model: selectedModel, provider, messageId: userMessageIdToUse })
logger.info(`[${tracker.requestId}] Starting title generation for non-streaming response`)
requestChatTitle({ message, model: selectedModel, provider })
.then(async (title) => {
if (title) {
await db
Expand All @@ -580,11 +558,11 @@ export async function POST(req: NextRequest) {
updatedAt: new Date(),
})
.where(eq(copilotChats.id, actualChatId!))
reqLogger.info(`Generated and saved title: ${title}`)
logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`)
}
})
.catch((error) => {
reqLogger.error('Title generation failed', error)
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
})
}

Expand All @@ -598,7 +576,7 @@ export async function POST(req: NextRequest) {
.where(eq(copilotChats.id, actualChatId!))
}

reqLogger.info('Returning non-streaming response', {
logger.info(`[${tracker.requestId}] Returning non-streaming response`, {
duration: tracker.getDuration(),
chatId: actualChatId,
responseLength: responseData.content?.length || 0,
Expand All @@ -615,36 +593,24 @@ export async function POST(req: NextRequest) {
},
})
} catch (error) {
if (
actualChatId &&
pendingChatStreamAcquired &&
!pendingChatStreamHandedOff &&
pendingChatStreamID
) {
await releasePendingChatStream(actualChatId, pendingChatStreamID).catch(() => {})
}
const duration = tracker.getDuration()

if (error instanceof z.ZodError) {
logger
.withMetadata({ requestId: tracker.requestId, messageId: pendingChatStreamID ?? undefined })
.error('Validation error', {
duration,
errors: error.errors,
})
logger.error(`[${tracker.requestId}] Validation error:`, {
duration,
errors: error.errors,
})
return NextResponse.json(
{ error: 'Invalid request data', details: error.errors },
{ status: 400 }
)
}

logger
.withMetadata({ requestId: tracker.requestId, messageId: pendingChatStreamID ?? undefined })
.error('Error handling copilot chat', {
duration,
error: error instanceof Error ? error.message : 'Unknown error',
stack: error instanceof Error ? error.stack : undefined,
})
logger.error(`[${tracker.requestId}] Error handling copilot chat:`, {
duration,
error: error instanceof Error ? error.message : 'Unknown error',
stack: error instanceof Error ? error.stack : undefined,
})

return NextResponse.json(
{ error: error instanceof Error ? error.message : 'Internal server error' },
Expand Down Expand Up @@ -673,32 +639,6 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
}

let streamSnapshot: {
events: Array<{ eventId: number; streamId: string; event: Record<string, unknown> }>
status: string
} | null = null

if (chat.conversationId) {
try {
const [meta, events] = await Promise.all([
getStreamMeta(chat.conversationId),
readStreamEvents(chat.conversationId, 0),
])
streamSnapshot = {
events: events || [],
status: meta?.status || 'unknown',
}
} catch (err) {
logger
.withMetadata({ messageId: chat.conversationId || undefined })
.warn('Failed to read stream snapshot for chat', {
chatId,
conversationId: chat.conversationId,
error: err instanceof Error ? err.message : String(err),
})
}
}

const transformedChat = {
id: chat.id,
title: chat.title,
Expand All @@ -707,16 +647,13 @@ export async function GET(req: NextRequest) {
messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0,
planArtifact: chat.planArtifact || null,
config: chat.config || null,
conversationId: chat.conversationId || null,
activeStreamId: chat.conversationId || null,
resources: Array.isArray(chat.resources) ? chat.resources : [],
createdAt: chat.createdAt,
updatedAt: chat.updatedAt,
...(streamSnapshot ? { streamSnapshot } : {}),
}

logger
.withMetadata({ messageId: chat.conversationId || undefined })
.info(`Retrieved chat ${chatId}`)
logger.info(`Retrieved chat ${chatId}`)
return NextResponse.json({ success: true, chat: transformedChat })
}

Expand Down Expand Up @@ -778,7 +715,7 @@ export async function GET(req: NextRequest) {
chats: transformedChats,
})
} catch (error) {
logger.error('Error fetching copilot chats', error)
logger.error('Error fetching copilot chats:', error)
return createInternalServerErrorResponse('Failed to fetch chats')
}
}
52 changes: 34 additions & 18 deletions apps/sim/app/api/copilot/chat/stream/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,37 @@
import { NextRequest } from 'next/server'
import { beforeEach, describe, expect, it, vi } from 'vitest'

const { getStreamMeta, readStreamEvents, authenticateCopilotRequestSessionOnly } = vi.hoisted(
() => ({
getStreamMeta: vi.fn(),
readStreamEvents: vi.fn(),
authenticateCopilotRequestSessionOnly: vi.fn(),
})
)
const {
getLatestRunForStream,
readEnvelopes,
checkForReplayGap,
authenticateCopilotRequestSessionOnly,
} = vi.hoisted(() => ({
getLatestRunForStream: vi.fn(),
readEnvelopes: vi.fn(),
checkForReplayGap: vi.fn(),
authenticateCopilotRequestSessionOnly: vi.fn(),
}))

vi.mock('@/lib/copilot/async-runs/repository', () => ({
getLatestRunForStream,
}))

vi.mock('@/lib/copilot/orchestrator/stream/buffer', () => ({
getStreamMeta,
readStreamEvents,
vi.mock('@/lib/copilot/mothership-stream', () => ({
readEnvelopes,
checkForReplayGap,
encodeSSEEnvelope: (event: Record<string, unknown>) =>
new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`),
SSE_RESPONSE_HEADERS: {
'Content-Type': 'text/event-stream',
},
}))

vi.mock('@/lib/copilot/request-helpers', () => ({
authenticateCopilotRequestSessionOnly,
}))

import { GET } from '@/app/api/copilot/chat/stream/route'
import { GET } from './route'

describe('copilot chat stream replay route', () => {
beforeEach(() => {
Expand All @@ -31,29 +44,32 @@ describe('copilot chat stream replay route', () => {
userId: 'user-1',
isAuthenticated: true,
})
readStreamEvents.mockResolvedValue([])
readEnvelopes.mockResolvedValue([])
checkForReplayGap.mockResolvedValue(null)
})

it('stops replay polling when stream meta becomes cancelled', async () => {
getStreamMeta
it('stops replay polling when run becomes cancelled', async () => {
getLatestRunForStream
.mockResolvedValueOnce({
status: 'active',
userId: 'user-1',
executionId: 'exec-1',
id: 'run-1',
})
.mockResolvedValueOnce({
status: 'cancelled',
userId: 'user-1',
executionId: 'exec-1',
id: 'run-1',
})

const response = await GET(
new NextRequest('http://localhost:3000/api/copilot/chat/stream?streamId=stream-1')
new NextRequest('http://localhost:3000/api/copilot/chat/stream?streamId=stream-1&after=0')
)

const reader = response.body?.getReader()
expect(reader).toBeTruthy()

const first = await reader!.read()
expect(first.done).toBe(true)
expect(getStreamMeta).toHaveBeenCalledTimes(2)
expect(getLatestRunForStream).toHaveBeenCalledTimes(2)
})
})
Loading