From 0398a615de0ef970a3f261444b4205f0b8eefdff Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 28 May 2026 09:26:49 -0700 Subject: [PATCH 1/5] fix(mothership): persist queued messages, edit-in-place preserves order --- .../mothership-chat/mothership-chat.tsx | 9 + .../queued-messages/queued-messages.tsx | 229 ++++++++++------- .../app/workspace/[workspaceId]/home/home.tsx | 6 + .../[workspaceId]/home/hooks/use-chat.ts | 239 ++++++++++++++---- .../w/[workflowId]/components/panel/panel.tsx | 6 + apps/sim/hooks/queries/tasks.ts | 4 + .../sim/stores/mothership-queue/store.test.ts | 167 ++++++++++++ apps/sim/stores/mothership-queue/store.ts | 186 ++++++++++++++ apps/sim/stores/mothership-queue/types.ts | 36 +++ 9 files changed, 743 insertions(+), 139 deletions(-) create mode 100644 apps/sim/stores/mothership-queue/store.test.ts create mode 100644 apps/sim/stores/mothership-queue/store.ts create mode 100644 apps/sim/stores/mothership-queue/types.ts diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx index 809c190ffea..79fc042f5de 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-chat/mothership-chat.tsx @@ -41,9 +41,12 @@ interface MothershipChatProps { ) => void onStopGeneration: () => void messageQueue: QueuedMessage[] + editingQueuedId: string | null + dispatchingHeadId: string | null onRemoveQueuedMessage: (id: string) => void onSendQueuedMessage: (id: string) => Promise onEditQueuedMessage: (id: string) => QueuedMessage | undefined + onCancelQueueEdit: () => void userId?: string chatId?: string onContextAdd?: (context: ChatContext) => void @@ -183,9 +186,12 @@ export function MothershipChat({ onSubmit, onStopGeneration, messageQueue, + editingQueuedId, + dispatchingHeadId, onRemoveQueuedMessage, onSendQueuedMessage, onEditQueuedMessage, + onCancelQueueEdit, userId, chatId, onContextAdd, @@ -313,9 +319,12 @@ export function MothershipChat({
void onSendNow: (id: string) => Promise onEdit: (id: string) => void + onCancelEdit: () => void } -export function QueuedMessages({ messageQueue, onRemove, onSendNow, onEdit }: QueuedMessagesProps) { +export function QueuedMessages({ + messageQueue, + editingQueuedId, + dispatchingHeadId, + onRemove, + onSendNow, + onEdit, + onCancelEdit, +}: QueuedMessagesProps) { const [isExpanded, setIsExpanded] = useState(true) const [isNarrow, setIsNarrow] = useState(false) const roRef = useRef(null) @@ -57,101 +69,138 @@ export function QueuedMessages({ messageQueue, onRemove, onSendNow, onEdit }: Qu {isExpanded && (
- {messageQueue.map((msg) => ( -
-
-
-
+ {messageQueue.map((msg) => { + const isEditing = msg.id === editingQueuedId + const isDispatching = msg.id === dispatchingHeadId + return ( +
+
+
+
-
- -
+
+ +
- {msg.fileAttachments && msg.fileAttachments.length > 0 && ( - - - {isNarrow ? ( - - {msg.fileAttachments.length} - + {msg.fileAttachments && msg.fileAttachments.length > 0 && ( + + + {isNarrow ? ( + + {msg.fileAttachments.length} + + ) : ( + <> + {msg.fileAttachments[0].filename} + {msg.fileAttachments.length > 1 && ( + + +{msg.fileAttachments.length - 1} + + )} + + )} + + )} + +
+ {isEditing ? ( + + + + + + Cancel edit + + ) : ( <> - {msg.fileAttachments[0].filename} - {msg.fileAttachments.length > 1 && ( - - +{msg.fileAttachments.length - 1} - - )} - - )} - - )} - -
- - - - - - Edit queued message - - + + + + + + {isDispatching ? 'Sending now' : 'Edit queued message'} + + - - - - - - Send now - - + + + + + + Send now + + - - - - - - Remove from queue - - + + + + + + Remove from queue + + + + )} +
-
- ))} + ) + })}
)}
diff --git a/apps/sim/app/workspace/[workspaceId]/home/home.tsx b/apps/sim/app/workspace/[workspaceId]/home/home.tsx index 52a3bf4442c..2b89156f90f 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/home.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/home.tsx @@ -141,6 +141,9 @@ export function Home({ chatId }: HomeProps = {}) { removeFromQueue, sendNow, editQueuedMessage, + cancelQueueEdit, + editingQueuedId, + dispatchingHeadId, previewSession, genericResourceData, getCurrentRequestId, @@ -349,9 +352,12 @@ export function Home({ chatId }: HomeProps = {}) { onSubmit={handleSubmit} onStopGeneration={handleStopGeneration} messageQueue={messageQueue} + editingQueuedId={editingQueuedId} + dispatchingHeadId={dispatchingHeadId} onRemoveQueuedMessage={removeFromQueue} onSendQueuedMessage={sendNow} onEditQueuedMessage={editQueuedMessage} + onCancelQueueEdit={cancelQueueEdit} userId={session?.user?.id} chatId={resolvedChatId} onContextAdd={handleContextAdd} diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 3ec0168918f..374e46e17de 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -2,7 +2,7 @@ import { useCallback, useEffect, useMemo, useRef, useState } from 'react' import { createLogger } from '@sim/logger' import { getErrorMessage, toError } from '@sim/utils/errors' import { sleep } from '@sim/utils/helpers' -import { generateId } from '@sim/utils/id' +import { generateId, generateShortId } from '@sim/utils/id' import { useQueryClient } from '@tanstack/react-query' import { usePathname, useRouter } from 'next/navigation' import { requestJson } from '@/lib/api/client/request' @@ -133,6 +133,11 @@ import { workflowKeys } from '@/hooks/queries/workflows' import { workspaceFilesKeys } from '@/hooks/queries/workspace-files' import { useExecutionStream } from '@/hooks/use-execution-stream' import { useExecutionStore } from '@/stores/execution/store' +import { useMothershipQueueStore } from '@/stores/mothership-queue/store' +import type { + QueuedMothershipMessage, + QueuedSendHandoffSeed, +} from '@/stores/mothership-queue/types' import type { ChatContext } from '@/stores/panel' import { useTerminalConsoleStore } from '@/stores/terminal' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' @@ -174,6 +179,9 @@ export interface UseChatReturn { removeFromQueue: (id: string) => void sendNow: (id: string) => Promise editQueuedMessage: (id: string) => QueuedMessage | undefined + cancelQueueEdit: () => void + editingQueuedId: string | null + dispatchingHeadId: string | null previewSession: FilePreviewSession | null genericResourceData: GenericResourceData | null getCurrentRequestId: () => string | undefined @@ -209,6 +217,13 @@ const QUEUED_SEND_HANDOFF_CLAIM_TTL_MS = 30_000 const QUEUED_SEND_HANDOFF_RETRY_BASE_MS = 1000 const QUEUED_SEND_HANDOFF_RETRY_MAX_MS = 30_000 +/** + * Stable reference returned by the queue selector when a chat has no bucket. + * Sharing one array keeps Zustand's equality check from re-rendering on + * every store write. + */ +const EMPTY_MESSAGE_QUEUE: QueuedMothershipMessage[] = [] + const logger = createLogger('useChat') type StreamPayload = Record @@ -237,17 +252,6 @@ interface QueuedSendHandoffState { resolveAttempts?: number } -interface QueuedSendHandoffSeed { - id: string - chatId?: string - supersededStreamId: string | null - userMessageId?: string -} - -type QueuedChatMessage = QueuedMessage & { - queuedSendHandoff?: QueuedSendHandoffSeed -} - interface QueuedSendHandoffClaim { id: string ownerId: string @@ -1540,6 +1544,7 @@ export function useChat( queueDispatchActionsRef.current = [] queuedMessageDispatchIdsRef.current.clear() queueDispatchTaskRef.current = null + setDispatchingHeadId(null) }, []) const resourcesRef = useRef(resources) resourcesRef.current = resources @@ -1708,9 +1713,21 @@ export function useChat( [removePreviewSession, syncPreviewSessionRefs] ) - const [messageQueue, setMessageQueue] = useState([]) - const messageQueueRef = useRef([]) - messageQueueRef.current = messageQueue + /** + * `pendingChatKeyRef` always holds the sentinel key used while no `chatId` + * is resolved. On first send, `adoptResolvedChatId` migrates the sentinel + * bucket to the real `chatId`. When the route drops back to home, we mint + * a fresh sentinel so a new pending chat starts with an empty bucket. + */ + const pendingChatKeyRef = useRef(`pending::${generateShortId()}`) + const [chatKey, setChatKey] = useState(initialChatId ?? pendingChatKeyRef.current) + const chatKeyRef = useRef(chatKey) + chatKeyRef.current = chatKey + const messageQueue = useMothershipQueueStore( + (state) => state.queues[chatKey] ?? EMPTY_MESSAGE_QUEUE + ) + const editingQueuedId = useMothershipQueueStore((state) => state.editing[chatKey] ?? null) + const [dispatchingHeadId, setDispatchingHeadId] = useState(null) const queuedMessageDispatchIdsRef = useRef>(new Set()) const queueDispatchActionsRef = useRef([]) const queueDispatchTaskRef = useRef | null>(null) @@ -1976,7 +1993,17 @@ export function useChat( inFlightResourceAddsRef.current.clear() reorderNeededAfterFlushRef.current = false resetEphemeralPreviewState() - setMessageQueue([]) + /** + * Queue state is owned by `useMothershipQueueStore` and persists across + * navigation. The dispatch loop is per-instance, so cancel it but leave + * the queued messages — they belong to the chat we just left. The editing + * binding, by contrast, is scoped to this hook's composer and cannot + * survive a chat switch, so release it on the chat we just left. + */ + useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) + pendingChatKeyRef.current = `pending::${generateShortId()}` + chatKeyRef.current = pendingChatKeyRef.current + setChatKey(pendingChatKeyRef.current) clearQueueDispatchState() }, [ cancelActiveStreamRecovery, @@ -2029,6 +2056,26 @@ export function useChat( (chatId: string, options?: { replaceHomeHistory?: boolean; invalidateList?: boolean }) => { const selectedChatId = selectedChatIdRef.current chatIdRef.current = chatId + /** + * Move the queue bucket that holds messages enqueued during this stream + * onto the resolved chatId. The source is the pending sentinel that was + * active when this stream started — not `chatKeyRef.current`, which may + * have moved to a different chat if the user navigated away mid-stream. + * Using the sentinel keeps the other chat's queue untouched. + */ + if (pendingChatKeyRef.current !== chatId) { + useMothershipQueueStore.getState().migrate(pendingChatKeyRef.current, chatId) + } + /** + * Only point our visible chatKey at the resolved id when the user is + * still viewing the chat that just resolved — otherwise we'd steal them + * back to the original session. + */ + const stillViewingResolvedChat = !selectedChatId || selectedChatId === chatId + if (stillViewingResolvedChat && chatKeyRef.current !== chatId) { + chatKeyRef.current = chatId + setChatKey(chatId) + } if (!selectedChatId || selectedChatId === chatId) { setResolvedChatId(chatId) } @@ -2287,7 +2334,26 @@ export function useChat( inFlightResourceAddsRef.current.clear() reorderNeededAfterFlushRef.current = false resetEphemeralPreviewState() - setMessageQueue([]) + /** + * Queue buckets are owned by `useMothershipQueueStore` and persist across + * chat switches. We just rotate the bucket key so selectors read the new + * chat's queue; the previous chat's queue is left intact for its return. + * The editing binding is composer-scoped, so we release it on the chat + * we're leaving before swapping buckets. + */ + if (chatKeyRef.current !== (initialChatId ?? '')) { + useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) + } + if (initialChatId) { + if (chatKeyRef.current !== initialChatId) { + chatKeyRef.current = initialChatId + setChatKey(initialChatId) + } + } else { + pendingChatKeyRef.current = `pending::${generateShortId()}` + chatKeyRef.current = pendingChatKeyRef.current + setChatKey(pendingChatKeyRef.current) + } clearQueueDispatchState() }, [ initialChatId, @@ -4409,7 +4475,8 @@ export function useChat( */ const notifyTurnEnded = useCallback( (options: { error: boolean; skipQueueDispatch?: boolean }) => { - const hasQueuedFollowUp = !options.error && messageQueueRef.current.length > 0 + const queue = useMothershipQueueStore.getState().queues[chatKeyRef.current] + const hasQueuedFollowUp = !options.error && (queue?.length ?? 0) > 0 if (!options.error) { const cid = chatIdRef.current if (cid && onStreamEndRef.current) { @@ -4429,7 +4496,7 @@ export function useChat( message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[] - ): QueuedChatMessage => { + ): QueuedMothershipMessage => { const id = generateId() const handoffChatId = selectedChatIdRef.current ?? chatIdRef.current const cachedActiveStreamId = handoffChatId @@ -4464,7 +4531,8 @@ export function useChat( const finalize = useCallback( (options?: { error?: boolean; targetChatId?: string }) => { const isError = !!options?.error - const hasQueuedFollowUp = !isError && messageQueueRef.current.length > 0 + const queue = useMothershipQueueStore.getState().queues[chatKeyRef.current] + const hasQueuedFollowUp = !isError && (queue?.length ?? 0) > 0 reconcileTerminalPreviewSessions() locallyTerminalStreamIdRef.current = streamIdRef.current ?? activeTurnRef.current?.userMessageId ?? undefined @@ -4893,19 +4961,38 @@ export function useChat( async (message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[]) => { if (!message.trim() || !workspaceId) return + const queueStore = useMothershipQueueStore.getState() + const activeChatKey = chatKeyRef.current + const editingId = queueStore.editing[activeChatKey] ?? null + + /** + * Edit-in-place: the user submitted while a queued slot was bound to the + * composer. Replace at the original index instead of appending — same id, + * same position. If the slot was dispatched between edit and submit (only + * possible for the head if the UI guard was bypassed) fall through and + * enqueue as a normal new message. + */ + if (editingId) { + const existing = queueStore.queues[activeChatKey] ?? [] + if (existing.some((m) => m.id === editingId)) { + queueStore.replaceAt(activeChatKey, editingId, { + content: message, + fileAttachments, + contexts, + }) + queueStore.setEditing(activeChatKey, null) + return + } + queueStore.setEditing(activeChatKey, null) + } + if (sendingRef.current) { - setMessageQueue((prev) => [ - ...prev, - createQueuedMessage(message, fileAttachments, contexts), - ]) + queueStore.enqueue(activeChatKey, createQueuedMessage(message, fileAttachments, contexts)) return } if (pendingStopPromiseRef.current) { - setMessageQueue((prev) => [ - ...prev, - createQueuedMessage(message, fileAttachments, contexts), - ]) + queueStore.enqueue(activeChatKey, createQueuedMessage(message, fileAttachments, contexts)) void enqueueQueueDispatchRef.current({ type: 'send_head' }) return } @@ -5429,7 +5516,7 @@ export function useChat( const dispatchQueuedMessage = useCallback( async ( - msg: QueuedChatMessage, + msg: QueuedMothershipMessage, options: { epoch: number pendingStop?: Promise | null @@ -5441,19 +5528,24 @@ export function useChat( } queuedMessageDispatchIdsRef.current.add(msg.id) - let originalIndex = messageQueueRef.current.findIndex((queued) => queued.id === msg.id) + const dispatchChatKey = chatKeyRef.current + const queueAtStart = + useMothershipQueueStore.getState().queues[dispatchChatKey] ?? EMPTY_MESSAGE_QUEUE + let originalIndex = queueAtStart.findIndex((queued) => queued.id === msg.id) if (originalIndex === -1) { queuedMessageDispatchIdsRef.current.delete(msg.id) return } + setDispatchingHeadId(msg.id) + let removedFromQueue = false const removeQueuedMessage = () => { if (removedFromQueue || options.epoch !== queueDispatchEpochRef.current) { return } removedFromQueue = true - setMessageQueue((prev) => prev.filter((queued) => queued.id !== msg.id)) + useMothershipQueueStore.getState().remove(dispatchChatKey, msg.id) } const restoreQueuedMessage = (handoff?: QueuedSendHandoffSeed) => { @@ -5464,26 +5556,32 @@ export function useChat( if (!removedFromQueue || options.epoch !== queueDispatchEpochRef.current) { return } - setMessageQueue((prev) => { - if (prev.some((queued) => queued.id === msg.id)) return prev - const next = [...prev] - next.splice(Math.min(originalIndex, next.length), 0, msg) - return next - }) + useMothershipQueueStore.getState().insertAt(dispatchChatKey, originalIndex, msg) } - const activeQueuedSendHandoff = options.queuedSendHandoff ?? msg.queuedSendHandoff + let activeQueuedSendHandoff: QueuedSendHandoffSeed | undefined = + options.queuedSendHandoff ?? msg.queuedSendHandoff try { - const currentIndex = messageQueueRef.current.findIndex((queued) => queued.id === msg.id) + const queueAtSend = + useMothershipQueueStore.getState().queues[dispatchChatKey] ?? EMPTY_MESSAGE_QUEUE + const currentIndex = queueAtSend.findIndex((queued) => queued.id === msg.id) if (currentIndex === -1) { return } originalIndex = currentIndex + /** + * Read the live message from the store, not the closure-captured `msg`. + * Between when the dispatcher was scheduled and now, the user may have + * applied an in-place edit (`replaceAt`) — we must send the latest + * content, not the pre-edit snapshot. + */ + const liveMsg = queueAtSend[currentIndex] + activeQueuedSendHandoff = options.queuedSendHandoff ?? liveMsg.queuedSendHandoff const consumed = await startSendMessage( - msg.content, - msg.fileAttachments, - msg.contexts, + liveMsg.content, + liveMsg.fileAttachments, + liveMsg.contexts, options.pendingStop, removeQueuedMessage, activeQueuedSendHandoff @@ -5495,6 +5593,7 @@ export function useChat( } catch { restoreQueuedMessage(activeQueuedSendHandoff) } finally { + setDispatchingHeadId((current) => (current === msg.id ? null : current)) queuedMessageDispatchIdsRef.current.delete(msg.id) } }, @@ -5515,7 +5614,8 @@ export function useChat( continue } - const msg = messageQueueRef.current[0] + const queue = useMothershipQueueStore.getState().queues[chatKeyRef.current] + const msg = queue?.[0] if (!msg) continue await dispatchQueuedMessage(msg, { epoch: action.epoch }) @@ -5545,12 +5645,13 @@ export function useChat( const removeFromQueue = useCallback((id: string) => { clearQueuedSendHandoffState(id) clearQueuedSendHandoffClaim(id) - setMessageQueue((prev) => prev.filter((m) => m.id !== id)) + useMothershipQueueStore.getState().remove(chatKeyRef.current, id) }, []) const sendQueuedMessageImmediately = useCallback( async (id: string) => { - const msg = messageQueueRef.current.find((queued) => queued.id === id) + const queue = useMothershipQueueStore.getState().queues[chatKeyRef.current] + const msg = queue?.find((queued) => queued.id === id) if (!msg) return if (queuedMessageDispatchIdsRef.current.has(msg.id)) return @@ -5603,14 +5704,45 @@ export function useChat( ) const editQueuedMessage = useCallback((id: string): QueuedMessage | undefined => { - const msg = messageQueueRef.current.find((m) => m.id === id) + /** + * Reject edits on a message that is currently being dispatched — the send + * is already in flight and `removeQueuedMessage` will drop the slot when + * it completes, so accepting the edit would silently lose the user's + * new content. The UI also disables this affordance via `dispatchingHeadId`. + */ + if (queuedMessageDispatchIdsRef.current.has(id)) return undefined + const activeChatKey = chatKeyRef.current + const queue = useMothershipQueueStore.getState().queues[activeChatKey] ?? EMPTY_MESSAGE_QUEUE + const msg = queue.find((m) => m.id === id) if (!msg) return undefined - clearQueuedSendHandoffState(id) - clearQueuedSendHandoffClaim(id) - setMessageQueue((prev) => prev.filter((m) => m.id !== id)) + useMothershipQueueStore.getState().setEditing(activeChatKey, id) return msg }, []) + const cancelQueueEdit = useCallback(() => { + useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) + }, []) + + /** + * Resume draining when a non-empty queue rehydrates with no active stream — + * e.g. user returns to a chat after navigating away. We wait until the chat + * history confirms there is no `activeStreamId` so we don't race the + * reconnect path; when a stream is in flight, `finalize -> notifyTurnEnded` + * already drains the queue on completion. Idempotent — the dispatch loop + * short-circuits if a task is already running. + */ + const chatHistoryReady = chatHistory !== undefined + const remoteActiveStreamId = chatHistory?.activeStreamId ?? null + useEffect(() => { + if (!workspaceId) return + if (messageQueue.length === 0) return + if (sendingRef.current || pendingStopPromiseRef.current) return + if (queueDispatchTaskRef.current) return + if (resolvedChatId && !chatHistoryReady) return + if (remoteActiveStreamId) return + void enqueueQueueDispatchRef.current({ type: 'send_head' }) + }, [workspaceId, messageQueue.length, resolvedChatId, chatHistoryReady, remoteActiveStreamId]) + useEffect(() => { return () => { cancelActiveStreamRecovery() @@ -5621,6 +5753,12 @@ export function useChat( abortControllerRef.current = null clearActiveTurn() sendingRef.current = false + /** + * Editing state binds a queued slot to this hook's composer ref; once + * the hook unmounts there is nothing left to drive the bound composer, + * so we release the slot. The queued message itself stays intact. + */ + useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) } }, [ cancelActiveStreamRecovery, @@ -5647,6 +5785,9 @@ export function useChat( removeFromQueue, sendNow, editQueuedMessage, + cancelQueueEdit, + editingQueuedId, + dispatchingHeadId, previewSession, genericResourceData, getCurrentRequestId, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx index 059db45a53a..251bb733fc0 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/panel.tsx @@ -347,6 +347,9 @@ export const Panel = memo(function Panel({ workspaceId: propWorkspaceId }: Panel removeFromQueue: copilotRemoveFromQueue, sendNow: copilotSendNow, editQueuedMessage: copilotEditQueuedMessage, + cancelQueueEdit: copilotCancelQueueEdit, + editingQueuedId: copilotEditingQueuedId, + dispatchingHeadId: copilotDispatchingHeadId, getCurrentRequestId: getCopilotCurrentRequestId, } = useChat( workspaceId, @@ -885,9 +888,12 @@ export const Panel = memo(function Panel({ workspaceId: propWorkspaceId }: Panel onSubmit={handleCopilotSubmit} onStopGeneration={handleCopilotStopGeneration} messageQueue={copilotMessageQueue} + editingQueuedId={copilotEditingQueuedId} + dispatchingHeadId={copilotDispatchingHeadId} onRemoveQueuedMessage={copilotRemoveFromQueue} onSendQueuedMessage={copilotSendNow} onEditQueuedMessage={copilotEditQueuedMessage} + onCancelQueueEdit={copilotCancelQueueEdit} userId={session?.user?.id} chatId={copilotResolvedChatId} layout='copilot-view' diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts index 0aff534a77b..6a35bfbf363 100644 --- a/apps/sim/hooks/queries/tasks.ts +++ b/apps/sim/hooks/queries/tasks.ts @@ -27,6 +27,7 @@ import { } from '@/lib/copilot/request/session/file-preview-session-contract' import { isStreamBatchEvent, type StreamBatchEvent } from '@/lib/copilot/request/session/types' import { type MothershipResource, MothershipResourceType } from '@/lib/copilot/resources/types' +import { useMothershipQueueStore } from '@/stores/mothership-queue/store' export interface TaskMetadata { id: string @@ -281,6 +282,7 @@ export function useDeleteTask(workspaceId?: string) { onSettled: (_data, _error, chatId) => { queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) queryClient.removeQueries({ queryKey: taskKeys.detail(chatId) }) + useMothershipQueueStore.getState().clearChat(chatId) }, }) } @@ -296,8 +298,10 @@ export function useDeleteTasks(workspaceId?: string) { }, onSettled: (_data, _error, chatIds) => { queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) + const queueStore = useMothershipQueueStore.getState() for (const chatId of chatIds) { queryClient.removeQueries({ queryKey: taskKeys.detail(chatId) }) + queueStore.clearChat(chatId) } }, }) diff --git a/apps/sim/stores/mothership-queue/store.test.ts b/apps/sim/stores/mothership-queue/store.test.ts new file mode 100644 index 00000000000..344b8c10911 --- /dev/null +++ b/apps/sim/stores/mothership-queue/store.test.ts @@ -0,0 +1,167 @@ +/** + * @vitest-environment node + */ + +import { beforeEach, describe, expect, it } from 'vitest' +import { useMothershipQueueStore } from '@/stores/mothership-queue/store' +import type { QueuedMothershipMessage } from '@/stores/mothership-queue/types' + +const message = (id: string, content = `content-${id}`): QueuedMothershipMessage => ({ + id, + content, +}) + +describe('useMothershipQueueStore', () => { + beforeEach(() => { + useMothershipQueueStore.getState().reset() + }) + + describe('enqueue / remove', () => { + it('appends to the chat bucket', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().enqueue('chat-A', message('m2')) + expect(useMothershipQueueStore.getState().queues['chat-A']?.map((m) => m.id)).toEqual([ + 'm1', + 'm2', + ]) + }) + + it('keeps buckets isolated per chat', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().enqueue('chat-B', message('n1')) + const state = useMothershipQueueStore.getState() + expect(state.queues['chat-A']?.map((m) => m.id)).toEqual(['m1']) + expect(state.queues['chat-B']?.map((m) => m.id)).toEqual(['n1']) + }) + + it('removes the chat bucket entirely when the last message is removed', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().remove('chat-A', 'm1') + expect(useMothershipQueueStore.getState().queues['chat-A']).toBeUndefined() + }) + + it('clears editing when the editing message is removed', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().setEditing('chat-A', 'm1') + useMothershipQueueStore.getState().remove('chat-A', 'm1') + expect(useMothershipQueueStore.getState().editing['chat-A']).toBeUndefined() + }) + + it('preserves editing when a different message is removed', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().enqueue('chat-A', message('m2')) + useMothershipQueueStore.getState().setEditing('chat-A', 'm1') + useMothershipQueueStore.getState().remove('chat-A', 'm2') + expect(useMothershipQueueStore.getState().editing['chat-A']).toBe('m1') + }) + }) + + describe('insertAt', () => { + it('inserts at the requested index', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().enqueue('chat-A', message('m3')) + useMothershipQueueStore.getState().insertAt('chat-A', 1, message('m2')) + expect(useMothershipQueueStore.getState().queues['chat-A']?.map((m) => m.id)).toEqual([ + 'm1', + 'm2', + 'm3', + ]) + }) + + it('clamps an out-of-range index to the end', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().insertAt('chat-A', 99, message('m2')) + expect(useMothershipQueueStore.getState().queues['chat-A']?.map((m) => m.id)).toEqual([ + 'm1', + 'm2', + ]) + }) + + it('ignores duplicate ids', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().insertAt('chat-A', 0, message('m1')) + expect(useMothershipQueueStore.getState().queues['chat-A']?.length).toBe(1) + }) + }) + + describe('replaceAt', () => { + it('overwrites content while preserving id and index', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1', 'orig-1')) + useMothershipQueueStore.getState().enqueue('chat-A', message('m2', 'orig-2')) + useMothershipQueueStore.getState().enqueue('chat-A', message('m3', 'orig-3')) + + useMothershipQueueStore.getState().replaceAt('chat-A', 'm2', { content: 'edited-2' }) + + const queue = useMothershipQueueStore.getState().queues['chat-A'] + expect(queue?.map((m) => m.id)).toEqual(['m1', 'm2', 'm3']) + expect(queue?.[1]?.content).toBe('edited-2') + }) + + it('is a no-op when the id is no longer in the queue', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + const before = useMothershipQueueStore.getState().queues['chat-A'] + useMothershipQueueStore.getState().replaceAt('chat-A', 'missing', { content: 'x' }) + expect(useMothershipQueueStore.getState().queues['chat-A']).toBe(before) + }) + + it('strips queuedSendHandoff on edit so a fresh handoff is minted at send time', () => { + const original: QueuedMothershipMessage = { + id: 'm1', + content: 'orig', + queuedSendHandoff: { id: 'm1', supersededStreamId: 'stream-x' }, + } + useMothershipQueueStore.getState().enqueue('chat-A', original) + useMothershipQueueStore.getState().replaceAt('chat-A', 'm1', { content: 'edited' }) + const replaced = useMothershipQueueStore.getState().queues['chat-A']?.[0] + expect(replaced?.queuedSendHandoff).toBeUndefined() + expect(replaced?.content).toBe('edited') + }) + }) + + describe('migrate', () => { + it('moves both queue and editing from sentinel to resolved chatId', () => { + const pendingKey = 'pending::abc' + useMothershipQueueStore.getState().enqueue(pendingKey, message('m1')) + useMothershipQueueStore.getState().setEditing(pendingKey, 'm1') + useMothershipQueueStore.getState().migrate(pendingKey, 'chat-X') + const state = useMothershipQueueStore.getState() + expect(state.queues[pendingKey]).toBeUndefined() + expect(state.editing[pendingKey]).toBeUndefined() + expect(state.queues['chat-X']?.map((m) => m.id)).toEqual(['m1']) + expect(state.editing['chat-X']).toBe('m1') + }) + + it('is a no-op when source and target are the same', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + const before = useMothershipQueueStore.getState().queues['chat-A'] + useMothershipQueueStore.getState().migrate('chat-A', 'chat-A') + expect(useMothershipQueueStore.getState().queues['chat-A']).toBe(before) + }) + + it('is a no-op when the source bucket is empty', () => { + const before = useMothershipQueueStore.getState().queues + useMothershipQueueStore.getState().migrate('nope', 'chat-X') + expect(useMothershipQueueStore.getState().queues).toBe(before) + }) + }) + + describe('clearChat', () => { + it('drops queue and editing for the chat', () => { + useMothershipQueueStore.getState().enqueue('chat-A', message('m1')) + useMothershipQueueStore.getState().setEditing('chat-A', 'm1') + useMothershipQueueStore.getState().clearChat('chat-A') + const state = useMothershipQueueStore.getState() + expect(state.queues['chat-A']).toBeUndefined() + expect(state.editing['chat-A']).toBeUndefined() + }) + }) + + describe('setEditing', () => { + it('stores and clears the editing id', () => { + useMothershipQueueStore.getState().setEditing('chat-A', 'm1') + expect(useMothershipQueueStore.getState().editing['chat-A']).toBe('m1') + useMothershipQueueStore.getState().setEditing('chat-A', null) + expect(useMothershipQueueStore.getState().editing['chat-A']).toBeUndefined() + }) + }) +}) diff --git a/apps/sim/stores/mothership-queue/store.ts b/apps/sim/stores/mothership-queue/store.ts new file mode 100644 index 00000000000..1087ea3bb0e --- /dev/null +++ b/apps/sim/stores/mothership-queue/store.ts @@ -0,0 +1,186 @@ +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { create } from 'zustand' +import { createJSONStorage, devtools, persist } from 'zustand/middleware' +import type { MothershipQueueState, QueuedMothershipMessage } from '@/stores/mothership-queue/types' + +const logger = createLogger('MothershipQueueStore') + +/** + * Per-tab sessionStorage adapter. No-ops on SSR and tolerates quota errors so + * a transient storage failure can never crash a render. + * + * The queue persists to **sessionStorage** rather than localStorage (which is + * what `mothership-drafts` uses) on purpose: a queued message carries + * intent-to-send, and the rehydrate path auto-drains the queue once chat + * history confirms there's no active server stream. Tab close should not + * fire those sends days later, so sessionStorage caps the replay window at + * the lifetime of the tab. + */ +const sessionStorageAdapter = { + getItem: (name: string): string | null => { + if (typeof sessionStorage === 'undefined') return null + try { + return sessionStorage.getItem(name) + } catch (error) { + logger.warn('Failed to read mothership queue from sessionStorage', toError(error)) + return null + } + }, + setItem: (name: string, value: string): void => { + if (typeof sessionStorage === 'undefined') return + try { + sessionStorage.setItem(name, value) + } catch (error) { + logger.warn('Failed to persist mothership queue to sessionStorage', toError(error)) + } + }, + removeItem: (name: string): void => { + if (typeof sessionStorage === 'undefined') return + try { + sessionStorage.removeItem(name) + } catch (error) { + logger.warn('Failed to remove mothership queue from sessionStorage', toError(error)) + } + }, +} + +const initialState = { + queues: {} as Record, + editing: {} as Record, +} + +const omitKey = (record: Record, key: string): Record => { + if (!(key in record)) return record + const { [key]: _removed, ...rest } = record + return rest +} + +const setQueueForChat = ( + queues: Record, + chatKey: string, + next: QueuedMothershipMessage[] +): Record => + next.length === 0 ? omitKey(queues, chatKey) : { ...queues, [chatKey]: next } + +/** + * Drops the volatile `queuedSendHandoff` so the persisted snapshot only carries + * data that remains meaningful after reload. Reconstruction on next dispatch is + * unnecessary because rehydrate happens outside any active-stream lifecycle. + */ +const stripVolatile = (message: QueuedMothershipMessage): QueuedMothershipMessage => { + if (!message.queuedSendHandoff) return message + const { queuedSendHandoff: _drop, ...rest } = message + return rest +} + +export const useMothershipQueueStore = create()( + devtools( + persist( + (set) => ({ + ...initialState, + + enqueue: (chatKey, message) => + set((state) => ({ + queues: setQueueForChat(state.queues, chatKey, [ + ...(state.queues[chatKey] ?? []), + message, + ]), + })), + + insertAt: (chatKey, index, message) => + set((state) => { + const current = state.queues[chatKey] ?? [] + if (current.some((m) => m.id === message.id)) return state + const next = [...current] + next.splice(Math.max(0, Math.min(index, next.length)), 0, message) + return { queues: setQueueForChat(state.queues, chatKey, next) } + }), + + replaceAt: (chatKey, id, patch) => + set((state) => { + const current = state.queues[chatKey] ?? [] + const index = current.findIndex((m) => m.id === id) + if (index === -1) return state + const next = [...current] + /** + * Strip `queuedSendHandoff` on edit. The original handoff was + * created when the slot was first enqueued and references the + * stream that was active at that moment; once the user changes + * the payload, that handoff is no longer valid. The dispatcher + * (or `sendQueuedMessageImmediately`) will mint a fresh handoff + * at send time if the active-stream lifecycle still needs one. + */ + const { queuedSendHandoff: _stale, ...rest } = next[index] + next[index] = { + ...rest, + content: patch.content, + fileAttachments: patch.fileAttachments, + contexts: patch.contexts, + } + return { queues: setQueueForChat(state.queues, chatKey, next) } + }), + + remove: (chatKey, id) => + set((state) => { + const current = state.queues[chatKey] ?? [] + const next = current.filter((m) => m.id !== id) + const wasEditingThis = state.editing[chatKey] === id + if (next.length === current.length) { + return wasEditingThis ? { editing: omitKey(state.editing, chatKey) } : state + } + return { + queues: setQueueForChat(state.queues, chatKey, next), + ...(wasEditingThis ? { editing: omitKey(state.editing, chatKey) } : {}), + } + }), + + setEditing: (chatKey, id) => + set((state) => ({ + editing: + id === null ? omitKey(state.editing, chatKey) : { ...state.editing, [chatKey]: id }, + })), + + migrate: (fromKey, toKey) => + set((state) => { + if (fromKey === toKey) return state + const fromQueue = state.queues[fromKey] + const fromEditing = state.editing[fromKey] + if (!fromQueue && fromEditing === undefined) return state + + const queues = omitKey(state.queues, fromKey) + if (fromQueue && fromQueue.length > 0) { + queues[toKey] = fromQueue + } + const editing = omitKey(state.editing, fromKey) + if (fromEditing !== undefined) { + editing[toKey] = fromEditing + } + return { queues, editing } + }), + + clearChat: (chatKey) => + set((state) => ({ + queues: omitKey(state.queues, chatKey), + editing: omitKey(state.editing, chatKey), + })), + + reset: () => set(initialState), + }), + { + name: 'mothership-queue', + storage: createJSONStorage(() => sessionStorageAdapter), + partialize: (state) => ({ + queues: Object.fromEntries( + Object.entries(state.queues).map(([key, messages]) => [ + key, + messages.map(stripVolatile), + ]) + ), + editing: state.editing, + }), + } + ), + { name: 'mothership-queue-store' } + ) +) diff --git a/apps/sim/stores/mothership-queue/types.ts b/apps/sim/stores/mothership-queue/types.ts new file mode 100644 index 00000000000..c28636b2945 --- /dev/null +++ b/apps/sim/stores/mothership-queue/types.ts @@ -0,0 +1,36 @@ +import type { QueuedMessage } from '@/app/workspace/[workspaceId]/home/types' + +/** + * Volatile metadata attached to a queued message so the dispatcher can claim + * an in-flight stream's slot at hand-off time. Not persisted across reload. + */ +export interface QueuedSendHandoffSeed { + id: string + chatId?: string + supersededStreamId: string | null + userMessageId?: string +} + +export type QueuedMothershipMessage = QueuedMessage & { + queuedSendHandoff?: QueuedSendHandoffSeed +} + +/** + * Mutable fields that an in-place edit may overwrite. `id` and index are + * preserved by {@link MothershipQueueState.replaceAt}. + */ +export type QueuedMessageEditPatch = Pick + +export interface MothershipQueueState { + queues: Record + editing: Record + + enqueue: (chatKey: string, message: QueuedMothershipMessage) => void + insertAt: (chatKey: string, index: number, message: QueuedMothershipMessage) => void + replaceAt: (chatKey: string, id: string, patch: QueuedMessageEditPatch) => void + remove: (chatKey: string, id: string) => void + setEditing: (chatKey: string, id: string | null) => void + migrate: (fromKey: string, toKey: string) => void + clearChat: (chatKey: string) => void + reset: () => void +} From 7482443ad1fe27c8e2c5b468cfa23365dccea9f4 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 28 May 2026 10:00:36 -0700 Subject: [PATCH 2/5] fix(mothership): pause drain while head is in edit, restore handoff cleanup on edit, merge on migrate --- .../[workspaceId]/home/hooks/use-chat.ts | 36 +++++++++++++++++-- .../sim/stores/mothership-queue/store.test.ts | 13 +++++++ apps/sim/stores/mothership-queue/store.ts | 11 +++++- 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 374e46e17de..9350f0bf9d8 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -4981,6 +4981,14 @@ export function useChat( contexts, }) queueStore.setEditing(activeChatKey, null) + /** + * If the dispatcher paused on this slot (head-was-editing), resume. + * `enqueueQueueDispatch` is idempotent against an in-flight loop and + * the dispatcher itself guards against double-sending the same id. + */ + if (!sendingRef.current && !pendingStopPromiseRef.current) { + void enqueueQueueDispatchRef.current({ type: 'send_head' }) + } return } queueStore.setEditing(activeChatKey, null) @@ -5614,9 +5622,17 @@ export function useChat( continue } - const queue = useMothershipQueueStore.getState().queues[chatKeyRef.current] - const msg = queue?.[0] + const queueState = useMothershipQueueStore.getState() + const activeChatKey = chatKeyRef.current + const msg = queueState.queues[activeChatKey]?.[0] if (!msg) continue + /** + * Pause draining while the head is bound to the composer. Dispatching + * would either send the pre-edit content (losing the user's in-flight + * edit) or remove the slot mid-edit (turning the eventual submit into + * a tail-append). The next kick after `setEditing(null)` resumes us. + */ + if (queueState.editing[activeChatKey] === msg.id) continue await dispatchQueuedMessage(msg, { epoch: action.epoch }) } @@ -5715,12 +5731,28 @@ export function useChat( const queue = useMothershipQueueStore.getState().queues[activeChatKey] ?? EMPTY_MESSAGE_QUEUE const msg = queue.find((m) => m.id === id) if (!msg) return undefined + /** + * Evict any sessionStorage handoff record for this id. If a prior dispatch + * attempt failed after writing the handoff (which carries a snapshot of + * the message content), leaving it in place would let the recovery effect + * replay the stale pre-edit content even though the in-memory store has + * the new content. + */ + clearQueuedSendHandoffState(id) + clearQueuedSendHandoffClaim(id) useMothershipQueueStore.getState().setEditing(activeChatKey, id) return msg }, []) const cancelQueueEdit = useCallback(() => { useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) + /** + * Resume dispatch if it paused on this slot. Original content remains in + * the queue, so the next drain will send what was there before the edit. + */ + if (!sendingRef.current && !pendingStopPromiseRef.current) { + void enqueueQueueDispatchRef.current({ type: 'send_head' }) + } }, []) /** diff --git a/apps/sim/stores/mothership-queue/store.test.ts b/apps/sim/stores/mothership-queue/store.test.ts index 344b8c10911..c3091927308 100644 --- a/apps/sim/stores/mothership-queue/store.test.ts +++ b/apps/sim/stores/mothership-queue/store.test.ts @@ -143,6 +143,19 @@ describe('useMothershipQueueStore', () => { useMothershipQueueStore.getState().migrate('nope', 'chat-X') expect(useMothershipQueueStore.getState().queues).toBe(before) }) + + it('merges into an existing destination bucket instead of overwriting', () => { + useMothershipQueueStore.getState().enqueue('chat-X', message('existing-1')) + useMothershipQueueStore.getState().enqueue('chat-X', message('existing-2')) + useMothershipQueueStore.getState().enqueue('pending::abc', message('pending-1')) + useMothershipQueueStore.getState().migrate('pending::abc', 'chat-X') + expect(useMothershipQueueStore.getState().queues['chat-X']?.map((m) => m.id)).toEqual([ + 'existing-1', + 'existing-2', + 'pending-1', + ]) + expect(useMothershipQueueStore.getState().queues['pending::abc']).toBeUndefined() + }) }) describe('clearChat', () => { diff --git a/apps/sim/stores/mothership-queue/store.ts b/apps/sim/stores/mothership-queue/store.ts index 1087ea3bb0e..aecd2e3a843 100644 --- a/apps/sim/stores/mothership-queue/store.ts +++ b/apps/sim/stores/mothership-queue/store.ts @@ -150,7 +150,16 @@ export const useMothershipQueueStore = create()( const queues = omitKey(state.queues, fromKey) if (fromQueue && fromQueue.length > 0) { - queues[toKey] = fromQueue + /** + * Merge into any existing destination bucket rather than + * overwriting. In the normal `adoptResolvedChatId` flow the + * destination is a fresh chatId with no prior bucket, but if + * a stale entry survives in sessionStorage we'd silently lose + * the user's pending messages on overwrite. Appending keeps + * FIFO order (existing first, then the resolved-stream sends). + */ + const existing = state.queues[toKey] ?? [] + queues[toKey] = [...existing, ...fromQueue] } const editing = omitKey(state.editing, fromKey) if (fromEditing !== undefined) { From 3d1ec54b54484ac44b08ae933b785ea80b5687c1 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 28 May 2026 10:28:26 -0700 Subject: [PATCH 3/5] improvement(mothership): strip editing on persist; tighten comments to codebase style --- .../[workspaceId]/home/hooks/use-chat.ts | 119 +++++------------- apps/sim/stores/mothership-queue/store.ts | 44 +++---- apps/sim/stores/mothership-queue/types.ts | 10 +- 3 files changed, 44 insertions(+), 129 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 9350f0bf9d8..2b2806f7735 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -217,11 +217,8 @@ const QUEUED_SEND_HANDOFF_CLAIM_TTL_MS = 30_000 const QUEUED_SEND_HANDOFF_RETRY_BASE_MS = 1000 const QUEUED_SEND_HANDOFF_RETRY_MAX_MS = 30_000 -/** - * Stable reference returned by the queue selector when a chat has no bucket. - * Sharing one array keeps Zustand's equality check from re-rendering on - * every store write. - */ +// Stable empty array — sharing one reference keeps the selector from +// re-rendering on unrelated store writes. const EMPTY_MESSAGE_QUEUE: QueuedMothershipMessage[] = [] const logger = createLogger('useChat') @@ -1713,12 +1710,9 @@ export function useChat( [removePreviewSession, syncPreviewSessionRefs] ) - /** - * `pendingChatKeyRef` always holds the sentinel key used while no `chatId` - * is resolved. On first send, `adoptResolvedChatId` migrates the sentinel - * bucket to the real `chatId`. When the route drops back to home, we mint - * a fresh sentinel so a new pending chat starts with an empty bucket. - */ + // Sentinel used while no `chatId` is resolved; `adoptResolvedChatId` + // migrates this bucket onto the real chatId on first send. Rotated on + // home reset so a new pending chat starts with an empty bucket. const pendingChatKeyRef = useRef(`pending::${generateShortId()}`) const [chatKey, setChatKey] = useState(initialChatId ?? pendingChatKeyRef.current) const chatKeyRef = useRef(chatKey) @@ -1993,13 +1987,7 @@ export function useChat( inFlightResourceAddsRef.current.clear() reorderNeededAfterFlushRef.current = false resetEphemeralPreviewState() - /** - * Queue state is owned by `useMothershipQueueStore` and persists across - * navigation. The dispatch loop is per-instance, so cancel it but leave - * the queued messages — they belong to the chat we just left. The editing - * binding, by contrast, is scoped to this hook's composer and cannot - * survive a chat switch, so release it on the chat we just left. - */ + // Editing binds to this hook's composer — release it before rotating chatKey. useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) pendingChatKeyRef.current = `pending::${generateShortId()}` chatKeyRef.current = pendingChatKeyRef.current @@ -2056,21 +2044,12 @@ export function useChat( (chatId: string, options?: { replaceHomeHistory?: boolean; invalidateList?: boolean }) => { const selectedChatId = selectedChatIdRef.current chatIdRef.current = chatId - /** - * Move the queue bucket that holds messages enqueued during this stream - * onto the resolved chatId. The source is the pending sentinel that was - * active when this stream started — not `chatKeyRef.current`, which may - * have moved to a different chat if the user navigated away mid-stream. - * Using the sentinel keeps the other chat's queue untouched. - */ + // Migrate from the pending sentinel (not chatKeyRef — user may have + // navigated to a different chat mid-stream, and we mustn't steal it). if (pendingChatKeyRef.current !== chatId) { useMothershipQueueStore.getState().migrate(pendingChatKeyRef.current, chatId) } - /** - * Only point our visible chatKey at the resolved id when the user is - * still viewing the chat that just resolved — otherwise we'd steal them - * back to the original session. - */ + // Only rebind chatKey if the user is still viewing the resolved chat. const stillViewingResolvedChat = !selectedChatId || selectedChatId === chatId if (stillViewingResolvedChat && chatKeyRef.current !== chatId) { chatKeyRef.current = chatId @@ -2334,13 +2313,8 @@ export function useChat( inFlightResourceAddsRef.current.clear() reorderNeededAfterFlushRef.current = false resetEphemeralPreviewState() - /** - * Queue buckets are owned by `useMothershipQueueStore` and persist across - * chat switches. We just rotate the bucket key so selectors read the new - * chat's queue; the previous chat's queue is left intact for its return. - * The editing binding is composer-scoped, so we release it on the chat - * we're leaving before swapping buckets. - */ + // Rotate the bucket key; the previous chat's queue stays in the store. + // Release editing on the chat we're leaving (composer-scoped). if (chatKeyRef.current !== (initialChatId ?? '')) { useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) } @@ -4965,13 +4939,8 @@ export function useChat( const activeChatKey = chatKeyRef.current const editingId = queueStore.editing[activeChatKey] ?? null - /** - * Edit-in-place: the user submitted while a queued slot was bound to the - * composer. Replace at the original index instead of appending — same id, - * same position. If the slot was dispatched between edit and submit (only - * possible for the head if the UI guard was bypassed) fall through and - * enqueue as a normal new message. - */ + // Edit-in-place: replace at the original index. If the slot was already + // dispatched mid-edit (UI-guard race), fall through to a tail-append. if (editingId) { const existing = queueStore.queues[activeChatKey] ?? [] if (existing.some((m) => m.id === editingId)) { @@ -4981,11 +4950,7 @@ export function useChat( contexts, }) queueStore.setEditing(activeChatKey, null) - /** - * If the dispatcher paused on this slot (head-was-editing), resume. - * `enqueueQueueDispatch` is idempotent against an in-flight loop and - * the dispatcher itself guards against double-sending the same id. - */ + // Resume dispatch if it paused on this slot. if (!sendingRef.current && !pendingStopPromiseRef.current) { void enqueueQueueDispatchRef.current({ type: 'send_head' }) } @@ -5578,12 +5543,8 @@ export function useChat( } originalIndex = currentIndex - /** - * Read the live message from the store, not the closure-captured `msg`. - * Between when the dispatcher was scheduled and now, the user may have - * applied an in-place edit (`replaceAt`) — we must send the latest - * content, not the pre-edit snapshot. - */ + // Re-read live: the user may have applied an in-place edit (`replaceAt`) + // between dispatch scheduling and this send. const liveMsg = queueAtSend[currentIndex] activeQueuedSendHandoff = options.queuedSendHandoff ?? liveMsg.queuedSendHandoff const consumed = await startSendMessage( @@ -5626,12 +5587,8 @@ export function useChat( const activeChatKey = chatKeyRef.current const msg = queueState.queues[activeChatKey]?.[0] if (!msg) continue - /** - * Pause draining while the head is bound to the composer. Dispatching - * would either send the pre-edit content (losing the user's in-flight - * edit) or remove the slot mid-edit (turning the eventual submit into - * a tail-append). The next kick after `setEditing(null)` resumes us. - */ + // Pause draining if the head is bound to the composer; dispatching now + // would race the eventual submit. The next kick on edit-resolve resumes us. if (queueState.editing[activeChatKey] === msg.id) continue await dispatchQueuedMessage(msg, { epoch: action.epoch }) @@ -5720,24 +5677,15 @@ export function useChat( ) const editQueuedMessage = useCallback((id: string): QueuedMessage | undefined => { - /** - * Reject edits on a message that is currently being dispatched — the send - * is already in flight and `removeQueuedMessage` will drop the slot when - * it completes, so accepting the edit would silently lose the user's - * new content. The UI also disables this affordance via `dispatchingHeadId`. - */ + // Reject edits on a message already mid-dispatch; the slot is about to be + // dropped. UI also disables this via `dispatchingHeadId`. if (queuedMessageDispatchIdsRef.current.has(id)) return undefined const activeChatKey = chatKeyRef.current const queue = useMothershipQueueStore.getState().queues[activeChatKey] ?? EMPTY_MESSAGE_QUEUE const msg = queue.find((m) => m.id === id) if (!msg) return undefined - /** - * Evict any sessionStorage handoff record for this id. If a prior dispatch - * attempt failed after writing the handoff (which carries a snapshot of - * the message content), leaving it in place would let the recovery effect - * replay the stale pre-edit content even though the in-memory store has - * the new content. - */ + // Evict any sessionStorage handoff — a failed prior dispatch may have left + // a pre-edit content snapshot that the recovery effect would otherwise replay. clearQueuedSendHandoffState(id) clearQueuedSendHandoffClaim(id) useMothershipQueueStore.getState().setEditing(activeChatKey, id) @@ -5746,23 +5694,16 @@ export function useChat( const cancelQueueEdit = useCallback(() => { useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) - /** - * Resume dispatch if it paused on this slot. Original content remains in - * the queue, so the next drain will send what was there before the edit. - */ + // Resume dispatch if it paused on this slot. if (!sendingRef.current && !pendingStopPromiseRef.current) { void enqueueQueueDispatchRef.current({ type: 'send_head' }) } }, []) - /** - * Resume draining when a non-empty queue rehydrates with no active stream — - * e.g. user returns to a chat after navigating away. We wait until the chat - * history confirms there is no `activeStreamId` so we don't race the - * reconnect path; when a stream is in flight, `finalize -> notifyTurnEnded` - * already drains the queue on completion. Idempotent — the dispatch loop - * short-circuits if a task is already running. - */ + // Resume draining when a non-empty queue rehydrates with no active stream + // (e.g. nav-back). Wait for chat history to confirm no `activeStreamId` to + // avoid racing the reconnect path; mid-stream completions go through + // `notifyTurnEnded`. Idempotent — the dispatch loop dedupes. const chatHistoryReady = chatHistory !== undefined const remoteActiveStreamId = chatHistory?.activeStreamId ?? null useEffect(() => { @@ -5785,11 +5726,7 @@ export function useChat( abortControllerRef.current = null clearActiveTurn() sendingRef.current = false - /** - * Editing state binds a queued slot to this hook's composer ref; once - * the hook unmounts there is nothing left to drive the bound composer, - * so we release the slot. The queued message itself stays intact. - */ + // Release the editing slot — the composer it binds to is unmounting. useMothershipQueueStore.getState().setEditing(chatKeyRef.current, null) } }, [ diff --git a/apps/sim/stores/mothership-queue/store.ts b/apps/sim/stores/mothership-queue/store.ts index aecd2e3a843..5c3449a5eee 100644 --- a/apps/sim/stores/mothership-queue/store.ts +++ b/apps/sim/stores/mothership-queue/store.ts @@ -7,15 +7,11 @@ import type { MothershipQueueState, QueuedMothershipMessage } from '@/stores/mot const logger = createLogger('MothershipQueueStore') /** - * Per-tab sessionStorage adapter. No-ops on SSR and tolerates quota errors so - * a transient storage failure can never crash a render. + * Per-tab sessionStorage adapter — no-ops on SSR and tolerates quota errors. * - * The queue persists to **sessionStorage** rather than localStorage (which is - * what `mothership-drafts` uses) on purpose: a queued message carries - * intent-to-send, and the rehydrate path auto-drains the queue once chat - * history confirms there's no active server stream. Tab close should not - * fire those sends days later, so sessionStorage caps the replay window at - * the lifetime of the tab. + * We persist to sessionStorage (not localStorage like `mothership-drafts`) + * because the queue auto-drains on rehydrate: tab close should not fire those + * sends days later. */ const sessionStorageAdapter = { getItem: (name: string): string | null => { @@ -63,11 +59,9 @@ const setQueueForChat = ( ): Record => next.length === 0 ? omitKey(queues, chatKey) : { ...queues, [chatKey]: next } -/** - * Drops the volatile `queuedSendHandoff` so the persisted snapshot only carries - * data that remains meaningful after reload. Reconstruction on next dispatch is - * unnecessary because rehydrate happens outside any active-stream lifecycle. - */ +// Drop the volatile `queuedSendHandoff` from the persisted snapshot — its +// stream reference is meaningless after reload; the dispatcher mints a fresh +// one at send time if needed. const stripVolatile = (message: QueuedMothershipMessage): QueuedMothershipMessage => { if (!message.queuedSendHandoff) return message const { queuedSendHandoff: _drop, ...rest } = message @@ -103,14 +97,8 @@ export const useMothershipQueueStore = create()( const index = current.findIndex((m) => m.id === id) if (index === -1) return state const next = [...current] - /** - * Strip `queuedSendHandoff` on edit. The original handoff was - * created when the slot was first enqueued and references the - * stream that was active at that moment; once the user changes - * the payload, that handoff is no longer valid. The dispatcher - * (or `sendQueuedMessageImmediately`) will mint a fresh handoff - * at send time if the active-stream lifecycle still needs one. - */ + // Strip `queuedSendHandoff` — references the stream active at + // original enqueue time; the dispatcher mints a fresh one at send. const { queuedSendHandoff: _stale, ...rest } = next[index] next[index] = { ...rest, @@ -150,14 +138,8 @@ export const useMothershipQueueStore = create()( const queues = omitKey(state.queues, fromKey) if (fromQueue && fromQueue.length > 0) { - /** - * Merge into any existing destination bucket rather than - * overwriting. In the normal `adoptResolvedChatId` flow the - * destination is a fresh chatId with no prior bucket, but if - * a stale entry survives in sessionStorage we'd silently lose - * the user's pending messages on overwrite. Appending keeps - * FIFO order (existing first, then the resolved-stream sends). - */ + // Merge defensively in case a stale bucket survived in + // sessionStorage. FIFO: existing first, then the resolved stream. const existing = state.queues[toKey] ?? [] queues[toKey] = [...existing, ...fromQueue] } @@ -186,7 +168,9 @@ export const useMothershipQueueStore = create()( messages.map(stripVolatile), ]) ), - editing: state.editing, + // Don't persist `editing` — the composer that holds the edit text + // is component-local and empty after reload. + editing: {}, }), } ), diff --git a/apps/sim/stores/mothership-queue/types.ts b/apps/sim/stores/mothership-queue/types.ts index c28636b2945..b39dff9b8e4 100644 --- a/apps/sim/stores/mothership-queue/types.ts +++ b/apps/sim/stores/mothership-queue/types.ts @@ -1,9 +1,6 @@ import type { QueuedMessage } from '@/app/workspace/[workspaceId]/home/types' -/** - * Volatile metadata attached to a queued message so the dispatcher can claim - * an in-flight stream's slot at hand-off time. Not persisted across reload. - */ +// Volatile — lets the dispatcher claim an in-flight stream's slot. Not persisted. export interface QueuedSendHandoffSeed { id: string chatId?: string @@ -15,10 +12,7 @@ export type QueuedMothershipMessage = QueuedMessage & { queuedSendHandoff?: QueuedSendHandoffSeed } -/** - * Mutable fields that an in-place edit may overwrite. `id` and index are - * preserved by {@link MothershipQueueState.replaceAt}. - */ +// Mutable fields an in-place edit overwrites; id and index are preserved by `replaceAt`. export type QueuedMessageEditPatch = Pick export interface MothershipQueueState { From 05edbac62a61c64a8717796b8995c7f72677d70b Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 28 May 2026 10:34:23 -0700 Subject: [PATCH 4/5] improvement(mothership): omit editing from partialize entirely --- apps/sim/stores/mothership-queue/store.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/sim/stores/mothership-queue/store.ts b/apps/sim/stores/mothership-queue/store.ts index 5c3449a5eee..5697d9ac475 100644 --- a/apps/sim/stores/mothership-queue/store.ts +++ b/apps/sim/stores/mothership-queue/store.ts @@ -161,6 +161,9 @@ export const useMothershipQueueStore = create()( { name: 'mothership-queue', storage: createJSONStorage(() => sessionStorageAdapter), + // `editing` is intentionally omitted — the composer that holds the + // edit text is component-local and empty after reload, so a persisted + // editing flag would render an in-edit row with nothing bound. partialize: (state) => ({ queues: Object.fromEntries( Object.entries(state.queues).map(([key, messages]) => [ @@ -168,9 +171,6 @@ export const useMothershipQueueStore = create()( messages.map(stripVolatile), ]) ), - // Don't persist `editing` — the composer that holds the edit text - // is component-local and empty after reload. - editing: {}, }), } ), From 036bbb37fe0fb3f3451d7433516e180c400f40eb Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Thu, 28 May 2026 10:51:55 -0700 Subject: [PATCH 5/5] fix(mothership): honor user removal during dispatch in failure-restore path --- .../[workspaceId]/home/hooks/use-chat.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 2b2806f7735..5ecc05fdc91 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -1540,6 +1540,7 @@ export function useChat( queueDispatchEpochRef.current++ queueDispatchActionsRef.current = [] queuedMessageDispatchIdsRef.current.clear() + userRemovedDuringDispatchRef.current.clear() queueDispatchTaskRef.current = null setDispatchingHeadId(null) }, []) @@ -1723,6 +1724,10 @@ export function useChat( const editingQueuedId = useMothershipQueueStore((state) => state.editing[chatKey] ?? null) const [dispatchingHeadId, setDispatchingHeadId] = useState(null) const queuedMessageDispatchIdsRef = useRef>(new Set()) + // Ids the user explicitly removed while a dispatch was in flight — used to + // suppress the dispatch's failure-restore path, which would otherwise undo + // the user's removal silently. + const userRemovedDuringDispatchRef = useRef>(new Set()) const queueDispatchActionsRef = useRef([]) const queueDispatchTaskRef = useRef | null>(null) const queueDispatchEpochRef = useRef(0) @@ -5529,6 +5534,11 @@ export function useChat( if (!removedFromQueue || options.epoch !== queueDispatchEpochRef.current) { return } + // If the user explicitly removed this message during dispatch, honor + // that and don't re-insert on failure. + if (userRemovedDuringDispatchRef.current.delete(msg.id)) { + return + } useMothershipQueueStore.getState().insertAt(dispatchChatKey, originalIndex, msg) } @@ -5564,6 +5574,7 @@ export function useChat( } finally { setDispatchingHeadId((current) => (current === msg.id ? null : current)) queuedMessageDispatchIdsRef.current.delete(msg.id) + userRemovedDuringDispatchRef.current.delete(msg.id) } }, [startSendMessage] @@ -5616,6 +5627,11 @@ export function useChat( enqueueQueueDispatchRef.current = enqueueQueueDispatch const removeFromQueue = useCallback((id: string) => { + // If the message is mid-dispatch, mark it so the dispatch's failure-restore + // path won't silently undo the user's removal. + if (queuedMessageDispatchIdsRef.current.has(id)) { + userRemovedDuringDispatchRef.current.add(id) + } clearQueuedSendHandoffState(id) clearQueuedSendHandoffClaim(id) useMothershipQueueStore.getState().remove(chatKeyRef.current, id)