From 264095633514e45c361a3071840d1ede18402168 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 15 Apr 2026 16:08:15 -0700 Subject: [PATCH 1/4] fix(mothership): chat streaming structure --- apps/sim/app/api/copilot/chat/queries.ts | 14 + .../app/api/copilot/chat/stop/route.test.ts | 160 +++++ apps/sim/app/api/copilot/chat/stop/route.ts | 13 +- .../api/mothership/chats/[chatId]/route.ts | 15 +- .../[workspaceId]/home/hooks/use-chat.ts | 647 ++++++++++++++---- apps/sim/hooks/use-task-events.test.ts | 19 +- apps/sim/hooks/use-task-events.ts | 13 +- .../copilot/chat/effective-transcript.test.ts | 260 +++++++ .../lib/copilot/chat/effective-transcript.ts | 412 +++++++++++ .../lib/copilot/chat/stream-tool-outcome.ts | 46 ++ 10 files changed, 1442 insertions(+), 157 deletions(-) create mode 100644 apps/sim/app/api/copilot/chat/stop/route.test.ts create mode 100644 apps/sim/lib/copilot/chat/effective-transcript.test.ts create mode 100644 apps/sim/lib/copilot/chat/effective-transcript.ts create mode 100644 apps/sim/lib/copilot/chat/stream-tool-outcome.ts diff --git a/apps/sim/app/api/copilot/chat/queries.ts b/apps/sim/app/api/copilot/chat/queries.ts index 4828a15aa7..9977f5419b 100644 --- a/apps/sim/app/api/copilot/chat/queries.ts +++ b/apps/sim/app/api/copilot/chat/queries.ts @@ -4,7 +4,9 @@ import { createLogger } from '@sim/logger' import { and, desc, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository' +import { buildEffectiveChatTranscript } from '@/lib/copilot/chat/effective-transcript' import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' +import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' import { authenticateCopilotRequestSessionOnly, createBadRequestResponse, @@ -113,11 +115,23 @@ export async function GET(req: NextRequest) { } } + const normalizedMessages = Array.isArray(chat.messages) + ? chat.messages + .filter((message): message is Record => Boolean(message)) + .map(normalizeMessage) + : [] + const effectiveMessages = buildEffectiveChatTranscript({ + messages: normalizedMessages, + activeStreamId: chat.conversationId || null, + ...(streamSnapshot ? { streamSnapshot } : {}), + }) + logger.info(`Retrieved chat ${chatId}`) return NextResponse.json({ success: true, chat: { ...transformChat(chat), + messages: effectiveMessages, ...(streamSnapshot ? { streamSnapshot } : {}), }, }) diff --git a/apps/sim/app/api/copilot/chat/stop/route.test.ts b/apps/sim/app/api/copilot/chat/stop/route.test.ts new file mode 100644 index 0000000000..a624817bab --- /dev/null +++ b/apps/sim/app/api/copilot/chat/stop/route.test.ts @@ -0,0 +1,160 @@ +/** + * @vitest-environment node + */ +import { NextRequest } from 'next/server' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { + mockGetSession, + mockSelect, + mockFrom, + mockWhereSelect, + mockLimit, + mockUpdate, + mockSet, + mockWhereUpdate, + mockReturning, + mockPublishStatusChanged, + mockSql, +} = vi.hoisted(() => ({ + mockGetSession: vi.fn(), + mockSelect: vi.fn(), + mockFrom: vi.fn(), + mockWhereSelect: vi.fn(), + mockLimit: vi.fn(), + mockUpdate: vi.fn(), + mockSet: vi.fn(), + mockWhereUpdate: vi.fn(), + mockReturning: vi.fn(), + mockPublishStatusChanged: vi.fn(), + mockSql: vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ strings, values })), +})) + +vi.mock('@/lib/auth', () => ({ + getSession: mockGetSession, +})) + +vi.mock('@sim/db', () => ({ + db: { + select: mockSelect, + update: mockUpdate, + }, +})) + +vi.mock('@sim/db/schema', () => ({ + copilotChats: { + id: 'id', + userId: 'userId', + workspaceId: 'workspaceId', + messages: 'messages', + conversationId: 'conversationId', + }, +})) + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...conditions: unknown[]) => ({ conditions, type: 'and' })), + eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })), + sql: mockSql, +})) + +vi.mock('@/lib/copilot/tasks', () => ({ + taskPubSub: { + publishStatusChanged: mockPublishStatusChanged, + }, +})) + +import { POST } from './route' + +function createRequest(body: Record) { + return new NextRequest('http://localhost:3000/api/copilot/chat/stop', { + method: 'POST', + body: JSON.stringify(body), + headers: { 'Content-Type': 'application/json' }, + }) +} + +describe('copilot chat stop route', () => { + beforeEach(() => { + vi.clearAllMocks() + + mockGetSession.mockResolvedValue({ user: { id: 'user-1' } }) + + mockLimit.mockResolvedValue([ + { + workspaceId: 'ws-1', + messages: [{ id: 'stream-1', role: 'user', content: 'hello' }], + }, + ]) + mockWhereSelect.mockReturnValue({ limit: mockLimit }) + mockFrom.mockReturnValue({ where: mockWhereSelect }) + mockSelect.mockReturnValue({ from: mockFrom }) + + mockReturning.mockResolvedValue([{ workspaceId: 'ws-1' }]) + mockWhereUpdate.mockReturnValue({ returning: mockReturning }) + mockSet.mockReturnValue({ where: mockWhereUpdate }) + mockUpdate.mockReturnValue({ set: mockSet }) + }) + + it('returns 401 when unauthenticated', async () => { + mockGetSession.mockResolvedValueOnce(null) + + const response = await POST( + createRequest({ + chatId: 'chat-1', + streamId: 'stream-1', + content: '', + }) + ) + + expect(response.status).toBe(401) + expect(await response.json()).toEqual({ error: 'Unauthorized' }) + }) + + it('is a no-op when the chat is missing', async () => { + mockLimit.mockResolvedValueOnce([]) + + const response = await POST( + createRequest({ + chatId: 'missing-chat', + streamId: 'stream-1', + content: '', + }) + ) + + expect(response.status).toBe(200) + expect(await response.json()).toEqual({ success: true }) + expect(mockUpdate).not.toHaveBeenCalled() + }) + + it('appends a stopped assistant message even with no content', async () => { + const response = await POST( + createRequest({ + chatId: 'chat-1', + streamId: 'stream-1', + content: '', + }) + ) + + expect(response.status).toBe(200) + expect(await response.json()).toEqual({ success: true }) + + const setArg = mockSet.mock.calls[0]?.[0] + expect(setArg).toBeTruthy() + expect(setArg.conversationId).toBeNull() + expect(setArg.messages).toBeTruthy() + + const appendedPayload = JSON.parse(setArg.messages.values[1] as string) + expect(appendedPayload).toHaveLength(1) + expect(appendedPayload[0]).toMatchObject({ + role: 'assistant', + content: '', + contentBlocks: [{ type: 'complete', status: 'cancelled' }], + }) + + expect(mockPublishStatusChanged).toHaveBeenCalledWith({ + workspaceId: 'ws-1', + chatId: 'chat-1', + type: 'completed', + }) + }) +}) diff --git a/apps/sim/app/api/copilot/chat/stop/route.ts b/apps/sim/app/api/copilot/chat/stop/route.ts index 8a742d7080..b610736dc4 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.ts @@ -70,7 +70,6 @@ export async function POST(req: NextRequest) { } const { chatId, streamId, content, contentBlocks } = StopSchema.parse(await req.json()) - const [row] = await db .select({ workspaceId: copilotChats.workspaceId, @@ -106,14 +105,20 @@ export async function POST(req: NextRequest) { const hasContent = content.trim().length > 0 const hasBlocks = Array.isArray(contentBlocks) && contentBlocks.length > 0 - - if ((hasContent || hasBlocks) && canAppendAssistant) { + const synthesizedStoppedBlocks = hasBlocks + ? contentBlocks + : hasContent + ? [{ type: 'text', channel: 'assistant', content }, { type: 'stopped' }] + : [{ type: 'stopped' }] + const shouldAppendAssistant = canAppendAssistant + + if (shouldAppendAssistant) { const normalized = normalizeMessage({ id: crypto.randomUUID(), role: 'assistant', content, timestamp: new Date().toISOString(), - ...(hasBlocks ? { contentBlocks } : {}), + contentBlocks: synthesizedStoppedBlocks, }) const assistantMessage: PersistedMessage = normalized setClause.messages = sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb` diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts index e5fc73f301..cf94fdda83 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts @@ -5,7 +5,9 @@ import { and, eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository' +import { buildEffectiveChatTranscript } from '@/lib/copilot/chat/effective-transcript' import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' +import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' import { authenticateCopilotRequestSessionOnly, createBadRequestResponse, @@ -93,12 +95,23 @@ export async function GET( } } + const normalizedMessages = Array.isArray(chat.messages) + ? chat.messages + .filter((message): message is Record => Boolean(message)) + .map(normalizeMessage) + : [] + const effectiveMessages = buildEffectiveChatTranscript({ + messages: normalizedMessages, + activeStreamId: chat.conversationId || null, + ...(streamSnapshot ? { streamSnapshot } : {}), + }) + return NextResponse.json({ success: true, chat: { id: chat.id, title: chat.title, - messages: Array.isArray(chat.messages) ? chat.messages : [], + messages: effectiveMessages, conversationId: chat.conversationId || null, resources: Array.isArray(chat.resources) ? chat.resources : [], createdAt: chat.createdAt, diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 4b7b9ff5d0..3a8e12ce7e 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -3,16 +3,20 @@ import { createLogger } from '@sim/logger' import { useQueryClient } from '@tanstack/react-query' import { usePathname, useRouter } from 'next/navigation' import { toDisplayMessage } from '@/lib/copilot/chat/display-message' +import { getLiveAssistantMessageId } from '@/lib/copilot/chat/effective-transcript' import type { PersistedFileAttachment, PersistedMessage, } from '@/lib/copilot/chat/persisted-message' -import { MOTHERSHIP_CHAT_API_PATH } from '@/lib/copilot/constants' +import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' +import { resolveStreamToolOutcome } from '@/lib/copilot/chat/stream-tool-outcome' +import { MOTHERSHIP_CHAT_API_PATH, STREAM_STORAGE_KEY } from '@/lib/copilot/constants' import type { MothershipStreamV1ErrorPayload, MothershipStreamV1ToolUI, } from '@/lib/copilot/generated/mothership-stream-v1' import { + MothershipStreamV1CompletionStatus, MothershipStreamV1EventType, MothershipStreamV1ResourceOp, MothershipStreamV1RunKind, @@ -172,6 +176,8 @@ const RECONNECT_TAIL_ERROR = const MAX_RECONNECT_ATTEMPTS = 10 const RECONNECT_BASE_DELAY_MS = 1000 const RECONNECT_MAX_DELAY_MS = 30_000 +const QUEUED_SEND_HANDOFF_STORAGE_KEY = `${STREAM_STORAGE_KEY}:queued-send-handoff` +const QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY = `${STREAM_STORAGE_KEY}:queued-send-handoff-claim` const logger = createLogger('useChat') @@ -188,6 +194,100 @@ type ActiveTurn = { optimisticAssistantMessage: ChatMessage } +interface QueuedSendHandoffState { + id: string + chatId: string + workspaceId: string + supersededStreamId: string | null + userMessageId: string + message: string + fileAttachments?: FileAttachmentForApi[] + contexts?: ChatContext[] + requestedAt: number +} + +interface QueuedSendHandoffSeed { + id: string + chatId: string + supersededStreamId: string | null + userMessageId?: string +} + +function readQueuedSendHandoffState(): QueuedSendHandoffState | null { + if (typeof window === 'undefined') return null + + try { + const raw = window.sessionStorage.getItem(QUEUED_SEND_HANDOFF_STORAGE_KEY) + if (!raw) return null + + const parsed = JSON.parse(raw) as Partial + if ( + typeof parsed?.id !== 'string' || + typeof parsed.chatId !== 'string' || + typeof parsed.workspaceId !== 'string' || + typeof parsed.userMessageId !== 'string' || + typeof parsed.message !== 'string' || + typeof parsed.requestedAt !== 'number' + ) { + return null + } + + return { + id: parsed.id, + chatId: parsed.chatId, + workspaceId: parsed.workspaceId, + supersededStreamId: + typeof parsed.supersededStreamId === 'string' ? parsed.supersededStreamId : null, + userMessageId: parsed.userMessageId, + message: parsed.message, + ...(Array.isArray(parsed.fileAttachments) + ? { fileAttachments: parsed.fileAttachments as FileAttachmentForApi[] } + : {}), + ...(Array.isArray(parsed.contexts) ? { contexts: parsed.contexts as ChatContext[] } : {}), + requestedAt: parsed.requestedAt, + } + } catch { + return null + } +} + +function writeQueuedSendHandoffState(state: QueuedSendHandoffState) { + if (typeof window === 'undefined') return + window.sessionStorage.setItem(QUEUED_SEND_HANDOFF_STORAGE_KEY, JSON.stringify(state)) +} + +function clearQueuedSendHandoffState(expectedId?: string) { + if (typeof window === 'undefined') return + if (expectedId) { + const current = readQueuedSendHandoffState() + if (current && current.id !== expectedId) { + return + } + } + window.sessionStorage.removeItem(QUEUED_SEND_HANDOFF_STORAGE_KEY) +} + +function readQueuedSendHandoffClaim(): string | null { + if (typeof window === 'undefined') return null + return window.sessionStorage.getItem(QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY) +} + +function writeQueuedSendHandoffClaim(id: string) { + if (typeof window === 'undefined') return + window.sessionStorage.setItem(QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY, id) +} + +function clearQueuedSendHandoffClaim(expectedId?: string) { + if (typeof window === 'undefined') return + if (expectedId) { + const current = readQueuedSendHandoffClaim() + if (current && current !== expectedId) { + return + } + } + window.sessionStorage.removeItem(QUEUED_SEND_HANDOFF_CLAIM_STORAGE_KEY) +} + function stringParam(value: unknown): string | undefined { return typeof value === 'string' && value.trim() ? value.trim() : undefined } @@ -594,6 +694,122 @@ function parseStreamBatchResponse(value: unknown): StreamBatchResponse { } } +function toRawPersistedContentBlock(block: ContentBlock): Record | null { + switch (block.type) { + case 'text': + return { + type: MothershipStreamV1EventType.text, + ...(block.subagent ? { lane: 'subagent' } : {}), + content: block.content ?? '', + } + case 'tool_call': + if (!block.toolCall) { + return null + } + return { + type: MothershipStreamV1EventType.tool, + phase: MothershipStreamV1ToolPhase.call, + toolCall: { + id: block.toolCall.id, + name: block.toolCall.name, + state: block.toolCall.status, + ...(block.toolCall.params ? { params: block.toolCall.params } : {}), + ...(block.toolCall.result ? { result: block.toolCall.result } : {}), + ...(block.toolCall.calledBy ? { calledBy: block.toolCall.calledBy } : {}), + ...(block.toolCall.displayTitle + ? { + display: { + title: block.toolCall.displayTitle, + }, + } + : {}), + }, + } + case 'subagent': + return { + type: MothershipStreamV1EventType.span, + kind: MothershipStreamV1SpanPayloadKind.subagent, + lifecycle: MothershipStreamV1SpanLifecycleEvent.start, + content: block.content ?? '', + } + case 'subagent_end': + return { + type: MothershipStreamV1EventType.span, + kind: MothershipStreamV1SpanPayloadKind.subagent, + lifecycle: MothershipStreamV1SpanLifecycleEvent.end, + } + case 'stopped': + return { + type: MothershipStreamV1EventType.complete, + status: MothershipStreamV1CompletionStatus.cancelled, + } + default: + return null + } +} + +function buildAssistantSnapshotMessage(params: { + id: string + content: string + contentBlocks: ContentBlock[] + requestId?: string +}): PersistedMessage { + const rawContentBlocks = params.contentBlocks + .map(toRawPersistedContentBlock) + .filter((block): block is Record => block !== null) + + return normalizeMessage({ + id: params.id, + role: 'assistant', + content: params.content, + timestamp: new Date().toISOString(), + ...(params.requestId ? { requestId: params.requestId } : {}), + ...(rawContentBlocks.length > 0 ? { contentBlocks: rawContentBlocks } : {}), + }) +} + +function markMessageStopped(message: PersistedMessage): PersistedMessage { + if (!message.contentBlocks?.some((block) => block.toolCall?.state === 'executing')) { + return message + } + + const nextBlocks = message.contentBlocks.map((block) => { + if (block.toolCall?.state !== 'executing') { + return block + } + + return { + ...block, + toolCall: { + ...block.toolCall, + state: 'cancelled' as const, + display: { + ...(block.toolCall.display ?? {}), + title: 'Stopped by user', + }, + }, + } + }) + + if ( + !nextBlocks.some( + (block) => + block.type === MothershipStreamV1EventType.complete && + block.status === MothershipStreamV1CompletionStatus.cancelled + ) + ) { + nextBlocks.push({ + type: MothershipStreamV1EventType.complete, + status: MothershipStreamV1CompletionStatus.cancelled, + }) + } + + return normalizeMessage({ + ...message, + contentBlocks: nextBlocks, + }) +} + function buildChatHistoryHydrationKey(chatHistory: TaskChatHistory): string { const resourceKey = chatHistory.resources .map((resource) => `${resource.type}:${resource.id}:${resource.title}`) @@ -667,22 +883,10 @@ function resolveLiveToolStatus( payload: Partial<{ status: string success: boolean + output: unknown }> ): ToolCallStatus { - switch (payload.status) { - case MothershipStreamV1ToolOutcome.success: - return ToolCallStatus.success - case MothershipStreamV1ToolOutcome.error: - return ToolCallStatus.error - case MothershipStreamV1ToolOutcome.cancelled: - return ToolCallStatus.cancelled - case MothershipStreamV1ToolOutcome.skipped: - return ToolCallStatus.skipped - case MothershipStreamV1ToolOutcome.rejected: - return ToolCallStatus.rejected - default: - return payload.success === true ? ToolCallStatus.success : ToolCallStatus.error - } + return resolveStreamToolOutcome(payload) as ToolCallStatus } /** Adds a workflow to the React Query cache with a top-insertion sort order if it doesn't already exist. */ @@ -808,7 +1012,7 @@ export function useChat( const pathname = usePathname() const router = useRouter() const queryClient = useQueryClient() - const [messages, setMessages] = useState([]) + const [pendingMessages, setPendingMessages] = useState([]) const [isSending, setIsSending] = useState(false) const [isReconnecting, setIsReconnecting] = useState(false) const [error, setError] = useState(null) @@ -855,6 +1059,22 @@ export function useChat( const activeResourceIdRef = useRef(effectiveActiveResourceId) activeResourceIdRef.current = effectiveActiveResourceId + const upsertTaskChatHistory = useCallback( + (chatId: string, updater: (current: TaskChatHistory) => TaskChatHistory) => { + queryClient.setQueryData(taskKeys.detail(chatId), (current) => { + const base: TaskChatHistory = current ?? { + id: chatId, + title: null, + messages: [], + activeStreamId: null, + resources: resourcesRef.current, + } + return updater(base) + }) + }, + [queryClient] + ) + const { previewSession, previewSessionsById, @@ -975,6 +1195,7 @@ export function useChat( (opts: { streamId: string; assistantId: string; gen: number }) => Promise >(async () => false) const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {}) + const recoveringQueuedSendHandoffIdRef = useRef(null) const resetEphemeralPreviewState = useCallback( (options?: { removeStreamingResource?: boolean }) => { @@ -1101,7 +1322,7 @@ export function useChat( setResolvedChatId(undefined) appliedChatHistoryKeyRef.current = undefined abortControllerRef.current = null - setMessages([]) + setPendingMessages([]) setError(null) setTransportIdle() setResources([]) @@ -1111,38 +1332,11 @@ export function useChat( clearQueueDispatchState() }, [clearActiveTurn, clearQueueDispatchState, resetEphemeralPreviewState, setTransportIdle]) - const mergeServerMessagesWithActiveTurn = useCallback( - (serverMessages: ChatMessage[], previousMessages: ChatMessage[]) => { - const activeTurn = activeTurnRef.current - if (!activeTurn || !sendingRef.current) { - return serverMessages - } - - const nextMessages = [...serverMessages] - const localStreamingUser = - previousMessages.find( - (message) => message.id === activeTurn.userMessageId && message.role === 'user' - ) ?? activeTurn.optimisticUserMessage - const localStreamingAssistant = - previousMessages.find( - (message) => message.id === activeTurn.assistantMessageId && message.role === 'assistant' - ) ?? activeTurn.optimisticAssistantMessage - - if (!nextMessages.some((message) => message.id === localStreamingUser.id)) { - nextMessages.push(localStreamingUser) - } - - if (!nextMessages.some((message) => message.id === localStreamingAssistant.id)) { - nextMessages.push(localStreamingAssistant) - } - - return nextMessages - }, - [] + const { data: chatHistory } = useChatHistory(resolvedChatId) + const messages = useMemo( + () => chatHistory?.messages.map(toDisplayMessage) ?? pendingMessages, + [chatHistory, pendingMessages] ) - - const { data: chatHistory } = useChatHistory(initialChatId) - const addResource = useCallback((resource: MothershipResource): boolean => { if (resourcesRef.current.some((r) => r.type === resource.type && r.id === resource.id)) { return false @@ -1268,12 +1462,12 @@ export function useChat( ) useEffect(() => { + const streamOwnerId = chatIdRef.current + const navigatedToDifferentChat = + sendingRef.current && + initialChatId !== streamOwnerId && + (initialChatId !== undefined || streamOwnerId !== undefined) if (sendingRef.current) { - const streamOwnerId = chatIdRef.current - const navigatedToDifferentChat = - initialChatId !== streamOwnerId && - (initialChatId !== undefined || streamOwnerId !== undefined) - if (navigatedToDifferentChat) { const abandonedChatId = streamOwnerId // Detach the current UI from the old stream without cancelling it on the server. @@ -1296,7 +1490,7 @@ export function useChat( clearActiveTurn() setResolvedChatId(initialChatId) appliedChatHistoryKeyRef.current = undefined - setMessages([]) + setPendingMessages([]) setError(null) setTransportIdle() setResources([]) @@ -1344,13 +1538,6 @@ export function useChat( if (!activeStreamId && locallyTerminalStreamIdRef.current) { locallyTerminalStreamIdRef.current = undefined } - const shouldPreserveLocalActiveTurn = sendingRef.current && activeTurnRef.current !== null - - if (shouldPreserveLocalActiveTurn) { - setMessages((prev) => mergeServerMessagesWithActiveTurn(mappedMessages, prev)) - } else { - setMessages(mappedMessages) - } void recoverPendingClientWorkflowTools(mappedMessages) @@ -1399,7 +1586,7 @@ export function useChat( lastCursorRef.current = '0' setTransportReconnecting() - const assistantId = generateId() + const assistantId = getLiveAssistantMessageId(activeStreamId) const reconnect = async () => { const initialSnapshot = chatHistory.streamSnapshot @@ -1458,7 +1645,6 @@ export function useChat( queryClient, recoverPendingClientWorkflowTools, seedPreviewSessions, - mergeServerMessagesWithActiveTurn, setTransportIdle, setTransportReconnecting, ]) @@ -1569,22 +1755,41 @@ export function useChat( const flush = () => { if (isStale()) return streamingBlocksRef.current = [...blocks] - const snapshot: Partial = { - content: runningText, - contentBlocks: [...blocks], - } - if (streamRequestId) snapshot.requestId = streamRequestId - setMessages((prev) => { - if (expectedGen !== undefined && streamGenRef.current !== expectedGen) return prev - const idx = prev.findIndex((m) => m.id === assistantId) - if (idx >= 0) { - return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m)) + const activeChatId = chatIdRef.current + if (!activeChatId) { + const snapshot: Partial = { + content: runningText, + contentBlocks: [...blocks], } - return [ - ...prev, - { id: assistantId, role: 'assistant' as const, content: '', ...snapshot }, - ] + if (streamRequestId) snapshot.requestId = streamRequestId + setPendingMessages((prev) => { + if (expectedGen !== undefined && streamGenRef.current !== expectedGen) return prev + const idx = prev.findIndex((m) => m.id === assistantId) + if (idx >= 0) { + return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m)) + } + return [ + ...prev, + { id: assistantId, role: 'assistant' as const, content: '', ...snapshot }, + ] + }) + return + } + + const assistantMessage = buildAssistantSnapshotMessage({ + id: assistantId, + content: runningText, + contentBlocks: blocks, + ...(streamRequestId ? { requestId: streamRequestId } : {}), }) + upsertTaskChatHistory(activeChatId, (current) => ({ + ...current, + messages: [ + ...current.messages.filter((message) => message.id !== assistantId), + assistantMessage, + ], + activeStreamId: streamIdRef.current ?? current.activeStreamId, + })) } const flushText = () => { @@ -1690,14 +1895,26 @@ export function useChat( const userMsg = pendingUserMsgRef.current const activeStreamId = streamIdRef.current if (userMsg && activeStreamId) { + const assistantMessage = buildAssistantSnapshotMessage({ + id: + activeTurnRef.current?.assistantMessageId ?? + getLiveAssistantMessageId(activeStreamId), + content: streamingContentRef.current, + contentBlocks: streamingBlocksRef.current, + }) + const seededMessages = + assistantMessage.content || assistantMessage.contentBlocks?.length + ? [userMsg, assistantMessage] + : [userMsg] queryClient.setQueryData(taskKeys.detail(payloadChatId), { id: payloadChatId, title: null, - messages: [userMsg], + messages: seededMessages, activeStreamId, resources: resourcesRef.current, }) } + setPendingMessages([]) if (!workflowIdRef.current) { window.history.replaceState( null, @@ -2273,6 +2490,7 @@ export function useChat( workspaceId, router, queryClient, + upsertTaskChatHistory, addResource, removeResource, applyPreviewSessionUpdate, @@ -2691,28 +2909,32 @@ export function useChat( [] ) - const invalidateChatQueries = useCallback(() => { - const activeChatId = chatIdRef.current - if (activeChatId) { - queryClient.invalidateQueries({ - queryKey: taskKeys.detail(activeChatId), - }) - } - queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) - }, [workspaceId, queryClient]) + const invalidateChatQueries = useCallback( + (options?: { includeDetail?: boolean }) => { + const activeChatId = chatIdRef.current + if (options?.includeDetail !== false && activeChatId) { + queryClient.invalidateQueries({ + queryKey: taskKeys.detail(activeChatId), + }) + } + queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) + }, + [workspaceId, queryClient] + ) const messagesRef = useRef(messages) messagesRef.current = messages const finalize = useCallback( (options?: { error?: boolean }) => { + const hasQueuedFollowUp = !options?.error && messageQueueRef.current.length > 0 reconcileTerminalPreviewSessions() locallyTerminalStreamIdRef.current = streamIdRef.current ?? activeTurnRef.current?.userMessageId ?? undefined clearActiveTurn() setTransportIdle() abortControllerRef.current = null - invalidateChatQueries() + invalidateChatQueries({ includeDetail: !hasQueuedFollowUp }) if (!options?.error) { const cid = chatIdRef.current @@ -2725,7 +2947,7 @@ export function useChat( return } - if (messageQueueRef.current.length > 0) { + if (hasQueuedFollowUp) { void enqueueQueueDispatchRef.current({ type: 'send_head' }) } }, @@ -2738,7 +2960,9 @@ export function useChat( message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[], - pendingStopOverride?: Promise | null + pendingStopOverride?: Promise | null, + onOptimisticSendApplied?: () => void, + queuedSendHandoff?: QueuedSendHandoffSeed ) => { if (!message.trim() || !workspaceId) return false const pendingStop = pendingStopOverride ?? pendingStopPromiseRef.current @@ -2750,8 +2974,8 @@ export function useChat( setTransportStreaming() locallyTerminalStreamIdRef.current = undefined - const userMessageId = generateId() - const assistantId = generateId() + const userMessageId = queuedSendHandoff?.userMessageId ?? generateId() + const assistantId = getLiveAssistantMessageId(userMessageId) streamIdRef.current = userMessageId lastCursorRef.current = '0' @@ -2769,6 +2993,19 @@ export function useChat( : undefined const requestChatId = selectedChatIdRef.current ?? chatIdRef.current + if (queuedSendHandoff) { + writeQueuedSendHandoffState({ + id: queuedSendHandoff.id, + chatId: queuedSendHandoff.chatId, + workspaceId, + supersededStreamId: queuedSendHandoff.supersededStreamId, + userMessageId, + message, + ...(fileAttachments ? { fileAttachments } : {}), + ...(contexts ? { contexts } : {}), + requestedAt: Date.now(), + }) + } const messageContexts = contexts?.map((c) => ({ kind: c.kind, label: c.label, @@ -2818,21 +3055,33 @@ export function useChat( optimisticAssistantMessage, } + if (requestChatId) { + await queryClient.cancelQueries({ queryKey: taskKeys.detail(requestChatId) }) + } + const applyOptimisticSend = () => { + const assistantSnapshot = buildAssistantSnapshotMessage({ + id: assistantId, + content: '', + contentBlocks: [], + }) if (requestChatId) { - queryClient.setQueryData(taskKeys.detail(requestChatId), (old) => { - if (!old) return undefined - const nextMessages = old.messages.filter((m) => m.id !== userMessageId) - return { - ...old, - resources: old.resources.filter((r) => r.id !== 'streaming-file'), - messages: [...nextMessages, cachedUserMsg], - activeStreamId: userMessageId, - } - }) + upsertTaskChatHistory(requestChatId, (current) => ({ + ...current, + resources: current.resources.filter((resource) => resource.id !== 'streaming-file'), + messages: [ + ...current.messages.filter( + (persistedMessage) => + persistedMessage.id !== userMessageId && persistedMessage.id !== assistantId + ), + cachedUserMsg, + assistantSnapshot, + ], + activeStreamId: userMessageId, + })) } - setMessages((prev) => { + setPendingMessages((prev) => { const nextMessages = prev.filter((m) => m.id !== userMessageId && m.id !== assistantId) return [...nextMessages, optimisticUserMessage, optimisticAssistantMessage] }) @@ -2840,20 +3089,27 @@ export function useChat( const rollbackOptimisticSend = () => { if (requestChatId) { - queryClient.setQueryData(taskKeys.detail(requestChatId), (old) => { - if (!old) return undefined - return { - ...old, - messages: old.messages.filter((m) => m.id !== userMessageId), - activeStreamId: old.activeStreamId === userMessageId ? null : old.activeStreamId, - } - }) + upsertTaskChatHistory(requestChatId, (current) => ({ + ...current, + messages: current.messages.filter( + (persistedMessage) => + persistedMessage.id !== userMessageId && persistedMessage.id !== assistantId + ), + activeStreamId: + current.activeStreamId === userMessageId ? null : current.activeStreamId, + })) } - setMessages((prev) => prev.filter((m) => m.id !== userMessageId && m.id !== assistantId)) + setPendingMessages((prev) => + prev.filter( + (pendingMessage) => + pendingMessage.id !== userMessageId && pendingMessage.id !== assistantId + ) + ) } applyOptimisticSend() + onOptimisticSendApplied?.() consumedByTranscript = true const abortController = new AbortController() @@ -2863,8 +3119,9 @@ export function useChat( if (pendingStop) { try { await pendingStop - // Query invalidation from the stop barrier can briefly stomp the optimistic tail. - // Re-apply it before the real POST so the mothership UI stays immediate. + if (requestChatId) { + await queryClient.cancelQueries({ queryKey: taskKeys.detail(requestChatId) }) + } applyOptimisticSend() } catch (err) { rollbackOptimisticSend() @@ -2928,6 +3185,10 @@ export function useChat( throw new Error(errorData.error || `Request failed: ${response.status}`) } + if (queuedSendHandoff) { + clearQueuedSendHandoffState(queuedSendHandoff.id) + } + if (!response.body) throw new Error('No response body') const streamResult = await processSSEStream(response.body.getReader(), assistantId, gen) @@ -2986,6 +3247,7 @@ export function useChat( [ workspaceId, queryClient, + upsertTaskChatHistory, processSSEStream, finalize, resumeOrFinalize, @@ -3015,6 +3277,69 @@ export function useChat( }, [workspaceId, startSendMessage] ) + useEffect(() => { + if (typeof window === 'undefined') return + + const clearClaim = () => { + clearQueuedSendHandoffClaim() + } + + window.addEventListener('pagehide', clearClaim) + window.addEventListener('beforeunload', clearClaim) + return () => { + window.removeEventListener('pagehide', clearClaim) + window.removeEventListener('beforeunload', clearClaim) + } + }, []) + useEffect(() => { + if (!workspaceId || !chatHistory || sendingRef.current || pendingStopPromiseRef.current) return + + const handoff = readQueuedSendHandoffState() + if (!handoff) return + if (handoff.workspaceId !== workspaceId || handoff.chatId !== chatHistory.id) return + if (recoveringQueuedSendHandoffIdRef.current === handoff.id) return + if (readQueuedSendHandoffClaim() === handoff.id) return + + if ( + chatHistory.activeStreamId === handoff.userMessageId || + chatHistory.messages.some((message) => message.id === handoff.userMessageId) + ) { + clearQueuedSendHandoffState(handoff.id) + clearQueuedSendHandoffClaim(handoff.id) + return + } + + if (chatHistory.activeStreamId === handoff.supersededStreamId) { + return + } + + if (chatHistory.activeStreamId && chatHistory.activeStreamId !== handoff.supersededStreamId) { + clearQueuedSendHandoffState(handoff.id) + clearQueuedSendHandoffClaim(handoff.id) + return + } + + recoveringQueuedSendHandoffIdRef.current = handoff.id + writeQueuedSendHandoffClaim(handoff.id) + void startSendMessage( + handoff.message, + handoff.fileAttachments, + handoff.contexts, + null, + undefined, + { + id: handoff.id, + chatId: handoff.chatId, + supersededStreamId: handoff.supersededStreamId, + userMessageId: handoff.userMessageId, + } + ).finally(() => { + if (recoveringQueuedSendHandoffIdRef.current === handoff.id) { + recoveringQueuedSendHandoffIdRef.current = null + } + clearQueuedSendHandoffClaim(handoff.id) + }) + }, [workspaceId, chatHistory, startSendMessage]) const cancelActiveWorkflowExecutions = useCallback(() => { const execState = useExecutionStore.getState() const consoleStore = useTerminalConsoleStore.getState() @@ -3066,6 +3391,7 @@ export function useChat( } const wasSending = sendingRef.current + const activeChatId = chatIdRef.current const sid = streamIdRef.current || activeTurnRef.current?.userMessageId || @@ -3088,24 +3414,36 @@ export function useChat( abortControllerRef.current = null setTransportIdle() - setMessages((prev) => - prev.map((msg) => { - if (!msg.contentBlocks?.some((b) => b.toolCall?.status === 'executing')) return msg - const updated = msg.contentBlocks!.map((block) => { - if (block.toolCall?.status !== 'executing') return block - return { - ...block, - toolCall: { - ...block.toolCall, - status: 'cancelled' as const, - displayTitle: 'Stopped by user', - }, + if (activeChatId) { + await queryClient.cancelQueries({ queryKey: taskKeys.detail(activeChatId) }) + upsertTaskChatHistory(activeChatId, (current) => ({ + ...current, + messages: current.messages.map(markMessageStopped), + })) + } else { + setPendingMessages((prev) => + prev.map((msg) => { + if (!msg.contentBlocks?.some((block) => block.toolCall?.status === 'executing')) { + return msg } + const updatedBlocks = msg.contentBlocks.map((block) => { + if (block.toolCall?.status !== 'executing') { + return block + } + return { + ...block, + toolCall: { + ...block.toolCall, + status: 'cancelled' as const, + displayTitle: 'Stopped by user', + }, + } + }) + updatedBlocks.push({ type: 'stopped' as const }) + return { ...msg, contentBlocks: updatedBlocks } }) - updated.push({ type: 'stopped' as const }) - return { ...msg, contentBlocks: updated } - }) - ) + ) + } // Cancel active run-tool executions before waiting for the server-side stream // shutdown barrier; otherwise the abort settle can sit behind tool execution teardown. @@ -3175,6 +3513,7 @@ export function useChat( persistPartialResponse, queryClient, resetEphemeralPreviewState, + upsertTaskChatHistory, clearActiveTurn, setTransportIdle, ]) @@ -3198,16 +3537,27 @@ export function useChat( let originalIndex = 0 let removedFromQueue = false + const removeQueuedMessage = () => { + if (removedFromQueue || action.epoch !== queueDispatchEpochRef.current) { + return + } + removedFromQueue = true + setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id)) + } try { const currentIndex = messageQueueRef.current.findIndex((queued) => queued.id === msg.id) if (currentIndex !== -1) { originalIndex = currentIndex - removedFromQueue = true - setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id)) } - const consumed = await startSendMessage(msg.content, msg.fileAttachments, msg.contexts) + const consumed = await startSendMessage( + msg.content, + msg.fileAttachments, + msg.contexts, + undefined, + removeQueuedMessage + ) if (!consumed && removedFromQueue && action.epoch === queueDispatchEpochRef.current) { setMessageQueue((prev) => { if (prev.some((queued) => queued.id === msg.id)) return prev @@ -3250,6 +3600,8 @@ export function useChat( enqueueQueueDispatchRef.current = enqueueQueueDispatch const removeFromQueue = useCallback((id: string) => { + clearQueuedSendHandoffState(id) + clearQueuedSendHandoffClaim(id) setMessageQueue((prev) => prev.filter((m) => m.id !== id)) }, []) @@ -3272,6 +3624,13 @@ export function useChat( let originalIndex = initialIndex let removedFromQueue = false + const removeQueuedMessage = () => { + if (removedFromQueue || epoch !== queueDispatchEpochRef.current) { + return + } + removedFromQueue = true + setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id)) + } const restoreQueuedMessage = () => { if (!removedFromQueue || epoch !== queueDispatchEpochRef.current) { return @@ -3291,15 +3650,29 @@ export function useChat( } originalIndex = currentIndex - removedFromQueue = true - setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id)) + const queuedSendHandoff = + sendingRef.current && workspaceId + ? { + id: msg.id, + chatId: selectedChatIdRef.current ?? chatIdRef.current ?? '', + supersededStreamId: + streamIdRef.current || + activeTurnRef.current?.userMessageId || + queryClient.getQueryData( + taskKeys.detail(selectedChatIdRef.current ?? chatIdRef.current) + )?.activeStreamId || + null, + } + : undefined const pendingStop = sendingRef.current ? stopGeneration() : pendingStopPromiseRef.current const consumed = await startSendMessage( msg.content, msg.fileAttachments, msg.contexts, - pendingStop + pendingStop, + removeQueuedMessage, + queuedSendHandoff?.chatId ? queuedSendHandoff : undefined ) if (!consumed) { @@ -3324,6 +3697,8 @@ export function useChat( const editQueuedMessage = useCallback((id: string): QueuedMessage | undefined => { const msg = messageQueueRef.current.find((m) => m.id === id) if (!msg) return undefined + clearQueuedSendHandoffState(id) + clearQueuedSendHandoffClaim(id) setMessageQueue((prev) => prev.filter((m) => m.id !== id)) return msg }, []) diff --git a/apps/sim/hooks/use-task-events.test.ts b/apps/sim/hooks/use-task-events.test.ts index ac58e6cf27..d62b32696a 100644 --- a/apps/sim/hooks/use-task-events.test.ts +++ b/apps/sim/hooks/use-task-events.test.ts @@ -16,9 +16,10 @@ describe('handleTaskStatusEvent', () => { vi.clearAllMocks() }) - it('invalidates the task list and completed chat detail', () => { + it('invalidates only the task list for completed task events', () => { handleTaskStatusEvent( queryClient, + 'ws-1', JSON.stringify({ chatId: 'chat-1', type: 'completed', @@ -26,18 +27,16 @@ describe('handleTaskStatusEvent', () => { }) ) - expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(2) - expect(queryClient.invalidateQueries).toHaveBeenNthCalledWith(1, { - queryKey: taskKeys.lists(), - }) - expect(queryClient.invalidateQueries).toHaveBeenNthCalledWith(2, { - queryKey: taskKeys.detail('chat-1'), + expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1) + expect(queryClient.invalidateQueries).toHaveBeenCalledWith({ + queryKey: taskKeys.list('ws-1'), }) }) it('keeps list invalidation only for non-completed task events', () => { handleTaskStatusEvent( queryClient, + 'ws-1', JSON.stringify({ chatId: 'chat-1', type: 'started', @@ -47,16 +46,16 @@ describe('handleTaskStatusEvent', () => { expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1) expect(queryClient.invalidateQueries).toHaveBeenCalledWith({ - queryKey: taskKeys.lists(), + queryKey: taskKeys.list('ws-1'), }) }) it('preserves list invalidation when task event payload is invalid', () => { - handleTaskStatusEvent(queryClient, '{') + handleTaskStatusEvent(queryClient, 'ws-1', '{') expect(queryClient.invalidateQueries).toHaveBeenCalledTimes(1) expect(queryClient.invalidateQueries).toHaveBeenCalledWith({ - queryKey: taskKeys.lists(), + queryKey: taskKeys.list('ws-1'), }) }) }) diff --git a/apps/sim/hooks/use-task-events.ts b/apps/sim/hooks/use-task-events.ts index 04bff3df49..e10a7599b1 100644 --- a/apps/sim/hooks/use-task-events.ts +++ b/apps/sim/hooks/use-task-events.ts @@ -38,19 +38,16 @@ function parseTaskStatusEventPayload(data: unknown): TaskStatusEventPayload | nu export function handleTaskStatusEvent( queryClient: Pick, + workspaceId: string, data: unknown ): void { - queryClient.invalidateQueries({ queryKey: taskKeys.lists() }) + queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) const payload = parseTaskStatusEventPayload(data) if (!payload) { logger.warn('Received invalid task_status payload') return } - - if (payload.type === 'completed' && payload.chatId) { - queryClient.invalidateQueries({ queryKey: taskKeys.detail(payload.chatId) }) - } } /** @@ -67,7 +64,11 @@ export function useTaskEvents(workspaceId: string | undefined) { ) eventSource.addEventListener('task_status', (event) => { - handleTaskStatusEvent(queryClient, event instanceof MessageEvent ? event.data : undefined) + handleTaskStatusEvent( + queryClient, + workspaceId, + event instanceof MessageEvent ? event.data : undefined + ) }) eventSource.onerror = () => { diff --git a/apps/sim/lib/copilot/chat/effective-transcript.test.ts b/apps/sim/lib/copilot/chat/effective-transcript.test.ts new file mode 100644 index 0000000000..2b17872241 --- /dev/null +++ b/apps/sim/lib/copilot/chat/effective-transcript.test.ts @@ -0,0 +1,260 @@ +/** + * @vitest-environment node + */ + +import { describe, expect, it } from 'vitest' +import { + MothershipStreamV1CompletionStatus, + MothershipStreamV1EventType, + MothershipStreamV1SessionKind, + MothershipStreamV1TextChannel, +} from '@/lib/copilot/generated/mothership-stream-v1' +import type { StreamBatchEvent } from '@/lib/copilot/request/session/types' +import { buildEffectiveChatTranscript, getLiveAssistantMessageId } from './effective-transcript' +import { normalizeMessage } from './persisted-message' + +function toBatchEvent(eventId: number, event: StreamBatchEvent['event']): StreamBatchEvent { + return { + eventId, + streamId: event.stream.streamId, + event, + } +} + +function buildUserMessage(id: string, content: string) { + return normalizeMessage({ + id, + role: 'user', + content, + timestamp: '2026-04-15T12:00:00.000Z', + }) +} + +describe('buildEffectiveChatTranscript', () => { + it('returns the existing transcript when the stream owner is no longer the trailing user', () => { + const messages = [ + buildUserMessage('stream-1', 'Hello'), + normalizeMessage({ + id: 'assistant-1', + role: 'assistant', + content: 'Persisted response', + timestamp: '2026-04-15T12:00:01.000Z', + }), + ] + + const result = buildEffectiveChatTranscript({ + messages, + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.text, + stream: { streamId: 'stream-1' }, + payload: { + channel: MothershipStreamV1TextChannel.assistant, + text: 'Live response', + }, + }), + ], + previewSessions: [], + status: 'active', + }, + }) + + expect(result).toEqual(messages) + }) + + it('appends a placeholder assistant while an active stream has not produced text yet', () => { + const result = buildEffectiveChatTranscript({ + messages: [buildUserMessage('stream-1', 'Hello')], + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.session, + stream: { streamId: 'stream-1' }, + payload: { + kind: MothershipStreamV1SessionKind.start, + }, + }), + ], + previewSessions: [], + status: 'active', + }, + }) + + expect(result).toHaveLength(2) + expect(result[1]).toEqual( + expect.objectContaining({ + id: getLiveAssistantMessageId('stream-1'), + role: 'assistant', + content: '', + }) + ) + }) + + it('materializes a live assistant response from redis-backed stream events', () => { + const result = buildEffectiveChatTranscript({ + messages: [buildUserMessage('stream-1', 'Hello')], + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.session, + stream: { streamId: 'stream-1' }, + trace: { requestId: 'req-1' }, + payload: { + kind: MothershipStreamV1SessionKind.trace, + requestId: 'req-1', + }, + }), + toBatchEvent(2, { + v: 1, + seq: 2, + ts: '2026-04-15T12:00:02.000Z', + type: MothershipStreamV1EventType.text, + stream: { streamId: 'stream-1' }, + trace: { requestId: 'req-1' }, + payload: { + channel: MothershipStreamV1TextChannel.assistant, + text: 'Live response', + }, + }), + ], + previewSessions: [], + status: 'active', + }, + }) + + expect(result).toHaveLength(2) + expect(result[1]).toEqual( + expect.objectContaining({ + id: getLiveAssistantMessageId('stream-1'), + role: 'assistant', + content: 'Live response', + requestId: 'req-1', + }) + ) + }) + + it('does not duplicate thinking-only text into a second assistant block', () => { + const result = buildEffectiveChatTranscript({ + messages: [buildUserMessage('stream-1', 'Hello')], + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.text, + stream: { streamId: 'stream-1' }, + payload: { + channel: MothershipStreamV1TextChannel.thinking, + text: 'Internal reasoning', + }, + }), + ], + previewSessions: [], + status: 'active', + }, + }) + + expect(result).toHaveLength(2) + expect(result[1]).toEqual( + expect.objectContaining({ + content: 'Internal reasoning', + contentBlocks: [ + expect.objectContaining({ + type: MothershipStreamV1EventType.text, + content: 'Internal reasoning', + }), + ], + }) + ) + }) + + it('treats user-cancelled tool results as cancelled', () => { + const result = buildEffectiveChatTranscript({ + messages: [buildUserMessage('stream-1', 'Hello')], + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.tool, + stream: { streamId: 'stream-1' }, + payload: { + phase: 'result', + toolCallId: 'tool-1', + toolName: 'workspace_file', + executor: 'go', + mode: 'sync', + success: false, + output: { + reason: 'user_cancelled', + }, + }, + }), + ], + previewSessions: [], + status: 'active', + }, + }) + + expect(result[1]?.contentBlocks).toEqual([ + expect.objectContaining({ + type: MothershipStreamV1EventType.tool, + toolCall: expect.objectContaining({ + id: 'tool-1', + name: 'workspace_file', + state: MothershipStreamV1CompletionStatus.cancelled, + }), + }), + ]) + }) + + it('materializes a cancelled assistant tail when the stream ends before persistence', () => { + const result = buildEffectiveChatTranscript({ + messages: [buildUserMessage('stream-1', 'Hello')], + activeStreamId: 'stream-1', + streamSnapshot: { + events: [ + toBatchEvent(1, { + v: 1, + seq: 1, + ts: '2026-04-15T12:00:01.000Z', + type: MothershipStreamV1EventType.complete, + stream: { streamId: 'stream-1' }, + payload: { + status: MothershipStreamV1CompletionStatus.cancelled, + }, + }), + ], + previewSessions: [], + status: MothershipStreamV1CompletionStatus.cancelled, + }, + }) + + expect(result).toHaveLength(2) + expect(result[1]?.contentBlocks).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + type: MothershipStreamV1EventType.complete, + status: MothershipStreamV1CompletionStatus.cancelled, + }), + ]) + ) + }) +}) diff --git a/apps/sim/lib/copilot/chat/effective-transcript.ts b/apps/sim/lib/copilot/chat/effective-transcript.ts new file mode 100644 index 0000000000..ba770a8c2b --- /dev/null +++ b/apps/sim/lib/copilot/chat/effective-transcript.ts @@ -0,0 +1,412 @@ +import { + MothershipStreamV1CompletionStatus, + type MothershipStreamV1ErrorPayload, + MothershipStreamV1EventType, + MothershipStreamV1RunKind, + MothershipStreamV1SessionKind, + MothershipStreamV1SpanLifecycleEvent, + MothershipStreamV1SpanPayloadKind, + MothershipStreamV1ToolOutcome, + MothershipStreamV1ToolPhase, +} from '@/lib/copilot/generated/mothership-stream-v1' +import type { FilePreviewSession } from '@/lib/copilot/request/session/file-preview-session-contract' +import type { StreamBatchEvent } from '@/lib/copilot/request/session/types' +import { normalizeMessage, type PersistedMessage } from './persisted-message' +import { resolveStreamToolOutcome } from './stream-tool-outcome' + +interface StreamSnapshotLike { + events: StreamBatchEvent[] + previewSessions: FilePreviewSession[] + status: string +} + +interface BuildEffectiveChatTranscriptParams { + messages: PersistedMessage[] + activeStreamId: string | null + streamSnapshot?: StreamSnapshotLike | null +} + +type RawPersistedBlock = Record + +export function getLiveAssistantMessageId(streamId: string): string { + return `live-assistant:${streamId}` +} + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === 'object' && !Array.isArray(value) +} + +function asPayloadRecord(value: unknown): Record | undefined { + return isRecord(value) ? value : undefined +} + +function isTerminalStreamStatus(status: string | null | undefined): boolean { + return ( + status === MothershipStreamV1CompletionStatus.complete || + status === MothershipStreamV1CompletionStatus.error || + status === MothershipStreamV1CompletionStatus.cancelled + ) +} + +function buildInlineErrorTag(payload: MothershipStreamV1ErrorPayload): string { + const message = + (typeof payload.displayMessage === 'string' ? payload.displayMessage : undefined) || + (typeof payload.message === 'string' ? payload.message : undefined) || + (typeof payload.error === 'string' ? payload.error : undefined) || + 'An unexpected error occurred' + const provider = typeof payload.provider === 'string' ? payload.provider : undefined + const code = typeof payload.code === 'string' ? payload.code : undefined + return `${JSON.stringify({ + message, + ...(code ? { code } : {}), + ...(provider ? { provider } : {}), + })}` +} + +function resolveToolDisplayTitle(ui: unknown): string | undefined { + if (!isRecord(ui)) return undefined + return typeof ui.title === 'string' + ? ui.title + : typeof ui.phaseLabel === 'string' + ? ui.phaseLabel + : undefined +} + +function appendTextBlock( + blocks: RawPersistedBlock[], + content: string, + options: { + lane?: 'subagent' + } +): void { + if (!content) return + const last = blocks[blocks.length - 1] + if (last?.type === MothershipStreamV1EventType.text && last.lane === options.lane) { + last.content = `${typeof last.content === 'string' ? last.content : ''}${content}` + return + } + + blocks.push({ + type: MothershipStreamV1EventType.text, + ...(options.lane ? { lane: options.lane } : {}), + content, + }) +} + +function buildLiveAssistantMessage(params: { + streamId: string + events: StreamBatchEvent[] + status: string | null | undefined +}): PersistedMessage | null { + const { streamId, events, status } = params + const blocks: RawPersistedBlock[] = [] + const toolIndexById = new Map() + const subagentByParentToolCallId = new Map() + let activeSubagent: string | undefined + let activeSubagentParentToolCallId: string | undefined + let activeCompactionId: string | undefined + let runningText = '' + let lastContentSource: 'main' | 'subagent' | null = null + let requestId: string | undefined + let lastTimestamp: string | undefined + + const resolveScopedSubagent = ( + agentId: string | undefined, + parentToolCallId: string | undefined + ): string | undefined => { + if (agentId) return agentId + if (parentToolCallId) { + const scoped = subagentByParentToolCallId.get(parentToolCallId) + if (scoped) return scoped + } + return activeSubagent + } + + const ensureToolBlock = (input: { + toolCallId: string + toolName: string + calledBy?: string + displayTitle?: string + params?: Record + result?: { success: boolean; output?: unknown; error?: string } + state?: string + }): RawPersistedBlock => { + const existingIndex = toolIndexById.get(input.toolCallId) + if (existingIndex !== undefined) { + const existing = blocks[existingIndex] + const existingToolCall = asPayloadRecord(existing.toolCall) + existing.toolCall = { + ...(existingToolCall ?? {}), + id: input.toolCallId, + name: input.toolName, + state: + input.state ?? + (typeof existingToolCall?.state === 'string' ? existingToolCall.state : 'executing'), + ...(input.calledBy ? { calledBy: input.calledBy } : {}), + ...(input.params ? { params: input.params } : {}), + ...(input.result ? { result: input.result } : {}), + ...(input.displayTitle + ? { + display: { + title: input.displayTitle, + }, + } + : existingToolCall?.display + ? { display: existingToolCall.display } + : {}), + } + return existing + } + + const nextBlock: RawPersistedBlock = { + type: MothershipStreamV1EventType.tool, + phase: MothershipStreamV1ToolPhase.call, + toolCall: { + id: input.toolCallId, + name: input.toolName, + state: input.state ?? 'executing', + ...(input.calledBy ? { calledBy: input.calledBy } : {}), + ...(input.params ? { params: input.params } : {}), + ...(input.result ? { result: input.result } : {}), + ...(input.displayTitle + ? { + display: { + title: input.displayTitle, + }, + } + : {}), + }, + } + toolIndexById.set(input.toolCallId, blocks.length) + blocks.push(nextBlock) + return nextBlock + } + + for (const entry of events) { + const parsed = entry.event + lastTimestamp = parsed.ts + if (typeof parsed.trace?.requestId === 'string') { + requestId = parsed.trace.requestId + } + const scopedParentToolCallId = + typeof parsed.scope?.parentToolCallId === 'string' ? parsed.scope.parentToolCallId : undefined + const scopedAgentId = + typeof parsed.scope?.agentId === 'string' ? parsed.scope.agentId : undefined + const scopedSubagent = resolveScopedSubagent(scopedAgentId, scopedParentToolCallId) + + switch (parsed.type) { + case MothershipStreamV1EventType.session: { + if (parsed.payload.kind === MothershipStreamV1SessionKind.chat) { + continue + } + if (parsed.payload.kind === MothershipStreamV1SessionKind.start) { + continue + } + if (parsed.payload.kind === MothershipStreamV1SessionKind.trace) { + requestId = parsed.payload.requestId + } + continue + } + case MothershipStreamV1EventType.text: { + const chunk = parsed.payload.text + if (!chunk) { + continue + } + const contentSource: 'main' | 'subagent' = scopedSubagent ? 'subagent' : 'main' + const needsBoundaryNewline = + lastContentSource !== null && + lastContentSource !== contentSource && + runningText.length > 0 && + !runningText.endsWith('\n') + const normalizedChunk = needsBoundaryNewline ? `\n${chunk}` : chunk + appendTextBlock(blocks, normalizedChunk, { + ...(scopedSubagent ? { lane: 'subagent' as const } : {}), + }) + runningText += normalizedChunk + lastContentSource = contentSource + continue + } + case MothershipStreamV1EventType.tool: { + const payload = parsed.payload + const toolCallId = payload.toolCallId + const displayTitle = resolveToolDisplayTitle('ui' in payload ? payload.ui : undefined) + + if ('previewPhase' in payload) { + continue + } + + if (payload.phase === MothershipStreamV1ToolPhase.args_delta) { + continue + } + + if (payload.phase === MothershipStreamV1ToolPhase.result) { + ensureToolBlock({ + toolCallId, + toolName: payload.toolName, + calledBy: scopedSubagent, + state: resolveStreamToolOutcome(payload), + result: { + success: payload.success, + ...(payload.output !== undefined ? { output: payload.output } : {}), + ...(typeof payload.error === 'string' ? { error: payload.error } : {}), + }, + }) + continue + } + + ensureToolBlock({ + toolCallId, + toolName: payload.toolName, + calledBy: scopedSubagent, + displayTitle, + params: isRecord(payload.arguments) ? payload.arguments : undefined, + state: typeof payload.status === 'string' ? payload.status : 'executing', + }) + continue + } + case MothershipStreamV1EventType.span: { + if (parsed.payload.kind !== MothershipStreamV1SpanPayloadKind.subagent) { + continue + } + + const spanData = asPayloadRecord(parsed.payload.data) + const parentToolCallId = + scopedParentToolCallId ?? + (typeof spanData?.tool_call_id === 'string' ? spanData.tool_call_id : undefined) + const name = typeof parsed.payload.agent === 'string' ? parsed.payload.agent : scopedAgentId + if (parsed.payload.event === MothershipStreamV1SpanLifecycleEvent.start && name) { + if (parentToolCallId) { + subagentByParentToolCallId.set(parentToolCallId, name) + } + activeSubagent = name + activeSubagentParentToolCallId = parentToolCallId + blocks.push({ + type: MothershipStreamV1EventType.span, + kind: MothershipStreamV1SpanPayloadKind.subagent, + lifecycle: MothershipStreamV1SpanLifecycleEvent.start, + content: name, + }) + continue + } + + if (parsed.payload.event === MothershipStreamV1SpanLifecycleEvent.end) { + if (spanData?.pending === true) { + continue + } + if (parentToolCallId) { + subagentByParentToolCallId.delete(parentToolCallId) + } + if ( + !parentToolCallId || + parentToolCallId === activeSubagentParentToolCallId || + name === activeSubagent + ) { + activeSubagent = undefined + activeSubagentParentToolCallId = undefined + } + blocks.push({ + type: MothershipStreamV1EventType.span, + kind: MothershipStreamV1SpanPayloadKind.subagent, + lifecycle: MothershipStreamV1SpanLifecycleEvent.end, + }) + } + continue + } + case MothershipStreamV1EventType.run: { + if (parsed.payload.kind === MothershipStreamV1RunKind.compaction_start) { + activeCompactionId = `compaction_${entry.eventId}` + ensureToolBlock({ + toolCallId: activeCompactionId, + toolName: 'context_compaction', + displayTitle: 'Compacting context...', + state: 'executing', + }) + continue + } + + if (parsed.payload.kind === MothershipStreamV1RunKind.compaction_done) { + const compactionId = activeCompactionId ?? `compaction_${entry.eventId}` + activeCompactionId = undefined + ensureToolBlock({ + toolCallId: compactionId, + toolName: 'context_compaction', + displayTitle: 'Compacted context', + state: MothershipStreamV1ToolOutcome.success, + }) + } + continue + } + case MothershipStreamV1EventType.error: { + const tag = buildInlineErrorTag(parsed.payload) + if (runningText.includes(tag)) { + continue + } + const prefix = runningText.length > 0 && !runningText.endsWith('\n') ? '\n' : '' + const content = `${prefix}${tag}` + appendTextBlock(blocks, content, { + ...(scopedSubagent ? { lane: 'subagent' as const } : {}), + }) + runningText += content + continue + } + case MothershipStreamV1EventType.complete: { + if (parsed.payload.status === MothershipStreamV1CompletionStatus.cancelled) { + blocks.push({ + type: MothershipStreamV1EventType.complete, + status: parsed.payload.status, + }) + } + continue + } + case MothershipStreamV1EventType.resource: { + continue + } + default: { + continue + } + } + } + + if (blocks.length === 0 && !runningText && isTerminalStreamStatus(status)) { + return null + } + + return normalizeMessage({ + id: getLiveAssistantMessageId(streamId), + role: 'assistant', + content: runningText, + timestamp: lastTimestamp ?? new Date().toISOString(), + ...(requestId ? { requestId } : {}), + ...(blocks.length > 0 ? { contentBlocks: blocks } : {}), + }) +} + +export function buildEffectiveChatTranscript({ + messages, + activeStreamId, + streamSnapshot, +}: BuildEffectiveChatTranscriptParams): PersistedMessage[] { + if (!activeStreamId || !streamSnapshot) { + return messages + } + + const trailingMessage = messages[messages.length - 1] + if ( + !trailingMessage || + trailingMessage.role !== 'user' || + trailingMessage.id !== activeStreamId + ) { + return messages + } + + const liveAssistant = buildLiveAssistantMessage({ + streamId: activeStreamId, + events: streamSnapshot.events, + status: streamSnapshot.status, + }) + if (!liveAssistant) { + return messages + } + + return [...messages, liveAssistant] +} diff --git a/apps/sim/lib/copilot/chat/stream-tool-outcome.ts b/apps/sim/lib/copilot/chat/stream-tool-outcome.ts new file mode 100644 index 0000000000..863c47f98e --- /dev/null +++ b/apps/sim/lib/copilot/chat/stream-tool-outcome.ts @@ -0,0 +1,46 @@ +import { MothershipStreamV1ToolOutcome } from '@/lib/copilot/generated/mothership-stream-v1' + +type TerminalToolOutcome = + | typeof MothershipStreamV1ToolOutcome.success + | typeof MothershipStreamV1ToolOutcome.error + | typeof MothershipStreamV1ToolOutcome.cancelled + | typeof MothershipStreamV1ToolOutcome.skipped + | typeof MothershipStreamV1ToolOutcome.rejected + +interface ResolveStreamToolOutcomeParams { + output?: unknown + status?: string + success?: boolean +} + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === 'object' && !Array.isArray(value) +} + +export function resolveStreamToolOutcome({ + output, + status, + success, +}: ResolveStreamToolOutcomeParams): TerminalToolOutcome { + const outputRecord = isRecord(output) ? output : undefined + const isCancelled = + outputRecord?.reason === 'user_cancelled' || + outputRecord?.cancelledByUser === true || + status === MothershipStreamV1ToolOutcome.cancelled + + if (isCancelled) { + return MothershipStreamV1ToolOutcome.cancelled + } + + switch (status) { + case MothershipStreamV1ToolOutcome.success: + case MothershipStreamV1ToolOutcome.error: + case MothershipStreamV1ToolOutcome.skipped: + case MothershipStreamV1ToolOutcome.rejected: + return status + default: + return success === true + ? MothershipStreamV1ToolOutcome.success + : MothershipStreamV1ToolOutcome.error + } +} From 34173328ddfeae998a7a162a9be1dc976307c188 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 15 Apr 2026 16:16:03 -0700 Subject: [PATCH 2/4] fix logs resource thinking bug" --- apps/sim/lib/copilot/chat/post.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/copilot/chat/post.ts b/apps/sim/lib/copilot/chat/post.ts index b15e84db69..8581621d1f 100644 --- a/apps/sim/lib/copilot/chat/post.ts +++ b/apps/sim/lib/copilot/chat/post.ts @@ -52,7 +52,7 @@ const FileAttachmentSchema = z.object({ }) const ResourceAttachmentSchema = z.object({ - type: z.enum(['workflow', 'table', 'file', 'knowledgebase', 'folder']), + type: z.enum(['workflow', 'table', 'file', 'knowledgebase', 'folder', 'task', 'log', 'generic']), id: z.string().min(1), title: z.string().optional(), active: z.boolean().optional(), @@ -64,6 +64,9 @@ const GENERIC_RESOURCE_TITLE: Record['t file: 'File', knowledgebase: 'Knowledge Base', folder: 'Folder', + task: 'Task', + log: 'Log', + generic: 'Resource', } const ChatContextSchema = z.object({ From 7d4f7a097419d0b9340d3238d25003f203c1a346 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 15 Apr 2026 16:26:45 -0700 Subject: [PATCH 3/4] address comments --- apps/sim/app/api/copilot/chat/stop/route.test.ts | 2 +- apps/sim/app/api/copilot/chat/stop/route.ts | 7 +++---- apps/sim/lib/copilot/chat/effective-transcript.test.ts | 7 +++++-- apps/sim/lib/copilot/chat/effective-transcript.ts | 4 ++-- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/stop/route.test.ts b/apps/sim/app/api/copilot/chat/stop/route.test.ts index a624817bab..21c32e38a7 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.test.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.test.ts @@ -63,7 +63,7 @@ vi.mock('@/lib/copilot/tasks', () => ({ }, })) -import { POST } from './route' +import { POST } from '@/app/api/copilot/chat/stop/route' function createRequest(body: Record) { return new NextRequest('http://localhost:3000/api/copilot/chat/stop', { diff --git a/apps/sim/app/api/copilot/chat/stop/route.ts b/apps/sim/app/api/copilot/chat/stop/route.ts index b610736dc4..05e5935aa4 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.ts @@ -7,6 +7,7 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message' import { taskPubSub } from '@/lib/copilot/tasks' +import { generateId } from '@/lib/core/utils/uuid' const logger = createLogger('CopilotChatStopAPI') @@ -110,11 +111,9 @@ export async function POST(req: NextRequest) { : hasContent ? [{ type: 'text', channel: 'assistant', content }, { type: 'stopped' }] : [{ type: 'stopped' }] - const shouldAppendAssistant = canAppendAssistant - - if (shouldAppendAssistant) { + if (canAppendAssistant) { const normalized = normalizeMessage({ - id: crypto.randomUUID(), + id: generateId(), role: 'assistant', content, timestamp: new Date().toISOString(), diff --git a/apps/sim/lib/copilot/chat/effective-transcript.test.ts b/apps/sim/lib/copilot/chat/effective-transcript.test.ts index 2b17872241..285743d37a 100644 --- a/apps/sim/lib/copilot/chat/effective-transcript.test.ts +++ b/apps/sim/lib/copilot/chat/effective-transcript.test.ts @@ -3,6 +3,11 @@ */ import { describe, expect, it } from 'vitest' +import { + buildEffectiveChatTranscript, + getLiveAssistantMessageId, +} from '@/lib/copilot/chat/effective-transcript' +import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' import { MothershipStreamV1CompletionStatus, MothershipStreamV1EventType, @@ -10,8 +15,6 @@ import { MothershipStreamV1TextChannel, } from '@/lib/copilot/generated/mothership-stream-v1' import type { StreamBatchEvent } from '@/lib/copilot/request/session/types' -import { buildEffectiveChatTranscript, getLiveAssistantMessageId } from './effective-transcript' -import { normalizeMessage } from './persisted-message' function toBatchEvent(eventId: number, event: StreamBatchEvent['event']): StreamBatchEvent { return { diff --git a/apps/sim/lib/copilot/chat/effective-transcript.ts b/apps/sim/lib/copilot/chat/effective-transcript.ts index ba770a8c2b..339a326d12 100644 --- a/apps/sim/lib/copilot/chat/effective-transcript.ts +++ b/apps/sim/lib/copilot/chat/effective-transcript.ts @@ -1,3 +1,5 @@ +import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import { resolveStreamToolOutcome } from '@/lib/copilot/chat/stream-tool-outcome' import { MothershipStreamV1CompletionStatus, type MothershipStreamV1ErrorPayload, @@ -11,8 +13,6 @@ import { } from '@/lib/copilot/generated/mothership-stream-v1' import type { FilePreviewSession } from '@/lib/copilot/request/session/file-preview-session-contract' import type { StreamBatchEvent } from '@/lib/copilot/request/session/types' -import { normalizeMessage, type PersistedMessage } from './persisted-message' -import { resolveStreamToolOutcome } from './stream-tool-outcome' interface StreamSnapshotLike { events: StreamBatchEvent[] From c956577794649ea16298b34e9b2946fab37da358 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 15 Apr 2026 16:34:12 -0700 Subject: [PATCH 4/4] address comments --- apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 3a8e12ce7e..8575e9a1b4 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -1902,10 +1902,7 @@ export function useChat( content: streamingContentRef.current, contentBlocks: streamingBlocksRef.current, }) - const seededMessages = - assistantMessage.content || assistantMessage.contentBlocks?.length - ? [userMsg, assistantMessage] - : [userMsg] + const seededMessages = [userMsg, assistantMessage] queryClient.setQueryData(taskKeys.detail(payloadChatId), { id: payloadChatId, title: null,