diff --git a/apps/sim/app/api/chat/[subdomain]/route.test.ts b/apps/sim/app/api/chat/[subdomain]/route.test.ts index fbc7f731247..6837b1a0ae3 100644 --- a/apps/sim/app/api/chat/[subdomain]/route.test.ts +++ b/apps/sim/app/api/chat/[subdomain]/route.test.ts @@ -7,20 +7,28 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { createMockRequest } from '@/app/api/__test-utils__/utils' describe('Chat Subdomain API Route', () => { - const mockWorkflowSingleOutput = { - id: 'response-id', - content: 'Test response', - timestamp: new Date().toISOString(), - type: 'workflow', + const createMockStream = () => { + return new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode('data: {"blockId":"agent-1","chunk":"Hello"}\n\n') + ) + controller.enqueue( + new TextEncoder().encode('data: {"blockId":"agent-1","chunk":" world"}\n\n') + ) + controller.enqueue( + new TextEncoder().encode('data: {"event":"final","data":{"success":true}}\n\n') + ) + controller.close() + }, + }) } - // Mock functions const mockAddCorsHeaders = vi.fn().mockImplementation((response) => response) const mockValidateChatAuth = vi.fn().mockResolvedValue({ authorized: true }) const mockSetChatAuthCookie = vi.fn() - const mockExecuteWorkflowForChat = vi.fn().mockResolvedValue(mockWorkflowSingleOutput) + const mockExecuteWorkflowForChat = vi.fn().mockResolvedValue(createMockStream()) - // Mock database return values const mockChatResult = [ { id: 'chat-id', @@ -41,13 +49,24 @@ describe('Chat Subdomain API Route', () => { const mockWorkflowResult = [ { isDeployed: true, + state: { + blocks: {}, + edges: [], + loops: {}, + parallels: {}, + }, + deployedState: { + blocks: {}, + edges: [], + loops: {}, + parallels: {}, + }, }, ] beforeEach(() => { vi.resetModules() - // Mock chat API utils vi.doMock('../utils', () => ({ addCorsHeaders: mockAddCorsHeaders, validateChatAuth: mockValidateChatAuth, @@ -56,7 +75,6 @@ describe('Chat Subdomain API Route', () => { executeWorkflowForChat: mockExecuteWorkflowForChat, })) - // Mock logger vi.doMock('@/lib/logs/console-logger', () => ({ createLogger: vi.fn().mockReturnValue({ debug: vi.fn(), @@ -66,32 +84,35 @@ describe('Chat Subdomain API Route', () => { }), })) - // Mock database vi.doMock('@/db', () => { - const mockLimitChat = vi.fn().mockReturnValue(mockChatResult) - const mockWhereChat = vi.fn().mockReturnValue({ limit: mockLimitChat }) - - const mockLimitWorkflow = vi.fn().mockReturnValue(mockWorkflowResult) - const mockWhereWorkflow = vi.fn().mockReturnValue({ limit: mockLimitWorkflow }) - - const mockFrom = vi.fn().mockImplementation((table) => { - // Check which table is being queried - if (table === 'workflow') { - return { where: mockWhereWorkflow } + const mockSelect = vi.fn().mockImplementation((fields) => { + if (fields && fields.isDeployed !== undefined) { + return { + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue(mockWorkflowResult), + }), + }), + } + } + return { + from: vi.fn().mockReturnValue({ + where: vi.fn().mockReturnValue({ + limit: vi.fn().mockReturnValue(mockChatResult), + }), + }), } - return { where: mockWhereChat } }) - const mockSelect = vi.fn().mockReturnValue({ from: mockFrom }) - return { db: { select: mockSelect, }, + chat: {}, + workflow: {}, } }) - // Mock API response helpers vi.doMock('@/app/api/workflows/utils', () => ({ createErrorResponse: vi.fn().mockImplementation((message, status, code) => { return new Response( @@ -277,37 +298,47 @@ describe('Chat Subdomain API Route', () => { }) it('should return 503 when workflow is not available', async () => { + // Override the default workflow result to return non-deployed vi.doMock('@/db', () => { - const mockLimitChat = vi.fn().mockReturnValue([ - { - id: 'chat-id', - workflowId: 'unavailable-workflow', - isActive: true, - authType: 'public', - }, - ]) - const mockWhereChat = vi.fn().mockReturnValue({ limit: mockLimitChat }) - - // Second call returns non-deployed workflow - const mockLimitWorkflow = vi.fn().mockReturnValue([ - { - isDeployed: false, - }, - ]) - const mockWhereWorkflow = vi.fn().mockReturnValue({ limit: mockLimitWorkflow }) - - // Mock from function to return different where implementations - const mockFrom = vi - .fn() - .mockImplementationOnce(() => ({ where: mockWhereChat })) // First call (chat) - .mockImplementationOnce(() => ({ where: mockWhereWorkflow })) // Second call (workflow) + // Track call count to return different results + let callCount = 0 + + const mockLimit = vi.fn().mockImplementation(() => { + callCount++ + if (callCount === 1) { + // First call - chat query + return [ + { + id: 'chat-id', + workflowId: 'unavailable-workflow', + userId: 'user-id', + isActive: true, + authType: 'public', + outputConfigs: [{ blockId: 'block-1', path: 'output' }], + }, + ] + } + if (callCount === 2) { + // Second call - workflow query + return [ + { + isDeployed: false, + }, + ] + } + return [] + }) + const mockWhere = vi.fn().mockReturnValue({ limit: mockLimit }) + const mockFrom = vi.fn().mockReturnValue({ where: mockWhere }) const mockSelect = vi.fn().mockReturnValue({ from: mockFrom }) return { db: { select: mockSelect, }, + chat: {}, + workflow: {}, } }) @@ -325,6 +356,48 @@ describe('Chat Subdomain API Route', () => { expect(data).toHaveProperty('message', 'Chat workflow is not available') }) + it('should return streaming response for valid chat messages', async () => { + const req = createMockRequest('POST', { message: 'Hello world', conversationId: 'conv-123' }) + const params = Promise.resolve({ subdomain: 'test-chat' }) + + const { POST } = await import('./route') + + const response = await POST(req, { params }) + + expect(response.status).toBe(200) + expect(response.headers.get('Content-Type')).toBe('text/event-stream') + expect(response.headers.get('Cache-Control')).toBe('no-cache') + expect(response.headers.get('Connection')).toBe('keep-alive') + + // Verify executeWorkflowForChat was called with correct parameters + expect(mockExecuteWorkflowForChat).toHaveBeenCalledWith('chat-id', 'Hello world', 'conv-123') + }) + + it('should handle streaming response body correctly', async () => { + const req = createMockRequest('POST', { message: 'Hello world' }) + const params = Promise.resolve({ subdomain: 'test-chat' }) + + const { POST } = await import('./route') + + const response = await POST(req, { params }) + + expect(response.status).toBe(200) + expect(response.body).toBeInstanceOf(ReadableStream) + + // Test that we can read from the response stream + if (response.body) { + const reader = response.body.getReader() + const { value, done } = await reader.read() + + if (!done && value) { + const chunk = new TextDecoder().decode(value) + expect(chunk).toMatch(/^data: /) + } + + reader.releaseLock() + } + }) + it('should handle workflow execution errors gracefully', async () => { const originalExecuteWorkflow = mockExecuteWorkflowForChat.getMockImplementation() mockExecuteWorkflowForChat.mockImplementationOnce(async () => { @@ -338,15 +411,64 @@ describe('Chat Subdomain API Route', () => { const response = await POST(req, { params }) - expect(response.status).toBe(503) + expect(response.status).toBe(500) const data = await response.json() expect(data).toHaveProperty('error') - expect(data).toHaveProperty('message', 'Chat workflow is not available') + expect(data).toHaveProperty('message', 'Execution failed') if (originalExecuteWorkflow) { mockExecuteWorkflowForChat.mockImplementation(originalExecuteWorkflow) } }) + + it('should handle invalid JSON in request body', async () => { + // Create a request with invalid JSON + const req = { + method: 'POST', + json: vi.fn().mockRejectedValue(new Error('Invalid JSON')), + } as any + + const params = Promise.resolve({ subdomain: 'test-chat' }) + + const { POST } = await import('./route') + + const response = await POST(req, { params }) + + expect(response.status).toBe(400) + + const data = await response.json() + expect(data).toHaveProperty('error') + expect(data).toHaveProperty('message', 'Invalid request body') + }) + + it('should pass conversationId to executeWorkflowForChat when provided', async () => { + const req = createMockRequest('POST', { + message: 'Hello world', + conversationId: 'test-conversation-123', + }) + const params = Promise.resolve({ subdomain: 'test-chat' }) + + const { POST } = await import('./route') + + await POST(req, { params }) + + expect(mockExecuteWorkflowForChat).toHaveBeenCalledWith( + 'chat-id', + 'Hello world', + 'test-conversation-123' + ) + }) + + it('should handle missing conversationId gracefully', async () => { + const req = createMockRequest('POST', { message: 'Hello world' }) + const params = Promise.resolve({ subdomain: 'test-chat' }) + + const { POST } = await import('./route') + + await POST(req, { params }) + + expect(mockExecuteWorkflowForChat).toHaveBeenCalledWith('chat-id', 'Hello world', undefined) + }) }) }) diff --git a/apps/sim/app/api/chat/[subdomain]/route.ts b/apps/sim/app/api/chat/[subdomain]/route.ts index 68ae96bb33e..80d00861e8f 100644 --- a/apps/sim/app/api/chat/[subdomain]/route.ts +++ b/apps/sim/app/api/chat/[subdomain]/route.ts @@ -108,105 +108,17 @@ export async function POST( // Execute workflow with structured input (message + conversationId for context) const result = await executeWorkflowForChat(deployment.id, message, conversationId) - // If the executor returned a ReadableStream, stream it directly to the client - if (result instanceof ReadableStream) { - const streamResponse = new NextResponse(result, { - status: 200, - headers: { - 'Content-Type': 'text/plain; charset=utf-8', - }, - }) - return addCorsHeaders(streamResponse, request) - } - - // Handle StreamingExecution format - if (result && typeof result === 'object' && 'stream' in result && 'execution' in result) { - const streamResponse = new NextResponse(result.stream as ReadableStream, { - status: 200, - headers: { - 'Content-Type': 'text/plain; charset=utf-8', - }, - }) - return addCorsHeaders(streamResponse, request) - } - - // Format the result for the client - // If result.content is an object, preserve it for structured handling - // If it's text or another primitive, make sure it's accessible - let formattedResult: any = { output: null } - - if (result) { - // Check if we have multiple outputs - if (result.multipleOutputs && Array.isArray(result.contents)) { - // Format multiple outputs in a way that they can be displayed as separate messages - // Join all contents, ensuring each is on a new line if they're strings - const formattedContents = result.contents.map((content) => { - if (typeof content === 'string') { - return content - } - - try { - return JSON.stringify(content) - } catch (error) { - logger.warn(`[${requestId}] Error stringifying content:`, error) - return '[Object cannot be serialized]' - } - }) - - // Set output to be the joined contents - formattedResult = { - ...result, - output: formattedContents.join('\n\n'), // Separate each output with double newline - } - - // Keep the original contents for clients that can handle structured data - formattedResult.multipleOutputs = true - formattedResult.contents = result.contents - } else if (result.content) { - // Handle single output cases - if (typeof result.content === 'object') { - // For objects like { text: "some content" } - if (result.content.text) { - formattedResult.output = result.content.text - } else { - // Keep the original structure but also add an output field - try { - formattedResult = { - ...result, - output: JSON.stringify(result.content), - } - } catch (error) { - logger.warn(`[${requestId}] Error stringifying content:`, error) - formattedResult = { - ...result, - output: '[Object cannot be serialized]', - } - } - } - } else { - // For direct string content - formattedResult = { - ...result, - output: result.content, - } - } - } else { - // Fallback if no content - formattedResult = { - ...result, - output: 'No output returned from workflow', - } - } - } - - logger.info(`[${requestId}] Returning formatted chat response:`, { - hasOutput: !!formattedResult.output, - outputType: typeof formattedResult.output, - isMultipleOutputs: !!formattedResult.multipleOutputs, + // The result is always a ReadableStream that we can pipe to the client + const streamResponse = new NextResponse(result, { + status: 200, + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', + }, }) - - // Add CORS headers before returning the response - return addCorsHeaders(createSuccessResponse(formattedResult), request) + return addCorsHeaders(streamResponse, request) } catch (error: any) { logger.error(`[${requestId}] Error processing chat request:`, error) return addCorsHeaders( diff --git a/apps/sim/app/api/chat/utils.ts b/apps/sim/app/api/chat/utils.ts index a0e8512a7d1..99f3b532e58 100644 --- a/apps/sim/app/api/chat/utils.ts +++ b/apps/sim/app/api/chat/utils.ts @@ -21,12 +21,10 @@ declare global { const logger = createLogger('ChatAuthUtils') const isDevelopment = env.NODE_ENV === 'development' -// Simple encryption for the auth token export const encryptAuthToken = (subdomainId: string, type: string): string => { return Buffer.from(`${subdomainId}:${type}:${Date.now()}`).toString('base64') } -// Decrypt and validate the auth token export const validateAuthToken = (token: string, subdomainId: string): boolean => { try { const decoded = Buffer.from(token, 'base64').toString() @@ -210,32 +208,6 @@ export async function validateChatAuth( return { authorized: false, error: 'Unsupported authentication type' } } -/** - * Extract a specific output from a block using the blockId and path - * This mimics how the chat panel extracts outputs from blocks - */ -function _extractBlockOutput(logs: any[], blockId: string, path?: string) { - // Find the block in logs - const blockLog = logs.find((log) => log.blockId === blockId) - if (!blockLog || !blockLog.output) return null - - // If no specific path, return the full output - if (!path) return blockLog.output - - // Navigate the path to extract the specific output - let result = blockLog.output - const pathParts = path.split('.') - - for (const part of pathParts) { - if (result === null || result === undefined || typeof result !== 'object') { - return null - } - result = result[part] - } - - return result -} - /** * Executes a workflow for a chat request and returns the formatted output. * @@ -251,11 +223,13 @@ export async function executeWorkflowForChat( chatId: string, message: string, conversationId?: string -) { +): Promise { const requestId = crypto.randomUUID().slice(0, 8) logger.debug( - `[${requestId}] Executing workflow for chat: ${chatId}${conversationId ? `, conversationId: ${conversationId}` : ''}` + `[${requestId}] Executing workflow for chat: ${chatId}${ + conversationId ? `, conversationId: ${conversationId}` : '' + }` ) // Find the chat deployment @@ -431,373 +405,108 @@ export async function executeWorkflowForChat( {} as Record> ) - // Create and execute the workflow - mimicking use-workflow-execution.ts - const executor = new Executor({ - workflow: serializedWorkflow, - currentBlockStates: processedBlockStates, - envVarValues: decryptedEnvVars, - workflowInput: { input: message, conversationId }, - workflowVariables, - contextExtensions: { - // Always request streaming – the executor will downgrade gracefully if unsupported - stream: true, - selectedOutputIds: outputBlockIds, - edges: edges.map((e: any) => ({ source: e.source, target: e.target })), - }, - }) - - // Execute and capture the result - const result = await executor.execute(workflowId) + const stream = new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder() + const streamedContent = new Map() - // If the executor returned a ReadableStream, forward it directly for streaming - if (result instanceof ReadableStream) { - return result - } + const onStream = async (streamingExecution: any): Promise => { + if (!streamingExecution.stream) return - // Handle StreamingExecution format (combined stream + execution data) - if (result && typeof result === 'object' && 'stream' in result && 'execution' in result) { - // We need to stream the response to the client while *also* capturing the full - // content so that we can persist accurate logs once streaming completes. - - // Duplicate the original stream – one copy goes to the client, the other we read - // server-side for log enrichment. - const [clientStream, loggingStream] = (result.stream as ReadableStream).tee() - - // Kick off background processing to read the stream and persist enriched logs - const processingPromise = (async () => { - try { - // The stream is only used to properly drain it and prevent memory leaks - // All the execution data is already provided from the agent handler - // through the X-Execution-Data header - await drainStream(loggingStream) - - // No need to wait for a processing promise - // The execution-logger.ts will handle token estimation - - // We can use the execution data as-is since it's already properly structured - const executionData = result.execution as any - - // Before persisting, clean up any response objects with zero tokens in agent blocks - // This prevents confusion in the console logs - if (executionData.logs && Array.isArray(executionData.logs)) { - executionData.logs.forEach((log: BlockLog) => { - if (log.blockType === 'agent' && log.output?.response) { - const response = log.output.response - - // Check for zero tokens that will be estimated later - if ( - response.tokens && - (!response.tokens.completion || response.tokens.completion === 0) && - (!response.toolCalls || - !response.toolCalls.list || - response.toolCalls.list.length === 0) - ) { - // Remove tokens from console display to avoid confusion - // They'll be properly estimated in the execution logger - response.tokens = undefined - } + const blockId = streamingExecution.execution?.blockId + const reader = streamingExecution.stream.getReader() + if (blockId) { + streamedContent.set(blockId, '') + } + try { + while (true) { + const { done, value } = await reader.read() + if (done) { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ blockId, event: 'end' })}\n\n`) + ) + break + } + const chunk = new TextDecoder().decode(value) + if (blockId) { + streamedContent.set(blockId, (streamedContent.get(blockId) || '') + chunk) } - }) + controller.enqueue(encoder.encode(`data: ${JSON.stringify({ blockId, chunk })}\n\n`)) + } + } catch (error) { + logger.error('Error while reading from stream:', error) + controller.error(error) } + } - // Build trace spans and persist - const { traceSpans, totalDuration } = buildTraceSpans(executionData) - const enrichedResult = { - ...executionData, - traceSpans, - totalDuration, - } + const executor = new Executor({ + workflow: serializedWorkflow, + currentBlockStates: processedBlockStates, + envVarValues: decryptedEnvVars, + workflowInput: { input: message, conversationId }, + workflowVariables, + contextExtensions: { + stream: true, + selectedOutputIds: outputBlockIds, + edges: edges.map((e: any) => ({ + source: e.source, + target: e.target, + })), + onStream, + }, + }) + + const result = await executor.execute(workflowId) + + if (result && 'success' in result) { + result.logs?.forEach((log: BlockLog) => { + if (streamedContent.has(log.blockId)) { + if (log.output?.response) { + log.output.response.content = streamedContent.get(log.blockId) + } + } + }) - // Add conversationId to metadata if available + const { traceSpans, totalDuration } = buildTraceSpans(result) + const enrichedResult = { ...result, traceSpans, totalDuration } if (conversationId) { if (!enrichedResult.metadata) { enrichedResult.metadata = { duration: totalDuration, + startTime: new Date().toISOString(), } } ;(enrichedResult.metadata as any).conversationId = conversationId } - const executionId = uuidv4() await persistExecutionLogs(workflowId, executionId, enrichedResult, 'chat') - logger.debug( - `[${requestId}] Persisted execution logs for streaming chat with ID: ${executionId}${ - conversationId ? `, conversationId: ${conversationId}` : '' - }` - ) + logger.debug(`Persisted logs for deployed chat: ${executionId}`) - // Update user stats for successful streaming chat execution - if (executionData.success) { + if (result.success) { try { - // Find the workflow to get the user ID - const workflowData = await db - .select({ userId: workflow.userId }) - .from(workflow) - .where(eq(workflow.id, workflowId)) - .limit(1) - - if (workflowData.length > 0) { - const userId = workflowData[0].userId - - // Update the user stats - await db - .update(userStats) - .set({ - totalChatExecutions: sql`total_chat_executions + 1`, - lastActive: new Date(), - }) - .where(eq(userStats.userId, userId)) - - logger.debug( - `[${requestId}] Updated user stats: incremented totalChatExecutions for streaming chat` - ) - } + await db + .update(userStats) + .set({ + totalChatExecutions: sql`total_chat_executions + 1`, + lastActive: new Date(), + }) + .where(eq(userStats.userId, deployment.userId)) + logger.debug(`Updated user stats for deployed chat: ${deployment.userId}`) } catch (error) { - // Don't fail if stats update fails - logger.error(`[${requestId}] Failed to update streaming chat execution stats:`, error) - } - } - - return { success: true } - } catch (error) { - logger.error(`[${requestId}] Failed to persist streaming chat execution logs:`, error) - return { success: false, error } - } finally { - // Ensure the stream is properly closed even if an error occurs - try { - const controller = new AbortController() - const _signal = controller.signal - controller.abort() - } catch (cleanupError) { - logger.debug(`[${requestId}] Error during stream cleanup: ${cleanupError}`) - } - } - })() - - // Register this processing promise with a global handler or tracker if needed - // This allows the background task to be monitored or waited for in testing - if (typeof global.__chatStreamProcessingTasks !== 'undefined') { - global.__chatStreamProcessingTasks.push( - processingPromise as Promise<{ success: boolean; error?: any }> - ) - } - - // Return the client-facing stream - return clientStream - } - - // Mark as chat execution in metadata - if (result) { - ;(result as any).metadata = { - ...(result.metadata || {}), - source: 'chat', - } - - // Add conversationId to metadata if available - if (conversationId) { - ;(result as any).metadata.conversationId = conversationId - } - } - - // Update user stats to increment totalChatExecutions if the execution was successful - if (result.success) { - try { - // Find the workflow to get the user ID - const workflowData = await db - .select({ userId: workflow.userId }) - .from(workflow) - .where(eq(workflow.id, workflowId)) - .limit(1) - - if (workflowData.length > 0) { - const userId = workflowData[0].userId - - // Update the user stats - await db - .update(userStats) - .set({ - totalChatExecutions: sql`total_chat_executions + 1`, - lastActive: new Date(), - }) - .where(eq(userStats.userId, userId)) - - logger.debug(`[${requestId}] Updated user stats: incremented totalChatExecutions`) - } - } catch (error) { - // Don't fail the chat response if stats update fails - logger.error(`[${requestId}] Failed to update chat execution stats:`, error) - } - } - - // Persist execution logs using the 'chat' trigger type for non-streaming results - try { - // Build trace spans to enrich the logs (same as in use-workflow-execution.ts) - const { traceSpans, totalDuration } = buildTraceSpans(result) - - // Create enriched result with trace data - const enrichedResult = { - ...result, - traceSpans, - totalDuration, - } - - // Add conversation ID to metadata if available - if (conversationId) { - if (!enrichedResult.metadata) { - enrichedResult.metadata = { - duration: totalDuration, - } - } - ;(enrichedResult.metadata as any).conversationId = conversationId - } - - // Generate a unique execution ID for this chat interaction - const executionId = uuidv4() - - // Persist the logs with 'chat' trigger type - await persistExecutionLogs(workflowId, executionId, enrichedResult, 'chat') - - logger.debug( - `[${requestId}] Persisted execution logs for chat with ID: ${executionId}${ - conversationId ? `, conversationId: ${conversationId}` : '' - }` - ) - } catch (error) { - // Don't fail the chat response if logging fails - logger.error(`[${requestId}] Failed to persist chat execution logs:`, error) - } - - if (!result.success) { - logger.error(`[${requestId}] Workflow execution failed:`, result.error) - throw new Error(`Workflow execution failed: ${result.error}`) - } - - logger.debug( - `[${requestId}] Workflow executed successfully, blocks executed: ${result.logs?.length || 0}` - ) - - // Get the outputs from all selected blocks - const outputs: { content: any }[] = [] - let hasFoundOutputs = false - - if (outputBlockIds.length > 0 && result.logs) { - logger.debug( - `[${requestId}] Looking for outputs from ${outputBlockIds.length} configured blocks` - ) - - // Extract outputs from each selected block - for (let i = 0; i < outputBlockIds.length; i++) { - const blockId = outputBlockIds[i] - const path = outputPaths[i] || undefined - - logger.debug( - `[${requestId}] Looking for output from block ${blockId} with path ${path || 'none'}` - ) - - // Find the block log entry - const blockLog = result.logs.find((log) => log.blockId === blockId) - if (!blockLog || !blockLog.output) { - logger.debug(`[${requestId}] No output found for block ${blockId}`) - continue - } - - // Extract the specific path if provided - let specificOutput = blockLog.output - if (path) { - logger.debug(`[${requestId}] Extracting path ${path} from output`) - const pathParts = path.split('.') - for (const part of pathParts) { - if ( - specificOutput === null || - specificOutput === undefined || - typeof specificOutput !== 'object' - ) { - logger.debug(`[${requestId}] Cannot extract path ${part}, output is not an object`) - specificOutput = null - break + logger.error(`Failed to update user stats for deployed chat:`, error) } - specificOutput = specificOutput[part] } } - if (specificOutput !== null && specificOutput !== undefined) { - logger.debug(`[${requestId}] Found output for block ${blockId}`) - outputs.push({ - content: specificOutput, - }) - hasFoundOutputs = true - } - } - } - - // If no specific outputs were found, use the final result - if (!hasFoundOutputs) { - logger.debug(`[${requestId}] No specific outputs found, using final output`) - if (result.output) { - if (result.output.response) { - outputs.push({ - content: result.output.response, - }) - } else { - outputs.push({ - content: result.output, - }) + if (!(result && typeof result === 'object' && 'stream' in result)) { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ event: 'final', data: result })}\n\n`) + ) } - } - } - // Simplify the response format to match what the chat panel expects - if (outputs.length === 1) { - const content = outputs[0].content - // Don't wrap strings in an object - if (typeof content === 'string') { - return { - id: uuidv4(), - content: content, - timestamp: new Date().toISOString(), - type: 'workflow', - } - } - // Return the content directly - avoid extra nesting - return { - id: uuidv4(), - content: content, - timestamp: new Date().toISOString(), - type: 'workflow', - } - } - if (outputs.length > 1) { - // For multiple outputs, create a structured object that can be handled better by the client - // This approach allows the client to decide how to render multiple outputs - return { - id: uuidv4(), - multipleOutputs: true, - contents: outputs.map((o) => o.content), - timestamp: new Date().toISOString(), - type: 'workflow', - } - } - // Fallback for no outputs - should rarely happen - return { - id: uuidv4(), - content: 'No output returned from workflow', - timestamp: new Date().toISOString(), - type: 'workflow', - } -} + controller.close() + }, + }) -/** - * Utility function to properly drain a stream to prevent memory leaks - */ -async function drainStream(stream: ReadableStream): Promise { - const reader = stream.getReader() - try { - while (true) { - const { done, value } = await reader.read() - if (done) break - // We don't need to do anything with the value, just drain the stream - } - } finally { - reader.releaseLock() - } + return stream } diff --git a/apps/sim/app/chat/[subdomain]/chat-client.tsx b/apps/sim/app/chat/[subdomain]/chat-client.tsx index 1f5c087fcc0..697bc00bdae 100644 --- a/apps/sim/app/chat/[subdomain]/chat-client.tsx +++ b/apps/sim/app/chat/[subdomain]/chat-client.tsx @@ -19,6 +19,9 @@ import { useChatStreaming } from './hooks/use-chat-streaming' const logger = createLogger('ChatClient') +// Chat timeout configuration (5 minutes) +const CHAT_REQUEST_TIMEOUT_MS = 300000 + interface ChatConfig { id: string title: string @@ -285,172 +288,174 @@ export default function ChatClient({ subdomain }: { subdomain: string }) { scrollToMessage(userMessage.id, true) }, 100) + // Create abort controller for request cancellation + const abortController = new AbortController() + const timeoutId = setTimeout(() => { + abortController.abort() + }, CHAT_REQUEST_TIMEOUT_MS) + try { // Send structured payload to maintain chat context const payload = { - message: userMessage.content, + message: + typeof userMessage.content === 'string' + ? userMessage.content + : JSON.stringify(userMessage.content), conversationId, } - // Create a new AbortController for this request - abortControllerRef.current = new AbortController() - - // Use relative URL with credentials const response = await fetch(`/api/chat/${subdomain}`, { method: 'POST', - credentials: 'same-origin', headers: { 'Content-Type': 'application/json', 'X-Requested-With': 'XMLHttpRequest', }, body: JSON.stringify(payload), - signal: abortControllerRef.current.signal, + credentials: 'same-origin', + signal: abortController.signal, }) + // Clear timeout since request succeeded + clearTimeout(timeoutId) + if (!response.ok) { - throw new Error('Failed to get response') + const errorData = await response.json() + throw new Error(errorData.error || 'Failed to get response') + } + + if (!response.body) { + throw new Error('Response body is missing') } - // Detect streaming response via content-type (text/plain) or absence of JSON content-type - const contentType = response.headers.get('Content-Type') || '' + const messageIdMap = new Map() - if (contentType.includes('text/plain')) { - const shouldPlayAudio = isVoiceInput || isVoiceFirstMode + // Get reader with proper cleanup + const reader = response.body.getReader() + const decoder = new TextDecoder() - const audioStreamHandler = shouldPlayAudio - ? createAudioStreamHandler(streamTextToAudio, DEFAULT_VOICE_SETTINGS.voiceId) - : undefined + const processStream = async () => { + let streamAborted = false - // Handle streaming response with audio support - await handleStreamedResponse( - response, - setMessages, - setIsLoading, - scrollToBottom, - userHasScrolled, - { - voiceSettings: { - isVoiceEnabled: true, - voiceId: DEFAULT_VOICE_SETTINGS.voiceId, - autoPlayResponses: isVoiceInput || isVoiceFirstMode, - }, - audioStreamHandler, + // Add cleanup handler for abort + const cleanup = () => { + streamAborted = true + try { + reader.releaseLock() + } catch (error) { + // Reader might already be released + logger.debug('Reader already released:', error) } - ) - } else { - // Fallback to JSON response handling - const responseData = await response.json() - logger.info('Message response:', responseData) - - // Handle different response formats from API - if ( - responseData.multipleOutputs && - responseData.contents && - Array.isArray(responseData.contents) - ) { - // For multiple outputs, create separate assistant messages for each - const assistantMessages = responseData.contents.map((content: any) => { - // Format the content appropriately - let formattedContent = content - - // Convert objects to strings for display - if (typeof formattedContent === 'object' && formattedContent !== null) { - try { - formattedContent = JSON.stringify(formattedContent) - } catch (_e) { - formattedContent = 'Received structured data response' - } - } + setIsLoading(false) + } + + // Listen for abort events + abortController.signal.addEventListener('abort', cleanup) + + try { + while (!streamAborted) { + const { done, value } = await reader.read() - return { - id: crypto.randomUUID(), - content: formattedContent || 'No content found', - type: 'assistant' as const, - timestamp: new Date(), + if (done) { + setIsLoading(false) + break } - }) - // Add all messages at once - setMessages((prev) => [...prev, ...assistantMessages]) - - // Play audio for the full response if voice mode is enabled - if (isVoiceInput || isVoiceFirstMode) { - const fullContent = assistantMessages.map((m: ChatMessage) => m.content).join(' ') - if (fullContent.trim()) { - try { - await streamTextToAudio(fullContent, { - voiceId: DEFAULT_VOICE_SETTINGS.voiceId, - onError: (error) => { - logger.error('Audio playback error:', error) - }, - }) - } catch (error) { - logger.error('TTS error:', error) - } + if (streamAborted) { + break } - } - } else { - // Handle single output as before - let messageContent = responseData.output - - if (!messageContent && responseData.content) { - if (typeof responseData.content === 'object') { - if (responseData.content.text) { - messageContent = responseData.content.text - } else { + + const chunk = decoder.decode(value, { stream: true }) + const lines = chunk.split('\n\n') + + for (const line of lines) { + if (line.startsWith('data: ')) { try { - messageContent = JSON.stringify(responseData.content) - } catch (_e) { - messageContent = 'Received structured data response' + const json = JSON.parse(line.substring(6)) + const { blockId, chunk: contentChunk, event: eventType } = json + + if (eventType === 'final') { + setIsLoading(false) + return + } + + if (blockId && contentChunk) { + if (!messageIdMap.has(blockId)) { + const newMessageId = crypto.randomUUID() + messageIdMap.set(blockId, newMessageId) + setMessages((prev) => [ + ...prev, + { + id: newMessageId, + content: contentChunk, + type: 'assistant', + timestamp: new Date(), + isStreaming: true, + }, + ]) + } else { + const messageId = messageIdMap.get(blockId) + if (messageId) { + setMessages((prev) => + prev.map((msg) => + msg.id === messageId + ? { ...msg, content: msg.content + contentChunk } + : msg + ) + ) + } + } + } else if (blockId && eventType === 'end') { + const messageId = messageIdMap.get(blockId) + if (messageId) { + setMessages((prev) => + prev.map((msg) => + msg.id === messageId ? { ...msg, isStreaming: false } : msg + ) + ) + } + } + } catch (parseError) { + logger.error('Error parsing stream data:', parseError) + // Continue processing other lines even if one fails } } - } else { - messageContent = responseData.content } } - - const assistantMessage: ChatMessage = { - id: crypto.randomUUID(), - content: messageContent || "Sorry, I couldn't process your request.", - type: 'assistant', - timestamp: new Date(), + } catch (streamError: unknown) { + if (streamError instanceof Error && streamError.name === 'AbortError') { + logger.info('Stream processing aborted by user') + return } - setMessages((prev) => [...prev, assistantMessage]) - - // Play audio for the response if voice mode is enabled - if ((isVoiceInput || isVoiceFirstMode) && assistantMessage.content) { - const contentString = - typeof assistantMessage.content === 'string' - ? assistantMessage.content - : JSON.stringify(assistantMessage.content) - - try { - await streamTextToAudio(contentString, { - voiceId: DEFAULT_VOICE_SETTINGS.voiceId, - onError: (error) => { - logger.error('Audio playback error:', error) - }, - }) - } catch (error) { - logger.error('TTS error:', error) - } - } + logger.error('Error processing stream:', streamError) + throw streamError + } finally { + // Ensure cleanup always happens + cleanup() + abortController.signal.removeEventListener('abort', cleanup) } } - } catch (error) { - logger.error('Error sending message:', error) + await processStream() + } catch (error: any) { + // Clear timeout in case of error + clearTimeout(timeoutId) + + if (error.name === 'AbortError') { + logger.info('Request aborted by user or timeout') + setIsLoading(false) + return + } + + logger.error('Error sending message:', error) + setIsLoading(false) const errorMessage: ChatMessage = { id: crypto.randomUUID(), content: 'Sorry, there was an error processing your message. Please try again.', type: 'assistant', timestamp: new Date(), } - setMessages((prev) => [...prev, errorMessage]) - } finally { - setIsLoading(false) } } diff --git a/apps/sim/app/w/[id]/components/panel/components/chat/chat.tsx b/apps/sim/app/w/[id]/components/panel/components/chat/chat.tsx index 5c719500c60..9bb91ad2a4c 100644 --- a/apps/sim/app/w/[id]/components/panel/components/chat/chat.tsx +++ b/apps/sim/app/w/[id]/components/panel/components/chat/chat.tsx @@ -5,9 +5,7 @@ import { ArrowUp } from 'lucide-react' import { Button } from '@/components/ui/button' import { Input } from '@/components/ui/input' import { ScrollArea } from '@/components/ui/scroll-area' -import { buildTraceSpans } from '@/lib/logs/trace-spans' -import type { BlockLog } from '@/executor/types' -import { calculateCost } from '@/providers/utils' +import type { BlockLog, ExecutionResult } from '@/executor/types' import { useExecutionStore } from '@/stores/execution/store' import { useChatStore } from '@/stores/panel/chat/store' import { useConsoleStore } from '@/stores/panel/console/store' @@ -113,189 +111,182 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) { // Check if we got a streaming response if (result && 'stream' in result && result.stream instanceof ReadableStream) { - // Generate a unique ID for the message - const messageId = crypto.randomUUID() + const messageIdMap = new Map() - // Create a content buffer to collect initial content - let initialContent = '' - let fullContent = '' // Store the complete content for updating logs later - let hasAddedMessage = false - const executionResult = (result as any).execution // Store the execution result with type assertion + const reader = result.stream.getReader() + const decoder = new TextDecoder() - try { - // Process the stream - const reader = result.stream.getReader() - const decoder = new TextDecoder() + const processStream = async () => { + while (true) { + const { done, value } = await reader.read() + if (done) { + // Finalize all streaming messages + messageIdMap.forEach((id) => finalizeMessageStream(id)) + break + } - console.log('Starting to read from stream') + const chunk = decoder.decode(value) + const lines = chunk.split('\n\n') - while (true) { - try { - const { done, value } = await reader.read() - if (done) { - console.log('Stream complete') - break - } + for (const line of lines) { + if (line.startsWith('data: ')) { + try { + const json = JSON.parse(line.substring(6)) + const { blockId, chunk: contentChunk, event, data } = json + + if (event === 'final' && data) { + const result = data as ExecutionResult + const nonStreamingLogs = + result.logs?.filter((log) => !messageIdMap.has(log.blockId)) || [] + + if (nonStreamingLogs.length > 0) { + const outputsToRender = selectedOutputs.filter((outputId) => + nonStreamingLogs.some((log) => log.blockId === outputId.split('.')[0]) + ) - // Decode and append chunk - const chunk = decoder.decode(value, { stream: true }) // Use stream option - - if (chunk) { - initialContent += chunk - fullContent += chunk - - // Only add the message to UI once we have some actual content to show - if (!hasAddedMessage && initialContent.trim().length > 0) { - // Add message with initial content - cast to any to bypass type checking for id - addMessage({ - content: initialContent, - workflowId: activeWorkflowId, - type: 'workflow', - isStreaming: true, - id: messageId, - } as any) - hasAddedMessage = true - } else if (hasAddedMessage) { - // Append to existing message - appendMessageContent(messageId, chunk) + for (const outputId of outputsToRender) { + const blockIdForOutput = outputId.split('.')[0] + const path = outputId.substring(blockIdForOutput.length + 1) + const log = nonStreamingLogs.find((l) => l.blockId === blockIdForOutput) + + if (log) { + let outputValue: any = log.output + if (path) { + const pathParts = path.split('.') + for (const part of pathParts) { + if ( + outputValue && + typeof outputValue === 'object' && + part in outputValue + ) { + outputValue = outputValue[part] + } else { + outputValue = undefined + break + } + } + } + if (outputValue !== undefined) { + addMessage({ + content: + typeof outputValue === 'string' + ? outputValue + : `\`\`\`json\n${JSON.stringify(outputValue, null, 2)}\n\`\`\``, + workflowId: activeWorkflowId, + type: 'workflow', + }) + } + } + } + } + } else if (blockId && contentChunk) { + if (!messageIdMap.has(blockId)) { + const newMessageId = crypto.randomUUID() + messageIdMap.set(blockId, newMessageId) + addMessage({ + id: newMessageId, + content: contentChunk, + workflowId: activeWorkflowId, + type: 'workflow', + isStreaming: true, + }) + } else { + const existingMessageId = messageIdMap.get(blockId) + if (existingMessageId) { + appendMessageContent(existingMessageId, contentChunk) + } + } + } else if (blockId && event === 'end') { + const existingMessageId = messageIdMap.get(blockId) + if (existingMessageId) { + finalizeMessageStream(existingMessageId) + } + } + } catch (e) { + console.error('Error parsing stream data:', e) } } - } catch (streamError) { - console.error('Error reading from stream:', streamError) - // Break the loop on error - break } } + } - // If we never added a message (no content received), add it now - if (!hasAddedMessage && initialContent.trim().length > 0) { - addMessage({ - content: initialContent, - workflowId: activeWorkflowId, - type: 'workflow', - id: messageId, - } as any) - } - - // Update logs with the full streaming content if available - if (executionResult && fullContent.trim().length > 0) { - try { - // Format the final content properly to match what's shown for manual executions - // Include all the markdown and formatting from the streamed response - const formattedContent = fullContent - - // Calculate cost based on token usage if available - let costData: any - - if (executionResult.output?.response?.tokens) { - const tokens = executionResult.output.response.tokens - const model = executionResult.output?.response?.model || 'gpt-4o' - const cost = calculateCost( - model, - tokens.prompt || 0, - tokens.completion || 0, - false // Don't use cached input for chat responses - ) - costData = { ...cost, model } as any + processStream().catch((e) => console.error('Error processing stream:', e)) + } else if (result && 'success' in result && result.success && 'logs' in result) { + const finalOutputs: any[] = [] + + if (selectedOutputs && selectedOutputs.length > 0) { + for (const outputId of selectedOutputs) { + // Find the log that corresponds to the start of the outputId + const log = result.logs?.find( + (l: BlockLog) => l.blockId === outputId || outputId.startsWith(`${l.blockId}_`) + ) + + if (log) { + let output = log.output + // Check if there is a path to traverse + if (outputId.length > log.blockId.length) { + const path = outputId.substring(log.blockId.length + 1) + if (path) { + const pathParts = path.split('.') + let current = output + for (const part of pathParts) { + if (current && typeof current === 'object' && part in current) { + current = current[part] + } else { + current = undefined + break + } + } + output = current + } } + if (output !== undefined) { + finalOutputs.push(output) + } + } + } + } - // Build trace spans and total duration before persisting - const { traceSpans, totalDuration } = buildTraceSpans(executionResult as any) - - // Create a completed execution ID - const completedExecutionId = - executionResult.metadata?.executionId || crypto.randomUUID() - - // Import the workflow execution hook for direct access to the workflow service - const workflowExecutionApi = await fetch(`/api/workflows/${activeWorkflowId}/log`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - executionId: completedExecutionId, - result: { - ...executionResult, - output: { - ...executionResult.output, - response: { - ...executionResult.output?.response, - content: formattedContent, - model: executionResult.output?.response?.model, - tokens: executionResult.output?.response?.tokens, - toolCalls: executionResult.output?.response?.toolCalls, - providerTiming: executionResult.output?.response?.providerTiming, - cost: costData || executionResult.output?.response?.cost, - }, - }, - cost: costData, - // Update the message to include the formatted content - logs: (executionResult.logs || []).map((log: BlockLog) => { - // Check if this is the streaming block by comparing with the selected output IDs - // Selected output IDs typically include the block ID we are streaming from - const isStreamingBlock = selectedOutputs.some( - (outputId) => - outputId === log.blockId || outputId.startsWith(`${log.blockId}_`) - ) + // If no specific outputs could be resolved, fall back to the final workflow output + if (finalOutputs.length === 0 && result.output) { + finalOutputs.push(result.output) + } - if (isStreamingBlock && log.blockType === 'agent' && log.output?.response) { - return { - ...log, - output: { - ...log.output, - response: { - ...log.output.response, - content: formattedContent, - providerTiming: log.output.response.providerTiming, - cost: costData || log.output.response.cost, - }, - }, - } - } - return log - }), - metadata: { - ...executionResult.metadata, - source: 'chat', - completedAt: new Date().toISOString(), - isStreamingComplete: true, - cost: costData || executionResult.metadata?.cost, - providerTiming: executionResult.output?.response?.providerTiming, - }, - traceSpans: traceSpans, - totalDuration: totalDuration, - }, - }), - }) - - if (!workflowExecutionApi.ok) { - console.error('Failed to log complete streaming execution') + // Add a new message for each resolved output + finalOutputs.forEach((output) => { + let content = '' + if (typeof output === 'string') { + content = output + } else if (output && typeof output === 'object') { + // Handle cases where output is { response: ... } + const outputObj = output as Record + const response = outputObj.response + if (response) { + if (typeof response.content === 'string') { + content = response.content + } else { + // Pretty print for better readability + content = `\`\`\`json\n${JSON.stringify(response, null, 2)}\n\`\`\`` } - } catch (logError) { - console.error('Error logging complete streaming execution:', logError) + } else { + content = `\`\`\`json\n${JSON.stringify(output, null, 2)}\n\`\`\`` } } - } catch (error) { - console.error('Error processing stream:', error) - // If there's an error and we haven't added a message yet, add an error message - if (!hasAddedMessage) { + if (content) { addMessage({ - content: 'Error: Failed to process the streaming response.', + content, workflowId: activeWorkflowId, type: 'workflow', - id: messageId, - } as any) - } else { - // Otherwise append the error to the existing message - appendMessageContent(messageId, '\n\nError: Failed to process the streaming response.') - } - } finally { - console.log('Finalizing stream') - if (hasAddedMessage) { - finalizeMessageStream(messageId) + }) } - } + }) + } else if (result && 'success' in result && !result.success) { + addMessage({ + content: `Error: ${'error' in result ? result.error : 'Workflow execution failed.'}`, + workflowId: activeWorkflowId, + type: 'workflow', + }) } } diff --git a/apps/sim/app/w/[id]/components/panel/components/console/components/console-entry/console-entry.tsx b/apps/sim/app/w/[id]/components/panel/components/console/components/console-entry/console-entry.tsx index de1364368e4..99adc458ee4 100644 --- a/apps/sim/app/w/[id]/components/panel/components/console/components/console-entry/console-entry.tsx +++ b/apps/sim/app/w/[id]/components/panel/components/console/components/console-entry/console-entry.tsx @@ -4,7 +4,6 @@ import { AlertCircle, AlertTriangle, Calendar, - CheckCircle2, ChevronDown, ChevronUp, Clock, @@ -66,14 +65,6 @@ export function ConsoleEntry({ entry, consoleWidth }: ConsoleEntryProps) { const BlockIcon = blockConfig?.icon - const _statusIcon = entry.error ? ( - - ) : entry.warning ? ( - - ) : ( - - ) - // Helper function to check if data has nested objects or arrays const hasNestedStructure = (data: any): boolean => { if (data === null || typeof data !== 'object') return false @@ -93,9 +84,9 @@ export function ConsoleEntry({ entry, consoleWidth }: ConsoleEntryProps) { return (
!entry.error && !entry.warning && setIsExpanded(!isExpanded)} + onClick={() => !entry.error && !entry.warning && entry.success && setIsExpanded(!isExpanded)} >
- {format(new Date(entry.startedAt), 'HH:mm:ss')} + {entry.startedAt ? format(new Date(entry.startedAt), 'HH:mm:ss') : 'N/A'}
- Duration: {entry.durationMs}ms + Duration: {entry.durationMs ?? 0}ms
diff --git a/apps/sim/app/w/[id]/hooks/use-workflow-execution.ts b/apps/sim/app/w/[id]/hooks/use-workflow-execution.ts index 95ba2db2b54..1f7c78f47e2 100644 --- a/apps/sim/app/w/[id]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/w/[id]/hooks/use-workflow-execution.ts @@ -2,9 +2,11 @@ import { useCallback, useState } from 'react' import { v4 as uuidv4 } from 'uuid' import { createLogger } from '@/lib/logs/console-logger' import { buildTraceSpans } from '@/lib/logs/trace-spans' +import type { BlockOutput } from '@/blocks/types' import { Executor } from '@/executor' -import type { ExecutionResult } from '@/executor/types' +import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types' import { Serializer } from '@/serializer' +import type { SerializedWorkflow } from '@/serializer/types' import { useExecutionStore } from '@/stores/execution/store' import { useNotificationStore } from '@/stores/notifications/store' import { useConsoleStore } from '@/stores/panel/console/store' @@ -18,6 +20,27 @@ import { useWorkflowStore } from '@/stores/workflows/workflow/store' const logger = createLogger('useWorkflowExecution') +// Interface for executor options +interface ExecutorOptions { + workflow: SerializedWorkflow + currentBlockStates?: Record + envVarValues?: Record + workflowInput?: any + workflowVariables?: Record + contextExtensions?: { + stream?: boolean + selectedOutputIds?: string[] + edges?: Array<{ source: string; target: string }> + onStream?: (streamingExecution: StreamingExecution) => Promise + } +} + +// Interface for stream error handling +interface StreamError extends Error { + name: string + message: string +} + export function useWorkflowExecution() { const { blocks, edges, loops, parallels } = useWorkflowStore() const { activeWorkflowId } = useWorkflowRegistry() @@ -124,313 +147,147 @@ export function useWorkflowExecution() { setActiveTab('console') } - const executionId = uuidv4() - // Determine if this is a chat execution - // Only true if the execution is initiated from the chat panel - // or through a chat-specific execution path const isChatExecution = activeTab === 'chat' && workflowInput && typeof workflowInput === 'object' && 'input' in workflowInput - // If this is a chat execution, get the selected outputs - let selectedOutputIds: string[] | undefined - if (isChatExecution && activeWorkflowId) { - // Get selected outputs from chat store - const chatStore = await import('@/stores/panel/chat/store').then((mod) => mod.useChatStore) - selectedOutputIds = chatStore.getState().getSelectedWorkflowOutput(activeWorkflowId) - logger.info('Chat execution with selected outputs:', selectedOutputIds) - } - - try { - // Clear any existing state - setDebugContext(null) - - // Use the mergeSubblockState utility to get all block states - const mergedStates = mergeSubblockState(blocks) - const currentBlockStates = Object.entries(mergedStates).reduce( - (acc, [id, block]) => { - acc[id] = Object.entries(block.subBlocks).reduce( - (subAcc, [key, subBlock]) => { - subAcc[key] = subBlock.value - return subAcc - }, - {} as Record - ) - return acc - }, - {} as Record> - ) - - // Get environment variables - const envVars = getAllVariables() - const envVarValues = Object.entries(envVars).reduce( - (acc, [key, variable]) => { - acc[key] = variable.value - return acc - }, - {} as Record - ) - - // Get workflow variables - const workflowVars = activeWorkflowId ? getVariablesByWorkflowId(activeWorkflowId) : [] - const workflowVariables = workflowVars.reduce( - (acc, variable) => { - acc[variable.id] = variable - return acc - }, - {} as Record - ) - - // Create serialized workflow - const workflow = new Serializer().serializeWorkflow(mergedStates, edges, loops, parallels) - - // Create executor options with streaming support for chat - const executorOptions: any = { - // Default executor options - workflow, - currentBlockStates, - envVarValues, - workflowInput, - workflowVariables, - } - - // Add streaming context for chat executions - if (isChatExecution && selectedOutputIds && selectedOutputIds.length > 0) { - executorOptions.contextExtensions = { - stream: true, - selectedOutputIds, - edges: workflow.connections.map((conn) => ({ - source: conn.source, - target: conn.target, - })), - } - } - - // Create executor and store in global state - const newExecutor = new Executor(executorOptions) - setExecutor(newExecutor) - - // Execute workflow - const result = await newExecutor.execute(activeWorkflowId) - - // Streaming results are handled differently - they won't have a standard result - if (result instanceof ReadableStream) { - logger.info('Received streaming result from executor') + // For chat executions, we'll use a streaming approach + if (isChatExecution) { + const stream = new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder() + const executionId = uuidv4() + const streamedContent = new Map() + const streamReadingPromises: Promise[] = [] + + const onStream = async (streamingExecution: StreamingExecution) => { + const promise = (async () => { + if (!streamingExecution.stream) return + const reader = streamingExecution.stream.getReader() + const blockId = (streamingExecution.execution as any)?.blockId + if (blockId) { + streamedContent.set(blockId, '') + } + try { + while (true) { + const { done, value } = await reader.read() + if (done) { + break + } + const chunk = new TextDecoder().decode(value) + if (blockId) { + streamedContent.set(blockId, (streamedContent.get(blockId) || '') + chunk) + } + controller.enqueue( + encoder.encode( + `data: ${JSON.stringify({ + blockId, + chunk, + })}\n\n` + ) + ) + } + } catch (error) { + logger.error('Error reading from stream:', error) + controller.error(error) + } + })() + streamReadingPromises.push(promise) + } - // For streaming results, we need to handle them in the component - // that initiated the execution (chat panel) - return { - success: true, - stream: result, - } - } + try { + const result = await executeWorkflow(workflowInput, onStream) - // Handle StreamingExecution format (combined stream + execution result) - if (result && typeof result === 'object' && 'stream' in result && 'execution' in result) { - logger.info('Received combined stream+execution result from executor') - - // Generate an executionId and store it in the execution metadata so that - // the chat component can persist the logs *after* the stream finishes. - const executionId = uuidv4() - - // Determine which block is streaming - typically the one that matches a selected output ID - let streamingBlockId = null - if (selectedOutputIds && selectedOutputIds.length > 0 && result.execution.logs) { - // Find the agent block in the logs that matches one of our selected outputs - const streamingBlock = result.execution.logs.find( - (log) => - log.blockType === 'agent' && - selectedOutputIds.some( - (id) => id === log.blockId || id.startsWith(`${log.blockId}_`) - ) - ) - if (streamingBlock) { - streamingBlockId = streamingBlock.blockId - logger.info(`Identified streaming block: ${streamingBlockId}`) - } - } + await Promise.all(streamReadingPromises) - // Attach streaming / source metadata and the newly generated executionId - result.execution.metadata = { - ...(result.execution.metadata || {}), - executionId, - source: isChatExecution ? 'chat' : 'manual', - streamingBlockId, // Add the block ID to the metadata - } as any - - // Clean up any response objects with zero tokens in agent blocks to avoid confusion in console - if (result.execution.logs && Array.isArray(result.execution.logs)) { - result.execution.logs.forEach((log: any) => { - if (log.blockType === 'agent' && log.output?.response) { - const response = log.output.response - - // Check for zero tokens that will be estimated later - if ( - response.tokens && - (!response.tokens.completion || response.tokens.completion === 0) && - (!response.toolCalls || - !response.toolCalls.list || - response.toolCalls.list.length === 0) - ) { - // Remove tokens from console display to avoid confusion - // They'll be properly estimated in the execution logger - response.tokens = undefined + if (result && 'success' in result) { + if (!result.metadata) { + result.metadata = { duration: 0, startTime: new Date().toISOString() } } + ;(result.metadata as any).source = 'chat' + result.logs?.forEach((log: BlockLog) => { + if (streamedContent.has(log.blockId)) { + const content = streamedContent.get(log.blockId) || '' + if (log.output?.response) { + log.output.response.content = content + } + useConsoleStore.getState().updateConsole(log.blockId, content) + } + }) + + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ event: 'final', data: result })}\n\n`) + ) + persistLogs(executionId, result).catch((err) => { + logger.error('Error persisting logs:', { error: err }) + }) } - }) - } - // Mark the execution as streaming so that downstream code can recognise it - ;(result.execution as any).isStreaming = true - - // Return both the stream and the execution object so the caller (chat panel) - // can collect the full content and then persist the logs in one go. - // Also include processingPromise if available to ensure token counts are final - return { - success: true, - stream: result.stream, - execution: result.execution, - processingPromise: (result as any).processingPromise, - } - } - - // Add metadata about source being chat if applicable - if (isChatExecution) { - // Use type assertion for adding custom metadata - ;(result as any).metadata = { - ...(result.metadata || {}), - source: 'chat', - } - } - - // If we're in debug mode, store the execution context for later steps - if (result.metadata?.isDebugSession && result.metadata.context) { - setDebugContext(result.metadata.context) + } catch (error: any) { + controller.error(error) + if (activeWorkflowId) { + addNotification( + 'error', + `Workflow execution failed: ${error.message}`, + activeWorkflowId + ) + } + } finally { + controller.close() + setIsExecuting(false) + setIsDebugging(false) + setActiveBlocks(new Set()) + } + }, + }) + return { success: true, stream } + } - // Make sure to update pending blocks + // For manual (non-chat) execution + const executionId = uuidv4() + try { + const result = await executeWorkflow(workflowInput) + if (result && 'metadata' in result && result.metadata?.isDebugSession) { + setDebugContext(result.metadata.context || null) if (result.metadata.pendingBlocks) { setPendingBlocks(result.metadata.pendingBlocks) } - } else { - // Normal execution completed - start with UI updates + } else if (result && 'success' in result) { setExecutionResult(result) - - // For better UI responsiveness, update state immediately if (!isDebugModeEnabled) { - // Reset execution states right away for UI to update setIsExecuting(false) setIsDebugging(false) setActiveBlocks(new Set()) } - // Show notification - addNotification( - result.success ? 'console' : 'error', - result.success - ? 'Workflow completed successfully' - : `Workflow execution failed: ${result.error}`, - activeWorkflowId - ) - - // In non-debug mode, persist logs (no need to wait for this) - // We explicitly don't await this to avoid blocking UI updates + if (isChatExecution) { + if (!result.metadata) { + result.metadata = { duration: 0, startTime: new Date().toISOString() } + } + ;(result.metadata as any).source = 'chat' + } + + if (activeWorkflowId) { + addNotification( + result.success ? 'console' : 'error', + result.success + ? 'Workflow completed successfully' + : `Workflow execution failed: ${result.error || 'Unknown error'}`, + activeWorkflowId + ) + } persistLogs(executionId, result).catch((err) => { logger.error('Error persisting logs:', { error: err }) }) } - return result } catch (error: any) { - logger.error('Workflow Execution Error:', error) - - // Properly extract error message ensuring it's never undefined - let errorMessage = 'Unknown error' - - if (error instanceof Error) { - errorMessage = error.message || `Error: ${String(error)}` - } else if (typeof error === 'string') { - errorMessage = error - } else if (error && typeof error === 'object') { - // Fix the "undefined (undefined)" pattern specifically - if ( - error.message === 'undefined (undefined)' || - (error.error && - typeof error.error === 'object' && - error.error.message === 'undefined (undefined)') - ) { - errorMessage = 'API request failed - no specific error details available' - } - // Try to extract error details from potential API or execution errors - else if (error.message) { - errorMessage = error.message - } else if (error.error && typeof error.error === 'string') { - errorMessage = error.error - } else if (error.error && typeof error.error === 'object' && error.error.message) { - errorMessage = error.error.message - } else { - // Last resort: stringify the whole object - try { - errorMessage = `Error details: ${JSON.stringify(error)}` - } catch { - errorMessage = 'Error occurred but details could not be displayed' - } - } - } - - // Ensure errorMessage is never "undefined (undefined)" - if (errorMessage === 'undefined (undefined)') { - errorMessage = 'API request failed - no specific error details available' - } - - // Set error result and show notification immediately - const errorResult = { - success: false, - output: { response: {} }, - error: errorMessage, - logs: [], - } - - // Update UI state immediately for better responsiveness - setExecutionResult(errorResult) - setIsExecuting(false) - setIsDebugging(false) - setActiveBlocks(new Set()) - - // Create a more user-friendly notification message - let notificationMessage = 'Workflow execution failed' - - // Add URL for HTTP errors - if (error?.request?.url) { - // Don't show empty URL errors - if (error.request.url && error.request.url.trim() !== '') { - notificationMessage += `: Request to ${error.request.url} failed` - - // Add status if available - if (error.status) { - notificationMessage += ` (Status: ${error.status})` - } - } - } else { - // Regular errors - notificationMessage += `: ${errorMessage}` - } - - // Safely show error notification - try { - addNotification('error', notificationMessage, activeWorkflowId) - } catch (notificationError) { - logger.error('Error showing error notification:', notificationError) - // Fallback console error - console.error('Workflow execution failed:', errorMessage) - } - - // Also send the error result to the API (don't await to keep UI responsive) + const errorResult = handleExecutionError(error) persistLogs(executionId, errorResult).catch((err) => { logger.error('Error persisting logs:', { error: err }) }) - return errorResult } }, @@ -444,16 +301,170 @@ export function useWorkflowExecution() { toggleConsole, togglePanel, setActiveTab, + activeTab, getAllVariables, getVariablesByWorkflowId, + isDebugModeEnabled, setIsExecuting, setIsDebugging, - isDebugModeEnabled, - isDebugging, + setDebugContext, + setExecutor, + setPendingBlocks, setActiveBlocks, ] ) + const executeWorkflow = async ( + workflowInput?: any, + onStream?: (se: StreamingExecution) => Promise + ): Promise => { + // Use the mergeSubblockState utility to get all block states + const mergedStates = mergeSubblockState(blocks) + const currentBlockStates = Object.entries(mergedStates).reduce( + (acc, [id, block]) => { + acc[id] = Object.entries(block.subBlocks).reduce( + (subAcc, [key, subBlock]) => { + subAcc[key] = subBlock.value + return subAcc + }, + {} as Record + ) + return acc + }, + {} as Record> + ) + + // Get environment variables + const envVars = getAllVariables() + const envVarValues = Object.entries(envVars).reduce( + (acc, [key, variable]) => { + acc[key] = variable.value + return acc + }, + {} as Record + ) + + // Get workflow variables + const workflowVars = activeWorkflowId ? getVariablesByWorkflowId(activeWorkflowId) : [] + const workflowVariables = workflowVars.reduce( + (acc, variable) => { + acc[variable.id] = variable + return acc + }, + {} as Record + ) + + // Create serialized workflow + const workflow = new Serializer().serializeWorkflow(mergedStates, edges, loops, parallels) + + // Determine if this is a chat execution + const isChatExecution = + activeTab === 'chat' && + workflowInput && + typeof workflowInput === 'object' && + 'input' in workflowInput + + // If this is a chat execution, get the selected outputs + let selectedOutputIds: string[] | undefined + if (isChatExecution && activeWorkflowId) { + // Get selected outputs from chat store + const chatStore = await import('@/stores/panel/chat/store').then((mod) => mod.useChatStore) + selectedOutputIds = chatStore.getState().getSelectedWorkflowOutput(activeWorkflowId) + } + + // Create executor options + const executorOptions: ExecutorOptions = { + workflow, + currentBlockStates, + envVarValues, + workflowInput, + workflowVariables, + contextExtensions: { + stream: isChatExecution, + selectedOutputIds, + edges: workflow.connections.map((conn) => ({ + source: conn.source, + target: conn.target, + })), + onStream, + }, + } + + // Create executor and store in global state + const newExecutor = new Executor(executorOptions) + setExecutor(newExecutor) + + // Execute workflow + return newExecutor.execute(activeWorkflowId || '') + } + + const handleExecutionError = (error: any) => { + let errorMessage = 'Unknown error' + if (error instanceof Error) { + errorMessage = error.message || `Error: ${String(error)}` + } else if (typeof error === 'string') { + errorMessage = error + } else if (error && typeof error === 'object') { + if ( + error.message === 'undefined (undefined)' || + (error.error && + typeof error.error === 'object' && + error.error.message === 'undefined (undefined)') + ) { + errorMessage = 'API request failed - no specific error details available' + } else if (error.message) { + errorMessage = error.message + } else if (error.error && typeof error.error === 'string') { + errorMessage = error.error + } else if (error.error && typeof error.error === 'object' && error.error.message) { + errorMessage = error.error.message + } else { + try { + errorMessage = `Error details: ${JSON.stringify(error)}` + } catch { + errorMessage = 'Error occurred but details could not be displayed' + } + } + } + + if (errorMessage === 'undefined (undefined)') { + errorMessage = 'API request failed - no specific error details available' + } + + const errorResult: ExecutionResult = { + success: false, + output: { response: {} }, + error: errorMessage, + logs: [], + } + + setExecutionResult(errorResult) + setIsExecuting(false) + setIsDebugging(false) + setActiveBlocks(new Set()) + + let notificationMessage = 'Workflow execution failed' + if (error?.request?.url) { + if (error.request.url && error.request.url.trim() !== '') { + notificationMessage += `: Request to ${error.request.url} failed` + if (error.status) { + notificationMessage += ` (Status: ${error.status})` + } + } + } else { + notificationMessage += `: ${errorMessage}` + } + + try { + addNotification('error', notificationMessage, activeWorkflowId || '') + } catch (notificationError) { + logger.error('Error showing error notification:', notificationError) + console.error('Workflow execution failed:', errorMessage) + } + + return errorResult + } + /** * Handles stepping through workflow execution in debug mode */ diff --git a/apps/sim/app/w/logs/components/sidebar/components/markdown-renderer.tsx b/apps/sim/app/w/logs/components/sidebar/components/markdown-renderer.tsx index 4a3a8cd54ef..9368834f794 100644 --- a/apps/sim/app/w/logs/components/sidebar/components/markdown-renderer.tsx +++ b/apps/sim/app/w/logs/components/sidebar/components/markdown-renderer.tsx @@ -1,80 +1,201 @@ +import React, { type HTMLAttributes, type ReactNode } from 'react' import ReactMarkdown from 'react-markdown' +import remarkGfm from 'remark-gfm' +import { Tooltip, TooltipContent, TooltipTrigger } from '@/components/ui/tooltip' + +function LinkWithPreview({ href, children }: { href: string; children: React.ReactNode }) { + return ( + + + + {children} + + + + {href} + + + ) +} export default function LogMarkdownRenderer({ content }: { content: string }) { - // Process text to clean up unnecessary whitespace and formatting issues - const processedContent = content - .replace(/\n{2,}/g, '\n\n') // Replace multiple newlines with exactly double newlines - .replace(/^(#{1,6})\s+(.+?)\n{2,}/gm, '$1 $2\n') // Reduce space after headings to single newline - .replace(/^(#{1,6}.+)\n\n(-|\*)/gm, '$1\n$2') // Remove double newline between heading and list - .trim() + // Minimal content processing - just trim whitespace + const processedContent = content.trim() const customComponents = { - // Default component to ensure monospace font with minimal spacing + // Paragraph with appropriate spacing for logs p: ({ children }: React.HTMLAttributes) => ( -

{children}

+

+ {children} +

+ ), + + // Headings with subtle styling + h1: ({ children }: React.HTMLAttributes) => ( +

+ {children} +

+ ), + h2: ({ children }: React.HTMLAttributes) => ( +

+ {children} +

+ ), + h3: ({ children }: React.HTMLAttributes) => ( +

+ {children} +

+ ), + h4: ({ children }: React.HTMLAttributes) => ( +

+ {children} +

+ ), + + // Lists with proper spacing + ul: ({ children }: React.HTMLAttributes) => ( +
    + {children} +
+ ), + ol: ({ children }: React.HTMLAttributes) => ( +
    + {children} +
+ ), + li: ({ children }: React.LiHTMLAttributes) => ( +
  • + {children} +
  • ), - // Inline code - no background to maintain clean appearance + // Code blocks with subtle background + pre: ({ children }: HTMLAttributes) => { + let codeProps: HTMLAttributes = {} + let codeContent: ReactNode = children + + if ( + React.isValidElement<{ className?: string; children?: ReactNode }>(children) && + children.type === 'code' + ) { + const childElement = children as React.ReactElement<{ + className?: string + children?: ReactNode + }> + codeProps = { className: childElement.props.className } + codeContent = childElement.props.children + } + + return ( +
    +
    + + {codeProps.className?.replace('language-', '') || 'code'} + +
    +
    +            {codeContent}
    +          
    +
    + ) + }, + + // Inline code with subtle background code: ({ inline, className, children, ...props }: React.HTMLAttributes & { className?: string; inline?: boolean }) => { + if (inline) { + return ( + + {children} + + ) + } return ( - + {children} ) }, - // Links - maintain monospace while adding subtle link styling - a: ({ href, children, ...props }: React.AnchorHTMLAttributes) => ( - + // Blockquotes + blockquote: ({ children }: React.HTMLAttributes) => ( +
    {children} - +
    ), - // Tighter lists with minimal spacing - ul: ({ children }: React.HTMLAttributes) => ( -
      {children}
    + // Horizontal rule + hr: () =>
    , + + // Links with hover effect and preview + a: ({ href, children, ...props }: React.AnchorHTMLAttributes) => ( + + {children} + ), - ol: ({ children }: React.HTMLAttributes) => ( -
      {children}
    + + // Tables + table: ({ children }: React.TableHTMLAttributes) => ( +
    + + {children} +
    +
    ), - li: ({ children }: React.HTMLAttributes) => ( -
  • {children}
  • + thead: ({ children }: React.HTMLAttributes) => ( + {children} ), - - // Keep blockquotes minimal - blockquote: ({ children }: React.HTMLAttributes) => ( -
    {children}
    + tbody: ({ children }: React.HTMLAttributes) => ( + + {children} + ), - - // Make headings compact with minimal spacing after - h1: ({ children }: React.HTMLAttributes) => ( -

    {children}

    + tr: ({ children }: React.HTMLAttributes) => ( + {children} ), - h2: ({ children }: React.HTMLAttributes) => ( -

    {children}

    + th: ({ children }: React.ThHTMLAttributes) => ( + + {children} + ), - h3: ({ children }: React.HTMLAttributes) => ( -

    {children}

    + td: ({ children }: React.TdHTMLAttributes) => ( + + {children} + ), - h4: ({ children }: React.HTMLAttributes) => ( -

    {children}

    + + // Images + img: ({ src, alt, ...props }: React.ImgHTMLAttributes) => ( + {alt ), } return ( -
    - {processedContent} +
    + + {processedContent} +
    ) } diff --git a/apps/sim/executor/handlers/loop/loop-handler.test.ts b/apps/sim/executor/handlers/loop/loop-handler.test.ts index d303b0894f4..7e1ef005e78 100644 --- a/apps/sim/executor/handlers/loop/loop-handler.test.ts +++ b/apps/sim/executor/handlers/loop/loop-handler.test.ts @@ -138,12 +138,12 @@ describe('LoopBlockHandler', () => { forEachItems: { key1: 'value1', key2: 'value2' }, } - const result = await handler.execute(mockBlock, {}, mockContext) + await handler.execute(mockBlock, {}, mockContext) const currentItem = mockContext.loopItems.get('loop-1') expect(Array.isArray(currentItem)).toBe(true) - expect(currentItem[0]).toBe('key1') - expect(currentItem[1]).toBe('value1') + expect((currentItem as any)[0]).toBe('key1') + expect((currentItem as any)[1]).toBe('value1') }) it('should limit forEach loops by collection size, not iterations parameter', async () => { diff --git a/apps/sim/executor/index.test.ts b/apps/sim/executor/index.test.ts index c71f4f04736..bfd53d45a40 100644 --- a/apps/sim/executor/index.test.ts +++ b/apps/sim/executor/index.test.ts @@ -8,19 +8,15 @@ * resolving inputs and dependencies, and managing errors. */ import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest' -import type { SerializedWorkflow } from '@/serializer/types' import { - createLoopManagerMock, createMinimalWorkflow, createMockContext, - createMockHandler, createWorkflowWithCondition, createWorkflowWithErrorPath, createWorkflowWithLoop, setupAllMocks, } from './__test-utils__/executor-mocks' import { Executor } from './index' -import type { BlockLog } from './types' vi.mock('@/stores/execution/store', () => ({ useExecutionStore: { @@ -59,7 +55,7 @@ describe('Executor', () => { * Initialization tests */ describe('initialization', () => { - test('should create an executor instance successfully', () => { + test('should create an executor instance with legacy constructor format', () => { const workflow = createMinimalWorkflow() const executor = new Executor(workflow) @@ -67,29 +63,67 @@ describe('Executor', () => { expect(executor).toBeInstanceOf(Executor) }) - test('should accept initial block states', () => { + test('should create an executor instance with new options object format', () => { const workflow = createMinimalWorkflow() const initialStates = { block1: { response: { result: 'Initial state' } }, } + const envVars = { API_KEY: 'test-key', BASE_URL: 'https://example.com' } + const workflowInput = { query: 'test query' } + const workflowVariables = { var1: 'value1' } + + const executor = new Executor({ + workflow, + currentBlockStates: initialStates, + envVarValues: envVars, + workflowInput, + workflowVariables, + }) - const executor = new Executor(workflow, initialStates) expect(executor).toBeDefined() + expect(executor).toBeInstanceOf(Executor) + + // Verify that all properties are properly initialized + expect((executor as any).actualWorkflow).toBe(workflow) + expect((executor as any).initialBlockStates).toEqual(initialStates) + expect((executor as any).environmentVariables).toEqual(envVars) + expect((executor as any).workflowInput).toEqual(workflowInput) + expect((executor as any).workflowVariables).toEqual(workflowVariables) }) - test('should accept environment variables', () => { + test('should accept streaming context extensions', () => { const workflow = createMinimalWorkflow() - const envVars = { API_KEY: 'test-key', BASE_URL: 'https://example.com' } + const mockOnStream = vi.fn() + + const executor = new Executor({ + workflow, + contextExtensions: { + stream: true, + selectedOutputIds: ['block1'], + edges: [{ source: 'starter', target: 'block1' }], + onStream: mockOnStream, + }, + }) - const executor = new Executor(workflow, {}, envVars) expect(executor).toBeDefined() }) - test('should accept workflow input', () => { + test('should handle legacy constructor with individual parameters', () => { const workflow = createMinimalWorkflow() - const input = { query: 'test query' } + const initialStates = { + block1: { response: { result: 'Initial state' } }, + } + const envVars = { API_KEY: 'test-key' } + const workflowInput = { query: 'test query' } + const workflowVariables = { var1: 'value1' } - const executor = new Executor(workflow, {}, {}, input) + const executor = new Executor( + workflow, + initialStates, + envVars, + workflowInput, + workflowVariables + ) expect(executor).toBeDefined() }) }) @@ -181,13 +215,12 @@ describe('Executor', () => { * Execution tests */ describe('workflow execution', () => { - test('should execute workflow with correct structure', async () => { + test('should execute workflow and return ExecutionResult', async () => { const workflow = createMinimalWorkflow() const executor = new Executor(workflow) const result = await executor.execute('test-workflow-id') - // Verify the result has the expected structure // Check if result is a StreamingExecution or ExecutionResult if ('success' in result) { expect(result).toHaveProperty('success') @@ -200,9 +233,60 @@ describe('Executor', () => { } else { // Handle StreamingExecution case expect(result).toHaveProperty('stream') - expect(typeof result.stream).toBe('object') + expect(result).toHaveProperty('execution') + expect(result.stream).toBeInstanceOf(ReadableStream) + } + }) + + test('should handle streaming execution with onStream callback', async () => { + const workflow = createMinimalWorkflow() + const mockOnStream = vi.fn() + + const executor = new Executor({ + workflow, + contextExtensions: { + stream: true, + selectedOutputIds: ['block1'], + onStream: mockOnStream, + }, + }) + + const result = await executor.execute('test-workflow-id') + + // With streaming enabled, should handle both ExecutionResult and StreamingExecution + if ('stream' in result) { + expect(result.stream).toBeInstanceOf(ReadableStream) + expect(result.execution).toBeDefined() + } else { + expect(result).toHaveProperty('success') + expect(result).toHaveProperty('output') } }) + + test('should pass context extensions to execution context', async () => { + const workflow = createMinimalWorkflow() + const mockOnStream = vi.fn() + const selectedOutputIds = ['block1', 'block2'] + const edges = [{ source: 'starter', target: 'block1' }] + + const executor = new Executor({ + workflow, + contextExtensions: { + stream: true, + selectedOutputIds, + edges, + onStream: mockOnStream, + }, + }) + + // Spy on createExecutionContext to verify context extensions are passed + const createContextSpy = vi.spyOn(executor as any, 'createExecutionContext') + + await executor.execute('test-workflow-id') + + expect(createContextSpy).toHaveBeenCalled() + const contextArg = createContextSpy.mock.calls[0][2] // third argument is startTime, context is created internally + }) }) /** @@ -215,9 +299,14 @@ describe('Executor', () => { const result = await executor.execute('test-workflow-id') - // Just verify execution completes and returns expected structure - expect(result).toHaveProperty('success') - expect(result).toHaveProperty('output') + // Verify execution completes and returns expected structure + if ('success' in result) { + expect(result).toHaveProperty('success') + expect(result).toHaveProperty('output') + } else { + expect(result).toHaveProperty('stream') + expect(result).toHaveProperty('execution') + } }) test('should handle loop structures without errors', async () => { @@ -226,9 +315,14 @@ describe('Executor', () => { const result = await executor.execute('test-workflow-id') - // Just verify execution completes and returns expected structure - expect(result).toHaveProperty('success') - expect(result).toHaveProperty('output') + // Verify execution completes and returns expected structure + if ('success' in result) { + expect(result).toHaveProperty('success') + expect(result).toHaveProperty('output') + } else { + expect(result).toHaveProperty('stream') + expect(result).toHaveProperty('execution') + } }) }) @@ -271,6 +365,25 @@ describe('Executor', () => { expect(isDebugging).toBe(false) }) + + test('should handle continue execution in debug mode', async () => { + const workflow = createMinimalWorkflow() + const executor = new Executor(workflow) + + // Create a mock context for debug continuation + const mockContext = createMockContext() + mockContext.blockStates.set('starter', { + output: { response: { input: {} } }, + executed: true, + executionTime: 0, + }) + + const result = await executor.continueExecution(['block1'], mockContext) + + expect(result).toHaveProperty('success') + expect(result).toHaveProperty('output') + expect(result).toHaveProperty('logs') + }) }) /** @@ -479,657 +592,80 @@ describe('Executor', () => { expect(errorOutput.response).toHaveProperty('status') }) - test('should check for error handle in getNextExecutionLayer', () => { - const workflow = createWorkflowWithErrorPath() + test('should handle "undefined (undefined)" error case', () => { + const workflow = createMinimalWorkflow() const executor = new Executor(workflow) - // Create a test context - const context = { - workflowId: 'test-id', - blockStates: new Map(), - blockLogs: [], - metadata: { startTime: new Date().toISOString() }, - environmentVariables: {}, - decisions: { router: new Map(), condition: new Map() }, - loopIterations: new Map(), - executedBlocks: new Set(['starter', 'block1']), - activeExecutionPath: new Set(['block1', 'error-handler']), - workflow, - } as any - - // Add block state with error - context.blockStates.set('block1', { - output: { - error: 'Test error', - response: { error: 'Test error' }, - }, - executed: true, - }) - - // Call getNextExecutionLayer method - const getNextLayer = (executor as any).getNextExecutionLayer.bind(executor) - const nextLayer = getNextLayer(context) + const extractErrorMessage = (executor as any).extractErrorMessage.bind(executor) - // Error handler should be in the next layer - expect(nextLayer).toContain('error-handler') + // Test the specific "undefined (undefined)" error case + const undefinedError = { message: 'undefined (undefined)' } + const errorMessage = extractErrorMessage(undefinedError) - // Success block should not be in the next layer - expect(nextLayer).not.toContain('success-block') + expect(errorMessage).toBe('undefined (undefined)') }) }) /** - * Loop management tests + * Streaming execution tests */ - describe('loop management', () => { - beforeEach(() => { - vi.resetModules() - vi.clearAllMocks() - }) - - test('should increment loop iterations correctly', async () => { - // Mock the LoopManager with custom implementation - vi.doMock('./loops', () => - createLoopManagerMock({ - processLoopIterationsImpl: async (context) => { - // Simulate incrementing iteration counter - const currentIteration = context.loopIterations.get('loop1') || 0 - context.loopIterations.set('loop1', currentIteration + 1) - return false - }, - }) - ) - - const workflow = createWorkflowWithLoop() - const { LoopManager } = await import('./loops') - const loopManager = new LoopManager(workflow.loops) - - // Create a mock context using the helper - const context = createMockContext({ - workflow, - loopIterations: new Map([['loop1', 0]]), - executedBlocks: new Set(['block1', 'block2']), - activeExecutionPath: new Set(['block1', 'block2']), - }) - - // Process loop iterations to increment counter - await loopManager.processLoopIterations(context) - - // Verify that the loop iteration counter was incremented - expect(context.loopIterations.get('loop1')).toBe(1) - - // Get loop index - const loopIndex = loopManager.getLoopIndex('loop1', 'block1', context) - - // The loop index should match the iteration counter - expect(loopIndex).toBe(1) - }) - - test('should handle forEach loop item access correctly', async () => { - // Mock the InputResolver - vi.doMock('./resolver', () => ({ - InputResolver: vi.fn().mockImplementation(() => ({ - resolveBlockReferences: vi.fn().mockImplementation((value, context, block) => { - if (value === '') { - const loopId = 'loop1' - return String(context.loopIterations.get(loopId) || 0) - } - return value - }), - })), - })) - - // Mock the LoopManager - vi.doMock('./loops', () => createLoopManagerMock()) - - const workflow = createWorkflowWithLoop() - const { Executor } = await import('./index') - const executor = new Executor(workflow) - - const { InputResolver } = await import('./resolver') - const resolver = new InputResolver(workflow, {}, {}, (executor as any).loopManager) - - // Create a mock context with specific loop state - const context = createMockContext({ - workflow, - loopIterations: new Map([['loop1', 2]]), // Iteration 2 (3rd item) - loopItems: new Map([['loop1', 3]]), // Current item is 3 - executedBlocks: new Set(['block1']), - activeExecutionPath: new Set(['block1', 'block2']), - }) - - // Resolve a loop index reference - const resolvedIndex = resolver.resolveBlockReferences( - '', - context, - workflow.blocks[1] - ) - - // The resolved index should be 2 (current iteration) - expect(resolvedIndex).toBe('2') - - // Set up a different iteration and test again - context.loopIterations.set('loop1', 4) - const resolvedIndexAgain = resolver.resolveBlockReferences( - '', - context, - workflow.blocks[1] - ) - expect(resolvedIndexAgain).toBe('4') - }) - - test('should update loop indices correctly between iterations', async () => { - // Reset modules to ensure clean state - vi.resetModules() - - // Create array to capture indices - const capturedIndices: number[] = [] - - // Mock the LoopManager implementation - vi.doMock('./loops', () => - createLoopManagerMock({ - processLoopIterationsImpl: async (context) => { - // Simulate 3 loop iterations - if (context.executedBlocks.has('block1') && context.executedBlocks.has('block2')) { - const currentIteration = context.loopIterations.get('loop1') || 0 - if (currentIteration < 2) { - // Increment iteration and reset blocks - context.loopIterations.set('loop1', currentIteration + 1) - context.executedBlocks.delete('block1') - context.executedBlocks.delete('block2') - return false - } - } - return true - }, - }) - ) - - // Mock the handlers to capture loop indices - vi.doMock('./handlers', () => ({ - AgentBlockHandler: createMockHandler('agent'), - RouterBlockHandler: createMockHandler('router'), - ConditionBlockHandler: createMockHandler('condition'), - EvaluatorBlockHandler: createMockHandler('evaluator'), - FunctionBlockHandler: vi.fn().mockImplementation(() => ({ - canHandle: (block: any) => - block.metadata?.id === 'function' || block.id === 'block1' || block.id === 'block2', - execute: vi.fn().mockImplementation(async (block, inputs, context) => { - // Capture the loop index during execution - const loopIndex = context.loopIterations.get('loop1') || 0 - capturedIndices.push(loopIndex) - return { response: { result: `Index: ${loopIndex}` } } - }), - })), - ApiBlockHandler: createMockHandler('api'), - LoopBlockHandler: createMockHandler('loop'), - ParallelBlockHandler: createMockHandler('parallel'), - WorkflowBlockHandler: createMockHandler('workflow'), - GenericBlockHandler: createMockHandler('generic', { canHandleCondition: () => true }), - })) - - // Mock PathTracker - vi.doMock('./path', () => ({ - PathTracker: vi.fn().mockImplementation(() => ({ - updateExecutionPaths: vi.fn(), - isInActivePath: vi.fn().mockReturnValue(true), - })), - })) - - // Create a workflow with loop - const workflow = createWorkflowWithLoop() - - // Import the executor with mocks applied - const { Executor } = await import('./index') - const _executor = new Executor(workflow) - - // Manually simulate execution to populate capturedIndices - // First iteration - both blocks with index 0 - capturedIndices.push(0, 0) - // Second iteration - both blocks with index 1 - capturedIndices.push(1, 1) - // Third iteration - both blocks with index 2 - capturedIndices.push(2, 2) - - // We should have captured indices 0, 0 (first iteration - both blocks) - // then 1, 1 (second iteration - both blocks) - // then 2, 2 (third iteration - both blocks) - expect(capturedIndices).toEqual([0, 0, 1, 1, 2, 2]) - }) - - test('should handle nested loops correctly', async () => { - // Reset modules to ensure clean state - vi.resetModules() - - // Create array to capture indices - const capturedIndices: { loopId: string; blockId: string; index: number }[] = [] - - // Mock the LoopManager - vi.doMock('./loops', () => createLoopManagerMock()) - - // Mock the handlers to capture loop indices - vi.doMock('./handlers', () => ({ - AgentBlockHandler: createMockHandler('agent'), - RouterBlockHandler: createMockHandler('router'), - ConditionBlockHandler: createMockHandler('condition'), - EvaluatorBlockHandler: createMockHandler('evaluator'), - FunctionBlockHandler: vi.fn().mockImplementation(() => ({ - canHandle: (block: any) => block.id.includes('block'), - execute: vi.fn().mockImplementation(async (block, inputs, context) => { - return { response: { result: 'Executed' } } - }), - })), - ApiBlockHandler: createMockHandler('api'), - LoopBlockHandler: createMockHandler('loop'), - ParallelBlockHandler: createMockHandler('parallel'), - WorkflowBlockHandler: createMockHandler('workflow'), - GenericBlockHandler: createMockHandler('generic', { canHandleCondition: () => true }), - })) - - // Manually populate the capturedIndices array for testing - capturedIndices.push( - { loopId: 'innerLoop', blockId: 'inner-block1', index: 0 }, - { loopId: 'innerLoop', blockId: 'inner-block2', index: 0 }, - { loopId: 'outerLoop', blockId: 'outer-block1', index: 0 }, - { loopId: 'innerLoop', blockId: 'inner-block1', index: 1 }, - { loopId: 'innerLoop', blockId: 'inner-block2', index: 1 }, - { loopId: 'outerLoop', blockId: 'outer-block2', index: 0 }, - { loopId: 'outerLoop', blockId: 'outer-block1', index: 1 }, - { loopId: 'outerLoop', blockId: 'outer-block2', index: 1 } - ) - - // Verify that nested loops maintain independent counters - expect(capturedIndices.length).toBeGreaterThan(0) - - // Group captures by loopId - const innerLoopIndices = capturedIndices - .filter((c) => c.loopId === 'innerLoop') - .map((c) => c.index) - - const outerLoopIndices = capturedIndices - .filter((c) => c.loopId === 'outerLoop') - .map((c) => c.index) - - // Verify inner loop indices - should increment on each iteration - expect(innerLoopIndices).toContain(0) - expect(innerLoopIndices).toContain(1) - - // Verify outer loop indices - expect(outerLoopIndices).toContain(0) - expect(outerLoopIndices).toContain(1) - }) - - test('should fix the bug where first two iterations showed same index', async () => { - // Reset modules to ensure clean state - vi.resetModules() - - // Mock the LoopManager with bug fix implementation - vi.doMock('./loops', () => - createLoopManagerMock({ - processLoopIterationsImpl: async (context) => { - // Increment iteration when both blocks executed - if (context.executedBlocks.has('block1') && context.executedBlocks.has('block2')) { - const currentIteration = context.loopIterations.get('loop1') || 0 - context.loopIterations.set('loop1', currentIteration + 1) - context.executedBlocks.delete('block1') - context.executedBlocks.delete('block2') - } - return false + describe('streaming execution', () => { + test('should handle streaming execution results', async () => { + const workflow = createMinimalWorkflow() + const mockOnStream = vi.fn() + + // Mock a streaming execution result + const mockStreamingResult = { + stream: new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('chunk1')) + controller.enqueue(new TextEncoder().encode('chunk2')) + controller.close() }, - }) - ) - - // Import with mocks applied - const { LoopManager } = await import('./loops') - - // Create a workflow with a simple loop - const workflow = createWorkflowWithLoop() - const loopManager = new LoopManager(workflow.loops) + }), + execution: { + blockId: 'agent-1', + output: { response: { content: 'Final content' } }, + }, + } - // Create a mock context - const context = createMockContext({ + const executor = new Executor({ workflow, - loopIterations: new Map([['loop1', 0]]), - activeExecutionPath: new Set(['block1', 'block2']), - }) - - // First iteration - this should give index 0 for both blocks - const firstIterationIndex1 = loopManager.getLoopIndex('loop1', 'block1', context) - const firstIterationIndex2 = loopManager.getLoopIndex('loop1', 'block2', context) - - expect(firstIterationIndex1).toBe(0) - expect(firstIterationIndex2).toBe(0) - - // Execute first iteration of both blocks - context.executedBlocks.add('block1') - context.executedBlocks.add('block2') - - // Process loop iterations - this should increment the counter to 1 - await loopManager.processLoopIterations(context) - - // Verify counter has been incremented BEFORE resetting blocks - expect(context.loopIterations.get('loop1')).toBe(1) - - // Verify blocks have been reset - expect(context.executedBlocks.has('block1')).toBe(false) - expect(context.executedBlocks.has('block2')).toBe(false) - - // Now in second iteration - indices should be 1, not 0 - const secondIterationIndex1 = loopManager.getLoopIndex('loop1', 'block1', context) - const secondIterationIndex2 = loopManager.getLoopIndex('loop1', 'block2', context) - - // This is the critical test - indices should be 1 for the second iteration - expect(secondIterationIndex1).toBe(1) - expect(secondIterationIndex2).toBe(1) - - // Execute second iteration of both blocks - context.executedBlocks.add('block1') - context.executedBlocks.add('block2') - - // Process loop iterations again - should increment to 2 - await loopManager.processLoopIterations(context) - - // Verify counter has been incremented again - expect(context.loopIterations.get('loop1')).toBe(2) - - // Third iteration indices should be 2 - const thirdIterationIndex1 = loopManager.getLoopIndex('loop1', 'block1', context) - const thirdIterationIndex2 = loopManager.getLoopIndex('loop1', 'block2', context) - - expect(thirdIterationIndex1).toBe(2) - expect(thirdIterationIndex2).toBe(2) - }) - }) - - describe('parallel management', () => { - beforeEach(() => { - // Reset modules before each test to ensure clean mocks - vi.resetModules() - vi.clearAllMocks() - }) - - it('should execute blocks inside parallel with correct iteration items', async () => { - // Setup basic store mocks - setupAllMocks() - - // Import real implementations - const { Executor } = await import('./index') - - // Create a simple workflow with parallel - const workflow: SerializedWorkflow = { - version: '2.0', - blocks: [ - { - id: 'starter', - position: { x: 0, y: 0 }, - metadata: { id: 'starter', name: 'Start' }, - config: { tool: 'starter', params: {} }, - inputs: {}, - outputs: {}, - enabled: true, - }, - { - id: 'parallel-1', - position: { x: 100, y: 0 }, - metadata: { id: 'parallel', name: 'Test Parallel' }, - config: { tool: 'parallel', params: {} }, - inputs: {}, - outputs: {}, - enabled: true, - }, - { - id: 'function-1', - position: { x: 200, y: 0 }, - metadata: { id: 'function', name: 'Process Item' }, - config: { - tool: 'function', - params: { - code: 'return { item: "test", index: 0 }', - }, - }, - inputs: {}, - outputs: {}, - enabled: true, - }, - { - id: 'endpoint', - position: { x: 300, y: 0 }, - metadata: { id: 'generic', name: 'End' }, - config: { tool: 'generic', params: {} }, - inputs: {}, - outputs: {}, - enabled: true, - }, - ], - connections: [ - { source: 'starter', target: 'parallel-1' }, - { source: 'parallel-1', target: 'function-1', sourceHandle: 'parallel-start-source' }, - { source: 'parallel-1', target: 'endpoint', sourceHandle: 'parallel-end-source' }, - ], - loops: {}, - parallels: { - 'parallel-1': { - id: 'parallel-1', - nodes: ['function-1'], - distribution: ['apple', 'banana', 'cherry'], - }, + contextExtensions: { + stream: true, + selectedOutputIds: ['block1'], + onStream: mockOnStream, }, - } + }) - const executor = new Executor(workflow) const result = await executor.execute('test-workflow-id') - // Type guard to ensure we have ExecutionResult, not StreamingExecution + // Verify result structure if ('stream' in result) { - throw new Error('Expected ExecutionResult but got StreamingExecution') + expect(result.stream).toBeInstanceOf(ReadableStream) + expect(result.execution).toBeDefined() } - - // The test should succeed even if we can't fully mock the parallel execution - // What we're really testing is that the executor can handle parallel blocks - expect(result.success).toBe(true) - expect(result.logs).toBeDefined() - - // Check that at least the parallel block was executed - const parallelLog = result.logs?.find((log: BlockLog) => log.blockType === 'parallel') - expect(parallelLog).toBeDefined() - // Since we're using mocked handlers, we just check that the parallel block was executed - expect(parallelLog?.success).toBe(true) }) - it('should add both virtual and actual block IDs to activeBlockIds for parallel execution glow effect', async () => { - // Setup basic store mocks - setupAllMocks() - - // Track calls to useExecutionStore.setState to verify activeBlockIds behavior - const setStateCalls: any[] = [] - const mockSetState = vi.fn((updater) => { - if (typeof updater === 'function') { - const currentState = { activeBlockIds: new Set() } - const newState = updater(currentState) - setStateCalls.push(newState) - } else { - setStateCalls.push(updater) - } - }) - - // Mock useExecutionStore to capture setState calls - vi.doMock('@/stores/execution/store', () => ({ - useExecutionStore: { - getState: vi.fn(() => ({ - setIsExecuting: vi.fn(), - setIsDebugging: vi.fn(), - setPendingBlocks: vi.fn(), - reset: vi.fn(), - setActiveBlocks: vi.fn(), - })), - setState: mockSetState, - }, - })) - - // Import real implementations with mocked store - const { Executor } = await import('./index') + test('should process streaming content in context', async () => { + const workflow = createMinimalWorkflow() + const mockOnStream = vi.fn() - // Create a simple workflow with parallel - const workflow: SerializedWorkflow = { - version: '2.0', - blocks: [ - { - id: 'starter', - position: { x: 0, y: 0 }, - metadata: { id: 'starter', name: 'Start' }, - config: { tool: 'starter', params: {} }, - inputs: {}, - outputs: {}, - enabled: true, - }, - { - id: 'parallel-1', - position: { x: 100, y: 0 }, - metadata: { id: 'parallel', name: 'Test Parallel' }, - config: { tool: 'parallel', params: {} }, - inputs: {}, - outputs: {}, - enabled: true, - }, - { - id: 'function-1', - position: { x: 200, y: 0 }, - metadata: { id: 'function', name: 'Process Item' }, - config: { - tool: 'function', - params: { - code: 'return { item: "test", index: 0 }', - }, - }, - inputs: {}, - outputs: {}, - enabled: true, - }, - ], - connections: [ - { source: 'starter', target: 'parallel-1' }, - { source: 'parallel-1', target: 'function-1', sourceHandle: 'parallel-start-source' }, - ], - loops: {}, - parallels: { - 'parallel-1': { - id: 'parallel-1', - nodes: ['function-1'], - distribution: ['apple', 'banana', 'cherry'], - }, + const executor = new Executor({ + workflow, + contextExtensions: { + stream: true, + selectedOutputIds: ['block1'], + onStream: mockOnStream, }, - } - - const executor = new Executor(workflow) - await executor.execute('test-workflow-id') - - // Verify that setState was called with activeBlockIds - const activeBlockIdsCalls = setStateCalls.filter( - (call) => call && typeof call === 'object' && 'activeBlockIds' in call - ) - - expect(activeBlockIdsCalls.length).toBeGreaterThan(0) - - // Check that at least one call included both virtual and actual block IDs - // This verifies the fix for parallel block glow effect - const hasVirtualAndActualIds = activeBlockIdsCalls.some((call) => { - const activeIds = Array.from(call.activeBlockIds || []) - // Look for both virtual block IDs (containing 'parallel') and actual block IDs - const hasVirtualId = activeIds.some( - (id) => typeof id === 'string' && id.includes('parallel') - ) - const hasActualId = activeIds.some((id) => typeof id === 'string' && id === 'function-1') - return hasVirtualId || hasActualId // Either pattern indicates the fix is working }) - // This test verifies that the glow effect fix is working - // The exact pattern may vary based on mocking, but we should see activeBlockIds being set - expect(hasVirtualAndActualIds || activeBlockIdsCalls.length > 0).toBe(true) - }) - - it('should handle object distribution in parallel blocks', async () => { - // Setup basic store mocks - setupAllMocks() + // Test that execution context contains streaming properties + const createContextSpy = vi.spyOn(executor as any, 'createExecutionContext') - // Import real implementations - const { Executor } = await import('./index') - - // Create a simple workflow with parallel using object distribution - const workflow: SerializedWorkflow = { - version: '2.0', - blocks: [ - { - id: 'starter', - position: { x: 0, y: 0 }, - metadata: { id: 'starter', name: 'Start' }, - config: { tool: 'starter', params: {} }, - inputs: {}, - outputs: {}, - enabled: true, - }, - { - id: 'parallel-1', - position: { x: 100, y: 0 }, - metadata: { id: 'parallel', name: 'Test Parallel' }, - config: { tool: 'parallel', params: {} }, - inputs: {}, - outputs: {}, - enabled: true, - }, - { - id: 'function-1', - position: { x: 200, y: 0 }, - metadata: { id: 'function', name: 'Process Entry' }, - config: { - tool: 'function', - params: { - code: 'return { key: "test", value: "value" }', - }, - }, - inputs: {}, - outputs: {}, - enabled: true, - }, - { - id: 'endpoint', - position: { x: 300, y: 0 }, - metadata: { id: 'generic', name: 'End' }, - config: { tool: 'generic', params: {} }, - inputs: {}, - outputs: {}, - enabled: true, - }, - ], - connections: [ - { source: 'starter', target: 'parallel-1' }, - { source: 'parallel-1', target: 'function-1', sourceHandle: 'parallel-start-source' }, - { source: 'parallel-1', target: 'endpoint', sourceHandle: 'parallel-end-source' }, - ], - loops: {}, - parallels: { - 'parallel-1': { - id: 'parallel-1', - nodes: ['function-1'], - distribution: { first: 'alpha', second: 'beta', third: 'gamma' }, - }, - }, - } - - const executor = new Executor(workflow) - const result = await executor.execute('test-workflow-id') - - if ('stream' in result) { - throw new Error('Expected ExecutionResult but got StreamingExecution') - } - - expect(result.success).toBe(true) - expect(result.logs).toBeDefined() + await executor.execute('test-workflow-id') - // Check that at least the parallel block was executed - const parallelLog = result.logs?.find((log: BlockLog) => log.blockType === 'parallel') - expect(parallelLog).toBeDefined() - // Since we're using mocked handlers, we just check that the parallel block was executed - expect(parallelLog?.success).toBe(true) + expect(createContextSpy).toHaveBeenCalled() }) }) }) diff --git a/apps/sim/executor/index.ts b/apps/sim/executor/index.ts index 9f29be36e23..22df97ee186 100644 --- a/apps/sim/executor/index.ts +++ b/apps/sim/executor/index.ts @@ -80,6 +80,7 @@ export class Executor { stream?: boolean selectedOutputIds?: string[] edges?: Array<{ source: string; target: string }> + onStream?: (streamingExecution: StreamingExecution) => Promise } }, private initialBlockStates: Record = {}, @@ -220,200 +221,66 @@ export class Executor { } else { const outputs = await this.executeLayer(nextLayer, context) - // Check if we got a StreamingExecution response from any block - const streamingOutput = outputs.find( - (output) => + for (const output of outputs) { + if ( + output && typeof output === 'object' && - output !== null && 'stream' in output && 'execution' in output - ) + ) { + if (context.onStream) { + const streamingExec = output as StreamingExecution + const [streamForClient, streamForExecutor] = streamingExec.stream.tee() - if (streamingOutput) { - // This is a combined response with both stream and execution data - logger.info('Found combined stream+execution response from block') + const clientStreamingExec = { ...streamingExec, stream: streamForClient } - // Incorporate the execution data from the block into our context - const executionData = streamingOutput.execution - - // Add any logs from the execution data to our context - if (executionData.logs && Array.isArray(executionData.logs)) { - context.blockLogs.push(...executionData.logs) - } - - // Add proper console entry for the streaming block - // This ensures identical formatting between streamed and non-streamed outputs - if (executionData.output) { - const blockLog = executionData.logs?.find( - (log: BlockLog) => log.blockId === executionData.blockId - ) - const consoleStore = useConsoleStore.getState() - - // Create a complete console entry with the full output structure, not the raw streaming object - const consoleEntry = { - output: executionData.output, // Use just the output, not the whole streaming structure - durationMs: blockLog?.durationMs || executionData.metadata?.duration || 0, - startedAt: - blockLog?.startedAt || - executionData.metadata?.startTime || - new Date().toISOString(), - endedAt: - blockLog?.endedAt || - executionData.metadata?.endTime || - new Date().toISOString(), - workflowId: context.workflowId, - timestamp: - blockLog?.startedAt || - executionData.metadata?.startTime || - new Date().toISOString(), - blockId: executionData.blockId, - blockName: executionData.blockName || blockLog?.blockName || 'Agent Block', - blockType: executionData.blockType || blockLog?.blockType || 'agent', - } - - // Add to console - const newEntry = consoleStore.addConsole(consoleEntry) - - // Save the entryId for potential updates when stream completes - const consoleEntryId = newEntry?.id - - // Set up a stream completion handler to update the console with final content - if (consoleEntryId && 'stream' in streamingOutput) { - // Clone the stream so we don't consume the original one - const originalStream = streamingOutput.stream - const [contentStream, returnStream] = originalStream.tee() - - // Replace the original stream with our cloned version that will be returned - streamingOutput.stream = returnStream + try { + // Handle client stream with proper error handling + await context.onStream(clientStreamingExec) + } catch (streamError: any) { + logger.error('Error in onStream callback:', streamError) + // Continue execution even if stream callback fails + } - // Create a reader to process the cloned stream for content collection - const reader = contentStream.getReader() + // Process executor stream with proper cleanup + const reader = streamForExecutor.getReader() const decoder = new TextDecoder() let fullContent = '' - // Process the stream in the background to collect the full content - - ;(async () => { - try { - while (true) { - const { done, value } = await reader.read() - if (done) break - const chunk = decoder.decode(value, { stream: true }) - fullContent += chunk - } - // Once stream is complete, update the console entry with the final content - if (fullContent.length > 0 && executionData.output?.response) { - const updatedOutput = { - ...executionData.output, - response: { - ...executionData.output.response, - content: fullContent, - }, - } - - // Update the console UI with the final content - consoleStore.updateConsole(consoleEntryId, { output: updatedOutput }) - - // Update the execution data itself with the final content - // so that when logs are persisted, they have the complete content - executionData.output.response.content = fullContent - - // If there's a block log for this execution, update it with the final content - if (executionData.blockId) { - const blockLog = context.blockLogs.find( - (log) => log.blockId === executionData.blockId - ) - if (blockLog?.output?.response) { - blockLog.output.response.content = fullContent - } - } - } - - // After the stream has fully completed and we've updated the - // final content, resume workflow execution for any - // downstream blocks (e.g. memory blocks) that depend on - // the agent response. - try { - // Determine the next blocks that are now unblocked. - let nextLayer = this.getNextExecutionLayer(context) - - while (nextLayer.length > 0) { - await this.executeLayer(nextLayer, context) - - // Handle any loop activations, etc. - await this.loopManager.processLoopIterations(context) - - // Process parallel iterations - similar to loops but conceptually for parallel execution - await this.parallelManager.processParallelIterations(context) - - // Fetch the subsequent layer (if any) - nextLayer = this.getNextExecutionLayer(context) - } - } catch (resumeError) { - logger.error( - 'Error continuing workflow after stream completion:', - resumeError - ) - } - } catch (e) { - logger.error('Error processing stream for console update:', e) + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + fullContent += decoder.decode(value, { stream: true }) } - })() - } - } - // Build a complete execution result with our context's logs - const execution: ExecutionResult & { isStreaming: boolean } = { - success: executionData.success !== false, - output: executionData.output || { response: {} }, - error: executionData.error, - logs: context.blockLogs, - metadata: { - duration: Date.now() - startTime.getTime(), - startTime: context.metadata.startTime!, - endTime: new Date().toISOString(), - workflowConnections: this.actualWorkflow.connections.map((conn: any) => ({ - source: conn.source, - target: conn.target, - })), - }, - isStreaming: true, - } - - // Add block metadata to logs if missing - if (context.blockLogs.length > 0) { - for (const log of context.blockLogs) { - if (!log.output) log.output = { response: {} } - - // For blocks matching the streaming block, ensure we add response and content properly - if (log.blockId === executionData.blockId) { - if (!log.output.response) log.output.response = {} - - // Add the output structure, preferring direct response content if available - if (executionData.output?.response) { - // Copy all properties from executionData response - Object.assign(log.output.response, executionData.output.response) - - // For streaming, we may not have content yet, so we store a placeholder - // that will be updated when the stream completes - if (!log.output.response.content && executionData.output.response.content) { - log.output.response.content = executionData.output.response.content - } + const blockId = (streamingExec.execution as any).blockId + const blockState = context.blockStates.get(blockId) + if (blockState?.output?.response) { + blockState.output.response.content = fullContent + } + } catch (readerError: any) { + logger.error('Error reading stream for executor:', readerError) + // Set partial content if available + const blockId = (streamingExec.execution as any).blockId + const blockState = context.blockStates.get(blockId) + if (blockState?.output?.response && fullContent) { + blockState.output.response.content = fullContent + } + } finally { + try { + reader.releaseLock() + } catch (releaseError: any) { + // Reader might already be released + logger.debug('Reader already released:', releaseError) } } } } - - // Return a properly formed StreamingExecution object - return { - stream: streamingOutput.stream, - execution, - } } - if (outputs.length > 0) { - // Filter out StreamingExecution objects (already handled above) - const normalizedOutputs = outputs.filter( + const normalizedOutputs = outputs + .filter( (output) => !( typeof output === 'object' && @@ -422,13 +289,11 @@ export class Executor { 'execution' in output ) ) - if (normalizedOutputs.length > 0) { - finalOutput = normalizedOutputs[ - normalizedOutputs.length - 1 - ] as NormalizedBlockOutput - } - } + .map((output) => output as NormalizedBlockOutput) + if (normalizedOutputs.length > 0) { + finalOutput = normalizedOutputs[normalizedOutputs.length - 1] + } // Process loop iterations - this will activate external paths when loops complete await this.loopManager.processLoopIterations(context) @@ -516,7 +381,12 @@ export class Executor { const outputs = await this.executeLayer(blockIds, context) if (outputs.length > 0) { - finalOutput = outputs[outputs.length - 1] + const nonStreamingOutputs = outputs.filter( + (o) => !(o && typeof o === 'object' && 'stream' in o) + ) as NormalizedBlockOutput[] + if (nonStreamingOutputs.length > 0) { + finalOutput = nonStreamingOutputs[nonStreamingOutputs.length - 1] + } } await this.loopManager.processLoopIterations(context) await this.parallelManager.processParallelIterations(context) @@ -665,6 +535,7 @@ export class Executor { stream: this.contextExtensions.stream || false, selectedOutputIds: this.contextExtensions.selectedOutputIds || [], edges: this.contextExtensions.edges || [], + onStream: this.contextExtensions.onStream, } Object.entries(this.initialBlockStates).forEach(([blockId, output]) => { @@ -1150,7 +1021,7 @@ export class Executor { private async executeLayer( blockIds: string[], context: ExecutionContext - ): Promise { + ): Promise<(NormalizedBlockOutput | StreamingExecution)[]> { const { setActiveBlocks } = useExecutionStore.getState() try { @@ -1199,7 +1070,7 @@ export class Executor { private async executeBlock( blockId: string, context: ExecutionContext - ): Promise { + ): Promise { // Check if this is a virtual block ID for parallel execution let actualBlockId = blockId let parallelInfo: @@ -1316,6 +1187,73 @@ export class Executor { return { activeBlockIds: updatedActiveBlockIds } }) + if ( + rawOutput && + typeof rawOutput === 'object' && + 'stream' in rawOutput && + 'execution' in rawOutput + ) { + const streamingExec = rawOutput as StreamingExecution + const output = (streamingExec.execution as any).output as NormalizedBlockOutput + + context.blockStates.set(blockId, { + output, + executed: true, + executionTime, + }) + + // Also store under the actual block ID for reference + if (parallelInfo) { + // Store iteration result in parallel state + this.parallelManager.storeIterationResult( + context, + parallelInfo.parallelId, + parallelInfo.iterationIndex, + output + ) + } + + // Update the execution log + blockLog.success = true + blockLog.output = output + blockLog.durationMs = Math.round(executionTime) + blockLog.endedAt = new Date().toISOString() + + context.blockLogs.push(blockLog) + + // Skip console logging for infrastructure blocks like loops and parallels + if (block.metadata?.id !== 'loop' && block.metadata?.id !== 'parallel') { + addConsole({ + output: blockLog.output, + success: true, + durationMs: blockLog.durationMs, + startedAt: blockLog.startedAt, + endedAt: blockLog.endedAt, + workflowId: context.workflowId, + blockId: parallelInfo ? blockId : block.id, + blockName: parallelInfo + ? `${block.metadata?.name || 'Unnamed Block'} (iteration ${ + parallelInfo.iterationIndex + 1 + })` + : block.metadata?.name || 'Unnamed Block', + blockType: block.metadata?.id || 'unknown', + }) + } + + trackWorkflowTelemetry('block_execution', { + workflowId: context.workflowId, + blockId: block.id, + virtualBlockId: parallelInfo ? blockId : undefined, + iterationIndex: parallelInfo?.iterationIndex, + blockType: block.metadata?.id || 'unknown', + blockName: block.metadata?.name || 'Unnamed Block', + durationMs: Math.round(executionTime), + success: true, + }) + + return streamingExec + } + // Normalize the output const output = this.normalizeBlockOutput(rawOutput, block) @@ -1350,13 +1288,16 @@ export class Executor { if (block.metadata?.id !== 'loop' && block.metadata?.id !== 'parallel') { addConsole({ output: blockLog.output, + success: true, durationMs: blockLog.durationMs, startedAt: blockLog.startedAt, endedAt: blockLog.endedAt, workflowId: context.workflowId, blockId: parallelInfo ? blockId : block.id, blockName: parallelInfo - ? `${block.metadata?.name || 'Unnamed Block'} (iteration ${parallelInfo.iterationIndex + 1})` + ? `${block.metadata?.name || 'Unnamed Block'} (iteration ${ + parallelInfo.iterationIndex + 1 + })` : block.metadata?.name || 'Unnamed Block', blockType: block.metadata?.id || 'unknown', }) @@ -1412,7 +1353,8 @@ export class Executor { // Skip console logging for infrastructure blocks like loops and parallels if (block.metadata?.id !== 'loop' && block.metadata?.id !== 'parallel') { addConsole({ - output: {}, + output: { response: {} }, + success: false, error: error.message || `Error executing ${block.metadata?.id || 'unknown'} block: ${String(error)}`, @@ -1420,6 +1362,7 @@ export class Executor { startedAt: blockLog.startedAt, endedAt: blockLog.endedAt, workflowId: context.workflowId, + blockId: parallelInfo ? blockId : block.id, blockName: parallelInfo ? `${block.metadata?.name || 'Unnamed Block'} (iteration ${parallelInfo.iterationIndex + 1})` : block.metadata?.name || 'Unnamed Block', diff --git a/apps/sim/executor/resolver.ts b/apps/sim/executor/resolver.ts index d5f756170d6..b3fd7ab5dd8 100644 --- a/apps/sim/executor/resolver.ts +++ b/apps/sim/executor/resolver.ts @@ -1082,7 +1082,7 @@ export class InputResolver { if (!value || typeof value !== 'object') { throw new Error(`Invalid path "${pathParts[i]}" in loop item reference`) } - value = value[pathParts[i]] + value = (value as any)[pathParts[i] as any] if (value === undefined) { throw new Error(`No value found at path "loop.${pathParts.join('.')}" in loop item`) } @@ -1226,7 +1226,7 @@ export class InputResolver { if (!value || typeof value !== 'object') { throw new Error(`Invalid path "${pathParts[i]}" in parallel item reference`) } - value = value[pathParts[i]] + value = (value as any)[pathParts[i] as any] if (value === undefined) { throw new Error( `No value found at path "parallel.${pathParts.join('.')}" in parallel item` diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index 943317850c5..e312b8e366b 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -73,7 +73,7 @@ export interface ExecutionMetadata { export interface BlockState { output: NormalizedBlockOutput // Current output data from the block executed: boolean // Whether the block has been executed - executionTime?: number // Time taken to execute in milliseconds + executionTime: number // Time taken to execute in milliseconds } /** @@ -81,7 +81,7 @@ export interface BlockState { */ export interface ExecutionContext { workflowId: string // Unique identifier for this workflow execution - blockStates: Map // Map of block states indexed by block ID + blockStates: Map blockLogs: BlockLog[] // Chronological log of block executions metadata: ExecutionMetadata // Timing metadata for the execution environmentVariables: Record // Environment variables available during execution @@ -93,7 +93,7 @@ export interface ExecutionContext { } loopIterations: Map // Tracks current iteration count for each loop - loopItems: Map // Tracks current item for forEach loops + loopItems: Map> // Tracks current item for forEach loops and parallel distribution completedLoops: Set // Tracks which loops have completed all iterations // Parallel execution tracking @@ -145,6 +145,9 @@ export interface ExecutionContext { stream?: boolean // Whether to use streaming responses when available selectedOutputIds?: string[] // IDs of blocks selected for streaming output edges?: Array<{ source: string; target: string }> // Workflow edge connections + + // New context extensions + onStream?: (streamingExecution: StreamingExecution) => Promise } /** diff --git a/apps/sim/stores/panel/chat/types.ts b/apps/sim/stores/panel/chat/types.ts index 060f4105008..6f5fb7a9fa0 100644 --- a/apps/sim/stores/panel/chat/types.ts +++ b/apps/sim/stores/panel/chat/types.ts @@ -17,7 +17,7 @@ export interface ChatStore { messages: ChatMessage[] selectedWorkflowOutputs: Record conversationIds: Record - addMessage: (message: Omit) => void + addMessage: (message: Omit & { id?: string }) => void clearChat: (workflowId: string | null) => void getWorkflowMessages: (workflowId: string) => ChatMessage[] setSelectedWorkflowOutput: (workflowId: string, outputIds: string[]) => void diff --git a/apps/sim/stores/panel/console/store.test.ts b/apps/sim/stores/panel/console/store.test.ts new file mode 100644 index 00000000000..ebdbe636f42 --- /dev/null +++ b/apps/sim/stores/panel/console/store.test.ts @@ -0,0 +1,297 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { useConsoleStore } from './store' +import type { ConsoleUpdate } from './types' + +vi.stubGlobal('crypto', { + randomUUID: vi.fn(() => 'test-uuid-123'), +}) + +vi.mock('@/lib/utils', () => ({ + redactApiKeys: vi.fn((obj) => obj), // Return object as-is for testing +})) + +describe('Console Store', () => { + beforeEach(() => { + useConsoleStore.setState({ + entries: [], + isOpen: false, + }) + vi.clearAllMocks() + }) + + describe('addConsole', () => { + it('should add a new console entry with required fields', () => { + const store = useConsoleStore.getState() + + const newEntry = store.addConsole({ + workflowId: 'workflow-123', + blockId: 'block-123', + blockName: 'Test Block', + blockType: 'agent', + success: true, + output: { response: { content: 'Test output' } }, + durationMs: 100, + startedAt: '2023-01-01T00:00:00.000Z', + endedAt: '2023-01-01T00:00:01.000Z', + }) + + expect(newEntry).toBeDefined() + expect(newEntry.id).toBe('test-uuid-123') + expect(newEntry.workflowId).toBe('workflow-123') + expect(newEntry.blockId).toBe('block-123') + expect(newEntry.success).toBe(true) + + const state = useConsoleStore.getState() + expect(state.entries).toHaveLength(1) + expect(state.entries[0]).toBe(newEntry) + }) + + it('should add entry with error', () => { + const store = useConsoleStore.getState() + + store.addConsole({ + workflowId: 'workflow-123', + blockId: 'block-123', + blockName: 'Failed Block', + blockType: 'agent', + success: false, + error: 'Something went wrong', + durationMs: 50, + startedAt: '2023-01-01T00:00:00.000Z', + endedAt: '2023-01-01T00:00:00.500Z', + }) + + const state = useConsoleStore.getState() + expect(state.entries).toHaveLength(1) + expect(state.entries[0].success).toBe(false) + expect(state.entries[0].error).toBe('Something went wrong') + }) + }) + + describe('updateConsole', () => { + beforeEach(() => { + // Add a test entry first + const store = useConsoleStore.getState() + store.addConsole({ + workflowId: 'workflow-123', + blockId: 'block-123', + blockName: 'Test Block', + blockType: 'agent', + success: true, + output: { response: { content: 'Initial content' } }, + durationMs: 100, + startedAt: '2023-01-01T00:00:00.000Z', + endedAt: '2023-01-01T00:00:01.000Z', + }) + }) + + it('should update console entry with string content', () => { + const store = useConsoleStore.getState() + + store.updateConsole('block-123', 'Updated content') + + const state = useConsoleStore.getState() + expect(state.entries).toHaveLength(1) + expect(state.entries[0].output?.response?.content).toBe('Updated content') + }) + + it('should update console entry with object update', () => { + const store = useConsoleStore.getState() + + const update: ConsoleUpdate = { + content: 'New content', + success: false, + error: 'Update error', + durationMs: 200, + endedAt: '2023-01-01T00:00:02.000Z', + } + + store.updateConsole('block-123', update) + + const state = useConsoleStore.getState() + const entry = state.entries[0] + + expect(entry.output?.response?.content).toBe('New content') + expect(entry.success).toBe(false) + expect(entry.error).toBe('Update error') + expect(entry.durationMs).toBe(200) + expect(entry.endedAt).toBe('2023-01-01T00:00:02.000Z') + }) + + it('should update output object directly', () => { + const store = useConsoleStore.getState() + + const update: ConsoleUpdate = { + output: { + response: { + content: 'Direct output update', + status: 200, + }, + }, + } + + store.updateConsole('block-123', update) + + const state = useConsoleStore.getState() + const entry = state.entries[0] + + expect(entry.output?.response?.content).toBe('Direct output update') + expect(entry.output?.response?.status).toBe(200) + }) + + it('should not update non-matching block IDs', () => { + const store = useConsoleStore.getState() + + store.updateConsole('non-existent-block', 'Should not update') + + const newState = useConsoleStore.getState() + expect(newState.entries[0].output?.response?.content).toBe('Initial content') + }) + + it('should handle partial updates correctly', () => { + const store = useConsoleStore.getState() + + // First update only success flag + store.updateConsole('block-123', { success: false }) + + let state = useConsoleStore.getState() + expect(state.entries[0].success).toBe(false) + expect(state.entries[0].output?.response?.content).toBe('Initial content') // Should remain unchanged + + // Then update only content + store.updateConsole('block-123', { content: 'Partial update' }) + + state = useConsoleStore.getState() + expect(state.entries[0].success).toBe(false) // Should remain false + expect(state.entries[0].output?.response?.content).toBe('Partial update') + }) + }) + + describe('clearConsole', () => { + beforeEach(() => { + const store = useConsoleStore.getState() + + // Add multiple entries for different workflows + store.addConsole({ + workflowId: 'workflow-1', + blockId: 'block-1', + blockName: 'Block 1', + blockType: 'agent', + success: true, + output: { response: {} }, + startedAt: '2023-01-01T00:00:00.000Z', + endedAt: '2023-01-01T00:00:01.000Z', + }) + + store.addConsole({ + workflowId: 'workflow-2', + blockId: 'block-2', + blockName: 'Block 2', + blockType: 'api', + success: true, + output: { response: {} }, + startedAt: '2023-01-01T00:00:00.000Z', + endedAt: '2023-01-01T00:00:01.000Z', + }) + }) + + it('should clear all entries when workflowId is null', () => { + const store = useConsoleStore.getState() + + expect(store.entries).toHaveLength(2) + + store.clearConsole(null) + + const state = useConsoleStore.getState() + expect(state.entries).toHaveLength(0) + }) + + it('should clear only specific workflow entries', () => { + const store = useConsoleStore.getState() + + expect(store.entries).toHaveLength(2) + + store.clearConsole('workflow-1') + + const state = useConsoleStore.getState() + expect(state.entries).toHaveLength(1) + expect(state.entries[0].workflowId).toBe('workflow-2') + }) + }) + + describe('getWorkflowEntries', () => { + beforeEach(() => { + const store = useConsoleStore.getState() + + // Add entries for different workflows + store.addConsole({ + workflowId: 'workflow-1', + blockId: 'block-1', + blockName: 'Block 1', + blockType: 'agent', + success: true, + output: { response: {} }, + startedAt: '2023-01-01T00:00:00.000Z', + endedAt: '2023-01-01T00:00:01.000Z', + }) + + store.addConsole({ + workflowId: 'workflow-2', + blockId: 'block-2', + blockName: 'Block 2', + blockType: 'api', + success: true, + output: { response: {} }, + startedAt: '2023-01-01T00:00:00.000Z', + endedAt: '2023-01-01T00:00:01.000Z', + }) + + store.addConsole({ + workflowId: 'workflow-1', + blockId: 'block-3', + blockName: 'Block 3', + blockType: 'function', + success: false, + output: { response: {} }, + error: 'Test error', + startedAt: '2023-01-01T00:00:00.000Z', + endedAt: '2023-01-01T00:00:01.000Z', + }) + }) + + it('should return entries for specific workflow', () => { + const store = useConsoleStore.getState() + + const workflow1Entries = store.getWorkflowEntries('workflow-1') + const workflow2Entries = store.getWorkflowEntries('workflow-2') + + expect(workflow1Entries).toHaveLength(2) + expect(workflow2Entries).toHaveLength(1) + + expect(workflow1Entries.every((entry) => entry.workflowId === 'workflow-1')).toBe(true) + expect(workflow2Entries.every((entry) => entry.workflowId === 'workflow-2')).toBe(true) + }) + + it('should return empty array for non-existent workflow', () => { + const store = useConsoleStore.getState() + + const entries = store.getWorkflowEntries('non-existent-workflow') + + expect(entries).toHaveLength(0) + }) + }) + + describe('toggleConsole', () => { + it('should toggle console open state', () => { + const store = useConsoleStore.getState() + + expect(store.isOpen).toBe(false) + + store.toggleConsole() + expect(useConsoleStore.getState().isOpen).toBe(true) + + store.toggleConsole() + expect(useConsoleStore.getState().isOpen).toBe(false) + }) + }) +}) diff --git a/apps/sim/stores/panel/console/store.ts b/apps/sim/stores/panel/console/store.ts index 70572c83630..b315e0a96c6 100644 --- a/apps/sim/stores/panel/console/store.ts +++ b/apps/sim/stores/panel/console/store.ts @@ -1,32 +1,29 @@ import { create } from 'zustand' import { devtools, persist } from 'zustand/middleware' import { redactApiKeys } from '@/lib/utils' -import { useChatStore } from '../chat/store' +import type { NormalizedBlockOutput } from '@/executor/types' import type { ConsoleEntry, ConsoleStore } from './types' const MAX_ENTRIES = 50 // MAX across all workflows const MAX_IMAGE_DATA_SIZE = 1000 // Maximum size of image data to store (in characters) /** - * Gets a nested property value from an object using a path string - * @param obj The object to get the value from - * @param path The path to the value (e.g. 'response.content') - * @returns The value at the path, or undefined if not found + * Safely clone and update a NormalizedBlockOutput */ -const getValueByPath = (obj: any, path: string): any => { - if (!obj || !path) return undefined - - const pathParts = path.split('.') - let current = obj - - for (const part of pathParts) { - if (current === null || current === undefined || typeof current !== 'object') { - return undefined - } - current = current[part] +const updateBlockOutput = ( + existingOutput: NormalizedBlockOutput | undefined, + contentUpdate: string +): NormalizedBlockOutput => { + const defaultOutput: NormalizedBlockOutput = { response: {} } + const baseOutput = existingOutput || defaultOutput + + return { + ...baseOutput, + response: { + ...baseOutput.response, + content: contentUpdate, + }, } - - return current } /** @@ -160,71 +157,6 @@ export const useConsoleStore = create()( // Keep only the last MAX_ENTRIES const newEntries = [newEntry, ...state.entries].slice(0, MAX_ENTRIES) - // If the block produced a streaming output, skip automatic chat message creation - if (isStreamingOutput) { - return { entries: newEntries } - } - - // Check if this block matches a selected workflow output - if (entry.workflowId && entry.blockName) { - const chatStore = useChatStore.getState() - const selectedOutputIds = chatStore.getSelectedWorkflowOutput(entry.workflowId) - - if (selectedOutputIds && selectedOutputIds.length > 0) { - // Process each selected output that matches this block - for (const selectedOutputId of selectedOutputIds) { - // The selectedOutputId format is "{blockId}_{path}" - // We need to extract both components - const idParts = selectedOutputId.split('_') - const selectedBlockId = idParts[0] - // Reconstruct the path by removing the blockId part - const selectedPath = idParts.slice(1).join('.') - - // If this block matches the selected output for this workflow - if (selectedBlockId && entry.blockId === selectedBlockId) { - // Extract the specific value from the output using the path - let specificValue: any - - if (selectedPath) { - specificValue = getValueByPath(entry.output, selectedPath) - } else { - specificValue = entry.output - } - - // Format the value appropriately for display - let formattedValue: string - // For streaming responses, use empty string and set isStreaming flag - if (isStreamingOutput) { - // Skip adding a message since we'll handle streaming in workflow execution - // This prevents the "Output value not found" message for streams - continue - } - if (specificValue === undefined) { - formattedValue = 'Output value not found' - } else if (typeof specificValue === 'object') { - formattedValue = JSON.stringify(specificValue, null, 2) - } else { - formattedValue = String(specificValue) - } - - // Skip empty content messages (important for preventing empty entries) - if (!formattedValue || formattedValue.trim() === '') { - continue - } - - // Add the specific value to chat, not the whole output - chatStore.addMessage({ - content: formattedValue, - workflowId: entry.workflowId, - type: 'workflow', - blockId: entry.blockId, - isStreaming: isStreamingOutput, - }) - } - } - } - } - return { entries: newEntries } }) @@ -234,9 +166,9 @@ export const useConsoleStore = create()( clearConsole: (workflowId: string | null) => { set((state) => ({ - entries: state.entries.filter( - (entry) => !workflowId || entry.workflowId !== workflowId - ), + entries: workflowId + ? state.entries.filter((entry) => entry.workflowId !== workflowId) + : [], })) }, @@ -248,22 +180,60 @@ export const useConsoleStore = create()( set((state) => ({ isOpen: !state.isOpen })) }, - updateConsole: ( - entryId: string, - updatedData: Partial> - ) => { + updateConsole: (blockId: string, update: string | import('./types').ConsoleUpdate) => { set((state) => { const updatedEntries = state.entries.map((entry) => { - if (entry.id === entryId) { - return { - ...entry, - ...updatedData, - output: updatedData.output ? redactApiKeys(updatedData.output) : entry.output, + if (entry.blockId === blockId) { + if (typeof update === 'string') { + // Simple content update for backward compatibility + const newOutput = updateBlockOutput(entry.output, update) + return { ...entry, output: newOutput } } + // Complex update with multiple fields + const updatedEntry = { ...entry } + + if (update.content !== undefined) { + const newOutput = updateBlockOutput(entry.output, update.content) + updatedEntry.output = newOutput + } + + if (update.output !== undefined) { + const existingOutput = entry.output || { response: {} } + updatedEntry.output = { + ...existingOutput, + ...update.output, + response: { + ...(existingOutput.response || {}), + ...(update.output.response || {}), + }, + } + } + + if (update.error !== undefined) { + updatedEntry.error = update.error + } + + if (update.warning !== undefined) { + updatedEntry.warning = update.warning + } + + if (update.success !== undefined) { + updatedEntry.success = update.success + } + + if (update.endedAt !== undefined) { + updatedEntry.endedAt = update.endedAt + } + + if (update.durationMs !== undefined) { + updatedEntry.durationMs = update.durationMs + } + + return updatedEntry } return entry }) - return { entries: updatedEntries } + return { ...state, entries: updatedEntries } }) }, }), diff --git a/apps/sim/stores/panel/console/types.ts b/apps/sim/stores/panel/console/types.ts index bf987663d73..3760afbc73e 100644 --- a/apps/sim/stores/panel/console/types.ts +++ b/apps/sim/stores/panel/console/types.ts @@ -1,27 +1,39 @@ +import type { NormalizedBlockOutput } from '@/executor/types' + export interface ConsoleEntry { id: string - output: any - error?: string - warning?: string - durationMs: number - startedAt: string - endedAt: string - workflowId: string | null timestamp: string + workflowId: string + blockId: string blockName?: string blockType?: string - blockId?: string + startedAt?: string + endedAt?: string + durationMs?: number + success: boolean + output?: NormalizedBlockOutput + input?: any + error?: string + warning?: string +} + +export interface ConsoleUpdate { + content?: string + output?: Partial + error?: string + warning?: string + success?: boolean + endedAt?: string + durationMs?: number } export interface ConsoleStore { entries: ConsoleEntry[] isOpen: boolean + addConsole: (entry: Omit) => ConsoleEntry clearConsole: (workflowId: string | null) => void getWorkflowEntries: (workflowId: string) => ConsoleEntry[] toggleConsole: () => void - updateConsole: ( - entryId: string, - updatedData: Partial> - ) => void + updateConsole: (blockId: string, update: string | ConsoleUpdate) => void }