From 228c6980d5db67216668fce56ed55fccbfd0407b Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 24 Jun 2026 08:48:56 -0700 Subject: [PATCH] refactor(sse): consolidate client SSE readers behind a single typed primitive Replace four hand-rolled client SSE decode loops with two layered primitives in lib/core/utils/sse.ts: - readSSELines: the single byte-stream decode engine. Splits on \n, strips trailing \r, tolerates data: with/without a leading space, skips the [DONE] sentinel, honors an AbortSignal before each chunk and between events, and releases the reader lock only when it acquired it. - readSSEEvents: a thin JSON layer that parses each payload and routes unparseable lines to onParseError (default: skip). An SSESource union accepts a Response, a ReadableStream, or an already-acquired reader so callers that must stash the reader for external cancellation keep ownership of the lock. Migrates use-execution-stream, chat use-chat-streaming, home use-chat (via readSSELines for schema-validated decode), and the workflow chat panel. Legacy server/wand exports (encodeSSE, SSE_HEADERS, readSSEStream) are untouched. Behavior is preserved across abort, RAF batching, TTS, [DONE], delimiter tolerance, and reader-lock ownership. Tests in sse.test.ts pin the prior behavior: \n and \n\n framing, mid-chunk splits, [DONE], data: with/without leading space, \r\n stripping, sync/async early-stop, pre-aborted and mid-stream abort, lock release/non-release per source, lock release on a throwing handler, and Response/stream/reader sources. --- apps/sim/app/chat/hooks/use-chat-streaming.ts | 437 +++++++++--------- .../[workspaceId]/home/hooks/use-chat.ts | 98 ++-- .../w/[workflowId]/components/chat/chat.tsx | 82 ++-- apps/sim/hooks/use-execution-stream.test.ts | 42 ++ apps/sim/hooks/use-execution-stream.ts | 40 +- apps/sim/lib/core/utils/sse.test.ts | 321 ++++++++++++- apps/sim/lib/core/utils/sse.ts | 175 +++++++ 7 files changed, 821 insertions(+), 374 deletions(-) diff --git a/apps/sim/app/chat/hooks/use-chat-streaming.ts b/apps/sim/app/chat/hooks/use-chat-streaming.ts index f4c9fcb9b97..dd315dafe73 100644 --- a/apps/sim/app/chat/hooks/use-chat-streaming.ts +++ b/apps/sim/app/chat/hooks/use-chat-streaming.ts @@ -3,6 +3,7 @@ import { useRef, useState } from 'react' import { createLogger } from '@sim/logger' import { generateId } from '@sim/utils/id' +import { readSSEEvents } from '@/lib/core/utils/sse' import { isUserFileWithMetadata } from '@/lib/core/utils/user-file' import type { ChatFile, ChatMessage } from '@/app/chat/components/message/message' import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants' @@ -125,14 +126,12 @@ export function useChatStreaming() { streamingOptions?.voiceSettings?.autoPlayResponses && streamingOptions?.audioStreamHandler - const reader = response.body?.getReader() - if (!reader) { + if (!response.body) { setIsLoading(false) setIsStreamingResponse(false) return } - const decoder = new TextDecoder() let accumulatedText = '' let lastAudioPosition = 0 @@ -192,264 +191,252 @@ export function useChatStreaming() { setIsLoading(false) + let terminated = false + try { - while (true) { - // Check if aborted - if (abortControllerRef.current === null) { - break + await readSSEEvents<{ + blockId?: string + chunk?: string + event?: string + error?: string + data?: { + success: boolean + error?: string | { message?: string } + output?: Record> } - - const { done, value } = await reader.read() - - if (done) { - flushUI() - // Stream any remaining text for TTS - if ( - shouldPlayAudio && - streamingOptions?.audioStreamHandler && - accumulatedText.length > lastAudioPosition - ) { - const remainingText = accumulatedText.substring(lastAudioPosition).trim() - if (remainingText) { - try { - await streamingOptions.audioStreamHandler(remainingText) - } catch (error) { - logger.error('TTS error for remaining text:', error) - } - } + }>(response.body, { + signal: abortControllerRef.current.signal, + onParseError: (_data, parseError) => { + logger.error('Error parsing stream data:', parseError) + }, + onEvent: async (json) => { + const { blockId, chunk: contentChunk, event: eventType } = json + + if (eventType === 'error' || json.event === 'error') { + const errorMessage = json.error || CHAT_ERROR_MESSAGES.GENERIC_ERROR + setMessages((prev) => + prev.map((msg) => + msg.id === messageId + ? { + ...msg, + content: errorMessage, + isStreaming: false, + type: 'assistant' as const, + } + : msg + ) + ) + setIsLoading(false) + terminated = true + return true } - break - } - const chunk = decoder.decode(value, { stream: true }) - const lines = chunk.split('\n\n') + if (eventType === 'final' && json.data) { + flushUI() + const finalData = json.data - for (const line of lines) { - if (line.startsWith('data: ')) { - const data = line.substring(6) + const outputConfigs = streamingOptions?.outputConfigs + const formattedOutputs: string[] = [] + let extractedFiles: ChatFile[] = [] - if (data === '[DONE]') { - continue - } + const formatValue = (value: any): string | null => { + if (value === null || value === undefined) { + return null + } - try { - const json = JSON.parse(data) - const { blockId, chunk: contentChunk, event: eventType } = json - - if (eventType === 'error' || json.event === 'error') { - const errorMessage = json.error || CHAT_ERROR_MESSAGES.GENERIC_ERROR - setMessages((prev) => - prev.map((msg) => - msg.id === messageId - ? { - ...msg, - content: errorMessage, - isStreaming: false, - type: 'assistant' as const, - } - : msg - ) - ) - setIsLoading(false) - return + if (isUserFileWithMetadata(value)) { + return null } - if (eventType === 'final' && json.data) { - flushUI() - const finalData = json.data as { - success: boolean - error?: string | { message?: string } - output?: Record> + if (Array.isArray(value) && value.length === 0) { + return null + } + + if (typeof value === 'string') { + return value + } + + if (typeof value === 'object') { + try { + return `\`\`\`json\n${JSON.stringify(value, null, 2)}\n\`\`\`` + } catch { + return String(value) } + } - const outputConfigs = streamingOptions?.outputConfigs - const formattedOutputs: string[] = [] - let extractedFiles: ChatFile[] = [] + return String(value) + } - const formatValue = (value: any): string | null => { - if (value === null || value === undefined) { - return null - } + const getOutputValue = (blockOutputs: Record, path?: string) => { + if (!path || path === 'content') { + if (blockOutputs.content !== undefined) return blockOutputs.content + if (blockOutputs.result !== undefined) return blockOutputs.result + return blockOutputs + } - if (isUserFileWithMetadata(value)) { - return null - } + if (blockOutputs[path] !== undefined) { + return blockOutputs[path] + } - if (Array.isArray(value) && value.length === 0) { - return null + if (path.includes('.')) { + return path.split('.').reduce((current, segment) => { + if (current && typeof current === 'object' && segment in current) { + return current[segment] } + return undefined + }, blockOutputs) + } - if (typeof value === 'string') { - return value - } + return undefined + } - if (typeof value === 'object') { - try { - return `\`\`\`json\n${JSON.stringify(value, null, 2)}\n\`\`\`` - } catch { - return String(value) - } - } + if (outputConfigs?.length && finalData.output) { + for (const config of outputConfigs) { + const blockOutputs = finalData.output[config.blockId] + if (!blockOutputs) continue + + const value = getOutputValue(blockOutputs, config.path) + + if (isUserFileWithMetadata(value)) { + extractedFiles.push({ + id: value.id, + name: value.name, + url: value.url, + key: value.key, + size: value.size, + type: value.type, + context: value.context, + }) + continue + } - return String(value) + const nestedFiles = extractFilesFromData(value) + if (nestedFiles.length > 0) { + extractedFiles = [...extractedFiles, ...nestedFiles] + continue } - const getOutputValue = (blockOutputs: Record, path?: string) => { - if (!path || path === 'content') { - if (blockOutputs.content !== undefined) return blockOutputs.content - if (blockOutputs.result !== undefined) return blockOutputs.result - return blockOutputs - } + const formatted = formatValue(value) + if (formatted) { + formattedOutputs.push(formatted) + } + } + } - if (blockOutputs[path] !== undefined) { - return blockOutputs[path] - } + let finalContent = accumulatedText - if (path.includes('.')) { - return path.split('.').reduce((current, segment) => { - if (current && typeof current === 'object' && segment in current) { - return current[segment] - } - return undefined - }, blockOutputs) - } + if (formattedOutputs.length > 0) { + const nonEmptyOutputs = formattedOutputs.filter((output) => output.trim()) + if (nonEmptyOutputs.length > 0) { + const combinedOutputs = nonEmptyOutputs.join('\n\n') + finalContent = finalContent + ? `${finalContent.trim()}\n\n${combinedOutputs}` + : combinedOutputs + } + } - return undefined + if (!finalContent && extractedFiles.length === 0) { + if (finalData.error) { + if (typeof finalData.error === 'string') { + finalContent = finalData.error + } else if (typeof finalData.error?.message === 'string') { + finalContent = finalData.error.message } + } else if (finalData.success && finalData.output) { + const fallbackOutput = Object.values(finalData.output) + .map((block) => formatValue(block)?.trim()) + .filter(Boolean)[0] + if (fallbackOutput) { + finalContent = fallbackOutput + } + } + } - if (outputConfigs?.length && finalData.output) { - for (const config of outputConfigs) { - const blockOutputs = finalData.output[config.blockId] - if (!blockOutputs) continue - - const value = getOutputValue(blockOutputs, config.path) - - if (isUserFileWithMetadata(value)) { - extractedFiles.push({ - id: value.id, - name: value.name, - url: value.url, - key: value.key, - size: value.size, - type: value.type, - context: value.context, - }) - continue - } - - const nestedFiles = extractFilesFromData(value) - if (nestedFiles.length > 0) { - extractedFiles = [...extractedFiles, ...nestedFiles] - continue + setMessages((prev) => + prev.map((msg) => + msg.id === messageId + ? { + ...msg, + isStreaming: false, + content: finalContent ?? msg.content, + files: extractedFiles.length > 0 ? extractedFiles : undefined, } + : msg + ) + ) - const formatted = formatValue(value) - if (formatted) { - formattedOutputs.push(formatted) - } - } - } + accumulatedTextRef.current = '' + lastStreamedPositionRef.current = 0 + lastDisplayedPositionRef.current = 0 + audioStreamingActiveRef.current = false - let finalContent = accumulatedText + terminated = true + return true + } - if (formattedOutputs.length > 0) { - const nonEmptyOutputs = formattedOutputs.filter((output) => output.trim()) - if (nonEmptyOutputs.length > 0) { - const combinedOutputs = nonEmptyOutputs.join('\n\n') - finalContent = finalContent - ? `${finalContent.trim()}\n\n${combinedOutputs}` - : combinedOutputs - } - } + if (blockId && contentChunk) { + if (!messageIdMap.has(blockId)) { + messageIdMap.set(blockId, messageId) + } - if (!finalContent && extractedFiles.length === 0) { - if (finalData.error) { - if (typeof finalData.error === 'string') { - finalContent = finalData.error - } else if (typeof finalData.error?.message === 'string') { - finalContent = finalData.error.message - } - } else if (finalData.success && finalData.output) { - const fallbackOutput = Object.values(finalData.output) - .map((block) => formatValue(block)?.trim()) - .filter(Boolean)[0] - if (fallbackOutput) { - finalContent = fallbackOutput - } - } + accumulatedText += contentChunk + accumulatedTextRef.current = accumulatedText + logger.debug('[useChatStreaming] Received chunk', { + blockId, + chunkLength: contentChunk.length, + totalLength: accumulatedText.length, + messageId, + chunk: contentChunk.substring(0, 20), + }) + uiDirty = true + scheduleUIFlush() + + if (shouldPlayAudio && streamingOptions?.audioStreamHandler) { + const newText = accumulatedText.substring(lastAudioPosition) + const sentenceEndings = ['. ', '! ', '? ', '.\n', '!\n', '?\n', '.', '!', '?'] + let sentenceEnd = -1 + + for (const ending of sentenceEndings) { + const index = newText.indexOf(ending) + if (index > 0) { + sentenceEnd = index + ending.length + break } - - setMessages((prev) => - prev.map((msg) => - msg.id === messageId - ? { - ...msg, - isStreaming: false, - content: finalContent ?? msg.content, - files: extractedFiles.length > 0 ? extractedFiles : undefined, - } - : msg - ) - ) - - accumulatedTextRef.current = '' - lastStreamedPositionRef.current = 0 - lastDisplayedPositionRef.current = 0 - audioStreamingActiveRef.current = false - - return } - if (blockId && contentChunk) { - if (!messageIdMap.has(blockId)) { - messageIdMap.set(blockId, messageId) - } - - accumulatedText += contentChunk - accumulatedTextRef.current = accumulatedText - logger.debug('[useChatStreaming] Received chunk', { - blockId, - chunkLength: contentChunk.length, - totalLength: accumulatedText.length, - messageId, - chunk: contentChunk.substring(0, 20), - }) - uiDirty = true - scheduleUIFlush() - - // Real-time TTS for voice mode - if (shouldPlayAudio && streamingOptions?.audioStreamHandler) { - const newText = accumulatedText.substring(lastAudioPosition) - const sentenceEndings = ['. ', '! ', '? ', '.\n', '!\n', '?\n', '.', '!', '?'] - let sentenceEnd = -1 - - for (const ending of sentenceEndings) { - const index = newText.indexOf(ending) - if (index > 0) { - sentenceEnd = index + ending.length - break - } - } - - if (sentenceEnd > 0) { - const sentence = newText.substring(0, sentenceEnd).trim() - if (sentence && sentence.length >= 3) { - try { - await streamingOptions.audioStreamHandler(sentence) - lastAudioPosition += sentenceEnd - } catch (error) { - logger.error('TTS error:', error) - } - } + if (sentenceEnd > 0) { + const sentence = newText.substring(0, sentenceEnd).trim() + if (sentence && sentence.length >= 3) { + try { + await streamingOptions.audioStreamHandler(sentence) + lastAudioPosition += sentenceEnd + } catch (error) { + logger.error('TTS error:', error) } } - } else if (blockId && eventType === 'end') { - setMessages((prev) => - prev.map((msg) => (msg.id === messageId ? { ...msg, isStreaming: false } : msg)) - ) } - } catch (parseError) { - logger.error('Error parsing stream data:', parseError) + } + } else if (blockId && eventType === 'end') { + setMessages((prev) => + prev.map((msg) => (msg.id === messageId ? { ...msg, isStreaming: false } : msg)) + ) + } + }, + }) + + if (!terminated) { + flushUI() + if ( + shouldPlayAudio && + streamingOptions?.audioStreamHandler && + accumulatedText.length > lastAudioPosition + ) { + const remainingText = accumulatedText.substring(lastAudioPosition).trim() + if (remainingText) { + try { + await streamingOptions.audioStreamHandler(remainingText) + } catch (error) { + logger.error('TTS error for remaining text:', error) } } } diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index b1ec30520af..e81ec0603f9 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -63,6 +63,7 @@ import { } from '@/lib/copilot/tools/client/run-tool-execution' import { setCurrentChatTraceparent } from '@/lib/copilot/tools/client/trace-context' import { isWorkflowToolName } from '@/lib/copilot/tools/workflow-tools' +import { readSSELines } from '@/lib/core/utils/sse' import { getQueryClient } from '@/app/_shell/providers/get-query-client' import { useFilePreviewController } from '@/app/workspace/[workspaceId]/home/hooks/preview' import { @@ -1934,7 +1935,6 @@ export function useChat( shouldContinue?: () => boolean } ) => { - const decoder = new TextDecoder() const ctx = createStreamLoopContext({ workspaceId, queryClient, @@ -1987,71 +1987,47 @@ export function useChat( return { sawStreamError: false, sawComplete: false } } streamReaderRef.current = reader - let buffer = '' try { - const pendingLines: string[] = [] - - while (true) { - if (pendingLines.length === 0) { - // Don't read another chunk after `complete` has drained. - if (state.sawCompleteEvent) break - const { done, value } = await reader.read() - if (done) break - if (ops.isStale()) continue - - buffer += decoder.decode(value, { stream: true }) - const lines = buffer.split('\n') - buffer = lines.pop() || '' - pendingLines.push(...lines) - if (pendingLines.length === 0) { - continue + await readSSELines(reader, { + onData: (raw) => { + if (state.sawCompleteEvent) return true + if (ops.isStale()) return + + const parsedResult = parsePersistedStreamEventEnvelopeJson(raw) + if (!parsedResult.ok) { + const error = createStreamSchemaValidationError(parsedResult, 'Live SSE event.') + logger.error('Rejected chat SSE event due to client-side schema enforcement', { + reason: parsedResult.reason, + message: parsedResult.message, + errors: parsedResult.errors, + error: error.message, + }) + throw error } - } - - const line = pendingLines.shift() - if (line === undefined) { - continue - } - if (ops.isStale()) { - pendingLines.length = 0 - continue - } - if (!line.startsWith('data: ')) continue - const raw = line.slice(6) - - const parsedResult = parsePersistedStreamEventEnvelopeJson(raw) - if (!parsedResult.ok) { - const error = createStreamSchemaValidationError(parsedResult, 'Live SSE event.') - logger.error('Rejected chat SSE event due to client-side schema enforcement', { - reason: parsedResult.reason, - message: parsedResult.message, - errors: parsedResult.errors, - error: error.message, - }) - throw error - } - const parsed = parsedResult.event + const parsed = parsedResult.event - if (parsed.trace?.requestId && parsed.trace.requestId !== state.streamRequestId) { - state.streamRequestId = parsed.trace.requestId - streamRequestIdRef.current = state.streamRequestId - ops.flush() - } - if (parsed.stream?.streamId) { - streamIdRef.current = parsed.stream.streamId - } - const eventCursor = parsed.stream?.cursor ?? String(parsed.seq) - if (isAlreadyProcessedStreamCursor(eventCursor, lastCursorRef.current)) { - continue - } - if (eventCursor) { - lastCursorRef.current = eventCursor - } + if (parsed.trace?.requestId && parsed.trace.requestId !== state.streamRequestId) { + state.streamRequestId = parsed.trace.requestId + streamRequestIdRef.current = state.streamRequestId + ops.flush() + } + if (parsed.stream?.streamId) { + streamIdRef.current = parsed.stream.streamId + } + const eventCursor = parsed.stream?.cursor ?? String(parsed.seq) + if (isAlreadyProcessedStreamCursor(eventCursor, lastCursorRef.current)) { + return + } + if (eventCursor) { + lastCursorRef.current = eventCursor + } - logger.debug('SSE event received', parsed) - dispatchStreamEvent(ctx, parsed) - } + logger.debug('SSE event received', parsed) + dispatchStreamEvent(ctx, parsed) + if (state.sawCompleteEvent) return true + }, + }) } finally { if (state.sawStreamError && !state.sawCompleteEvent) { applyTurnTerminal(state.model, 'error') diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx index 588ff6da5cc..d4caa19424f 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx @@ -25,6 +25,7 @@ import { extractPathFromOutputId, parseOutputContentSafely, } from '@/lib/core/utils/response-format' +import { readSSEEvents } from '@/lib/core/utils/sse' import { CHAT_ACCEPT_ATTRIBUTE } from '@/lib/uploads/utils/validation' import { normalizeInputFormatValue } from '@/lib/workflows/input-format' import { StartBlockPath, TriggerUtils } from '@/lib/workflows/triggers/triggers' @@ -520,12 +521,10 @@ export function Chat() { * @param responseMessageId - ID of the message to update with streamed content */ const processStreamingResponse = useCallback( - async (stream: ReadableStream, responseMessageId: string) => { + async (stream: ReadableStream, responseMessageId: string) => { const reader = stream.getReader() streamReaderRef.current = reader - const decoder = new TextDecoder() let accumulatedContent = '' - let buffer = '' const BATCH_MAX_MS = 50 let pendingChunks = '' @@ -563,63 +562,34 @@ export function Chat() { } try { - while (true) { - const { done, value } = await reader.read() - if (done) { - flushChunks() - finalizeMessageStream(responseMessageId) - break - } - - const chunk = decoder.decode(value, { stream: true }) - buffer += chunk - - const separatorIndex = buffer.lastIndexOf('\n\n') - if (separatorIndex === -1) { - continue - } - - const processable = buffer.slice(0, separatorIndex) - buffer = buffer.slice(separatorIndex + 2) - - const lines = processable.split('\n\n') - - for (const line of lines) { - if (!line.startsWith('data: ')) continue - - const data = line.substring(6) - if (data === '[DONE]') continue - - try { - const json = JSON.parse(data) - const { event, data: eventData, chunk: contentChunk } = json - - if (event === 'final' && eventData) { - const result = eventData as ExecutionResult - - if ('success' in result && !result.success) { - const errorMessage = result.error || 'Workflow execution failed' - flushChunks() - appendMessageContent( - responseMessageId, - `${accumulatedContent ? '\n\n' : ''}Error: ${errorMessage}` - ) - finalizeMessageStream(responseMessageId) - return - } + await readSSEEvents<{ event?: string; data?: ExecutionResult; chunk?: string }>(reader, { + onParseError: (_data, e) => { + logger.error('Error parsing stream data:', e) + }, + onEvent: (json) => { + const { event, data: eventData, chunk: contentChunk } = json + if (event === 'final' && eventData) { + if ('success' in eventData && !eventData.success) { + const errorMessage = eventData.error || 'Workflow execution failed' flushChunks() - finalizeMessageStream(responseMessageId) - } else if (contentChunk) { - accumulatedContent += contentChunk - pendingChunks += contentChunk - scheduleFlush() + appendMessageContent( + responseMessageId, + `${accumulatedContent ? '\n\n' : ''}Error: ${errorMessage}` + ) } - } catch (e) { - logger.error('Error parsing stream data:', e) + return true } - } - } + + if (contentChunk) { + accumulatedContent += contentChunk + pendingChunks += contentChunk + scheduleFlush() + } + }, + }) + flushChunks() + finalizeMessageStream(responseMessageId) } catch (error) { if ((error as Error)?.name !== 'AbortError') { logger.error('Error processing stream:', error) diff --git a/apps/sim/hooks/use-execution-stream.test.ts b/apps/sim/hooks/use-execution-stream.test.ts index da52635ff99..f38f028c805 100644 --- a/apps/sim/hooks/use-execution-stream.test.ts +++ b/apps/sim/hooks/use-execution-stream.test.ts @@ -84,4 +84,46 @@ describe('processSSEStream', () => { expect(onEventId).not.toHaveBeenCalled() }) + + it('releases the reader lock after the stream completes', async () => { + const stream = streamEvents([]) + const reader = stream.getReader() + expect(stream.locked).toBe(true) + + await processSSEStream(reader, {}, 'test') + + expect(stream.locked).toBe(false) + }) + + it('releases the reader lock even when a handler throws', async () => { + const event: ExecutionEvent = { + type: 'block:started', + eventId: 7, + timestamp: new Date().toISOString(), + executionId: 'exec-1', + workflowId: 'wf-1', + data: { + blockId: 'block-1', + blockName: 'Block 1', + blockType: 'function', + executionOrder: 1, + }, + } + const stream = streamEvents([event]) + const reader = stream.getReader() + + await expect( + processSSEStream( + reader, + { + onBlockStarted: () => { + throw new Error('boom') + }, + }, + 'test' + ) + ).rejects.toThrow('boom') + + expect(stream.locked).toBe(false) + }) }) diff --git a/apps/sim/hooks/use-execution-stream.ts b/apps/sim/hooks/use-execution-stream.ts index 24a6e0cad4a..ffe862c4710 100644 --- a/apps/sim/hooks/use-execution-stream.ts +++ b/apps/sim/hooks/use-execution-stream.ts @@ -1,6 +1,7 @@ import { useCallback } from 'react' import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' +import { readSSEEvents } from '@/lib/core/utils/sse' import type { BlockChildWorkflowStartedData, BlockCompletedData, @@ -82,36 +83,12 @@ export async function processSSEStream( callbacks: ExecutionStreamCallbacks, logPrefix: string ): Promise { - const decoder = new TextDecoder() - let buffer = '' - try { - while (true) { - const { done, value } = await reader.read() - - if (done) break - - buffer += decoder.decode(value, { stream: true }) - const lines = buffer.split('\n\n') - buffer = lines.pop() || '' - - for (const line of lines) { - if (!line.trim() || !line.startsWith('data: ')) continue - - const data = line.substring(6).trim() - if (data === '[DONE]') { - logger.info(`${logPrefix} stream completed`) - continue - } - - let event: ExecutionEvent - try { - event = JSON.parse(data) as ExecutionEvent - } catch (error) { - logger.error('Failed to parse SSE event:', error, { data }) - continue - } - + await readSSEEvents(reader, { + onParseError: (data, error) => { + logger.error('Failed to parse SSE event:', error, { data }) + }, + onEvent: async (event) => { try { switch (event.type) { case 'execution:started': @@ -168,8 +145,9 @@ export async function processSSEStream( error ) } - } - } + }, + }) + logger.debug(`${logPrefix} stream completed`) } finally { reader.releaseLock() } diff --git a/apps/sim/lib/core/utils/sse.test.ts b/apps/sim/lib/core/utils/sse.test.ts index 524c00b83d3..579e23fd607 100644 --- a/apps/sim/lib/core/utils/sse.test.ts +++ b/apps/sim/lib/core/utils/sse.test.ts @@ -2,7 +2,13 @@ * @vitest-environment node */ import { describe, expect, it, vi } from 'vitest' -import { encodeSSE, readSSEStream, SSE_HEADERS } from '@/lib/core/utils/sse' +import { + encodeSSE, + readSSEEvents, + readSSELines, + readSSEStream, + SSE_HEADERS, +} from '@/lib/core/utils/sse' function createStreamFromChunks(chunks: Uint8Array[]): ReadableStream { let index = 0 @@ -311,3 +317,316 @@ describe('readSSEStream', () => { }) }) }) + +function streamFromStringChunks(chunks: string[]): ReadableStream { + const encoder = new TextEncoder() + return createStreamFromChunks(chunks.map((c) => encoder.encode(c))) +} + +describe('readSSEEvents', () => { + it('parses `\\n\\n`-framed events', async () => { + const stream = streamFromStringChunks([ + 'data: {"n":1}\n\n', + 'data: {"n":2}\n\n', + 'data: {"n":3}\n\n', + ]) + const events: number[] = [] + await readSSEEvents<{ n: number }>(stream, { + onEvent: (e) => { + events.push(e.n) + }, + }) + expect(events).toEqual([1, 2, 3]) + }) + + it('parses `\\n`-framed events', async () => { + const stream = streamFromStringChunks(['data: {"n":1}\ndata: {"n":2}\ndata: {"n":3}\n']) + const events: number[] = [] + await readSSEEvents<{ n: number }>(stream, { + onEvent: (e) => { + events.push(e.n) + }, + }) + expect(events).toEqual([1, 2, 3]) + }) + + it('reassembles events split across chunk boundaries', async () => { + const stream = streamFromStringChunks(['data: {"ms', 'g":"hel', 'lo"}\n\n']) + const events: Array<{ msg: string }> = [] + await readSSEEvents<{ msg: string }>(stream, { + onEvent: (e) => { + events.push(e) + }, + }) + expect(events).toEqual([{ msg: 'hello' }]) + }) + + it('skips the [DONE] sentinel', async () => { + const stream = streamFromStringChunks(['data: {"n":1}\n\n', 'data: [DONE]\n\n']) + const events: number[] = [] + await readSSEEvents<{ n: number }>(stream, { + onEvent: (e) => { + events.push(e.n) + }, + }) + expect(events).toEqual([1]) + }) + + it('accepts `data:` with and without a leading space', async () => { + const stream = streamFromStringChunks(['data:{"n":1}\n\n', 'data: {"n":2}\n\n']) + const events: number[] = [] + await readSSEEvents<{ n: number }>(stream, { + onEvent: (e) => { + events.push(e.n) + }, + }) + expect(events).toEqual([1, 2]) + }) + + it('strips trailing carriage returns (\\r\\n framing)', async () => { + const stream = streamFromStringChunks(['data: {"n":1}\r\n\r\n', 'data: {"n":2}\r\n\r\n']) + const events: number[] = [] + await readSSEEvents<{ n: number }>(stream, { + onEvent: (e) => { + events.push(e.n) + }, + }) + expect(events).toEqual([1, 2]) + }) + + it('routes unparseable payloads to onParseError and continues', async () => { + const stream = streamFromStringChunks(['data: not-json\n\n', 'data: {"n":2}\n\n']) + const events: number[] = [] + const onParseError = vi.fn() + await readSSEEvents<{ n: number }>(stream, { + onEvent: (e) => { + events.push(e.n) + }, + onParseError, + }) + expect(events).toEqual([2]) + expect(onParseError).toHaveBeenCalledTimes(1) + expect(onParseError).toHaveBeenCalledWith('not-json', expect.any(Error)) + }) + + it('stops early when onEvent returns true', async () => { + const stream = streamFromStringChunks([ + 'data: {"n":1}\n\n', + 'data: {"n":2}\n\n', + 'data: {"n":3}\n\n', + ]) + const events: number[] = [] + await readSSEEvents<{ n: number }>(stream, { + onEvent: (e) => { + events.push(e.n) + return e.n === 2 + }, + }) + expect(events).toEqual([1, 2]) + }) + + it('does not process events once the signal is aborted', async () => { + const controller = new AbortController() + const stream = streamFromStringChunks(['data: {"n":1}\n\n', 'data: {"n":2}\n\n']) + const events: number[] = [] + await readSSEEvents<{ n: number }>(stream, { + signal: controller.signal, + onEvent: (e) => { + events.push(e.n) + controller.abort() + }, + }) + expect(events).toEqual([1]) + }) + + it('returns immediately when the signal is already aborted', async () => { + const controller = new AbortController() + controller.abort() + const stream = streamFromStringChunks(['data: {"n":1}\n\n']) + const onEvent = vi.fn() + await readSSEEvents(stream, { signal: controller.signal, onEvent }) + expect(onEvent).not.toHaveBeenCalled() + }) + + it('releases the reader lock for a stream source', async () => { + const stream = streamFromStringChunks(['data: {"n":1}\n\n']) + await readSSEEvents<{ n: number }>(stream, { onEvent: () => {} }) + expect(() => stream.getReader()).not.toThrow() + }) + + it('does not release the lock for a reader source', async () => { + const stream = streamFromStringChunks(['data: {"n":1}\n\n']) + const reader = stream.getReader() + await readSSEEvents<{ n: number }>(reader, { onEvent: () => {} }) + expect(() => stream.getReader()).toThrow() + reader.releaseLock() + }) + + it('accepts a Response source', async () => { + const response = new Response(streamFromStringChunks(['data: {"n":7}\n\n'])) + const events: number[] = [] + await readSSEEvents<{ n: number }>(response, { + onEvent: (e) => { + events.push(e.n) + }, + }) + expect(events).toEqual([7]) + }) + + it('silently skips unparseable payloads when no onParseError is provided', async () => { + const stream = streamFromStringChunks(['data: not-json\n\n', 'data: {"n":2}\n\n']) + const events: number[] = [] + await expect( + readSSEEvents<{ n: number }>(stream, { + onEvent: (e) => { + events.push(e.n) + }, + }) + ).resolves.toBeUndefined() + expect(events).toEqual([2]) + }) + + it('surfaces a fatal parse error when onParseError throws', async () => { + const stream = streamFromStringChunks(['data: not-json\n\n', 'data: {"n":2}\n\n']) + const events: number[] = [] + await expect( + readSSEEvents<{ n: number }>(stream, { + onEvent: (e) => { + events.push(e.n) + }, + onParseError: () => { + throw new Error('boom') + }, + }) + ).rejects.toThrow('boom') + expect(events).toEqual([]) + }) + + it('stops early when onEvent resolves true asynchronously', async () => { + const stream = streamFromStringChunks([ + 'data: {"n":1}\n\n', + 'data: {"n":2}\n\n', + 'data: {"n":3}\n\n', + ]) + const events: number[] = [] + await readSSEEvents<{ n: number }>(stream, { + onEvent: async (e) => { + events.push(e.n) + return e.n === 2 + }, + }) + expect(events).toEqual([1, 2]) + }) + + it('throws "No response body" for a Response without a body', async () => { + const response = new Response(null) + await expect(readSSEEvents(response, { onEvent: () => {} })).rejects.toThrow('No response body') + }) +}) + +describe('readSSELines', () => { + it('delivers raw (un-parsed) data payloads', async () => { + const stream = streamFromStringChunks(['data: raw-one\n\n', 'data: {"keep":"asString"}\n\n']) + const lines: string[] = [] + await readSSELines(stream, { + onData: (raw) => { + lines.push(raw) + }, + }) + expect(lines).toEqual(['raw-one', '{"keep":"asString"}']) + }) + + it('skips [DONE] and blank separator lines', async () => { + const stream = streamFromStringChunks(['data: a\n\ndata: b\n\ndata: [DONE]\n\n']) + const lines: string[] = [] + await readSSELines(stream, { + onData: (raw) => { + lines.push(raw) + }, + }) + expect(lines).toEqual(['a', 'b']) + }) + + it('preserves the raw payload verbatim (no JSON parsing)', async () => { + const stream = streamFromStringChunks(['data: {"unterminated\n\n', 'data:no-space\n\n']) + const lines: string[] = [] + await readSSELines(stream, { + onData: (raw) => { + lines.push(raw) + }, + }) + expect(lines).toEqual(['{"unterminated', 'no-space']) + }) + + it('strips a trailing carriage return from each line', async () => { + const stream = streamFromStringChunks(['data: one\r\n\r\ndata: two\r\n\r\n']) + const lines: string[] = [] + await readSSELines(stream, { + onData: (raw) => { + lines.push(raw) + }, + }) + expect(lines).toEqual(['one', 'two']) + }) + + it('stops early when onData returns true', async () => { + const stream = streamFromStringChunks(['data: a\n\ndata: b\n\ndata: c\n\n']) + const lines: string[] = [] + await readSSELines(stream, { + onData: (raw) => { + lines.push(raw) + return raw === 'b' + }, + }) + expect(lines).toEqual(['a', 'b']) + }) + + it('does not deliver any line when the signal is already aborted', async () => { + const controller = new AbortController() + controller.abort() + const stream = streamFromStringChunks(['data: a\n\n']) + const onData = vi.fn() + await readSSELines(stream, { signal: controller.signal, onData }) + expect(onData).not.toHaveBeenCalled() + }) + + it('stops between events in the same chunk once aborted mid-stream', async () => { + const controller = new AbortController() + const stream = streamFromStringChunks(['data: a\n\ndata: b\n\ndata: c\n\n']) + const lines: string[] = [] + await readSSELines(stream, { + signal: controller.signal, + onData: (raw) => { + lines.push(raw) + if (raw === 'a') controller.abort() + }, + }) + expect(lines).toEqual(['a']) + }) + + it('releases the lock for a stream source', async () => { + const stream = streamFromStringChunks(['data: a\n\n']) + await readSSELines(stream, { onData: () => {} }) + expect(() => stream.getReader()).not.toThrow() + }) + + it('does not release the lock for a reader source', async () => { + const stream = streamFromStringChunks(['data: a\n\n']) + const reader = stream.getReader() + await readSSELines(reader, { onData: () => {} }) + expect(() => stream.getReader()).toThrow() + reader.releaseLock() + }) + + it('releases the lock for a stream source even when onData throws', async () => { + const stream = streamFromStringChunks(['data: a\n\n']) + await expect( + readSSELines(stream, { + onData: () => { + throw new Error('handler failed') + }, + }) + ).rejects.toThrow('handler failed') + expect(() => stream.getReader()).not.toThrow() + }) +}) diff --git a/apps/sim/lib/core/utils/sse.ts b/apps/sim/lib/core/utils/sse.ts index 9d9d3f785a5..50c758f0013 100644 --- a/apps/sim/lib/core/utils/sse.ts +++ b/apps/sim/lib/core/utils/sse.ts @@ -20,6 +20,181 @@ export function encodeSSE(data: any): Uint8Array { return new TextEncoder().encode(`data: ${JSON.stringify(data)}\n\n`) } +/** + * The sentinel value servers emit to signal end-of-stream. Lines carrying this + * payload are skipped before reaching the consumer's `onEvent` callback. + */ +const DONE_SENTINEL = '[DONE]' + +/** + * A source the SSE reader can consume: a fetch `Response`, its `ReadableStream` + * body, or an already-acquired reader. Passing a `Response`/stream lets the + * primitive own `getReader()` and the reader lifecycle (lock release); passing a + * reader is supported for callers that must acquire it first (e.g. to stash it + * for external cancellation). + */ +export type SSESource = + | Response + | ReadableStream + | ReadableStreamDefaultReader + +/** + * The result of an SSE event/line callback. Only the literal `true` (returned + * synchronously or resolved from a `Promise`) stops processing and returns + * early — useful for terminal events. Any other value (including the + * `undefined` a handler that returns nothing produces) keeps processing. + * + * Typed as `unknown` rather than `boolean | void | Promise` so + * both sync and `async` handlers — including `async` handlers that return + * nothing (`Promise`) — stay assignable, without the confusing + * `void`-inside-a-`Promise` union that the precise type would require. + */ +export type SSEStopSignal = unknown + +/** + * Options for {@link readSSELines} — the low-level line engine that delivers the + * raw `data:` payload string (no JSON parsing). + */ +export interface ReadSSELinesOptions { + /** Invoked once per SSE `data:` line with the raw (un-parsed) payload string. */ + onData: (rawData: string) => SSEStopSignal + /** Aborts the read; checked before each chunk and between events. */ + signal?: AbortSignal +} + +/** + * Options for {@link readSSEEvents} — the JSON convenience layer over + * {@link readSSELines}. + */ +export interface ReadSSEEventsOptions { + /** + * Invoked once per parsed SSE `data:` event with the JSON-parsed payload. + * Return (or resolve) `true` to stop processing and return early. Callers + * narrow the typed payload. + */ + onEvent: (event: T) => SSEStopSignal + /** + * Invoked for a `data:` line whose payload is not valid JSON. Defaults to + * silently skipping the line. Throw from here to surface a fatal parse error. + */ + onParseError?: (rawData: string, error: unknown) => void + /** Aborts the read; checked before each chunk and between events. */ + signal?: AbortSignal +} + +/** + * Resolves an {@link SSESource} to a reader, reporting whether this call + * acquired the lock (and is therefore responsible for releasing it). + */ +function toReader(source: SSESource): { + reader: ReadableStreamDefaultReader + ownsLock: boolean +} { + if (source instanceof ReadableStream) { + return { reader: source.getReader(), ownsLock: true } + } + if (source instanceof Response) { + if (!source.body) throw new Error('No response body') + return { reader: source.body.getReader(), ownsLock: true } + } + return { reader: source, ownsLock: false } +} + +/** + * Strips an optional trailing carriage return from a single SSE line, so both + * `\n`- and `\r\n`-terminated framings parse identically. + */ +function stripCarriageReturn(line: string): string { + return line.endsWith('\r') ? line.slice(0, -1) : line +} + +/** + * The single client-side SSE decode engine. Reads a byte stream, decodes it + * incrementally, splits it into lines, and invokes `onData` once per `data:` + * line with its raw (un-parsed) payload string. + * + * It splits on `\n` and processes each `data:` line individually, which makes it + * tolerant of BOTH `\n`- and `\n\n`-separated framings (the blank separator + * lines between events are simply ignored). Trailing `\r` is stripped, a single + * optional space after `data:` is consumed, and the `[DONE]` sentinel is + * skipped. The reader's lock is always released on completion, abort, or error + * (only when this function acquired it). + * + * This is the low-level engine. Most callers want {@link readSSEEvents}, which + * adds JSON parsing on top. Reach for `readSSELines` only when the payload needs + * custom parsing (e.g. schema-validated decoding). + * + * @param source - A `Response`, `ReadableStream`, or stream reader. + * @param options - The `onData` callback plus an optional `signal`. + */ +export async function readSSELines(source: SSESource, options: ReadSSELinesOptions): Promise { + const { onData, signal } = options + const { reader, ownsLock } = toReader(source) + const decoder = new TextDecoder() + let buffer = '' + + try { + while (true) { + if (signal?.aborted) break + + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + + for (const rawLine of lines) { + if (signal?.aborted) return + + const line = stripCarriageReturn(rawLine) + if (!line.startsWith('data:')) continue + + let data = line.slice(5) + if (data.startsWith(' ')) data = data.slice(1) + if (data === DONE_SENTINEL) continue + + if ((await onData(data)) === true) return + } + } + } finally { + if (ownsLock) reader.releaseLock() + } +} + +/** + * The JSON convenience layer over {@link readSSELines}: invokes `onEvent` once + * per `data:` event with its JSON-parsed payload. Unparseable lines are passed + * to `onParseError` (default: silently skipped). All framing, `\r`, `[DONE]`, + * abort, and reader-lifecycle behavior is inherited from `readSSELines`. + * + * Higher-level concerns — UI batching, reconnect, error classification, event + * dispatch — belong in the caller's `onEvent`, not here. + * + * @typeParam T - The parsed event type the caller expects (defaults to `unknown`). + * @param source - A `Response`, `ReadableStream`, or stream reader. + * @param options - The `onEvent` callback plus optional `signal`/`onParseError`. + */ +export async function readSSEEvents( + source: SSESource, + options: ReadSSEEventsOptions +): Promise { + const { onEvent, onParseError, signal } = options + await readSSELines(source, { + signal, + onData: (data) => { + let parsed: T + try { + parsed = JSON.parse(data) as T + } catch (error) { + onParseError?.(data, error) + return + } + return onEvent(parsed) + }, + }) +} + /** * Options for reading SSE stream */