diff --git a/apps/sim/app/api/billing/update-cost/route.ts b/apps/sim/app/api/billing/update-cost/route.ts index 65899d55572..743ddb17011 100644 --- a/apps/sim/app/api/billing/update-cost/route.ts +++ b/apps/sim/app/api/billing/update-cost/route.ts @@ -4,7 +4,7 @@ import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { recordUsage } from '@/lib/billing/core/usage-log' import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing' -import { checkInternalApiKey } from '@/lib/copilot/utils' +import { checkInternalApiKey } from '@/lib/copilot/request/http' import { isBillingEnabled } from '@/lib/core/config/feature-flags' import { generateRequestId } from '@/lib/core/utils/request' diff --git a/apps/sim/app/api/copilot/api-keys/validate/route.ts b/apps/sim/app/api/copilot/api-keys/validate/route.ts index 77521f3b3ed..1c1df540132 100644 --- a/apps/sim/app/api/copilot/api-keys/validate/route.ts +++ b/apps/sim/app/api/copilot/api-keys/validate/route.ts @@ -1,8 +1,11 @@ +import { db } from '@sim/db' +import { user } from '@sim/db/schema' import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor' -import { checkInternalApiKey } from '@/lib/copilot/utils' +import { checkInternalApiKey } from '@/lib/copilot/request/http' const logger = createLogger('CopilotApiKeysValidate') @@ -34,6 +37,12 @@ export async function POST(req: NextRequest) { const { userId } = validationResult.data + const [existingUser] = await db.select().from(user).where(eq(user.id, userId)).limit(1) + if (!existingUser) { + logger.warn('[API VALIDATION] userId does not exist', { userId }) + return NextResponse.json({ error: 'User not found' }, { status: 403 }) + } + logger.info('[API VALIDATION] Validating usage limit', { userId }) const { isExceeded, currentUsage, limit } = await checkServerSideUsageLimits(userId) diff --git a/apps/sim/app/api/copilot/chat/abort/route.ts b/apps/sim/app/api/copilot/chat/abort/route.ts index 33fe68c8d88..5cd333f0731 100644 --- a/apps/sim/app/api/copilot/chat/abort/route.ts +++ b/apps/sim/app/api/copilot/chat/abort/route.ts @@ -1,10 +1,12 @@ +import { createLogger } from '@sim/logger' import { NextResponse } from 'next/server' import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository' -import { abortActiveStream, waitForPendingChatStream } from '@/lib/copilot/chat-streaming' import { SIM_AGENT_API_URL } from '@/lib/copilot/constants' -import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers' +import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http' +import { abortActiveStream } from '@/lib/copilot/request/session/abort' import { env } from '@/lib/core/config/env' +const logger = createLogger('CopilotChatAbortAPI') const GO_EXPLICIT_ABORT_TIMEOUT_MS = 3000 export async function POST(request: Request) { @@ -15,7 +17,12 @@ export async function POST(request: Request) { return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) } - const body = await request.json().catch(() => ({})) + const body = await request.json().catch((err) => { + logger.warn('Abort request body parse failed; continuing with empty object', { + error: err instanceof Error ? err.message : String(err), + }) + return {} + }) const streamId = typeof body.streamId === 'string' ? body.streamId : '' let chatId = typeof body.chatId === 'string' ? body.chatId : '' @@ -24,7 +31,13 @@ export async function POST(request: Request) { } if (!chatId) { - const run = await getLatestRunForStream(streamId, authenticatedUserId).catch(() => null) + const run = await getLatestRunForStream(streamId, authenticatedUserId).catch((err) => { + logger.warn('getLatestRunForStream failed while resolving chatId for abort', { + streamId, + error: err instanceof Error ? err.message : String(err), + }) + return null + }) if (run?.chatId) { chatId = run.chatId } @@ -50,15 +63,13 @@ export async function POST(request: Request) { if (!response.ok) { throw new Error(`Explicit abort marker request failed: ${response.status}`) } - } catch { - // best effort: local abort should still proceed even if Go marker fails + } catch (err) { + logger.warn('Explicit abort marker request failed; proceeding with local abort', { + streamId, + error: err instanceof Error ? err.message : String(err), + }) } const aborted = await abortActiveStream(streamId) - if (chatId) { - await waitForPendingChatStream(chatId, GO_EXPLICIT_ABORT_TIMEOUT_MS + 1000, streamId).catch( - () => false - ) - } return NextResponse.json({ aborted }) } diff --git a/apps/sim/app/api/copilot/chat/delete/route.test.ts b/apps/sim/app/api/copilot/chat/delete/route.test.ts index c53c2a11256..0493b3ffe89 100644 --- a/apps/sim/app/api/copilot/chat/delete/route.test.ts +++ b/apps/sim/app/api/copilot/chat/delete/route.test.ts @@ -36,11 +36,11 @@ vi.mock('drizzle-orm', () => ({ eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })), })) -vi.mock('@/lib/copilot/chat-lifecycle', () => ({ +vi.mock('@/lib/copilot/chat/lifecycle', () => ({ getAccessibleCopilotChat: mockGetAccessibleCopilotChat, })) -vi.mock('@/lib/copilot/task-events', () => ({ +vi.mock('@/lib/copilot/tasks', () => ({ taskPubSub: { publishStatusChanged: vi.fn() }, })) diff --git a/apps/sim/app/api/copilot/chat/delete/route.ts b/apps/sim/app/api/copilot/chat/delete/route.ts index 652f732e676..1742d9e7e55 100644 --- a/apps/sim/app/api/copilot/chat/delete/route.ts +++ b/apps/sim/app/api/copilot/chat/delete/route.ts @@ -5,8 +5,8 @@ import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' -import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle' -import { taskPubSub } from '@/lib/copilot/task-events' +import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' +import { taskPubSub } from '@/lib/copilot/tasks' const logger = createLogger('DeleteChatAPI') diff --git a/apps/sim/app/api/copilot/chat/queries.ts b/apps/sim/app/api/copilot/chat/queries.ts new file mode 100644 index 00000000000..6f1f548a09f --- /dev/null +++ b/apps/sim/app/api/copilot/chat/queries.ts @@ -0,0 +1,119 @@ +import { db } from '@sim/db' +import { copilotChats } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, desc, eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' +import { + authenticateCopilotRequestSessionOnly, + createBadRequestResponse, + createInternalServerErrorResponse, + createUnauthorizedResponse, +} from '@/lib/copilot/request/http' +import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' +import { assertActiveWorkspaceAccess } from '@/lib/workspaces/permissions/utils' + +const logger = createLogger('CopilotChatAPI') + +function transformChat(chat: { + id: string + title: string | null + model: string | null + messages: unknown + planArtifact?: unknown + config?: unknown + conversationId?: string | null + resources?: unknown + createdAt: Date | null + updatedAt: Date | null +}) { + return { + id: chat.id, + title: chat.title, + model: chat.model, + messages: Array.isArray(chat.messages) ? chat.messages : [], + messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0, + planArtifact: chat.planArtifact || null, + config: chat.config || null, + ...('conversationId' in chat ? { activeStreamId: chat.conversationId || null } : {}), + ...('resources' in chat + ? { resources: Array.isArray(chat.resources) ? chat.resources : [] } + : {}), + createdAt: chat.createdAt, + updatedAt: chat.updatedAt, + } +} + +export async function GET(req: NextRequest) { + try { + const { searchParams } = new URL(req.url) + const workflowId = searchParams.get('workflowId') + const workspaceId = searchParams.get('workspaceId') + const chatId = searchParams.get('chatId') + + const { userId: authenticatedUserId, isAuthenticated } = + await authenticateCopilotRequestSessionOnly() + if (!isAuthenticated || !authenticatedUserId) { + return createUnauthorizedResponse() + } + + if (chatId) { + const chat = await getAccessibleCopilotChat(chatId, authenticatedUserId) + if (!chat) { + return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 }) + } + + logger.info(`Retrieved chat ${chatId}`) + return NextResponse.json({ success: true, chat: transformChat(chat) }) + } + + if (!workflowId && !workspaceId) { + return createBadRequestResponse('workflowId, workspaceId, or chatId is required') + } + + if (workspaceId) { + await assertActiveWorkspaceAccess(workspaceId, authenticatedUserId) + } + + if (workflowId) { + const authorization = await authorizeWorkflowByWorkspacePermission({ + workflowId, + userId: authenticatedUserId, + action: 'read', + }) + if (!authorization.allowed) { + return createUnauthorizedResponse() + } + } + + const scopeFilter = workflowId + ? eq(copilotChats.workflowId, workflowId) + : eq(copilotChats.workspaceId, workspaceId!) + + const chats = await db + .select({ + id: copilotChats.id, + title: copilotChats.title, + model: copilotChats.model, + messages: copilotChats.messages, + planArtifact: copilotChats.planArtifact, + config: copilotChats.config, + createdAt: copilotChats.createdAt, + updatedAt: copilotChats.updatedAt, + }) + .from(copilotChats) + .where(and(eq(copilotChats.userId, authenticatedUserId), scopeFilter)) + .orderBy(desc(copilotChats.updatedAt)) + + const scope = workflowId ? `workflow ${workflowId}` : `workspace ${workspaceId}` + logger.info(`Retrieved ${chats.length} chats for ${scope}`) + + return NextResponse.json({ + success: true, + chats: chats.map(transformChat), + }) + } catch (error) { + logger.error('Error fetching copilot chats:', error) + return createInternalServerErrorResponse('Failed to fetch chats') + } +} diff --git a/apps/sim/app/api/copilot/chat/rename/route.ts b/apps/sim/app/api/copilot/chat/rename/route.ts new file mode 100644 index 00000000000..7587f577411 --- /dev/null +++ b/apps/sim/app/api/copilot/chat/rename/route.ts @@ -0,0 +1,65 @@ +import { db } from '@sim/db' +import { copilotChats } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { getSession } from '@/lib/auth' +import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' +import { taskPubSub } from '@/lib/copilot/tasks' + +const logger = createLogger('RenameChatAPI') + +const RenameChatSchema = z.object({ + chatId: z.string().min(1), + title: z.string().min(1).max(200), +}) + +export async function PATCH(request: NextRequest) { + try { + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ success: false, error: 'Unauthorized' }, { status: 401 }) + } + + const body = await request.json() + const { chatId, title } = RenameChatSchema.parse(body) + + const chat = await getAccessibleCopilotChat(chatId, session.user.id) + if (!chat) { + return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 }) + } + + const now = new Date() + const [updated] = await db + .update(copilotChats) + .set({ title, updatedAt: now, lastSeenAt: now }) + .where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, session.user.id))) + .returning({ id: copilotChats.id, workspaceId: copilotChats.workspaceId }) + + if (!updated) { + return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 }) + } + + logger.info('Chat renamed', { chatId, title }) + + if (updated.workspaceId) { + taskPubSub?.publishStatusChanged({ + workspaceId: updated.workspaceId, + chatId, + type: 'renamed', + }) + } + + return NextResponse.json({ success: true }) + } catch (error) { + if (error instanceof z.ZodError) { + return NextResponse.json( + { success: false, error: 'Invalid request data', details: error.errors }, + { status: 400 } + ) + } + logger.error('Error renaming chat:', error) + return NextResponse.json({ success: false, error: 'Failed to rename chat' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/copilot/chat/resources/route.ts b/apps/sim/app/api/copilot/chat/resources/route.ts index 8d528150218..0d80be8e77b 100644 --- a/apps/sim/app/api/copilot/chat/resources/route.ts +++ b/apps/sim/app/api/copilot/chat/resources/route.ts @@ -10,8 +10,8 @@ import { createInternalServerErrorResponse, createNotFoundResponse, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' -import type { ChatResource, ResourceType } from '@/lib/copilot/resources' +} from '@/lib/copilot/request/http' +import type { ChatResource, ResourceType } from '@/lib/copilot/resources/persistence' const logger = createLogger('CopilotChatResourcesAPI') diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 21f83737066..332665f8dc5 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -1,47 +1,45 @@ import { db } from '@sim/db' import { copilotChats } from '@sim/db/schema' import { createLogger } from '@sim/logger' -import { and, desc, eq, sql } from 'drizzle-orm' +import { eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' -import { createRunSegment } from '@/lib/copilot/async-runs/repository' -import { getAccessibleCopilotChat, resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle' -import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload' +import { type ChatLoadResult, resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle' +import { buildCopilotRequestPayload } from '@/lib/copilot/chat/payload' import { - acquirePendingChatStream, - createSSEStream, - releasePendingChatStream, - requestChatTitle, - SSE_RESPONSE_HEADERS, -} from '@/lib/copilot/chat-streaming' -import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models' -import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' -import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer' -import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types' -import { resolveActiveResourceContext } from '@/lib/copilot/process-contents' + buildPersistedAssistantMessage, + buildPersistedUserMessage, +} from '@/lib/copilot/chat/persisted-message' +import { + processContextsServer, + resolveActiveResourceContext, +} from '@/lib/copilot/chat/process-contents' +import { COPILOT_REQUEST_MODES } from '@/lib/copilot/constants' import { - authenticateCopilotRequestSessionOnly, createBadRequestResponse, - createInternalServerErrorResponse, createRequestTracker, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' -import { generateId } from '@/lib/core/utils/uuid' -import { captureServerEvent } from '@/lib/posthog/server' +} from '@/lib/copilot/request/http' +import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/request/lifecycle/start' import { - authorizeWorkflowByWorkspacePermission, - resolveWorkflowIdForUser, -} from '@/lib/workflows/utils' -import { - assertActiveWorkspaceAccess, - getUserEntityPermissions, -} from '@/lib/workspaces/permissions/utils' + acquirePendingChatStream, + getPendingChatStreamId, + releasePendingChatStream, +} from '@/lib/copilot/request/session' +import type { OrchestratorResult } from '@/lib/copilot/request/types' +import { getWorkflowById, resolveWorkflowIdForUser } from '@/lib/workflows/utils' +import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' +import type { ChatContext } from '@/stores/panel' export const maxDuration = 3600 const logger = createLogger('CopilotChatAPI') +// --------------------------------------------------------------------------- +// Schemas +// --------------------------------------------------------------------------- + const FileAttachmentSchema = z.object({ id: z.string(), key: z.string(), @@ -68,7 +66,6 @@ const ChatMessageSchema = z.object({ mode: z.enum(COPILOT_REQUEST_MODES).optional().default('agent'), prefetch: z.boolean().optional(), createNewChat: z.boolean().optional().default(false), - stream: z.boolean().optional().default(true), implicitFeedback: z.string().optional(), fileAttachments: z.array(FileAttachmentSchema).optional(), resourceAttachments: z.array(ResourceAttachmentSchema).optional(), @@ -108,27 +105,25 @@ const ChatMessageSchema = z.object({ userTimezone: z.string().optional(), }) -/** - * POST /api/copilot/chat - * Send messages to sim agent and handle chat persistence - */ +// --------------------------------------------------------------------------- +// POST /api/copilot/chat +// --------------------------------------------------------------------------- + export async function POST(req: NextRequest) { const tracker = createRequestTracker() let actualChatId: string | undefined - let pendingChatStreamAcquired = false - let pendingChatStreamHandedOff = false - let pendingChatStreamID: string | undefined + let chatStreamLockAcquired = false + let userMessageIdToUse = '' try { - // Get session to access user information including name + // 1. Auth const session = await getSession() - if (!session?.user?.id) { return createUnauthorizedResponse() } - const authenticatedUserId = session.user.id + // 2. Parse & validate const body = await req.json() const { message, @@ -141,7 +136,6 @@ export async function POST(req: NextRequest) { mode, prefetch, createNewChat, - stream, implicitFeedback, fileAttachments, resourceAttachments, @@ -155,17 +149,12 @@ export async function POST(req: NextRequest) { ? contexts.map((ctx) => { if (ctx.kind !== 'blocks') return ctx if (Array.isArray(ctx.blockIds) && ctx.blockIds.length > 0) return ctx - if (ctx.blockId) { - return { - ...ctx, - blockIds: [ctx.blockId], - } - } + if (ctx.blockId) return { ...ctx, blockIds: [ctx.blockId] } return ctx }) : contexts - // Copilot route always requires a workflow scope + // 3. Resolve workflow & workspace const resolved = await resolveWorkflowIdForUser( authenticatedUserId, providedWorkflowId, @@ -177,63 +166,28 @@ export async function POST(req: NextRequest) { 'No workflows found. Create a workflow first or provide a valid workflowId.' ) } - const workflowId = resolved.workflowId - const workflowResolvedName = resolved.workflowName + const { workflowId, workflowName: workflowResolvedName } = resolved - // Resolve workspace from workflow so it can be sent as implicit context to the copilot. let resolvedWorkspaceId: string | undefined try { - const { getWorkflowById } = await import('@/lib/workflows/utils') const wf = await getWorkflowById(workflowId) resolvedWorkspaceId = wf?.workspaceId ?? undefined } catch { - logger - .withMetadata({ requestId: tracker.requestId, messageId: userMessageId }) - .warn('Failed to resolve workspaceId from workflow') + logger.warn(`[${tracker.requestId}] Failed to resolve workspaceId from workflow`) } - captureServerEvent( - authenticatedUserId, - 'copilot_chat_sent', - { - workflow_id: workflowId, - workspace_id: resolvedWorkspaceId ?? '', - has_file_attachments: Array.isArray(fileAttachments) && fileAttachments.length > 0, - has_contexts: Array.isArray(contexts) && contexts.length > 0, - mode, - }, - { - groups: resolvedWorkspaceId ? { workspace: resolvedWorkspaceId } : undefined, - setOnce: { first_copilot_use_at: new Date().toISOString() }, - } - ) + userMessageIdToUse = userMessageId || crypto.randomUUID() + const selectedModel = model || 'claude-opus-4-6' - const userMessageIdToUse = userMessageId || generateId() - const reqLogger = logger.withMetadata({ - requestId: tracker.requestId, - messageId: userMessageIdToUse, + logger.info(`[${tracker.requestId}] Received chat POST`, { + workflowId, + contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0, }) - try { - reqLogger.info('Received chat POST', { - workflowId, - hasContexts: Array.isArray(normalizedContexts), - contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0, - contextsPreview: Array.isArray(normalizedContexts) - ? normalizedContexts.map((c: any) => ({ - kind: c?.kind, - chatId: c?.chatId, - workflowId: c?.workflowId, - executionId: (c as any)?.executionId, - label: c?.label, - })) - : undefined, - }) - } catch {} - let currentChat: any = null - let conversationHistory: any[] = [] + // 4. Resolve or create chat + let currentChat: ChatLoadResult['chat'] = null + let conversationHistory: unknown[] = [] actualChatId = chatId - const selectedModel = model || 'claude-opus-4-6' if (chatId || createNewChat) { const chatResult = await resolveOrCreateChat({ @@ -253,37 +207,48 @@ export async function POST(req: NextRequest) { } } + if (actualChatId) { + chatStreamLockAcquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse) + if (!chatStreamLockAcquired) { + const activeStreamId = await getPendingChatStreamId(actualChatId) + return NextResponse.json( + { + error: 'A response is already in progress for this chat.', + ...(activeStreamId ? { activeStreamId } : {}), + }, + { status: 409 } + ) + } + } + + // 5. Process contexts let agentContexts: Array<{ type: string; content: string }> = [] + if (Array.isArray(normalizedContexts) && normalizedContexts.length > 0) { try { - const { processContextsServer } = await import('@/lib/copilot/process-contents') const processed = await processContextsServer( - normalizedContexts as any, + normalizedContexts as ChatContext[], authenticatedUserId, message, resolvedWorkspaceId, actualChatId ) agentContexts = processed - reqLogger.info('Contexts processed for request', { + logger.info(`[${tracker.requestId}] Contexts processed`, { processedCount: agentContexts.length, kinds: agentContexts.map((c) => c.type), - lengthPreview: agentContexts.map((c) => c.content?.length ?? 0), }) - if ( - Array.isArray(normalizedContexts) && - normalizedContexts.length > 0 && - agentContexts.length === 0 - ) { - reqLogger.warn( - 'Contexts provided but none processed. Check executionId for logs contexts.' + if (agentContexts.length === 0) { + logger.warn( + `[${tracker.requestId}] Contexts provided but none processed. Check executionId for logs contexts.` ) } } catch (e) { - reqLogger.error('Failed to process contexts', e) + logger.error(`[${tracker.requestId}] Failed to process contexts`, e) } } + // 5b. Process resource attachments if ( Array.isArray(resourceAttachments) && resourceAttachments.length > 0 && @@ -299,26 +264,30 @@ export async function POST(req: NextRequest) { actualChatId ) if (!ctx) return null - return { - ...ctx, - tag: r.active ? '@active_tab' : '@open_tab', - } + return { ...ctx, tag: r.active ? '@active_tab' : '@open_tab' } }) ) for (const result of results) { if (result.status === 'fulfilled' && result.value) { agentContexts.push(result.value) } else if (result.status === 'rejected') { - reqLogger.error('Failed to resolve resource attachment', result.reason) + logger.error( + `[${tracker.requestId}] Failed to resolve resource attachment`, + result.reason + ) } } } - const effectiveMode = mode === 'agent' ? 'build' : mode - + // 6. Build copilot request payload const userPermission = resolvedWorkspaceId ? await getUserEntityPermissions(authenticatedUserId, 'workspace', resolvedWorkspaceId).catch( - () => null + (err) => { + logger.warn('Failed to load user permissions', { + error: err instanceof Error ? err.message : String(err), + }) + return null + } ) : null @@ -342,55 +311,24 @@ export async function POST(req: NextRequest) { userPermission: userPermission ?? undefined, userTimezone, }, - { - selectedModel, - } + { selectedModel } ) - try { - reqLogger.info('About to call Sim Agent', { - hasContext: agentContexts.length > 0, - contextCount: agentContexts.length, - hasFileAttachments: Array.isArray(requestPayload.fileAttachments), - messageLength: message.length, - mode: effectiveMode, - hasTools: Array.isArray(requestPayload.tools), - toolCount: Array.isArray(requestPayload.tools) ? requestPayload.tools.length : 0, - hasBaseTools: Array.isArray(requestPayload.baseTools), - baseToolCount: Array.isArray(requestPayload.baseTools) - ? requestPayload.baseTools.length - : 0, - hasCredentials: !!requestPayload.credentials, - }) - } catch {} - - if (stream && actualChatId) { - const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse) - if (!acquired) { - return NextResponse.json( - { - error: - 'A response is already in progress for this chat. Wait for it to finish or use Stop.', - }, - { status: 409 } - ) - } - pendingChatStreamAcquired = true - pendingChatStreamID = userMessageIdToUse - } + logger.info(`[${tracker.requestId}] About to call Sim Agent`, { + contextCount: agentContexts.length, + hasFileAttachments: Array.isArray(requestPayload.fileAttachments), + messageLength: message.length, + mode, + }) + // 7. Persist user message if (actualChatId) { - const userMsg = { + const userMsg = buildPersistedUserMessage({ id: userMessageIdToUse, - role: 'user' as const, content: message, - timestamp: new Date().toISOString(), - ...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }), - ...(Array.isArray(normalizedContexts) && - normalizedContexts.length > 0 && { - contexts: normalizedContexts, - }), - } + fileAttachments, + contexts: normalizedContexts, + }) const [updated] = await db .update(copilotChats) @@ -403,268 +341,66 @@ export async function POST(req: NextRequest) { .returning({ messages: copilotChats.messages }) if (updated) { - const freshMessages: any[] = Array.isArray(updated.messages) ? updated.messages : [] - conversationHistory = freshMessages.filter((m: any) => m.id !== userMessageIdToUse) + const freshMessages: Record[] = Array.isArray(updated.messages) + ? updated.messages + : [] + conversationHistory = freshMessages.filter( + (m: Record) => m.id !== userMessageIdToUse + ) } } - if (stream) { - const executionId = generateId() - const runId = generateId() - const sseStream = createSSEStream({ - requestPayload, - userId: authenticatedUserId, - streamId: userMessageIdToUse, - executionId, - runId, - chatId: actualChatId, - currentChat, - isNewChat: conversationHistory.length === 0, - message, - titleModel: selectedModel, - titleProvider: provider, - requestId: tracker.requestId, - workspaceId: resolvedWorkspaceId, - pendingChatStreamAlreadyRegistered: Boolean(actualChatId && stream), - orchestrateOptions: { - userId: authenticatedUserId, - workflowId, - chatId: actualChatId, - executionId, - runId, - goRoute: '/api/copilot', - autoExecuteTools: true, - interactive: true, - onComplete: async (result: OrchestratorResult) => { - if (!actualChatId) return - if (!result.success) return - - const assistantMessage: Record = { - id: generateId(), - role: 'assistant' as const, - content: result.content, - timestamp: new Date().toISOString(), - ...(result.requestId ? { requestId: result.requestId } : {}), - } - if (result.toolCalls.length > 0) { - assistantMessage.toolCalls = result.toolCalls - } - if (result.contentBlocks.length > 0) { - assistantMessage.contentBlocks = result.contentBlocks.map((block) => { - const stored: Record = { type: block.type } - if (block.content) stored.content = block.content - if (block.type === 'tool_call' && block.toolCall) { - const state = - block.toolCall.result?.success !== undefined - ? block.toolCall.result.success - ? 'success' - : 'error' - : block.toolCall.status - const isSubagentTool = !!block.calledBy - const isNonTerminal = - state === 'cancelled' || state === 'pending' || state === 'executing' - stored.toolCall = { - id: block.toolCall.id, - name: block.toolCall.name, - state, - ...(isSubagentTool && isNonTerminal ? {} : { result: block.toolCall.result }), - ...(isSubagentTool && isNonTerminal - ? {} - : block.toolCall.params - ? { params: block.toolCall.params } - : {}), - ...(block.calledBy ? { calledBy: block.calledBy } : {}), - } - } - return stored - }) - } - - try { - const [row] = await db - .select({ messages: copilotChats.messages }) - .from(copilotChats) - .where(eq(copilotChats.id, actualChatId)) - .limit(1) - - const msgs: any[] = Array.isArray(row?.messages) ? row.messages : [] - const userIdx = msgs.findIndex((m: any) => m.id === userMessageIdToUse) - const alreadyHasResponse = - userIdx >= 0 && - userIdx + 1 < msgs.length && - (msgs[userIdx + 1] as any)?.role === 'assistant' - - if (!alreadyHasResponse) { - await db - .update(copilotChats) - .set({ - messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`, - conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${userMessageIdToUse} THEN NULL ELSE ${copilotChats.conversationId} END`, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, actualChatId)) - } - } catch (error) { - reqLogger.error('Failed to persist chat messages', { - chatId: actualChatId, - error: error instanceof Error ? error.message : 'Unknown error', - }) - } - }, - }, - }) - pendingChatStreamHandedOff = true - - return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS }) - } - - const nsExecutionId = generateId() - const nsRunId = generateId() + // 8. Create SSE stream with onComplete for assistant message persistence + const executionId = crypto.randomUUID() + const runId = crypto.randomUUID() - if (actualChatId) { - await createRunSegment({ - id: nsRunId, - executionId: nsExecutionId, - chatId: actualChatId, - userId: authenticatedUserId, - workflowId, - streamId: userMessageIdToUse, - }).catch(() => {}) - } - - const nonStreamingResult = await orchestrateCopilotStream(requestPayload, { + const sseStream = createSSEStream({ + requestPayload, userId: authenticatedUserId, - workflowId, + streamId: userMessageIdToUse, + executionId, + runId, chatId: actualChatId, - executionId: nsExecutionId, - runId: nsRunId, - goRoute: '/api/copilot', - autoExecuteTools: true, - interactive: true, - }) - - const responseData = { - content: nonStreamingResult.content, - toolCalls: nonStreamingResult.toolCalls, - model: selectedModel, - provider: typeof requestPayload?.provider === 'string' ? requestPayload.provider : undefined, - } - - reqLogger.info('Non-streaming response from orchestrator', { - hasContent: !!responseData.content, - contentLength: responseData.content?.length || 0, - model: responseData.model, - provider: responseData.provider, - toolCallsCount: responseData.toolCalls?.length || 0, - }) - - // Save messages if we have a chat - if (currentChat && responseData.content) { - const userMessage = { - id: userMessageIdToUse, // Consistent ID used for request and persistence - role: 'user', - content: message, - timestamp: new Date().toISOString(), - ...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }), - ...(Array.isArray(normalizedContexts) && - normalizedContexts.length > 0 && { - contexts: normalizedContexts, - }), - ...(Array.isArray(normalizedContexts) && - normalizedContexts.length > 0 && { - contentBlocks: [ - { type: 'contexts', contexts: normalizedContexts as any, timestamp: Date.now() }, - ], - }), - } - - const assistantMessage = { - id: generateId(), - role: 'assistant', - content: responseData.content, - timestamp: new Date().toISOString(), - } - - const updatedMessages = [...conversationHistory, userMessage, assistantMessage] - - // Start title generation in parallel if this is first message (non-streaming) - if (actualChatId && !currentChat.title && conversationHistory.length === 0) { - reqLogger.info('Starting title generation for non-streaming response') - requestChatTitle({ message, model: selectedModel, provider, messageId: userMessageIdToUse }) - .then(async (title) => { - if (title) { - await db - .update(copilotChats) - .set({ - title, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, actualChatId!)) - reqLogger.info(`Generated and saved title: ${title}`) - } - }) - .catch((error) => { - reqLogger.error('Title generation failed', error) - }) - } - - // Update chat in database immediately (without blocking for title) - await db - .update(copilotChats) - .set({ - messages: updatedMessages, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, actualChatId!)) - } - - reqLogger.info('Returning non-streaming response', { - duration: tracker.getDuration(), - chatId: actualChatId, - responseLength: responseData.content?.length || 0, - }) - - return NextResponse.json({ - success: true, - response: responseData, - chatId: actualChatId, - metadata: { - requestId: tracker.requestId, - message, - duration: tracker.getDuration(), + currentChat, + isNewChat: conversationHistory.length === 0, + message, + titleModel: selectedModel, + titleProvider: provider, + requestId: tracker.requestId, + workspaceId: resolvedWorkspaceId, + orchestrateOptions: { + userId: authenticatedUserId, + workflowId, + chatId: actualChatId, + executionId, + runId, + goRoute: '/api/copilot', + autoExecuteTools: true, + interactive: true, + onComplete: buildOnComplete(actualChatId, userMessageIdToUse, tracker.requestId), }, }) + + return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS }) } catch (error) { - if ( - actualChatId && - pendingChatStreamAcquired && - !pendingChatStreamHandedOff && - pendingChatStreamID - ) { - await releasePendingChatStream(actualChatId, pendingChatStreamID).catch(() => {}) + if (chatStreamLockAcquired && actualChatId && userMessageIdToUse) { + await releasePendingChatStream(actualChatId, userMessageIdToUse) } const duration = tracker.getDuration() if (error instanceof z.ZodError) { - logger - .withMetadata({ requestId: tracker.requestId, messageId: pendingChatStreamID ?? undefined }) - .error('Validation error', { - duration, - errors: error.errors, - }) + logger.error(`[${tracker.requestId}] Validation error:`, { duration, errors: error.errors }) return NextResponse.json( { error: 'Invalid request data', details: error.errors }, { status: 400 } ) } - logger - .withMetadata({ requestId: tracker.requestId, messageId: pendingChatStreamID ?? undefined }) - .error('Error handling copilot chat', { - duration, - error: error instanceof Error ? error.message : 'Unknown error', - stack: error instanceof Error ? error.stack : undefined, - }) + logger.error(`[${tracker.requestId}] Error handling copilot chat:`, { + duration, + error: error instanceof Error ? error.message : 'Unknown error', + stack: error instanceof Error ? error.stack : undefined, + }) return NextResponse.json( { error: error instanceof Error ? error.message : 'Internal server error' }, @@ -673,132 +409,55 @@ export async function POST(req: NextRequest) { } } -export async function GET(req: NextRequest) { - try { - const { searchParams } = new URL(req.url) - const workflowId = searchParams.get('workflowId') - const workspaceId = searchParams.get('workspaceId') - const chatId = searchParams.get('chatId') - - const { userId: authenticatedUserId, isAuthenticated } = - await authenticateCopilotRequestSessionOnly() - if (!isAuthenticated || !authenticatedUserId) { - return createUnauthorizedResponse() - } - - if (chatId) { - const chat = await getAccessibleCopilotChat(chatId, authenticatedUserId) +// --------------------------------------------------------------------------- +// onComplete: persist assistant message after streaming finishes +// --------------------------------------------------------------------------- - if (!chat) { - return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 }) - } +function buildOnComplete( + chatId: string | undefined, + userMessageId: string, + requestId: string +): (result: OrchestratorResult) => Promise { + return async (result) => { + if (!chatId || !result.success) return - let streamSnapshot: { - events: Array<{ eventId: number; streamId: string; event: Record }> - status: string - } | null = null - - if (chat.conversationId) { - try { - const [meta, events] = await Promise.all([ - getStreamMeta(chat.conversationId), - readStreamEvents(chat.conversationId, 0), - ]) - streamSnapshot = { - events: events || [], - status: meta?.status || 'unknown', - } - } catch (err) { - logger - .withMetadata({ messageId: chat.conversationId || undefined }) - .warn('Failed to read stream snapshot for chat', { - chatId, - conversationId: chat.conversationId, - error: err instanceof Error ? err.message : String(err), - }) - } - } + const assistantMessage = buildPersistedAssistantMessage(result, result.requestId) - const transformedChat = { - id: chat.id, - title: chat.title, - model: chat.model, - messages: Array.isArray(chat.messages) ? chat.messages : [], - messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0, - planArtifact: chat.planArtifact || null, - config: chat.config || null, - conversationId: chat.conversationId || null, - resources: Array.isArray(chat.resources) ? chat.resources : [], - createdAt: chat.createdAt, - updatedAt: chat.updatedAt, - ...(streamSnapshot ? { streamSnapshot } : {}), + try { + const [row] = await db + .select({ messages: copilotChats.messages }) + .from(copilotChats) + .where(eq(copilotChats.id, chatId)) + .limit(1) + + const msgs: Record[] = Array.isArray(row?.messages) ? row.messages : [] + const userIdx = msgs.findIndex((m: Record) => m.id === userMessageId) + const alreadyHasResponse = + userIdx >= 0 && + userIdx + 1 < msgs.length && + (msgs[userIdx + 1] as Record)?.role === 'assistant' + + if (!alreadyHasResponse) { + await db + .update(copilotChats) + .set({ + messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`, + conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${userMessageId} THEN NULL ELSE ${copilotChats.conversationId} END`, + updatedAt: new Date(), + }) + .where(eq(copilotChats.id, chatId)) } - - logger - .withMetadata({ messageId: chat.conversationId || undefined }) - .info(`Retrieved chat ${chatId}`) - return NextResponse.json({ success: true, chat: transformedChat }) - } - - if (!workflowId && !workspaceId) { - return createBadRequestResponse('workflowId, workspaceId, or chatId is required') - } - - if (workspaceId) { - await assertActiveWorkspaceAccess(workspaceId, authenticatedUserId) - } - - if (workflowId) { - const authorization = await authorizeWorkflowByWorkspacePermission({ - workflowId, - userId: authenticatedUserId, - action: 'read', + } catch (error) { + logger.error(`[${requestId}] Failed to persist chat messages`, { + chatId, + error: error instanceof Error ? error.message : 'Unknown error', }) - if (!authorization.allowed) { - return createUnauthorizedResponse() - } } - - const scopeFilter = workflowId - ? eq(copilotChats.workflowId, workflowId) - : eq(copilotChats.workspaceId, workspaceId!) - - const chats = await db - .select({ - id: copilotChats.id, - title: copilotChats.title, - model: copilotChats.model, - messages: copilotChats.messages, - planArtifact: copilotChats.planArtifact, - config: copilotChats.config, - createdAt: copilotChats.createdAt, - updatedAt: copilotChats.updatedAt, - }) - .from(copilotChats) - .where(and(eq(copilotChats.userId, authenticatedUserId), scopeFilter)) - .orderBy(desc(copilotChats.updatedAt)) - - const transformedChats = chats.map((chat) => ({ - id: chat.id, - title: chat.title, - model: chat.model, - messages: Array.isArray(chat.messages) ? chat.messages : [], - messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0, - planArtifact: chat.planArtifact || null, - config: chat.config || null, - createdAt: chat.createdAt, - updatedAt: chat.updatedAt, - })) - - const scope = workflowId ? `workflow ${workflowId}` : `workspace ${workspaceId}` - logger.info(`Retrieved ${transformedChats.length} chats for ${scope}`) - - return NextResponse.json({ - success: true, - chats: transformedChats, - }) - } catch (error) { - logger.error('Error fetching copilot chats', error) - return createInternalServerErrorResponse('Failed to fetch chats') } } + +// --------------------------------------------------------------------------- +// GET handler (read-only queries, extracted to queries.ts) +// --------------------------------------------------------------------------- + +export { GET } from './queries' diff --git a/apps/sim/app/api/copilot/chat/stream/route.test.ts b/apps/sim/app/api/copilot/chat/stream/route.test.ts index 993f10501a8..ff5115e637a 100644 --- a/apps/sim/app/api/copilot/chat/stream/route.test.ts +++ b/apps/sim/app/api/copilot/chat/stream/route.test.ts @@ -4,25 +4,67 @@ import { NextRequest } from 'next/server' import { beforeEach, describe, expect, it, vi } from 'vitest' +import { + MothershipStreamV1CompletionStatus, + MothershipStreamV1EventType, +} from '@/lib/copilot/generated/mothership-stream-v1' -const { getStreamMeta, readStreamEvents, authenticateCopilotRequestSessionOnly } = vi.hoisted( - () => ({ - getStreamMeta: vi.fn(), - readStreamEvents: vi.fn(), - authenticateCopilotRequestSessionOnly: vi.fn(), - }) -) +const { + getLatestRunForStream, + readEvents, + checkForReplayGap, + authenticateCopilotRequestSessionOnly, +} = vi.hoisted(() => ({ + getLatestRunForStream: vi.fn(), + readEvents: vi.fn(), + checkForReplayGap: vi.fn(), + authenticateCopilotRequestSessionOnly: vi.fn(), +})) + +vi.mock('@/lib/copilot/async-runs/repository', () => ({ + getLatestRunForStream, +})) -vi.mock('@/lib/copilot/orchestrator/stream/buffer', () => ({ - getStreamMeta, - readStreamEvents, +vi.mock('@/lib/copilot/request/session', () => ({ + readEvents, + checkForReplayGap, + createEvent: (event: Record) => ({ + stream: { + streamId: event.streamId, + cursor: event.cursor, + }, + seq: event.seq, + trace: { requestId: event.requestId ?? '' }, + type: event.type, + payload: event.payload, + }), + encodeSSEEnvelope: (event: Record) => + new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`), + SSE_RESPONSE_HEADERS: { + 'Content-Type': 'text/event-stream', + }, })) -vi.mock('@/lib/copilot/request-helpers', () => ({ +vi.mock('@/lib/copilot/request/http', () => ({ authenticateCopilotRequestSessionOnly, })) -import { GET } from '@/app/api/copilot/chat/stream/route' +import { GET } from './route' + +async function readAllChunks(response: Response): Promise { + const reader = response.body?.getReader() + expect(reader).toBeTruthy() + + const chunks: string[] = [] + while (true) { + const { done, value } = await reader!.read() + if (done) { + break + } + chunks.push(new TextDecoder().decode(value)) + } + return chunks +} describe('copilot chat stream replay route', () => { beforeEach(() => { @@ -31,29 +73,54 @@ describe('copilot chat stream replay route', () => { userId: 'user-1', isAuthenticated: true, }) - readStreamEvents.mockResolvedValue([]) + readEvents.mockResolvedValue([]) + checkForReplayGap.mockResolvedValue(null) }) - it('stops replay polling when stream meta becomes cancelled', async () => { - getStreamMeta + it('stops replay polling when run becomes cancelled', async () => { + getLatestRunForStream .mockResolvedValueOnce({ status: 'active', - userId: 'user-1', + executionId: 'exec-1', + id: 'run-1', }) .mockResolvedValueOnce({ status: 'cancelled', - userId: 'user-1', + executionId: 'exec-1', + id: 'run-1', }) const response = await GET( - new NextRequest('http://localhost:3000/api/copilot/chat/stream?streamId=stream-1') + new NextRequest('http://localhost:3000/api/copilot/chat/stream?streamId=stream-1&after=0') ) - const reader = response.body?.getReader() - expect(reader).toBeTruthy() + const chunks = await readAllChunks(response) + expect(chunks.join('')).toContain( + JSON.stringify({ + status: MothershipStreamV1CompletionStatus.cancelled, + reason: 'terminal_status', + }) + ) + expect(getLatestRunForStream).toHaveBeenCalledTimes(2) + }) + + it('emits structured terminal replay error when run metadata disappears', async () => { + getLatestRunForStream + .mockResolvedValueOnce({ + status: 'active', + executionId: 'exec-1', + id: 'run-1', + }) + .mockResolvedValueOnce(null) + + const response = await GET( + new NextRequest('http://localhost:3000/api/copilot/chat/stream?streamId=stream-1&after=0') + ) - const first = await reader!.read() - expect(first.done).toBe(true) - expect(getStreamMeta).toHaveBeenCalledTimes(2) + const chunks = await readAllChunks(response) + const body = chunks.join('') + expect(body).toContain(`"type":"${MothershipStreamV1EventType.error}"`) + expect(body).toContain('"code":"resume_run_unavailable"') + expect(body).toContain(`"type":"${MothershipStreamV1EventType.complete}"`) }) }) diff --git a/apps/sim/app/api/copilot/chat/stream/route.ts b/apps/sim/app/api/copilot/chat/stream/route.ts index b56d9471817..9ccf07b3994 100644 --- a/apps/sim/app/api/copilot/chat/stream/route.ts +++ b/apps/sim/app/api/copilot/chat/stream/route.ts @@ -1,12 +1,18 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' +import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository' import { - getStreamMeta, - readStreamEvents, - type StreamMeta, -} from '@/lib/copilot/orchestrator/stream/buffer' -import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers' -import { SSE_HEADERS } from '@/lib/core/utils/sse' + MothershipStreamV1CompletionStatus, + MothershipStreamV1EventType, +} from '@/lib/copilot/generated/mothership-stream-v1' +import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http' +import { + checkForReplayGap, + createEvent, + encodeSSEEnvelope, + readEvents, + SSE_RESPONSE_HEADERS, +} from '@/lib/copilot/request/session' export const maxDuration = 3600 @@ -14,8 +20,59 @@ const logger = createLogger('CopilotChatStreamAPI') const POLL_INTERVAL_MS = 250 const MAX_STREAM_MS = 60 * 60 * 1000 -function encodeEvent(event: Record): Uint8Array { - return new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`) +function isTerminalStatus( + status: string | null | undefined +): status is MothershipStreamV1CompletionStatus { + return ( + status === MothershipStreamV1CompletionStatus.complete || + status === MothershipStreamV1CompletionStatus.error || + status === MothershipStreamV1CompletionStatus.cancelled + ) +} + +function buildResumeTerminalEnvelopes(options: { + streamId: string + afterCursor: string + status: MothershipStreamV1CompletionStatus + message?: string + code: string + reason?: string +}) { + const baseSeq = Number(options.afterCursor || '0') + const seq = Number.isFinite(baseSeq) ? baseSeq : 0 + const envelopes: ReturnType[] = [] + + if (options.status === MothershipStreamV1CompletionStatus.error) { + envelopes.push( + createEvent({ + streamId: options.streamId, + cursor: String(seq + 1), + seq: seq + 1, + requestId: '', + type: MothershipStreamV1EventType.error, + payload: { + message: options.message || 'Stream recovery failed before completion.', + code: options.code, + }, + }) + ) + } + + envelopes.push( + createEvent({ + streamId: options.streamId, + cursor: String(seq + envelopes.length + 1), + seq: seq + envelopes.length + 1, + requestId: '', + type: MothershipStreamV1EventType.complete, + payload: { + status: options.status, + ...(options.reason ? { reason: options.reason } : {}), + }, + }) + ) + + return envelopes } export async function GET(request: NextRequest) { @@ -28,58 +85,49 @@ export async function GET(request: NextRequest) { const url = new URL(request.url) const streamId = url.searchParams.get('streamId') || '' - const fromParam = url.searchParams.get('from') || '0' - const fromEventId = Number(fromParam || 0) - // If batch=true, return buffered events as JSON instead of SSE + const afterCursor = url.searchParams.get('after') || '' const batchMode = url.searchParams.get('batch') === 'true' - const toParam = url.searchParams.get('to') - const toEventId = toParam ? Number(toParam) : undefined - - const reqLogger = logger.withMetadata({ messageId: streamId || undefined }) - - reqLogger.info('[Resume] Received resume request', { - streamId: streamId || undefined, - fromEventId, - toEventId, - batchMode, - }) if (!streamId) { return NextResponse.json({ error: 'streamId is required' }, { status: 400 }) } - const meta = (await getStreamMeta(streamId)) as StreamMeta | null - reqLogger.info('[Resume] Stream lookup', { + const run = await getLatestRunForStream(streamId, authenticatedUserId).catch((err) => { + logger.warn('Failed to fetch latest run for stream', { + streamId, + error: err instanceof Error ? err.message : String(err), + }) + return null + }) + logger.info('[Resume] Stream lookup', { streamId, - fromEventId, - toEventId, + afterCursor, batchMode, - hasMeta: !!meta, - metaStatus: meta?.status, + hasRun: !!run, + runStatus: run?.status, }) - if (!meta) { + if (!run) { return NextResponse.json({ error: 'Stream not found' }, { status: 404 }) } - if (meta.userId && meta.userId !== authenticatedUserId) { - return NextResponse.json({ error: 'Unauthorized' }, { status: 403 }) - } - // Batch mode: return all buffered events as JSON if (batchMode) { - const events = await readStreamEvents(streamId, fromEventId) - const filteredEvents = toEventId ? events.filter((e) => e.eventId <= toEventId) : events - reqLogger.info('[Resume] Batch response', { + const afterSeq = afterCursor || '0' + const events = await readEvents(streamId, afterSeq) + const batchEvents = events.map((envelope) => ({ + eventId: envelope.seq, + streamId: envelope.stream.streamId, + event: envelope, + })) + logger.info('[Resume] Batch response', { streamId, - fromEventId, - toEventId, - eventCount: filteredEvents.length, + afterCursor: afterSeq, + eventCount: batchEvents.length, + runStatus: run.status, }) return NextResponse.json({ success: true, - events: filteredEvents, - status: meta.status, - executionId: meta.executionId, - runId: meta.runId, + events: batchEvents, + status: run.status, }) } @@ -87,9 +135,9 @@ export async function GET(request: NextRequest) { const stream = new ReadableStream({ async start(controller) { - let lastEventId = Number.isFinite(fromEventId) ? fromEventId : 0 - let latestMeta = meta + let cursor = afterCursor || '0' let controllerClosed = false + let sawTerminalEvent = false const closeController = () => { if (controllerClosed) return @@ -97,14 +145,14 @@ export async function GET(request: NextRequest) { try { controller.close() } catch { - // Controller already closed by runtime/client - treat as normal. + // Controller already closed by runtime/client } } - const enqueueEvent = (payload: Record) => { + const enqueueEvent = (payload: unknown) => { if (controllerClosed) return false try { - controller.enqueue(encodeEvent(payload)) + controller.enqueue(encodeSSEEnvelope(payload)) return true } catch { controllerClosed = true @@ -118,47 +166,96 @@ export async function GET(request: NextRequest) { request.signal.addEventListener('abort', abortListener, { once: true }) const flushEvents = async () => { - const events = await readStreamEvents(streamId, lastEventId) + const events = await readEvents(streamId, cursor) if (events.length > 0) { - reqLogger.info('[Resume] Flushing events', { + logger.info('[Resume] Flushing events', { streamId, - fromEventId: lastEventId, + afterCursor: cursor, eventCount: events.length, }) } - for (const entry of events) { - lastEventId = entry.eventId - const payload = { - ...entry.event, - eventId: entry.eventId, - streamId: entry.streamId, - executionId: latestMeta?.executionId, - runId: latestMeta?.runId, + for (const envelope of events) { + cursor = envelope.stream.cursor ?? String(envelope.seq) + if (envelope.type === MothershipStreamV1EventType.complete) { + sawTerminalEvent = true } - if (!enqueueEvent(payload)) { + if (!enqueueEvent(envelope)) { + break + } + } + } + + const emitTerminalIfMissing = ( + status: MothershipStreamV1CompletionStatus, + options?: { message?: string; code: string; reason?: string } + ) => { + if (controllerClosed || sawTerminalEvent) { + return + } + for (const envelope of buildResumeTerminalEnvelopes({ + streamId, + afterCursor: cursor, + status, + message: options?.message, + code: options?.code ?? 'resume_terminal', + reason: options?.reason, + })) { + cursor = envelope.stream.cursor ?? String(envelope.seq) + if (envelope.type === MothershipStreamV1EventType.complete) { + sawTerminalEvent = true + } + if (!enqueueEvent(envelope)) { break } } } try { + const gap = await checkForReplayGap(streamId, afterCursor) + if (gap) { + for (const envelope of gap.envelopes) { + enqueueEvent(envelope) + } + return + } + await flushEvents() while (!controllerClosed && Date.now() - startTime < MAX_STREAM_MS) { - const currentMeta = await getStreamMeta(streamId) - if (!currentMeta) break - latestMeta = currentMeta + const currentRun = await getLatestRunForStream(streamId, authenticatedUserId).catch( + (err) => { + logger.warn('Failed to poll latest run for stream', { + streamId, + error: err instanceof Error ? err.message : String(err), + }) + return null + } + ) + if (!currentRun) { + emitTerminalIfMissing(MothershipStreamV1CompletionStatus.error, { + message: 'The stream could not be recovered because its run metadata is unavailable.', + code: 'resume_run_unavailable', + reason: 'run_unavailable', + }) + break + } await flushEvents() if (controllerClosed) { break } - if ( - currentMeta.status === 'complete' || - currentMeta.status === 'error' || - currentMeta.status === 'cancelled' - ) { + if (isTerminalStatus(currentRun.status)) { + emitTerminalIfMissing(currentRun.status, { + message: + currentRun.status === MothershipStreamV1CompletionStatus.error + ? typeof currentRun.error === 'string' + ? currentRun.error + : 'The recovered stream ended with an error.' + : undefined, + code: 'resume_terminal_status', + reason: 'terminal_status', + }) break } @@ -169,12 +266,24 @@ export async function GET(request: NextRequest) { await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)) } + if (!controllerClosed && Date.now() - startTime >= MAX_STREAM_MS) { + emitTerminalIfMissing(MothershipStreamV1CompletionStatus.error, { + message: 'The stream recovery timed out before completion.', + code: 'resume_timeout', + reason: 'timeout', + }) + } } catch (error) { if (!controllerClosed && !request.signal.aborted) { - reqLogger.warn('Stream replay failed', { + logger.warn('Stream replay failed', { streamId, error: error instanceof Error ? error.message : String(error), }) + emitTerminalIfMissing(MothershipStreamV1CompletionStatus.error, { + message: 'The stream replay failed before completion.', + code: 'resume_internal', + reason: 'stream_replay_failed', + }) } } finally { request.signal.removeEventListener('abort', abortListener) @@ -183,5 +292,5 @@ export async function GET(request: NextRequest) { }, }) - return new Response(stream, { headers: SSE_HEADERS }) + return new Response(stream, { headers: SSE_RESPONSE_HEADERS }) } diff --git a/apps/sim/app/api/copilot/chat/update-messages/route.test.ts b/apps/sim/app/api/copilot/chat/update-messages/route.test.ts index 0376005c283..512a05cfd84 100644 --- a/apps/sim/app/api/copilot/chat/update-messages/route.test.ts +++ b/apps/sim/app/api/copilot/chat/update-messages/route.test.ts @@ -327,7 +327,35 @@ describe('Copilot Chat Update Messages API Route', () => { }) expect(mockSet).toHaveBeenCalledWith({ - messages, + messages: [ + { + id: 'msg-1', + role: 'user', + content: 'Hello', + timestamp: '2024-01-01T10:00:00.000Z', + }, + { + id: 'msg-2', + role: 'assistant', + content: 'Hi there!', + timestamp: '2024-01-01T10:01:00.000Z', + contentBlocks: [ + { + type: 'text', + content: 'Here is the weather information', + }, + { + type: 'tool', + phase: 'call', + toolCall: { + id: 'tool-1', + name: 'get_weather', + state: 'pending', + }, + }, + ], + }, + ], updatedAt: expect.any(Date), }) }) diff --git a/apps/sim/app/api/copilot/chat/update-messages/route.ts b/apps/sim/app/api/copilot/chat/update-messages/route.ts index 574c2241ede..ee2dfee0bb7 100644 --- a/apps/sim/app/api/copilot/chat/update-messages/route.ts +++ b/apps/sim/app/api/copilot/chat/update-messages/route.ts @@ -4,15 +4,16 @@ import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' -import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle' -import { COPILOT_MODES } from '@/lib/copilot/models' +import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' +import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import { COPILOT_MODES } from '@/lib/copilot/constants' import { authenticateCopilotRequestSessionOnly, createInternalServerErrorResponse, createNotFoundResponse, createRequestTracker, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' +} from '@/lib/copilot/request/http' const logger = createLogger('CopilotChatUpdateAPI') @@ -78,12 +79,15 @@ export async function POST(req: NextRequest) { } const { chatId, messages, planArtifact, config } = UpdateMessagesSchema.parse(body) + const normalizedMessages: PersistedMessage[] = messages.map((message) => + normalizeMessage(message as Record) + ) // Debug: Log what we're about to save - const lastMsgParsed = messages[messages.length - 1] + const lastMsgParsed = normalizedMessages[normalizedMessages.length - 1] if (lastMsgParsed?.role === 'assistant') { logger.info(`[${tracker.requestId}] Parsed messages to save`, { - messageCount: messages.length, + messageCount: normalizedMessages.length, lastMsgId: lastMsgParsed.id, lastMsgContentLength: lastMsgParsed.content?.length || 0, lastMsgContentBlockCount: lastMsgParsed.contentBlocks?.length || 0, @@ -99,8 +103,8 @@ export async function POST(req: NextRequest) { } // Update chat with new messages, plan artifact, and config - const updateData: Record = { - messages: messages, + const updateData: Record = { + messages: normalizedMessages, updatedAt: new Date(), } @@ -116,14 +120,14 @@ export async function POST(req: NextRequest) { logger.info(`[${tracker.requestId}] Successfully updated chat`, { chatId, - newMessageCount: messages.length, + newMessageCount: normalizedMessages.length, hasPlanArtifact: !!planArtifact, hasConfig: !!config, }) return NextResponse.json({ success: true, - messageCount: messages.length, + messageCount: normalizedMessages.length, }) } catch (error) { logger.error(`[${tracker.requestId}] Error updating chat messages:`, error) diff --git a/apps/sim/app/api/copilot/chats/route.test.ts b/apps/sim/app/api/copilot/chats/route.test.ts index 32088fe093a..3dbbf2791f8 100644 --- a/apps/sim/app/api/copilot/chats/route.test.ts +++ b/apps/sim/app/api/copilot/chats/route.test.ts @@ -66,7 +66,7 @@ vi.mock('drizzle-orm', () => ({ sql: vi.fn(), })) -vi.mock('@/lib/copilot/request-helpers', () => ({ +vi.mock('@/lib/copilot/request/http', () => ({ authenticateCopilotRequestSessionOnly: mockAuthenticate, createUnauthorizedResponse: mockCreateUnauthorizedResponse, createInternalServerErrorResponse: mockCreateInternalServerErrorResponse, diff --git a/apps/sim/app/api/copilot/chats/route.ts b/apps/sim/app/api/copilot/chats/route.ts index 7010d84e92b..b0142c27f7b 100644 --- a/apps/sim/app/api/copilot/chats/route.ts +++ b/apps/sim/app/api/copilot/chats/route.ts @@ -4,14 +4,14 @@ import { createLogger } from '@sim/logger' import { and, desc, eq, isNull, or, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' -import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle' +import { resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle' import { authenticateCopilotRequestSessionOnly, createBadRequestResponse, createInternalServerErrorResponse, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' -import { taskPubSub } from '@/lib/copilot/task-events' +} from '@/lib/copilot/request/http' +import { taskPubSub } from '@/lib/copilot/tasks' import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' import { assertActiveWorkspaceAccess } from '@/lib/workspaces/permissions/utils' @@ -37,7 +37,7 @@ export async function GET(_request: NextRequest) { title: copilotChats.title, workflowId: copilotChats.workflowId, workspaceId: copilotChats.workspaceId, - conversationId: copilotChats.conversationId, + activeStreamId: copilotChats.conversationId, updatedAt: copilotChats.updatedAt, }) .from(copilotChats) diff --git a/apps/sim/app/api/copilot/checkpoints/revert/route.test.ts b/apps/sim/app/api/copilot/checkpoints/revert/route.test.ts index 7fd68b4925e..fe4fb76f4d1 100644 --- a/apps/sim/app/api/copilot/checkpoints/revert/route.test.ts +++ b/apps/sim/app/api/copilot/checkpoints/revert/route.test.ts @@ -43,7 +43,7 @@ vi.mock('@/lib/workflows/utils', () => ({ authorizeWorkflowByWorkspacePermission: mockAuthorize, })) -vi.mock('@/lib/copilot/chat-lifecycle', () => ({ +vi.mock('@/lib/copilot/chat/lifecycle', () => ({ getAccessibleCopilotChat: mockGetAccessibleCopilotChat, })) diff --git a/apps/sim/app/api/copilot/checkpoints/revert/route.ts b/apps/sim/app/api/copilot/checkpoints/revert/route.ts index dd73477f5ec..b49d21f3841 100644 --- a/apps/sim/app/api/copilot/checkpoints/revert/route.ts +++ b/apps/sim/app/api/copilot/checkpoints/revert/route.ts @@ -4,14 +4,14 @@ import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' -import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle' +import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' import { authenticateCopilotRequestSessionOnly, createInternalServerErrorResponse, createNotFoundResponse, createRequestTracker, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' +} from '@/lib/copilot/request/http' import { getInternalApiBaseUrl } from '@/lib/core/utils/urls' import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' import { isUuidV4 } from '@/executor/constants' diff --git a/apps/sim/app/api/copilot/checkpoints/route.test.ts b/apps/sim/app/api/copilot/checkpoints/route.test.ts index eedf688af37..e1b3a1f4e81 100644 --- a/apps/sim/app/api/copilot/checkpoints/route.test.ts +++ b/apps/sim/app/api/copilot/checkpoints/route.test.ts @@ -62,7 +62,7 @@ vi.mock('drizzle-orm', () => ({ desc: vi.fn((field: unknown) => ({ field, type: 'desc' })), })) -vi.mock('@/lib/copilot/chat-lifecycle', () => ({ +vi.mock('@/lib/copilot/chat/lifecycle', () => ({ getAccessibleCopilotChat: mockGetAccessibleCopilotChat, })) diff --git a/apps/sim/app/api/copilot/checkpoints/route.ts b/apps/sim/app/api/copilot/checkpoints/route.ts index 58b4cde4bb2..c800e519542 100644 --- a/apps/sim/app/api/copilot/checkpoints/route.ts +++ b/apps/sim/app/api/copilot/checkpoints/route.ts @@ -4,14 +4,14 @@ import { createLogger } from '@sim/logger' import { and, desc, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' -import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle' +import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' import { authenticateCopilotRequestSessionOnly, createBadRequestResponse, createInternalServerErrorResponse, createRequestTracker, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' +} from '@/lib/copilot/request/http' import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' const logger = createLogger('WorkflowCheckpointsAPI') diff --git a/apps/sim/app/api/copilot/confirm/route.test.ts b/apps/sim/app/api/copilot/confirm/route.test.ts index 8570d637646..0b40d981e84 100644 --- a/apps/sim/app/api/copilot/confirm/route.test.ts +++ b/apps/sim/app/api/copilot/confirm/route.test.ts @@ -38,7 +38,7 @@ const { publishToolConfirmation: vi.fn(), })) -vi.mock('@/lib/copilot/request-helpers', () => ({ +vi.mock('@/lib/copilot/request/http', () => ({ authenticateCopilotRequestSessionOnly, createBadRequestResponse, createInternalServerErrorResponse, @@ -54,7 +54,7 @@ vi.mock('@/lib/copilot/async-runs/repository', () => ({ completeAsyncToolCall, })) -vi.mock('@/lib/copilot/orchestrator/persistence', () => ({ +vi.mock('@/lib/copilot/persistence/tool-confirm', () => ({ publishToolConfirmation, })) diff --git a/apps/sim/app/api/copilot/confirm/route.ts b/apps/sim/app/api/copilot/confirm/route.ts index bf154487f05..7246381105a 100644 --- a/apps/sim/app/api/copilot/confirm/route.ts +++ b/apps/sim/app/api/copilot/confirm/route.ts @@ -1,13 +1,14 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' +import { ASYNC_TOOL_STATUS } from '@/lib/copilot/async-runs/lifecycle' import { completeAsyncToolCall, getAsyncToolCall, getRunSegment, upsertAsyncToolCall, } from '@/lib/copilot/async-runs/repository' -import { publishToolConfirmation } from '@/lib/copilot/orchestrator/persistence' +import { publishToolConfirmation } from '@/lib/copilot/persistence/tool-confirm' import { authenticateCopilotRequestSessionOnly, createBadRequestResponse, @@ -16,7 +17,7 @@ import { createRequestTracker, createUnauthorizedResponse, type NotificationStatus, -} from '@/lib/copilot/request-helpers' +} from '@/lib/copilot/request/http' const logger = createLogger('CopilotConfirmAPI') @@ -42,17 +43,17 @@ async function updateToolCallStatus( const toolCallId = existing.toolCallId const durableStatus = status === 'success' - ? 'completed' + ? ASYNC_TOOL_STATUS.completed : status === 'cancelled' - ? 'cancelled' + ? ASYNC_TOOL_STATUS.cancelled : status === 'error' || status === 'rejected' - ? 'failed' - : 'pending' + ? ASYNC_TOOL_STATUS.failed + : ASYNC_TOOL_STATUS.pending try { if ( - durableStatus === 'completed' || - durableStatus === 'failed' || - durableStatus === 'cancelled' + durableStatus === ASYNC_TOOL_STATUS.completed || + durableStatus === ASYNC_TOOL_STATUS.failed || + durableStatus === ASYNC_TOOL_STATUS.cancelled ) { await completeAsyncToolCall({ toolCallId, @@ -107,13 +108,25 @@ export async function POST(req: NextRequest) { const body = await req.json() const { toolCallId, status, message, data } = ConfirmationSchema.parse(body) - const existing = await getAsyncToolCall(toolCallId).catch(() => null) + const existing = await getAsyncToolCall(toolCallId).catch((err) => { + logger.warn('Failed to fetch async tool call', { + toolCallId, + error: err instanceof Error ? err.message : String(err), + }) + return null + }) if (!existing) { return createNotFoundResponse('Tool call not found') } - const run = await getRunSegment(existing.runId).catch(() => null) + const run = await getRunSegment(existing.runId).catch((err) => { + logger.warn('Failed to fetch run segment', { + runId: existing.runId, + error: err instanceof Error ? err.message : String(err), + }) + return null + }) if (!run) { return createNotFoundResponse('Tool call run not found') } diff --git a/apps/sim/app/api/copilot/credentials/route.ts b/apps/sim/app/api/copilot/credentials/route.ts index 2f764429d74..82d031c9e64 100644 --- a/apps/sim/app/api/copilot/credentials/route.ts +++ b/apps/sim/app/api/copilot/credentials/route.ts @@ -1,5 +1,5 @@ import { type NextRequest, NextResponse } from 'next/server' -import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers' +import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http' import { routeExecution } from '@/lib/copilot/tools/server/router' /** diff --git a/apps/sim/app/api/copilot/feedback/route.test.ts b/apps/sim/app/api/copilot/feedback/route.test.ts index f74aecf77a7..3f3a28598a6 100644 --- a/apps/sim/app/api/copilot/feedback/route.test.ts +++ b/apps/sim/app/api/copilot/feedback/route.test.ts @@ -57,7 +57,7 @@ vi.mock('drizzle-orm', () => ({ eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })), })) -vi.mock('@/lib/copilot/request-helpers', () => ({ +vi.mock('@/lib/copilot/request/http', () => ({ authenticateCopilotRequestSessionOnly: mockAuthenticate, createUnauthorizedResponse: mockCreateUnauthorizedResponse, createBadRequestResponse: mockCreateBadRequestResponse, diff --git a/apps/sim/app/api/copilot/feedback/route.ts b/apps/sim/app/api/copilot/feedback/route.ts index 92abaa1c3e9..175bc995d50 100644 --- a/apps/sim/app/api/copilot/feedback/route.ts +++ b/apps/sim/app/api/copilot/feedback/route.ts @@ -10,7 +10,7 @@ import { createInternalServerErrorResponse, createRequestTracker, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' +} from '@/lib/copilot/request/http' import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('CopilotFeedbackAPI') diff --git a/apps/sim/app/api/copilot/models/route.ts b/apps/sim/app/api/copilot/models/route.ts index d1773797453..7e23e38df69 100644 --- a/apps/sim/app/api/copilot/models/route.ts +++ b/apps/sim/app/api/copilot/models/route.ts @@ -1,8 +1,14 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { SIM_AGENT_API_URL } from '@/lib/copilot/constants' -import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers' -import type { AvailableModel } from '@/lib/copilot/types' +import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http' + +interface AvailableModel { + id: string + friendlyName: string + provider: string +} + import { env } from '@/lib/core/config/env' const logger = createLogger('CopilotModelsAPI') diff --git a/apps/sim/app/api/copilot/stats/route.test.ts b/apps/sim/app/api/copilot/stats/route.test.ts index 176a97eb371..ca6e97704f0 100644 --- a/apps/sim/app/api/copilot/stats/route.test.ts +++ b/apps/sim/app/api/copilot/stats/route.test.ts @@ -23,7 +23,7 @@ const { mockFetch: vi.fn(), })) -vi.mock('@/lib/copilot/request-helpers', () => ({ +vi.mock('@/lib/copilot/request/http', () => ({ authenticateCopilotRequestSessionOnly: mockAuthenticateCopilotRequestSessionOnly, createUnauthorizedResponse: mockCreateUnauthorizedResponse, createBadRequestResponse: mockCreateBadRequestResponse, diff --git a/apps/sim/app/api/copilot/stats/route.ts b/apps/sim/app/api/copilot/stats/route.ts index 493f6e4ec90..75ed6d096b1 100644 --- a/apps/sim/app/api/copilot/stats/route.ts +++ b/apps/sim/app/api/copilot/stats/route.ts @@ -7,7 +7,7 @@ import { createInternalServerErrorResponse, createRequestTracker, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' +} from '@/lib/copilot/request/http' import { env } from '@/lib/core/config/env' const BodySchema = z.object({ diff --git a/apps/sim/app/api/copilot/training/examples/route.ts b/apps/sim/app/api/copilot/training/examples/route.ts index 934ce256875..a9318940b91 100644 --- a/apps/sim/app/api/copilot/training/examples/route.ts +++ b/apps/sim/app/api/copilot/training/examples/route.ts @@ -4,7 +4,7 @@ import { z } from 'zod' import { authenticateCopilotRequestSessionOnly, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' +} from '@/lib/copilot/request/http' import { env } from '@/lib/core/config/env' const logger = createLogger('CopilotTrainingExamplesAPI') diff --git a/apps/sim/app/api/copilot/training/route.ts b/apps/sim/app/api/copilot/training/route.ts index e6e58f59bb0..e30918b8212 100644 --- a/apps/sim/app/api/copilot/training/route.ts +++ b/apps/sim/app/api/copilot/training/route.ts @@ -4,7 +4,7 @@ import { z } from 'zod' import { authenticateCopilotRequestSessionOnly, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' +} from '@/lib/copilot/request/http' import { env } from '@/lib/core/config/env' const logger = createLogger('CopilotTrainingAPI') diff --git a/apps/sim/app/api/files/serve/[...path]/route.test.ts b/apps/sim/app/api/files/serve/[...path]/route.test.ts index bc5b35647c0..3c0c1fa7f6c 100644 --- a/apps/sim/app/api/files/serve/[...path]/route.test.ts +++ b/apps/sim/app/api/files/serve/[...path]/route.test.ts @@ -75,6 +75,16 @@ vi.mock('@/lib/uploads/utils/file-utils', () => ({ vi.mock('@/lib/uploads/setup.server', () => ({})) +vi.mock('@/lib/execution/doc-vm', () => ({ + generatePdfFromCode: vi.fn().mockResolvedValue(Buffer.from('%PDF-compiled')), + generateDocxFromCode: vi.fn().mockResolvedValue(Buffer.from('PK\x03\x04compiled')), + generatePptxFromCode: vi.fn().mockResolvedValue(Buffer.from('PK\x03\x04compiled')), +})) + +vi.mock('@/lib/uploads/contexts/workspace/workspace-file-manager', () => ({ + parseWorkspaceFileKey: vi.fn().mockReturnValue(undefined), +})) + vi.mock('@/app/api/files/utils', () => ({ FileNotFoundError, createFileResponse: mockCreateFileResponse, diff --git a/apps/sim/app/api/files/serve/[...path]/route.ts b/apps/sim/app/api/files/serve/[...path]/route.ts index bc14086395a..0f15198c328 100644 --- a/apps/sim/app/api/files/serve/[...path]/route.ts +++ b/apps/sim/app/api/files/serve/[...path]/route.ts @@ -4,7 +4,11 @@ import { createLogger } from '@sim/logger' import type { NextRequest } from 'next/server' import { NextResponse } from 'next/server' import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' -import { generatePptxFromCode } from '@/lib/execution/pptx-vm' +import { + generateDocxFromCode, + generatePdfFromCode, + generatePptxFromCode, +} from '@/lib/execution/doc-vm' import { CopilotFiles, isUsingCloudStorage } from '@/lib/uploads' import type { StorageContext } from '@/lib/uploads/config' import { parseWorkspaceFileKey } from '@/lib/uploads/contexts/workspace/workspace-file-manager' @@ -22,47 +26,73 @@ import { const logger = createLogger('FilesServeAPI') const ZIP_MAGIC = Buffer.from([0x50, 0x4b, 0x03, 0x04]) +const PDF_MAGIC = Buffer.from([0x25, 0x50, 0x44, 0x46, 0x2d]) // %PDF- + +interface CompilableFormat { + magic: Buffer + compile: (code: string, workspaceId: string) => Promise + contentType: string +} + +const COMPILABLE_FORMATS: Record = { + '.pptx': { + magic: ZIP_MAGIC, + compile: generatePptxFromCode, + contentType: 'application/vnd.openxmlformats-officedocument.presentationml.presentation', + }, + '.docx': { + magic: ZIP_MAGIC, + compile: generateDocxFromCode, + contentType: 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + }, + '.pdf': { + magic: PDF_MAGIC, + compile: generatePdfFromCode, + contentType: 'application/pdf', + }, +} -const MAX_COMPILED_PPTX_CACHE = 10 -const compiledPptxCache = new Map() +const MAX_COMPILED_DOC_CACHE = 10 +const compiledDocCache = new Map() function compiledCacheSet(key: string, buffer: Buffer): void { - if (compiledPptxCache.size >= MAX_COMPILED_PPTX_CACHE) { - compiledPptxCache.delete(compiledPptxCache.keys().next().value as string) + if (compiledDocCache.size >= MAX_COMPILED_DOC_CACHE) { + compiledDocCache.delete(compiledDocCache.keys().next().value as string) } - compiledPptxCache.set(key, buffer) + compiledDocCache.set(key, buffer) } -async function compilePptxIfNeeded( +async function compileDocumentIfNeeded( buffer: Buffer, filename: string, workspaceId?: string, raw?: boolean ): Promise<{ buffer: Buffer; contentType: string }> { - const isPptx = filename.toLowerCase().endsWith('.pptx') - if (raw || !isPptx || buffer.subarray(0, 4).equals(ZIP_MAGIC)) { + if (raw) return { buffer, contentType: getContentType(filename) } + + const ext = filename.slice(filename.lastIndexOf('.')).toLowerCase() + const format = COMPILABLE_FORMATS[ext] + if (!format) return { buffer, contentType: getContentType(filename) } + + const magicLen = format.magic.length + if (buffer.length >= magicLen && buffer.subarray(0, magicLen).equals(format.magic)) { return { buffer, contentType: getContentType(filename) } } const code = buffer.toString('utf-8') const cacheKey = createHash('sha256') + .update(ext) .update(code) .update(workspaceId ?? '') .digest('hex') - const cached = compiledPptxCache.get(cacheKey) + const cached = compiledDocCache.get(cacheKey) if (cached) { - return { - buffer: cached, - contentType: 'application/vnd.openxmlformats-officedocument.presentationml.presentation', - } + return { buffer: cached, contentType: format.contentType } } - const compiled = await generatePptxFromCode(code, workspaceId || '') + const compiled = await format.compile(code, workspaceId || '') compiledCacheSet(cacheKey, compiled) - return { - buffer: compiled, - contentType: 'application/vnd.openxmlformats-officedocument.presentationml.presentation', - } + return { buffer: compiled, contentType: format.contentType } } const STORAGE_KEY_PREFIX_RE = /^\d{13}-[a-z0-9]{7}-/ @@ -169,7 +199,7 @@ async function handleLocalFile( const segment = filename.split('/').pop() || filename const displayName = stripStorageKeyPrefix(segment) const workspaceId = getWorkspaceIdForCompile(filename) - const { buffer: fileBuffer, contentType } = await compilePptxIfNeeded( + const { buffer: fileBuffer, contentType } = await compileDocumentIfNeeded( rawBuffer, displayName, workspaceId, @@ -226,7 +256,7 @@ async function handleCloudProxy( const segment = cloudKey.split('/').pop() || 'download' const displayName = stripStorageKeyPrefix(segment) const workspaceId = getWorkspaceIdForCompile(cloudKey) - const { buffer: fileBuffer, contentType } = await compilePptxIfNeeded( + const { buffer: fileBuffer, contentType } = await compileDocumentIfNeeded( rawBuffer, displayName, workspaceId, diff --git a/apps/sim/app/api/function/execute/route.test.ts b/apps/sim/app/api/function/execute/route.test.ts index 02db5236704..766bbb88fad 100644 --- a/apps/sim/app/api/function/execute/route.test.ts +++ b/apps/sim/app/api/function/execute/route.test.ts @@ -24,6 +24,27 @@ vi.mock('@/lib/auth/hybrid', () => ({ vi.mock('@/lib/execution/e2b', () => ({ executeInE2B: mockExecuteInE2B, + executeShellInE2B: vi.fn(), +})) + +vi.mock('@/lib/copilot/request/tools/files', () => ({ + FORMAT_TO_CONTENT_TYPE: { + json: 'application/json', + csv: 'text/csv', + txt: 'text/plain', + md: 'text/markdown', + html: 'text/html', + }, + normalizeOutputWorkspaceFileName: vi.fn((p: string) => p.replace(/^files\//, '')), + resolveOutputFormat: vi.fn(() => 'json'), +})) + +vi.mock('@/lib/uploads/contexts/workspace/workspace-file-manager', () => ({ + uploadWorkspaceFile: vi.fn(), +})) + +vi.mock('@/lib/workflows/utils', () => ({ + getWorkflowById: vi.fn(), })) vi.mock('@/lib/core/config/feature-flags', () => ({ @@ -32,6 +53,7 @@ vi.mock('@/lib/core/config/feature-flags', () => ({ isProd: false, isDev: false, isTest: true, + isEmailVerificationEnabled: false, })) import { validateProxyUrl } from '@/lib/core/security/input-validation' diff --git a/apps/sim/app/api/function/execute/route.ts b/apps/sim/app/api/function/execute/route.ts index 24e992401b7..2328bf11fbb 100644 --- a/apps/sim/app/api/function/execute/route.ts +++ b/apps/sim/app/api/function/execute/route.ts @@ -1,11 +1,18 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { checkInternalAuth } from '@/lib/auth/hybrid' +import { + FORMAT_TO_CONTENT_TYPE, + normalizeOutputWorkspaceFileName, + resolveOutputFormat, +} from '@/lib/copilot/request/tools/files' import { isE2bEnabled } from '@/lib/core/config/feature-flags' import { generateRequestId } from '@/lib/core/utils/request' -import { executeInE2B } from '@/lib/execution/e2b' +import { executeInE2B, executeShellInE2B } from '@/lib/execution/e2b' import { executeInIsolatedVM } from '@/lib/execution/isolated-vm' import { CodeLanguage, DEFAULT_CODE_LANGUAGE, isValidCodeLanguage } from '@/lib/execution/languages' +import { uploadWorkspaceFile } from '@/lib/uploads/contexts/workspace/workspace-file-manager' +import { getWorkflowById } from '@/lib/workflows/utils' import { escapeRegExp, normalizeName, REFERENCE } from '@/executor/constants' import { type OutputSchema, resolveBlockReference } from '@/executor/utils/block-reference' import { formatLiteralForCode } from '@/executor/utils/code-formatting' @@ -580,6 +587,107 @@ function cleanStdout(stdout: string): string { return stdout } +async function maybeExportSandboxFileToWorkspace(args: { + authUserId: string + workflowId?: string + workspaceId?: string + outputPath?: string + outputFormat?: string + outputMimeType?: string + outputSandboxPath?: string + exportedFileContent?: string + stdout: string + executionTime: number +}) { + const { + authUserId, + workflowId, + workspaceId, + outputPath, + outputFormat, + outputMimeType, + outputSandboxPath, + exportedFileContent, + stdout, + executionTime, + } = args + + if (!outputSandboxPath) return null + + if (!outputPath) { + return NextResponse.json( + { + success: false, + error: + 'outputSandboxPath requires outputPath. Set outputPath to the destination workspace file, e.g. "files/result.csv".', + output: { result: null, stdout: cleanStdout(stdout), executionTime }, + }, + { status: 400 } + ) + } + + const resolvedWorkspaceId = + workspaceId || (workflowId ? (await getWorkflowById(workflowId))?.workspaceId : undefined) + + if (!resolvedWorkspaceId) { + return NextResponse.json( + { + success: false, + error: 'Workspace context required to save sandbox file to workspace', + output: { result: null, stdout: cleanStdout(stdout), executionTime }, + }, + { status: 400 } + ) + } + + if (exportedFileContent === undefined) { + return NextResponse.json( + { + success: false, + error: `Sandbox file "${outputSandboxPath}" was not found or could not be read`, + output: { result: null, stdout: cleanStdout(stdout), executionTime }, + }, + { status: 500 } + ) + } + + const fileName = normalizeOutputWorkspaceFileName(outputPath) + + const TEXT_MIMES = new Set(Object.values(FORMAT_TO_CONTENT_TYPE)) + const resolvedMimeType = + outputMimeType || + FORMAT_TO_CONTENT_TYPE[resolveOutputFormat(fileName, outputFormat)] || + 'application/octet-stream' + const isBinary = !TEXT_MIMES.has(resolvedMimeType) + const fileBuffer = isBinary + ? Buffer.from(exportedFileContent, 'base64') + : Buffer.from(exportedFileContent, 'utf-8') + + const uploaded = await uploadWorkspaceFile( + resolvedWorkspaceId, + authUserId, + fileBuffer, + fileName, + resolvedMimeType + ) + + return NextResponse.json({ + success: true, + output: { + result: { + message: `Sandbox file exported to files/${fileName}`, + fileId: uploaded.id, + fileName, + downloadUrl: uploaded.url, + sandboxPath: outputSandboxPath, + }, + stdout: cleanStdout(stdout), + executionTime, + }, + resources: [{ type: 'file', id: uploaded.id, title: fileName }], + }) +} + export async function POST(req: NextRequest) { const requestId = generateRequestId() const startTime = Date.now() @@ -603,12 +711,17 @@ export async function POST(req: NextRequest) { params = {}, timeout = DEFAULT_EXECUTION_TIMEOUT_MS, language = DEFAULT_CODE_LANGUAGE, + outputPath, + outputFormat, + outputMimeType, + outputSandboxPath, envVars = {}, blockData = {}, blockNameMapping = {}, blockOutputSchemas = {}, workflowVariables = {}, workflowId, + workspaceId, isCustomTool = false, _sandboxFiles, } = body @@ -626,18 +739,25 @@ export async function POST(req: NextRequest) { const lang = isValidCodeLanguage(language) ? language : DEFAULT_CODE_LANGUAGE - const codeResolution = resolveCodeVariables( - code, - executionParams, - envVars, - blockData, - blockNameMapping, - blockOutputSchemas, - workflowVariables, - lang - ) - resolvedCode = codeResolution.resolvedCode - const contextVariables = codeResolution.contextVariables + let contextVariables: Record = {} + if (lang === CodeLanguage.Shell) { + // For shell, env vars are injected as OS env vars via shellEnvs. + // Replace {{VAR}} placeholders with $VAR so the shell can access them natively. + resolvedCode = code.replace(/\{\{([A-Za-z_][A-Za-z0-9_]*)\}\}/g, '$$$1') + } else { + const codeResolution = resolveCodeVariables( + code, + executionParams, + envVars, + blockData, + blockNameMapping, + blockOutputSchemas, + workflowVariables, + lang + ) + resolvedCode = codeResolution.resolvedCode + contextVariables = codeResolution.contextVariables + } let jsImports = '' let jsRemainingCode = resolvedCode @@ -652,6 +772,83 @@ export async function POST(req: NextRequest) { hasImports = jsImports.trim().length > 0 || hasRequireStatements } + if (lang === CodeLanguage.Shell) { + if (!isE2bEnabled) { + throw new Error( + 'Shell execution requires E2B to be enabled. Please contact your administrator to enable E2B.' + ) + } + + const shellEnvs: Record = {} + for (const [k, v] of Object.entries(envVars)) { + shellEnvs[k] = String(v) + } + for (const [k, v] of Object.entries(contextVariables)) { + shellEnvs[k] = String(v) + } + + logger.info(`[${requestId}] E2B shell execution`, { + enabled: isE2bEnabled, + hasApiKey: Boolean(process.env.E2B_API_KEY), + envVarCount: Object.keys(shellEnvs).length, + }) + + const execStart = Date.now() + const { + result: shellResult, + stdout: shellStdout, + sandboxId, + error: shellError, + exportedFileContent, + } = await executeShellInE2B({ + code: resolvedCode, + envs: shellEnvs, + timeoutMs: timeout, + sandboxFiles: _sandboxFiles, + outputSandboxPath, + }) + const executionTime = Date.now() - execStart + + logger.info(`[${requestId}] E2B shell sandbox`, { + sandboxId, + stdoutPreview: shellStdout?.slice(0, 200), + error: shellError, + executionTime, + }) + + if (shellError) { + return NextResponse.json( + { + success: false, + error: shellError, + output: { result: null, stdout: cleanStdout(shellStdout), executionTime }, + }, + { status: 500 } + ) + } + + if (outputSandboxPath) { + const fileExportResponse = await maybeExportSandboxFileToWorkspace({ + authUserId: auth.userId, + workflowId, + workspaceId, + outputPath, + outputFormat, + outputMimeType, + outputSandboxPath, + exportedFileContent, + stdout: shellStdout, + executionTime, + }) + if (fileExportResponse) return fileExportResponse + } + + return NextResponse.json({ + success: true, + output: { result: shellResult ?? null, stdout: cleanStdout(shellStdout), executionTime }, + }) + } + if (lang === CodeLanguage.Python && !isE2bEnabled) { throw new Error( 'Python execution requires E2B to be enabled. Please contact your administrator to enable E2B, or use JavaScript instead.' @@ -719,11 +916,13 @@ export async function POST(req: NextRequest) { stdout: e2bStdout, sandboxId, error: e2bError, + exportedFileContent, } = await executeInE2B({ code: codeForE2B, language: CodeLanguage.JavaScript, timeoutMs: timeout, sandboxFiles: _sandboxFiles, + outputSandboxPath, }) const executionTime = Date.now() - execStart stdout += e2bStdout @@ -752,6 +951,22 @@ export async function POST(req: NextRequest) { ) } + if (outputSandboxPath) { + const fileExportResponse = await maybeExportSandboxFileToWorkspace({ + authUserId: auth.userId, + workflowId, + workspaceId, + outputPath, + outputFormat, + outputMimeType, + outputSandboxPath, + exportedFileContent, + stdout, + executionTime, + }) + if (fileExportResponse) return fileExportResponse + } + return NextResponse.json({ success: true, output: { result: e2bResult ?? null, stdout: cleanStdout(stdout), executionTime }, @@ -783,11 +998,13 @@ export async function POST(req: NextRequest) { stdout: e2bStdout, sandboxId, error: e2bError, + exportedFileContent, } = await executeInE2B({ code: codeForE2B, language: CodeLanguage.Python, timeoutMs: timeout, sandboxFiles: _sandboxFiles, + outputSandboxPath, }) const executionTime = Date.now() - execStart stdout += e2bStdout @@ -816,6 +1033,22 @@ export async function POST(req: NextRequest) { ) } + if (outputSandboxPath) { + const fileExportResponse = await maybeExportSandboxFileToWorkspace({ + authUserId: auth.userId, + workflowId, + workspaceId, + outputPath, + outputFormat, + outputMimeType, + outputSandboxPath, + exportedFileContent, + stdout, + executionTime, + }) + if (fileExportResponse) return fileExportResponse + } + return NextResponse.json({ success: true, output: { result: e2bResult ?? null, stdout: cleanStdout(stdout), executionTime }, diff --git a/apps/sim/app/api/mcp/copilot/route.ts b/apps/sim/app/api/mcp/copilot/route.ts index f1377d5ad13..fca77e30cbb 100644 --- a/apps/sim/app/api/mcp/copilot/route.ts +++ b/apps/sim/app/api/mcp/copilot/route.ts @@ -17,14 +17,11 @@ import { eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { validateOAuthAccessToken } from '@/lib/auth/oauth-token' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' -import { createRunSegment } from '@/lib/copilot/async-runs/repository' import { ORCHESTRATION_TIMEOUT_MS, SIM_AGENT_API_URL } from '@/lib/copilot/constants' -import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' -import { orchestrateSubagentStream } from '@/lib/copilot/orchestrator/subagent' -import { - executeToolServerSide, - prepareExecutionContext, -} from '@/lib/copilot/orchestrator/tool-executor' +import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run' +import { orchestrateSubagentStream } from '@/lib/copilot/request/subagent' +import { ensureHandlersRegistered, executeTool } from '@/lib/copilot/tool-executor' +import { prepareExecutionContext } from '@/lib/copilot/tools/handlers/context' import { DIRECT_TOOL_DEFS, SUBAGENT_TOOL_DEFS } from '@/lib/copilot/tools/mcp/definitions' import { env } from '@/lib/core/config/env' import { RateLimiter } from '@/lib/core/rate-limiter' @@ -125,12 +122,10 @@ Sim is a workflow automation platform. Workflows are visual pipelines of connect 1. \`list_workspaces\` → know where to work 2. \`create_workflow(name, workspaceId)\` → get a workflowId -3. \`sim_build(request, workflowId)\` → plan and build in one pass +3. \`sim_workflow(request, workflowId)\` → plan and build in one pass 4. \`sim_test(request, workflowId)\` → verify it works 5. \`sim_deploy("deploy as api", workflowId)\` → make it accessible externally (optional) -For fine-grained control, use \`sim_plan\` → \`sim_edit\` instead of \`sim_build\`. Pass the plan object from sim_plan EXACTLY as-is to sim_edit's context.plan field. - ### Working with Existing Workflows When the user refers to a workflow by name or description ("the email one", "my Slack bot"): @@ -645,7 +640,8 @@ async function handleDirectToolCall( startTime: Date.now(), } - const result = await executeToolServerSide(toolCall, execContext) + ensureHandlersRegistered() + const result = await executeTool(toolCall.name, toolCall.params || {}, execContext) return { content: [ @@ -672,7 +668,7 @@ async function handleDirectToolCall( /** * Build mode uses the main chat orchestrator with the 'fast' command instead of - * the subagent endpoint. In Go, 'build' is not a registered subagent — it's a mode + * the subagent endpoint. In Go, 'workflow' is not a registered subagent — it's a mode * (ModeFast) on the main chat processor that bypasses subagent orchestration and * executes all tools directly. */ @@ -728,25 +724,10 @@ async function handleBuildToolCall( chatId, } - const executionId = generateId() - const runId = generateId() - const messageId = requestPayload.messageId as string - - await createRunSegment({ - id: runId, - executionId, - chatId, - userId, - workflowId: resolved.workflowId, - streamId: messageId, - }).catch(() => {}) - - const result = await orchestrateCopilotStream(requestPayload, { + const result = await runCopilotLifecycle(requestPayload, { userId, workflowId: resolved.workflowId, chatId, - executionId, - runId, goRoute: '/api/mcp', autoExecuteTools: true, timeout: ORCHESTRATION_TIMEOUT_MS, @@ -785,7 +766,7 @@ async function handleSubagentToolCall( userId: string, abortSignal?: AbortSignal ): Promise { - if (toolDef.agentId === 'build') { + if (toolDef.agentId === 'workflow') { return handleBuildToolCall(args, userId, abortSignal) } diff --git a/apps/sim/app/api/mothership/chat/route.ts b/apps/sim/app/api/mothership/chat/route.ts index 09dea73a050..d6275e33e74 100644 --- a/apps/sim/app/api/mothership/chat/route.ts +++ b/apps/sim/app/api/mothership/chat/route.ts @@ -5,19 +5,26 @@ import { eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' -import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle' -import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload' +import { resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle' +import { buildCopilotRequestPayload } from '@/lib/copilot/chat/payload' +import { + buildPersistedAssistantMessage, + buildPersistedUserMessage, +} from '@/lib/copilot/chat/persisted-message' +import { + processContextsServer, + resolveActiveResourceContext, +} from '@/lib/copilot/chat/process-contents' +import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context' +import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request/http' +import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/request/lifecycle/start' import { acquirePendingChatStream, - createSSEStream, - SSE_RESPONSE_HEADERS, -} from '@/lib/copilot/chat-streaming' -import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types' -import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents' -import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers' -import { taskPubSub } from '@/lib/copilot/task-events' -import { generateWorkspaceContext } from '@/lib/copilot/workspace-context' -import { generateId } from '@/lib/core/utils/uuid' + getPendingChatStreamId, + releasePendingChatStream, +} from '@/lib/copilot/request/session' +import type { OrchestratorResult } from '@/lib/copilot/request/types' +import { taskPubSub } from '@/lib/copilot/tasks' import { assertActiveWorkspaceAccess, getUserEntityPermissions, @@ -38,7 +45,6 @@ const FileAttachmentSchema = z.object({ const ResourceAttachmentSchema = z.object({ type: z.enum(['workflow', 'table', 'file', 'knowledgebase', 'folder']), id: z.string().min(1), - title: z.string().optional(), active: z.boolean().optional(), }) @@ -90,7 +96,9 @@ const MothershipMessageSchema = z.object({ */ export async function POST(req: NextRequest) { const tracker = createRequestTracker() - let userMessageIdForLogs: string | undefined + let lockChatId: string | undefined + let lockStreamId = '' + let chatStreamLockAcquired = false try { const session = await getSession() @@ -112,28 +120,24 @@ export async function POST(req: NextRequest) { userTimezone, } = MothershipMessageSchema.parse(body) - const userMessageId = providedMessageId || generateId() - userMessageIdForLogs = userMessageId - const reqLogger = logger.withMetadata({ - requestId: tracker.requestId, - messageId: userMessageId, - }) + const userMessageId = providedMessageId || crypto.randomUUID() + lockStreamId = userMessageId - reqLogger.info('Received mothership chat start request', { - workspaceId, - chatId, - createNewChat, - hasContexts: Array.isArray(contexts) && contexts.length > 0, - contextsCount: Array.isArray(contexts) ? contexts.length : 0, - hasResourceAttachments: Array.isArray(resourceAttachments) && resourceAttachments.length > 0, - resourceAttachmentCount: Array.isArray(resourceAttachments) ? resourceAttachments.length : 0, - hasFileAttachments: Array.isArray(fileAttachments) && fileAttachments.length > 0, - fileAttachmentCount: Array.isArray(fileAttachments) ? fileAttachments.length : 0, - }) + // Phase 1: workspace access + chat resolution in parallel + const [accessResult, chatResult] = await Promise.allSettled([ + assertActiveWorkspaceAccess(workspaceId, authenticatedUserId), + chatId || createNewChat + ? resolveOrCreateChat({ + chatId, + userId: authenticatedUserId, + workspaceId, + model: 'claude-opus-4-6', + type: 'mothership', + }) + : null, + ]) - try { - await assertActiveWorkspaceAccess(workspaceId, authenticatedUserId) - } catch { + if (accessResult.status === 'rejected') { return NextResponse.json({ error: 'Workspace not found or access denied' }, { status: 403 }) } @@ -141,18 +145,12 @@ export async function POST(req: NextRequest) { let conversationHistory: any[] = [] let actualChatId = chatId - if (chatId || createNewChat) { - const chatResult = await resolveOrCreateChat({ - chatId, - userId: authenticatedUserId, - workspaceId, - model: 'claude-opus-4-6', - type: 'mothership', - }) - currentChat = chatResult.chat - actualChatId = chatResult.chatId || chatId - conversationHistory = Array.isArray(chatResult.conversationHistory) - ? chatResult.conversationHistory + if (chatResult.status === 'fulfilled' && chatResult.value) { + const resolved = chatResult.value + currentChat = resolved.chat + actualChatId = resolved.chatId || chatId + conversationHistory = Array.isArray(resolved.conversationHistory) + ? resolved.conversationHistory : [] if (chatId && !currentChat) { @@ -160,77 +158,73 @@ export async function POST(req: NextRequest) { } } - let agentContexts: Array<{ type: string; content: string }> = [] - if (Array.isArray(contexts) && contexts.length > 0) { - try { - agentContexts = await processContextsServer( - contexts as any, - authenticatedUserId, - message, - workspaceId, - actualChatId + if (actualChatId) { + chatStreamLockAcquired = await acquirePendingChatStream(actualChatId, userMessageId) + if (!chatStreamLockAcquired) { + const activeStreamId = await getPendingChatStreamId(actualChatId) + return NextResponse.json( + { + error: 'A response is already in progress for this chat.', + ...(activeStreamId ? { activeStreamId } : {}), + }, + { status: 409 } ) - } catch (e) { - reqLogger.error('Failed to process contexts', e) } + lockChatId = actualChatId } - if (Array.isArray(resourceAttachments) && resourceAttachments.length > 0) { - const results = await Promise.allSettled( - resourceAttachments.map(async (r) => { - const ctx = await resolveActiveResourceContext( - r.type, - r.id, - workspaceId, + // Phase 2: contexts + workspace context + user message persistence in parallel + const contextPromise = (async () => { + let agentCtxs: Array<{ type: string; content: string }> = [] + if (Array.isArray(contexts) && contexts.length > 0) { + try { + agentCtxs = await processContextsServer( + contexts as any, authenticatedUserId, + message, + workspaceId, actualChatId ) - if (!ctx) return null - return { - ...ctx, - tag: r.active ? '@active_tab' : '@open_tab', + } catch (e) { + logger.error(`[${tracker.requestId}] Failed to process contexts`, e) + } + } + if (Array.isArray(resourceAttachments) && resourceAttachments.length > 0) { + const results = await Promise.allSettled( + resourceAttachments.map(async (r) => { + const ctx = await resolveActiveResourceContext( + r.type, + r.id, + workspaceId, + authenticatedUserId, + actualChatId + ) + if (!ctx) return null + return { ...ctx, tag: r.active ? '@active_tab' : '@open_tab' } + }) + ) + for (const result of results) { + if (result.status === 'fulfilled' && result.value) { + agentCtxs.push(result.value) + } else if (result.status === 'rejected') { + logger.error( + `[${tracker.requestId}] Failed to resolve resource attachment`, + result.reason + ) } - }) - ) - for (const result of results) { - if (result.status === 'fulfilled' && result.value) { - agentContexts.push(result.value) - } else if (result.status === 'rejected') { - reqLogger.error('Failed to resolve resource attachment', result.reason) } } - } + return agentCtxs + })() - if (actualChatId) { - const userMsg = { + const userMsgPromise = (async () => { + if (!actualChatId) return + const userMsg = buildPersistedUserMessage({ id: userMessageId, - role: 'user' as const, content: message, - timestamp: new Date().toISOString(), - ...(fileAttachments && - fileAttachments.length > 0 && { - fileAttachments: fileAttachments.map((f) => ({ - id: f.id, - key: f.key, - filename: f.filename, - media_type: f.media_type, - size: f.size, - })), - }), - ...(contexts && - contexts.length > 0 && { - contexts: contexts.map((c) => ({ - kind: c.kind, - label: c.label, - ...(c.workflowId && { workflowId: c.workflowId }), - ...(c.knowledgeId && { knowledgeId: c.knowledgeId }), - ...(c.tableId && { tableId: c.tableId }), - ...(c.fileId && { fileId: c.fileId }), - ...(c.folderId && { folderId: c.folderId }), - })), - }), - } - + fileAttachments, + contexts, + }) const [updated] = await db .update(copilotChats) .set({ @@ -246,11 +240,15 @@ export async function POST(req: NextRequest) { conversationHistory = freshMessages.filter((m: any) => m.id !== userMessageId) taskPubSub?.publishStatusChanged({ workspaceId, chatId: actualChatId, type: 'started' }) } - } + })() - const [workspaceContext, userPermission] = await Promise.all([ - generateWorkspaceContext(workspaceId, authenticatedUserId), - getUserEntityPermissions(authenticatedUserId, 'workspace', workspaceId).catch(() => null), + const [agentContexts, [workspaceContext, userPermission]] = await Promise.all([ + contextPromise, + Promise.all([ + generateWorkspaceContext(workspaceId, authenticatedUserId), + getUserEntityPermissions(authenticatedUserId, 'workspace', workspaceId).catch(() => null), + ]), + userMsgPromise, ]) const requestPayload = await buildCopilotRequestPayload( @@ -271,21 +269,8 @@ export async function POST(req: NextRequest) { { selectedModel: '' } ) - if (actualChatId) { - const acquired = await acquirePendingChatStream(actualChatId, userMessageId) - if (!acquired) { - return NextResponse.json( - { - error: - 'A response is already in progress for this chat. Wait for it to finish or use Stop.', - }, - { status: 409 } - ) - } - } - - const executionId = generateId() - const runId = generateId() + const executionId = crypto.randomUUID() + const runId = crypto.randomUUID() const stream = createSSEStream({ requestPayload, userId: authenticatedUserId, @@ -299,7 +284,6 @@ export async function POST(req: NextRequest) { titleModel: 'claude-opus-4-6', requestId: tracker.requestId, workspaceId, - pendingChatStreamAlreadyRegistered: Boolean(actualChatId), orchestrateOptions: { userId: authenticatedUserId, workspaceId, @@ -313,46 +297,7 @@ export async function POST(req: NextRequest) { if (!actualChatId) return if (!result.success) return - const assistantMessage: Record = { - id: generateId(), - role: 'assistant' as const, - content: result.content, - timestamp: new Date().toISOString(), - ...(result.requestId ? { requestId: result.requestId } : {}), - } - if (result.toolCalls.length > 0) { - assistantMessage.toolCalls = result.toolCalls - } - if (result.contentBlocks.length > 0) { - assistantMessage.contentBlocks = result.contentBlocks.map((block) => { - const stored: Record = { type: block.type } - if (block.content) stored.content = block.content - if (block.type === 'tool_call' && block.toolCall) { - const state = - block.toolCall.result?.success !== undefined - ? block.toolCall.result.success - ? 'success' - : 'error' - : block.toolCall.status - const isSubagentTool = !!block.calledBy - const isNonTerminal = - state === 'cancelled' || state === 'pending' || state === 'executing' - stored.toolCall = { - id: block.toolCall.id, - name: block.toolCall.name, - state, - ...(isSubagentTool && isNonTerminal ? {} : { result: block.toolCall.result }), - ...(isSubagentTool && isNonTerminal - ? {} - : block.toolCall.params - ? { params: block.toolCall.params } - : {}), - ...(block.calledBy ? { calledBy: block.calledBy } : {}), - } - } - return stored - }) - } + const assistantMessage = buildPersistedAssistantMessage(result, result.requestId) try { const [row] = await db @@ -385,7 +330,7 @@ export async function POST(req: NextRequest) { }) } } catch (error) { - reqLogger.error('Failed to persist chat messages', { + logger.error(`[${tracker.requestId}] Failed to persist chat messages`, { chatId: actualChatId, error: error instanceof Error ? error.message : 'Unknown error', }) @@ -396,6 +341,9 @@ export async function POST(req: NextRequest) { return new Response(stream, { headers: SSE_RESPONSE_HEADERS }) } catch (error) { + if (chatStreamLockAcquired && lockChatId && lockStreamId) { + await releasePendingChatStream(lockChatId, lockStreamId) + } if (error instanceof z.ZodError) { return NextResponse.json( { error: 'Invalid request data', details: error.errors }, @@ -403,11 +351,9 @@ export async function POST(req: NextRequest) { ) } - logger - .withMetadata({ requestId: tracker.requestId, messageId: userMessageIdForLogs }) - .error('Error handling mothership chat', { - error: error instanceof Error ? error.message : 'Unknown error', - }) + logger.error(`[${tracker.requestId}] Error handling mothership chat:`, { + error: error instanceof Error ? error.message : 'Unknown error', + }) return NextResponse.json( { error: error instanceof Error ? error.message : 'Internal server error' }, diff --git a/apps/sim/app/api/mothership/chat/stop/route.ts b/apps/sim/app/api/mothership/chat/stop/route.ts index 70dc1df7c87..cc5cac32a29 100644 --- a/apps/sim/app/api/mothership/chat/stop/route.ts +++ b/apps/sim/app/api/mothership/chat/stop/route.ts @@ -5,9 +5,9 @@ import { and, eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' -import { releasePendingChatStream } from '@/lib/copilot/chat-streaming' -import { taskPubSub } from '@/lib/copilot/task-events' -import { generateId } from '@/lib/core/utils/uuid' +import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import { releasePendingChatStream } from '@/lib/copilot/request/session' +import { taskPubSub } from '@/lib/copilot/tasks' const logger = createLogger('MothershipChatStopAPI') @@ -27,15 +27,25 @@ const StoredToolCallSchema = z display: z .object({ text: z.string().optional(), + title: z.string().optional(), + phaseLabel: z.string().optional(), }) .optional(), calledBy: z.string().optional(), + durationMs: z.number().optional(), + error: z.string().optional(), }) .nullable() const ContentBlockSchema = z.object({ type: z.string(), + lane: z.enum(['main', 'subagent']).optional(), content: z.string().optional(), + channel: z.enum(['assistant', 'thinking']).optional(), + phase: z.enum(['call', 'args_delta', 'result']).optional(), + kind: z.enum(['subagent', 'structured_result', 'subagent_result']).optional(), + lifecycle: z.enum(['start', 'end']).optional(), + status: z.enum(['complete', 'error', 'cancelled']).optional(), toolCall: StoredToolCallSchema.optional(), }) @@ -71,15 +81,14 @@ export async function POST(req: NextRequest) { const hasBlocks = Array.isArray(contentBlocks) && contentBlocks.length > 0 if (hasContent || hasBlocks) { - const assistantMessage: Record = { - id: generateId(), - role: 'assistant' as const, + const normalized = normalizeMessage({ + id: crypto.randomUUID(), + role: 'assistant', content, timestamp: new Date().toISOString(), - } - if (hasBlocks) { - assistantMessage.contentBlocks = contentBlocks - } + ...(hasBlocks ? { contentBlocks } : {}), + }) + const assistantMessage: PersistedMessage = normalized setClause.messages = sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb` } diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts index 3101e681589..5800507f9f1 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts @@ -4,15 +4,15 @@ import { createLogger } from '@sim/logger' import { and, eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' -import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle' -import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer' +import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' import { authenticateCopilotRequestSessionOnly, createBadRequestResponse, createInternalServerErrorResponse, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' -import { taskPubSub } from '@/lib/copilot/task-events' +} from '@/lib/copilot/request/http' +import { readEvents } from '@/lib/copilot/request/session/buffer' +import { taskPubSub } from '@/lib/copilot/tasks' import { captureServerEvent } from '@/lib/posthog/server' const logger = createLogger('MothershipChatAPI') @@ -47,29 +47,24 @@ export async function GET( } let streamSnapshot: { - events: Array<{ eventId: number; streamId: string; event: Record }> + events: unknown[] status: string } | null = null if (chat.conversationId) { try { - const [meta, events] = await Promise.all([ - getStreamMeta(chat.conversationId), - readStreamEvents(chat.conversationId, 0), - ]) + const events = await readEvents(chat.conversationId, '0') streamSnapshot = { events: events || [], - status: meta?.status || 'unknown', + status: events.length > 0 ? 'active' : 'unknown', } } catch (error) { - logger - .withMetadata({ messageId: chat.conversationId || undefined }) - .warn('Failed to read stream snapshot for mothership chat', { - chatId, - conversationId: chat.conversationId, - error: error instanceof Error ? error.message : String(error), - }) + logger.warn('Failed to read stream snapshot for mothership chat', { + chatId, + conversationId: chat.conversationId, + error: error instanceof Error ? error.message : String(error), + }) } } diff --git a/apps/sim/app/api/mothership/chats/read/route.ts b/apps/sim/app/api/mothership/chats/read/route.ts new file mode 100644 index 00000000000..344687ddfdc --- /dev/null +++ b/apps/sim/app/api/mothership/chats/read/route.ts @@ -0,0 +1,43 @@ +import { db } from '@sim/db' +import { copilotChats } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, sql } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { + authenticateCopilotRequestSessionOnly, + createBadRequestResponse, + createInternalServerErrorResponse, + createUnauthorizedResponse, +} from '@/lib/copilot/request/http' + +const logger = createLogger('MarkTaskReadAPI') + +const MarkReadSchema = z.object({ + chatId: z.string().min(1), +}) + +export async function POST(request: NextRequest) { + try { + const { userId, isAuthenticated } = await authenticateCopilotRequestSessionOnly() + if (!isAuthenticated || !userId) { + return createUnauthorizedResponse() + } + + const body = await request.json() + const { chatId } = MarkReadSchema.parse(body) + + await db + .update(copilotChats) + .set({ lastSeenAt: sql`GREATEST(${copilotChats.updatedAt}, NOW())` }) + .where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId))) + + return NextResponse.json({ success: true }) + } catch (error) { + if (error instanceof z.ZodError) { + return createBadRequestResponse('chatId is required') + } + logger.error('Error marking task as read:', error) + return createInternalServerErrorResponse('Failed to mark task as read') + } +} diff --git a/apps/sim/app/api/mothership/chats/route.ts b/apps/sim/app/api/mothership/chats/route.ts index bc694d1d9fe..99bd6fd7390 100644 --- a/apps/sim/app/api/mothership/chats/route.ts +++ b/apps/sim/app/api/mothership/chats/route.ts @@ -9,8 +9,8 @@ import { createBadRequestResponse, createInternalServerErrorResponse, createUnauthorizedResponse, -} from '@/lib/copilot/request-helpers' -import { taskPubSub } from '@/lib/copilot/task-events' +} from '@/lib/copilot/request/http' +import { taskPubSub } from '@/lib/copilot/tasks' import { captureServerEvent } from '@/lib/posthog/server' import { assertActiveWorkspaceAccess } from '@/lib/workspaces/permissions/utils' @@ -39,7 +39,7 @@ export async function GET(request: NextRequest) { id: copilotChats.id, title: copilotChats.title, updatedAt: copilotChats.updatedAt, - conversationId: copilotChats.conversationId, + activeStreamId: copilotChats.conversationId, lastSeenAt: copilotChats.lastSeenAt, }) .from(copilotChats) diff --git a/apps/sim/app/api/mothership/events/route.ts b/apps/sim/app/api/mothership/events/route.ts index 38abba7b33f..4f1646f6e34 100644 --- a/apps/sim/app/api/mothership/events/route.ts +++ b/apps/sim/app/api/mothership/events/route.ts @@ -7,7 +7,7 @@ * Auth is handled via session cookies (EventSource sends cookies automatically). */ -import { taskPubSub } from '@/lib/copilot/task-events' +import { taskPubSub } from '@/lib/copilot/tasks' import { createWorkspaceSSE } from '@/lib/events/sse-endpoint' export const dynamic = 'force-dynamic' diff --git a/apps/sim/app/api/mothership/execute/route.ts b/apps/sim/app/api/mothership/execute/route.ts index 619e0135726..06654a61b82 100644 --- a/apps/sim/app/api/mothership/execute/route.ts +++ b/apps/sim/app/api/mothership/execute/route.ts @@ -2,10 +2,9 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { checkInternalAuth } from '@/lib/auth/hybrid' -import { createRunSegment } from '@/lib/copilot/async-runs/repository' -import { buildIntegrationToolSchemas } from '@/lib/copilot/chat-payload' -import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' -import { generateWorkspaceContext } from '@/lib/copilot/workspace-context' +import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload' +import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context' +import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run' import { generateId } from '@/lib/core/utils/uuid' import { assertActiveWorkspaceAccess, @@ -73,34 +72,25 @@ export async function POST(req: NextRequest) { ...(userPermission ? { userPermission } : {}), } - const executionId = generateId() - const runId = generateId() - - await createRunSegment({ - id: runId, - executionId, - chatId: effectiveChatId, - userId, - workspaceId, - streamId: messageId, - }).catch(() => {}) - - const result = await orchestrateCopilotStream(requestPayload, { + const result = await runCopilotLifecycle(requestPayload, { userId, workspaceId, chatId: effectiveChatId, - executionId, - runId, goRoute: '/api/mothership/execute', autoExecuteTools: true, interactive: false, }) if (!result.success) { - reqLogger.error('Mothership execute failed', { - error: result.error, - errors: result.errors, - }) + logger.error( + messageId + ? `Mothership execute failed [messageId:${messageId}]` + : 'Mothership execute failed', + { + error: result.error, + errors: result.errors, + } + ) return NextResponse.json( { error: result.error || 'Mothership execution failed', @@ -136,9 +126,12 @@ export async function POST(req: NextRequest) { ) } - logger.withMetadata({ messageId }).error('Mothership execute error', { - error: error instanceof Error ? error.message : 'Unknown error', - }) + logger.error( + messageId ? `Mothership execute error [messageId:${messageId}]` : 'Mothership execute error', + { + error: error instanceof Error ? error.message : 'Unknown error', + } + ) return NextResponse.json( { error: error instanceof Error ? error.message : 'Internal server error' }, diff --git a/apps/sim/app/api/templates/approved/sanitized/route.ts b/apps/sim/app/api/templates/approved/sanitized/route.ts index 2b6fad9652a..dd72ef80464 100644 --- a/apps/sim/app/api/templates/approved/sanitized/route.ts +++ b/apps/sim/app/api/templates/approved/sanitized/route.ts @@ -3,7 +3,7 @@ import { templates } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' -import { checkInternalApiKey } from '@/lib/copilot/utils' +import { checkInternalApiKey } from '@/lib/copilot/request/http' import { generateRequestId } from '@/lib/core/utils/request' import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer' diff --git a/apps/sim/app/api/v1/copilot/chat/route.ts b/apps/sim/app/api/v1/copilot/chat/route.ts index 52be435d681..5c366c91ff7 100644 --- a/apps/sim/app/api/v1/copilot/chat/route.ts +++ b/apps/sim/app/api/v1/copilot/chat/route.ts @@ -1,9 +1,8 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' -import { createRunSegment } from '@/lib/copilot/async-runs/repository' -import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models' -import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' +import { COPILOT_REQUEST_MODES } from '@/lib/copilot/constants' +import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run' import { generateId } from '@/lib/core/utils/uuid' import { getWorkflowById, resolveWorkflowIdForUser } from '@/lib/workflows/utils' import { authenticateV1Request } from '@/app/api/v1/auth' @@ -83,16 +82,20 @@ export async function POST(req: NextRequest) { // Always generate a chatId - required for artifacts system to work with subagents const chatId = parsed.chatId || generateId() - messageId = generateId() - const reqLogger = logger.withMetadata({ messageId }) - reqLogger.info('Received headless copilot chat start request', { - workflowId: resolved.workflowId, - workflowName: parsed.workflowName, - chatId, - mode: transportMode, - autoExecuteTools: parsed.autoExecuteTools, - timeout: parsed.timeout, - }) + messageId = crypto.randomUUID() + logger.info( + messageId + ? `Received headless copilot chat start request [messageId:${messageId}]` + : 'Received headless copilot chat start request', + { + workflowId: resolved.workflowId, + workflowName: parsed.workflowName, + chatId, + mode: transportMode, + autoExecuteTools: parsed.autoExecuteTools, + timeout: parsed.timeout, + } + ) const requestPayload = { message: parsed.message, workflowId: resolved.workflowId, @@ -103,24 +106,10 @@ export async function POST(req: NextRequest) { chatId, } - const executionId = generateId() - const runId = generateId() - - await createRunSegment({ - id: runId, - executionId, - chatId, - userId: auth.userId, - workflowId: resolved.workflowId, - streamId: messageId, - }).catch(() => {}) - - const result = await orchestrateCopilotStream(requestPayload, { + const result = await runCopilotLifecycle(requestPayload, { userId: auth.userId, workflowId: resolved.workflowId, chatId, - executionId, - runId, goRoute: '/api/mcp', autoExecuteTools: parsed.autoExecuteTools, timeout: parsed.timeout, @@ -142,9 +131,14 @@ export async function POST(req: NextRequest) { ) } - logger.withMetadata({ messageId }).error('Headless copilot request failed', { - error: error instanceof Error ? error.message : String(error), - }) + logger.error( + messageId + ? `Headless copilot request failed [messageId:${messageId}]` + : 'Headless copilot request failed', + { + error: error instanceof Error ? error.message : String(error), + } + ) return NextResponse.json({ success: false, error: 'Internal server error' }, { status: 500 }) } } diff --git a/apps/sim/app/api/workspaces/[id]/pptx/preview/route.ts b/apps/sim/app/api/workspaces/[id]/pptx/preview/route.ts index 65543049416..6de747246d8 100644 --- a/apps/sim/app/api/workspaces/[id]/pptx/preview/route.ts +++ b/apps/sim/app/api/workspaces/[id]/pptx/preview/route.ts @@ -1,7 +1,7 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { getSession } from '@/lib/auth' -import { generatePptxFromCode } from '@/lib/execution/pptx-vm' +import { generatePptxFromCode } from '@/lib/execution/doc-vm' import { verifyWorkspaceMembership } from '@/app/api/workflows/utils' export const dynamic = 'force-dynamic' diff --git a/apps/sim/app/workspace/[workspaceId]/files/[fileId]/view/file-viewer.tsx b/apps/sim/app/workspace/[workspaceId]/files/[fileId]/view/file-viewer.tsx index a450bd374da..cbed424d13b 100644 --- a/apps/sim/app/workspace/[workspaceId]/files/[fileId]/view/file-viewer.tsx +++ b/apps/sim/app/workspace/[workspaceId]/files/[fileId]/view/file-viewer.tsx @@ -17,7 +17,7 @@ export function FileViewer() { return null } - const serveUrl = `/api/files/serve/${encodeURIComponent(file.key)}?context=workspace` + const serveUrl = `/api/files/serve/${encodeURIComponent(file.key)}?context=workspace&t=${file.size}` return (
diff --git a/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/file-viewer.tsx b/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/file-viewer.tsx index f62caa1f51c..3235931ec74 100644 --- a/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/file-viewer.tsx +++ b/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/file-viewer.tsx @@ -57,7 +57,7 @@ const TEXT_EDITABLE_EXTENSIONS = new Set([ ...SUPPORTED_CODE_EXTENSIONS, ]) -const IFRAME_PREVIEWABLE_MIME_TYPES = new Set(['application/pdf']) +const IFRAME_PREVIEWABLE_MIME_TYPES = new Set(['application/pdf', 'text/x-pdflibjs']) const IFRAME_PREVIEWABLE_EXTENSIONS = new Set(['pdf']) const IMAGE_PREVIEWABLE_MIME_TYPES = new Set(['image/png', 'image/jpeg', 'image/gif', 'image/webp']) @@ -65,11 +65,13 @@ const IMAGE_PREVIEWABLE_EXTENSIONS = new Set(['png', 'jpg', 'jpeg', 'gif', 'webp const PPTX_PREVIEWABLE_MIME_TYPES = new Set([ 'application/vnd.openxmlformats-officedocument.presentationml.presentation', + 'text/x-pptxgenjs', ]) const PPTX_PREVIEWABLE_EXTENSIONS = new Set(['pptx']) const DOCX_PREVIEWABLE_MIME_TYPES = new Set([ 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + 'text/x-docxjs', ]) const DOCX_PREVIEWABLE_EXTENSIONS = new Set(['docx']) @@ -91,8 +93,8 @@ function resolveFileCategory(mimeType: string | null, filename: string): FileCat if (mimeType && TEXT_EDITABLE_MIME_TYPES.has(mimeType)) return 'text-editable' if (mimeType && IFRAME_PREVIEWABLE_MIME_TYPES.has(mimeType)) return 'iframe-previewable' if (mimeType && IMAGE_PREVIEWABLE_MIME_TYPES.has(mimeType)) return 'image-previewable' - if (mimeType && PPTX_PREVIEWABLE_MIME_TYPES.has(mimeType)) return 'pptx-previewable' if (mimeType && DOCX_PREVIEWABLE_MIME_TYPES.has(mimeType)) return 'docx-previewable' + if (mimeType && PPTX_PREVIEWABLE_MIME_TYPES.has(mimeType)) return 'pptx-previewable' if (mimeType && XLSX_PREVIEWABLE_MIME_TYPES.has(mimeType)) return 'xlsx-previewable' const ext = getFileExtension(filename) @@ -100,8 +102,8 @@ function resolveFileCategory(mimeType: string | null, filename: string): FileCat if (TEXT_EDITABLE_EXTENSIONS.has(nameKey)) return 'text-editable' if (IFRAME_PREVIEWABLE_EXTENSIONS.has(ext)) return 'iframe-previewable' if (IMAGE_PREVIEWABLE_EXTENSIONS.has(ext)) return 'image-previewable' - if (PPTX_PREVIEWABLE_EXTENSIONS.has(ext)) return 'pptx-previewable' if (DOCX_PREVIEWABLE_EXTENSIONS.has(ext)) return 'docx-previewable' + if (PPTX_PREVIEWABLE_EXTENSIONS.has(ext)) return 'pptx-previewable' if (XLSX_PREVIEWABLE_EXTENSIONS.has(ext)) return 'xlsx-previewable' return 'unsupported' @@ -128,6 +130,7 @@ interface FileViewerProps { onSaveStatusChange?: (status: 'idle' | 'saving' | 'saved' | 'error') => void saveRef?: React.MutableRefObject<(() => Promise) | null> streamingContent?: string + streamingMode?: 'append' | 'replace' } export function FileViewer({ @@ -141,6 +144,7 @@ export function FileViewer({ onSaveStatusChange, saveRef, streamingContent, + streamingMode, }: FileViewerProps) { const category = resolveFileCategory(file.type, file.name) @@ -156,26 +160,27 @@ export function FileViewer({ onSaveStatusChange={onSaveStatusChange} saveRef={saveRef} streamingContent={streamingContent} + streamingMode={streamingMode} /> ) } if (category === 'iframe-previewable') { - return + return } if (category === 'image-previewable') { return } - if (category === 'pptx-previewable') { - return - } - if (category === 'docx-previewable') { return } + if (category === 'pptx-previewable') { + return + } + if (category === 'xlsx-previewable') { return } @@ -193,6 +198,7 @@ interface TextEditorProps { onSaveStatusChange?: (status: 'idle' | 'saving' | 'saved' | 'error') => void saveRef?: React.MutableRefObject<(() => Promise) | null> streamingContent?: string + streamingMode?: 'append' | 'replace' } function TextEditor({ @@ -205,6 +211,7 @@ function TextEditor({ onSaveStatusChange, saveRef, streamingContent, + streamingMode = 'append', }: TextEditorProps) { const initializedRef = useRef(false) const contentRef = useRef('') @@ -219,7 +226,14 @@ function TextEditor({ isLoading, error, dataUpdatedAt, - } = useWorkspaceFileContent(workspaceId, file.id, file.key, file.type === 'text/x-pptxgenjs') + } = useWorkspaceFileContent( + workspaceId, + file.id, + file.key, + file.type === 'text/x-pptxgenjs' || + file.type === 'text/x-docxjs' || + file.type === 'text/x-pdflibjs' + ) const updateContent = useUpdateWorkspaceFileContent() const updateContentRef = useRef(updateContent) @@ -228,15 +242,49 @@ function TextEditor({ const [content, setContent] = useState('') const [savedContent, setSavedContent] = useState('') const savedContentRef = useRef('') + const wasStreamingRef = useRef(false) + const pendingStreamReconcileRef = useRef(false) + const lastStreamedContentRef = useRef(null) useEffect(() => { if (streamingContent !== undefined) { - setContent(streamingContent) - contentRef.current = streamingContent + wasStreamingRef.current = true + const nextContent = + streamingMode === 'replace' || fetchedContent === undefined + ? streamingContent + : fetchedContent.endsWith(streamingContent) || + fetchedContent.endsWith(`\n${streamingContent}`) + ? fetchedContent + : `${fetchedContent}\n${streamingContent}` + pendingStreamReconcileRef.current = true + lastStreamedContentRef.current = nextContent + setContent(nextContent) + contentRef.current = nextContent initializedRef.current = true return } + if (wasStreamingRef.current && pendingStreamReconcileRef.current) { + const lastStreamed = lastStreamedContentRef.current + const hasFetchedAdvanced = + fetchedContent !== undefined && fetchedContent !== savedContentRef.current + const fetchedMatchesLastStream = + fetchedContent !== undefined && lastStreamed !== null && fetchedContent === lastStreamed + + if (hasFetchedAdvanced || fetchedMatchesLastStream) { + pendingStreamReconcileRef.current = false + wasStreamingRef.current = false + lastStreamedContentRef.current = null + if (lastStreamed !== null && contentRef.current === lastStreamed) { + setContent(fetchedContent) + contentRef.current = fetchedContent + } + setSavedContent(fetchedContent) + savedContentRef.current = fetchedContent + return + } + } + if (fetchedContent === undefined) return if (!initializedRef.current) { @@ -260,7 +308,7 @@ function TextEditor({ savedContentRef.current = fetchedContent contentRef.current = fetchedContent } - }, [streamingContent, fetchedContent, dataUpdatedAt, autoFocus]) + }, [streamingContent, fetchedContent, streamingMode, dataUpdatedAt, autoFocus]) const handleContentChange = useCallback((value: string) => { setContent(value) @@ -340,7 +388,8 @@ function TextEditor({ ) const isStreaming = streamingContent !== undefined - const revealedContent = useStreamingText(content, isStreaming) + const shouldAnimateStreaming = isStreaming && streamingMode === 'append' + const revealedContent = useStreamingText(content, shouldAnimateStreaming) const textareaStuckRef = useRef(true) @@ -445,13 +494,36 @@ function TextEditor({ ) } -const IframePreview = memo(function IframePreview({ file }: { file: WorkspaceFileRecord }) { - const serveUrl = `/api/files/serve/${encodeURIComponent(file.key)}?context=workspace` +const IframePreview = memo(function IframePreview({ + file, + workspaceId, +}: { + file: WorkspaceFileRecord + workspaceId: string +}) { + const { data: fileData, isLoading } = useWorkspaceFileBinary(workspaceId, file.id, file.key) + const [blobUrl, setBlobUrl] = useState(null) + + useEffect(() => { + if (!fileData) return + const blob = new Blob([fileData], { type: 'application/pdf' }) + const url = URL.createObjectURL(blob) + setBlobUrl(url) + return () => URL.revokeObjectURL(url) + }, [fileData]) + + if (isLoading || !blobUrl) { + return ( +
+ +
+ ) + } return (