diff --git a/apps/sim/app/api/mothership/execute/route.ts b/apps/sim/app/api/mothership/execute/route.ts index 1e1c0ee0bc9..1f65c9a6e22 100644 --- a/apps/sim/app/api/mothership/execute/route.ts +++ b/apps/sim/app/api/mothership/execute/route.ts @@ -7,8 +7,13 @@ import { parseRequest } from '@/lib/api/server' import { checkInternalAuth } from '@/lib/auth/hybrid' import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload' import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context' +import { + MothershipStreamV1EventType, + MothershipStreamV1TextChannel, +} from '@/lib/copilot/generated/mothership-stream-v1' import { runHeadlessCopilotLifecycle } from '@/lib/copilot/request/lifecycle/headless' import { requestExplicitStreamAbort } from '@/lib/copilot/request/session/explicit-abort' +import type { StreamEvent } from '@/lib/copilot/request/types' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { buildMothershipToolsForRequest } from '@/lib/mothership/settings/runtime' import { @@ -19,17 +24,60 @@ import { export const maxDuration = 3600 const logger = createLogger('MothershipExecuteAPI') +const MOTHERSHIP_EXECUTE_STREAM_HEADER = 'x-mothership-execute-stream' +const MOTHERSHIP_EXECUTE_STREAM_VALUE = 'ndjson' +const MOTHERSHIP_EXECUTE_STREAM_CONTENT_TYPE = 'application/x-ndjson' +const MOTHERSHIP_EXECUTE_HEARTBEAT_INTERVAL_MS = 15_000 +const ndjsonEncoder = new TextEncoder() function isAbortError(error: unknown): boolean { return error instanceof Error && error.name === 'AbortError' } +function wantsStreamedExecuteResponse(req: NextRequest): boolean { + return ( + req.headers.get(MOTHERSHIP_EXECUTE_STREAM_HEADER) === MOTHERSHIP_EXECUTE_STREAM_VALUE || + req.headers.get('accept')?.includes(MOTHERSHIP_EXECUTE_STREAM_CONTENT_TYPE) === true + ) +} + +function encodeNdjson(value: unknown): Uint8Array { + return ndjsonEncoder.encode(`${JSON.stringify(value)}\n`) +} + +function buildExecuteResponsePayload( + result: Awaited>, + effectiveChatId: string, + integrationTools: Array<{ name: string }> +) { + const clientToolNames = new Set(integrationTools.map((t) => t.name)) + const clientToolCalls = (result.toolCalls || []).filter( + (tc: { name: string }) => clientToolNames.has(tc.name) || tc.name.startsWith('mcp-') + ) + + return { + content: result.content, + model: 'mothership', + conversationId: effectiveChatId, + tokens: result.usage + ? { + prompt: result.usage.prompt, + completion: result.usage.completion, + total: (result.usage.prompt || 0) + (result.usage.completion || 0), + } + : {}, + cost: result.cost || undefined, + toolCalls: clientToolCalls, + } +} + /** * POST /api/mothership/execute * - * Non-streaming endpoint for Mothership block execution within workflows. - * Called by the executor via internal JWT auth, not by the browser directly. - * Consumes the Go SSE stream internally and returns a single JSON response. + * Endpoint for Mothership block execution within workflows. Called by the + * executor via internal JWT auth, not by the browser directly. JSON callers get + * a single final response; NDJSON callers get heartbeats followed by a final + * event so long-running headless requests do not look idle to HTTP stacks. */ export const POST = withRouteHandler(async (req: NextRequest) => { let messageId: string | undefined @@ -100,7 +148,8 @@ export const POST = withRouteHandler(async (req: NextRequest) => { let allowExplicitAbort = true let explicitAbortRequest: Promise | undefined - const onAbort = () => { + const lifecycleAbortController = new AbortController() + const requestExplicitAbortOnce = () => { if (!allowExplicitAbort || explicitAbortRequest || !messageId) { return } @@ -115,6 +164,15 @@ export const POST = withRouteHandler(async (req: NextRequest) => { }) }) } + const abortLifecycle = (reason?: unknown) => { + if (!lifecycleAbortController.signal.aborted) { + lifecycleAbortController.abort(reason ?? 'mothership_execute_aborted') + } + requestExplicitAbortOnce() + } + const onAbort = () => { + abortLifecycle(req.signal.reason ?? 'request_aborted') + } if (req.signal.aborted) { onAbort() @@ -122,8 +180,8 @@ export const POST = withRouteHandler(async (req: NextRequest) => { req.signal.addEventListener('abort', onAbort, { once: true }) } - try { - const result = await runHeadlessCopilotLifecycle(requestPayload, { + const runLifecycle = (onEvent?: (event: StreamEvent) => Promise) => + runHeadlessCopilotLifecycle(requestPayload, { userId, workspaceId, chatId: effectiveChatId, @@ -133,12 +191,145 @@ export const POST = withRouteHandler(async (req: NextRequest) => { goRoute: '/api/mothership/execute', autoExecuteTools: true, interactive: false, - abortSignal: req.signal, + abortSignal: lifecycleAbortController.signal, + onEvent, }) + if (wantsStreamedExecuteResponse(req)) { + let cancelled = false + let heartbeatId: ReturnType | undefined + + const stream = new ReadableStream({ + start(controller) { + let forwardedAssistantContent = '' + const send = (event: unknown) => { + if (!cancelled) { + controller.enqueue(encodeNdjson(event)) + } + } + + // Flush response headers promptly and keep long headless runs from + // looking idle to worker/proxy HTTP stacks. + send({ type: 'heartbeat', timestamp: new Date().toISOString() }) + heartbeatId = setInterval(() => { + send({ type: 'heartbeat', timestamp: new Date().toISOString() }) + }, MOTHERSHIP_EXECUTE_HEARTBEAT_INTERVAL_MS) + + void (async () => { + try { + const result = await runLifecycle(async (event) => { + if ( + event.type === MothershipStreamV1EventType.text && + event.payload.channel === MothershipStreamV1TextChannel.assistant && + event.payload.text + ) { + const text = event.payload.text + const content = text.startsWith(forwardedAssistantContent) + ? text.slice(forwardedAssistantContent.length) + : text + if (content) { + forwardedAssistantContent += content + send({ type: 'chunk', content }) + } + } + }) + allowExplicitAbort = false + + if (lifecycleAbortController.signal.aborted) { + send({ type: 'error', error: 'Mothership execution aborted' }) + return + } + + if (!result.success) { + logger.error( + messageId + ? `Mothership execute failed [messageId:${messageId}]` + : 'Mothership execute failed', + { + requestId, + workflowId, + executionId, + error: result.error, + errors: result.errors, + } + ) + send({ + type: 'error', + error: result.error || 'Mothership execution failed', + content: result.content || '', + }) + return + } + + send({ + type: 'final', + data: buildExecuteResponsePayload(result, effectiveChatId, integrationTools), + }) + } catch (error) { + if ( + lifecycleAbortController.signal.aborted || + req.signal.aborted || + isAbortError(error) + ) { + logger.info( + messageId + ? `Mothership execute aborted [messageId:${messageId}]` + : 'Mothership execute aborted', + { requestId } + ) + send({ type: 'error', error: 'Mothership execution aborted' }) + return + } + + logger.error( + messageId + ? `Mothership execute error [messageId:${messageId}]` + : 'Mothership execute error', + { + requestId, + error: error instanceof Error ? error.message : 'Unknown error', + } + ) + send({ + type: 'error', + error: error instanceof Error ? error.message : 'Internal server error', + }) + } finally { + allowExplicitAbort = false + if (heartbeatId) { + clearInterval(heartbeatId) + } + req.signal.removeEventListener('abort', onAbort) + await explicitAbortRequest + if (!cancelled) { + controller.close() + } + } + })() + }, + cancel(reason) { + cancelled = true + if (heartbeatId) { + clearInterval(heartbeatId) + } + abortLifecycle(reason ?? 'mothership_execute_stream_cancelled') + }, + }) + + return new Response(stream, { + headers: { + 'Content-Type': `${MOTHERSHIP_EXECUTE_STREAM_CONTENT_TYPE}; charset=utf-8`, + 'Cache-Control': 'no-cache, no-transform', + }, + }) + } + + try { + const result = await runLifecycle() + allowExplicitAbort = false - if (req.signal.aborted) { + if (lifecycleAbortController.signal.aborted || req.signal.aborted) { reqLogger.info('Mothership execute aborted after lifecycle completion') return NextResponse.json({ error: 'Mothership execution aborted' }, { status: 499 }) } @@ -165,25 +356,9 @@ export const POST = withRouteHandler(async (req: NextRequest) => { ) } - const clientToolNames = new Set(integrationTools.map((t) => t.name)) - const clientToolCalls = (result.toolCalls || []).filter( - (tc: { name: string }) => clientToolNames.has(tc.name) || tc.name.startsWith('mcp-') + return NextResponse.json( + buildExecuteResponsePayload(result, effectiveChatId, integrationTools) ) - - return NextResponse.json({ - content: result.content, - model: 'mothership', - conversationId: effectiveChatId, - tokens: result.usage - ? { - prompt: result.usage.prompt, - completion: result.usage.completion, - total: (result.usage.prompt || 0) + (result.usage.completion || 0), - } - : {}, - cost: result.cost || undefined, - toolCalls: clientToolCalls, - }) } finally { allowExplicitAbort = false req.signal.removeEventListener('abort', onAbort) diff --git a/apps/sim/executor/handlers/mothership/mothership-handler.test.ts b/apps/sim/executor/handlers/mothership/mothership-handler.test.ts index 5aaeab83dfd..a646ce6b4e5 100644 --- a/apps/sim/executor/handlers/mothership/mothership-handler.test.ts +++ b/apps/sim/executor/handlers/mothership/mothership-handler.test.ts @@ -3,7 +3,7 @@ import '@sim/testing/mocks/executor' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { BlockType } from '@/executor/constants' import { MothershipBlockHandler } from '@/executor/handlers/mothership/mothership-handler' -import type { ExecutionContext } from '@/executor/types' +import type { ExecutionContext, StreamingExecution } from '@/executor/types' import type { SerializedBlock } from '@/serializer/types' const { @@ -66,6 +66,22 @@ function createAbortableFetchPromise(signal?: AbortSignal): Promise { }) } +async function readStreamText(stream: ReadableStream): Promise { + const reader = stream.getReader() + const decoder = new TextDecoder() + let text = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + text += decoder.decode(value, { stream: true }) + } + + text += decoder.decode() + reader.releaseLock() + return text +} + describe('MothershipBlockHandler', () => { let handler: MothershipBlockHandler let block: SerializedBlock @@ -119,6 +135,24 @@ describe('MothershipBlockHandler', () => { vi.unstubAllGlobals() }) + function createNdjsonResponse(events: unknown[]): Response { + const encoder = new TextEncoder() + return new Response( + new ReadableStream({ + start(controller) { + for (const event of events) { + controller.enqueue(encoder.encode(`${JSON.stringify(event)}\n`)) + } + controller.close() + }, + }), + { + status: 200, + headers: { 'Content-Type': 'application/x-ndjson; charset=utf-8' }, + } + ) + } + it('forwards workflow and execution metadata with generated UUID ids', async () => { mockGenerateId.mockReturnValueOnce('chat-uuid') mockGenerateId.mockReturnValueOnce('message-uuid') @@ -156,6 +190,10 @@ describe('MothershipBlockHandler', () => { expect(url).toBe('http://localhost:3000/api/mothership/execute') expect(options.method).toBe('POST') expect(options.signal).toBeInstanceOf(AbortSignal) + expect(options.headers).toMatchObject({ + Accept: 'application/x-ndjson', + 'X-Mothership-Execute-Stream': 'ndjson', + }) const body = JSON.parse(String(options.body)) expect(body).toEqual({ @@ -219,6 +257,144 @@ describe('MothershipBlockHandler', () => { expect(mockGenerateId).toHaveBeenCalledTimes(2) }) + it('consumes mothership execute heartbeat streams until the final result', async () => { + mockGenerateId.mockReturnValueOnce('chat-uuid') + mockGenerateId.mockReturnValueOnce('message-uuid') + mockGenerateId.mockReturnValueOnce('request-uuid') + + fetchMock.mockResolvedValue( + createNdjsonResponse([ + { type: 'heartbeat', timestamp: '2026-05-15T18:13:48.000Z' }, + { + type: 'final', + data: { + content: 'streamed done', + model: 'mothership', + conversationId: 'chat-uuid', + tokens: { total: 7 }, + toolCalls: [{ name: 'tool_a', params: { a: 1 }, result: 'ok', durationMs: 42 }], + cost: { total: 0.1 }, + }, + }, + ]) + ) + + const result = await handler.execute(context, block, { prompt: 'Hello from workflow' }) + + expect(result).toEqual({ + content: 'streamed done', + model: 'mothership', + conversationId: 'chat-uuid', + tokens: { total: 7 }, + toolCalls: { + list: [ + { + name: 'tool_a', + arguments: { a: 1 }, + result: 'ok', + error: undefined, + duration: 42, + }, + ], + count: 1, + }, + cost: { total: 0.1 }, + }) + }) + + it('surfaces mothership execute stream errors', async () => { + mockGenerateId.mockReturnValueOnce('chat-uuid') + mockGenerateId.mockReturnValueOnce('message-uuid') + mockGenerateId.mockReturnValueOnce('request-uuid') + + fetchMock.mockResolvedValue( + createNdjsonResponse([ + { type: 'heartbeat', timestamp: '2026-05-15T18:13:48.000Z' }, + { type: 'error', error: 'Mothership execution aborted' }, + ]) + ) + + await expect( + handler.execute(context, block, { prompt: 'Hello from workflow' }) + ).rejects.toThrow('Mothership execution failed: Mothership execution aborted') + }) + + it('streams mothership assistant chunks and preserves final metadata', async () => { + context.stream = true + context.selectedOutputs = [`${block.id}_content`] + mockGenerateId.mockReturnValueOnce('chat-uuid') + mockGenerateId.mockReturnValueOnce('message-uuid') + mockGenerateId.mockReturnValueOnce('request-uuid') + + fetchMock.mockResolvedValue( + createNdjsonResponse([ + { type: 'heartbeat', timestamp: '2026-05-15T18:13:48.000Z' }, + { type: 'chunk', content: 'Hello' }, + { type: 'heartbeat', timestamp: '2026-05-15T18:14:03.000Z' }, + { type: 'chunk', content: ' world' }, + { + type: 'final', + data: { + content: 'Hello world', + model: 'mothership', + conversationId: 'chat-uuid', + tokens: { total: 7 }, + toolCalls: [{ name: 'tool_a', params: { a: 1 }, result: 'ok', durationMs: 42 }], + cost: { total: 0.1 }, + }, + }, + ]) + ) + + const result = await handler.execute(context, block, { prompt: 'Hello from workflow' }) + expect(result).toHaveProperty('stream') + + const streamingExecution = result as StreamingExecution + await expect(readStreamText(streamingExecution.stream)).resolves.toBe('Hello world') + expect(streamingExecution.execution.output).toEqual({ + content: 'Hello world', + model: 'mothership', + conversationId: 'chat-uuid', + tokens: { total: 7 }, + toolCalls: { + list: [ + { + name: 'tool_a', + arguments: { a: 1 }, + result: 'ok', + error: undefined, + duration: 42, + }, + ], + count: 1, + }, + cost: { total: 0.1 }, + }) + }) + + it('surfaces mothership streaming errors while streaming selected content', async () => { + context.stream = true + context.selectedOutputs = [`${block.id}_content`] + mockGenerateId.mockReturnValueOnce('chat-uuid') + mockGenerateId.mockReturnValueOnce('message-uuid') + mockGenerateId.mockReturnValueOnce('request-uuid') + + fetchMock.mockResolvedValue( + createNdjsonResponse([ + { type: 'chunk', content: 'partial' }, + { type: 'error', error: 'Mothership execution aborted' }, + ]) + ) + + const result = (await handler.execute(context, block, { + prompt: 'Hello from workflow', + })) as StreamingExecution + + await expect(readStreamText(result.stream)).rejects.toThrow( + 'Mothership execution failed: Mothership execution aborted' + ) + }) + it('embeds attached files for the mothership execute request', async () => { const fileContent = Buffer.from('hello mothership', 'utf8').toString('base64') mockGenerateId.mockReturnValueOnce('chat-uuid') @@ -334,4 +510,37 @@ describe('MothershipBlockHandler', () => { await expect(abortedExecution).resolves.toMatchObject({ name: 'AbortError' }) expect(mockIsExecutionCancelled).toHaveBeenCalledWith('execution-1') }) + + it('aborts the mothership request when selected-output streaming is cancelled', async () => { + context.stream = true + context.selectedOutputs = [`${block.id}_content`] + + mockGenerateId.mockReturnValueOnce('chat-uuid') + mockGenerateId.mockReturnValueOnce('message-uuid') + mockGenerateId.mockReturnValueOnce('request-uuid') + + let fetchSignal: AbortSignal | undefined + fetchMock.mockImplementation((_url: string, options?: RequestInit) => { + fetchSignal = options?.signal as AbortSignal | undefined + return Promise.resolve( + new Response( + new ReadableStream({ + start() {}, + }), + { + status: 200, + headers: { 'Content-Type': 'application/x-ndjson; charset=utf-8' }, + } + ) + ) + }) + + const result = (await handler.execute(context, block, { prompt: 'Cancel stream' })) as + | StreamingExecution + | undefined + + await result?.stream.cancel('client_cancelled') + + expect(fetchSignal?.aborted).toBe(true) + }) }) diff --git a/apps/sim/executor/handlers/mothership/mothership-handler.ts b/apps/sim/executor/handlers/mothership/mothership-handler.ts index e677680ecd7..4fc5b58e0d0 100644 --- a/apps/sim/executor/handlers/mothership/mothership-handler.ts +++ b/apps/sim/executor/handlers/mothership/mothership-handler.ts @@ -12,18 +12,268 @@ import { import type { BlockOutput } from '@/blocks/types' import { normalizeFileInput } from '@/blocks/utils' import { BlockType } from '@/executor/constants' -import type { BlockHandler, ExecutionContext } from '@/executor/types' +import type { + BlockHandler, + ExecutionContext, + NormalizedBlockOutput, + StreamingExecution, +} from '@/executor/types' import { buildAPIUrl, buildAuthHeaders, extractAPIErrorMessage } from '@/executor/utils/http' import type { SerializedBlock } from '@/serializer/types' const logger = createLogger('MothershipBlockHandler') const CANCELLATION_CHECK_INTERVAL_MS = 500 const MAX_MOTHERSHIP_ATTACHMENT_BYTES = 10 * 1024 * 1024 +const MOTHERSHIP_EXECUTE_STREAM_HEADER = 'X-Mothership-Execute-Stream' +const MOTHERSHIP_EXECUTE_STREAM_VALUE = 'ndjson' type MothershipFileAttachment = MessageContent & { filename?: string } +type MothershipExecuteResult = { + content?: string + model?: string + conversationId?: string + tokens?: Record + toolCalls?: Array> + cost?: unknown +} + +type MothershipExecuteStreamEvent = + | { type: 'heartbeat'; timestamp?: string } + | { type: 'chunk'; content?: string } + | { type: 'final'; data: MothershipExecuteResult } + | { type: 'error'; error?: string } + +function parseMothershipExecuteStreamLine(line: string): MothershipExecuteStreamEvent | undefined { + const trimmed = line.trim() + if (!trimmed) return undefined + + try { + return JSON.parse(trimmed) as MothershipExecuteStreamEvent + } catch { + throw new Error('Mothership execution stream returned malformed data') + } +} + +function formatMothershipBlockOutput( + result: MothershipExecuteResult, + fallbackChatId: string +): NormalizedBlockOutput { + const formattedList = (result.toolCalls || []).map((tc: Record) => ({ + name: typeof tc.name === 'string' ? tc.name : String(tc.name ?? ''), + arguments: (tc.params && typeof tc.params === 'object' ? tc.params : {}) as Record< + string, + unknown + >, + result: tc.result as any, + error: typeof tc.error === 'string' ? tc.error : undefined, + duration: typeof tc.durationMs === 'number' ? tc.durationMs : 0, + })) + const toolCalls: NormalizedBlockOutput['toolCalls'] = { + list: formattedList, + count: formattedList.length, + } + + return { + content: result.content || '', + model: result.model || 'mothership', + conversationId: result.conversationId || fallbackChatId, + tokens: (result.tokens || {}) as NormalizedBlockOutput['tokens'], + toolCalls, + cost: result.cost as NormalizedBlockOutput['cost'] | undefined, + } +} + +function isContentSelectedForStreaming(ctx: ExecutionContext, block: SerializedBlock): boolean { + if (!ctx.stream) return false + + return ( + ctx.selectedOutputs?.some((outputId) => { + if (outputId === block.id) return true + return outputId === `${block.id}.content` || outputId === `${block.id}_content` + }) ?? false + ) +} + +async function readMothershipExecuteResponse(response: Response): Promise { + const contentType = response.headers.get('content-type') || '' + if (!contentType.includes('application/x-ndjson')) { + return response.json() + } + + if (!response.body) { + throw new Error('Mothership execution stream ended without a response body') + } + + const reader = response.body.getReader() + const decoder = new TextDecoder() + let buffer = '' + let finalResult: MothershipExecuteResult | undefined + + const processLine = (line: string) => { + const event = parseMothershipExecuteStreamLine(line) + if (!event) return + + if (event.type === 'heartbeat' || event.type === 'chunk') { + return + } + + if (event.type === 'error') { + throw new Error(`Mothership execution failed: ${event.error || 'Unknown error'}`) + } + + if (event.type === 'final') { + finalResult = event.data + return + } + + throw new Error('Mothership execution stream returned an unknown event') + } + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + for (const line of lines) { + processLine(line) + } + } + + buffer += decoder.decode() + processLine(buffer) + + if (!finalResult) { + throw new Error('Mothership execution stream ended without a final result') + } + + return finalResult + } finally { + reader.releaseLock() + } +} + +function createMothershipStreamingExecution( + response: Response, + fallbackChatId: string, + blockId: string, + options: { + onCancel?: (reason?: unknown) => void + onDone?: () => void + } = {} +): StreamingExecution { + if (!response.body) { + throw new Error('Mothership execution stream ended without a response body') + } + + const output = formatMothershipBlockOutput({}, fallbackChatId) + let reader: ReadableStreamDefaultReader | undefined + let cancelled = false + let cleanedUp = false + const cleanup = () => { + if (cleanedUp) return + cleanedUp = true + options.onDone?.() + } + + const stream = new ReadableStream({ + async start(controller) { + reader = response.body!.getReader() + const decoder = new TextDecoder() + const encoder = new TextEncoder() + let buffer = '' + let sawFinal = false + + const processLine = (line: string) => { + const event = parseMothershipExecuteStreamLine(line) + if (!event) return + + if (event.type === 'heartbeat') { + return + } + + if (event.type === 'chunk') { + if (event.content) { + controller.enqueue(encoder.encode(event.content)) + } + return + } + + if (event.type === 'error') { + throw new Error(`Mothership execution failed: ${event.error || 'Unknown error'}`) + } + + if (event.type === 'final') { + sawFinal = true + Object.assign(output, formatMothershipBlockOutput(event.data, fallbackChatId)) + return + } + + throw new Error('Mothership execution stream returned an unknown event') + } + + try { + while (true) { + const { done, value } = await reader.read() + if (cancelled) return + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() ?? '' + for (const line of lines) { + processLine(line) + } + } + + buffer += decoder.decode() + processLine(buffer) + + if (!sawFinal) { + throw new Error('Mothership execution stream ended without a final result') + } + + if (!cancelled) { + controller.close() + } + } catch (error) { + if (!cancelled) { + controller.error(error) + } + } finally { + cleanup() + reader?.releaseLock() + } + }, + cancel(reason) { + cancelled = true + options.onCancel?.(reason) + cleanup() + return reader?.cancel(reason) + }, + }) + + return { + stream, + execution: { + success: true, + output, + blockId, + logs: [], + metadata: { + duration: 0, + startTime: new Date().toISOString(), + }, + isStreaming: true, + } as StreamingExecution['execution'] & { blockId: string }, + } +} + async function buildMothershipFileAttachments( filesInput: unknown, ctx: ExecutionContext, @@ -82,7 +332,7 @@ export class MothershipBlockHandler implements BlockHandler { ctx: ExecutionContext, block: SerializedBlock, inputs: Record - ): Promise { + ): Promise { const prompt = inputs.prompt if (!prompt || typeof prompt !== 'string') { throw new Error('Prompt input is required') @@ -97,6 +347,8 @@ export class MothershipBlockHandler implements BlockHandler { const url = buildAPIUrl('/api/mothership/execute') const headers = await buildAuthHeaders(ctx.userId) + headers.Accept = 'application/x-ndjson' + headers[MOTHERSHIP_EXECUTE_STREAM_HEADER] = MOTHERSHIP_EXECUTE_STREAM_VALUE const body: Record = { messages, @@ -161,8 +413,15 @@ export class MothershipBlockHandler implements BlockHandler { }) }, CANCELLATION_CHECK_INTERVAL_MS) : undefined + const cleanupAbortListeners = () => { + if (cancellationPoller) { + clearInterval(cancellationPoller) + } + ctx.abortSignal?.removeEventListener('abort', onAbort) + } let response: Response + let cleanupImmediately = true try { response = await fetch(url.toString(), { method: 'POST', @@ -170,36 +429,31 @@ export class MothershipBlockHandler implements BlockHandler { body: JSON.stringify(body), signal: abortController.signal, }) - } finally { - if (cancellationPoller) { - clearInterval(cancellationPoller) + + if (!response.ok) { + const errorMsg = await extractAPIErrorMessage(response) + throw new Error(`Mothership execution failed: ${errorMsg}`) } - ctx.abortSignal?.removeEventListener('abort', onAbort) - } - if (!response.ok) { - const errorMsg = await extractAPIErrorMessage(response) - throw new Error(`Mothership execution failed: ${errorMsg}`) - } + if (isContentSelectedForStreaming(ctx, block)) { + const streamingExecution = createMothershipStreamingExecution(response, chatId, block.id, { + onCancel: (reason) => { + if (!abortController.signal.aborted) { + abortController.abort(reason ?? 'mothership_stream_cancelled') + } + }, + onDone: cleanupAbortListeners, + }) + cleanupImmediately = false + return streamingExecution + } - const result = await response.json() - - const formattedList = (result.toolCalls || []).map((tc: Record) => ({ - name: tc.name, - arguments: tc.params || {}, - result: tc.result, - error: tc.error, - duration: tc.durationMs || 0, - })) - const toolCalls = { list: formattedList, count: formattedList.length } - - return { - content: result.content || '', - model: result.model || 'mothership', - conversationId: result.conversationId || chatId, - tokens: result.tokens || {}, - toolCalls, - cost: result.cost || undefined, + const result = await readMothershipExecuteResponse(response) + return formatMothershipBlockOutput(result, chatId) + } finally { + if (cleanupImmediately) { + cleanupAbortListeners() + } } } } diff --git a/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts b/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts index 4ded8fda4ec..2b2fff6e703 100644 --- a/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts +++ b/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts @@ -1521,6 +1521,55 @@ describe('errorHandled - handled errors should not bubble up', () => { expect(span.status).toBe('success') expect(span.errorHandled).toBeUndefined() }) + + it.concurrent('successful mothership blocks do not bubble failed child tool spans', () => { + const result: ExecutionResult = { + success: true, + output: { content: 'Mothership recovered from the failed tool' }, + metadata: { duration: 3000, startTime: '2024-01-01T10:00:00.000Z' }, + logs: [ + { + blockId: 'mothership-1', + blockName: 'Mothership', + blockType: 'mothership', + startedAt: '2024-01-01T10:00:00.000Z', + endedAt: '2024-01-01T10:00:03.000Z', + durationMs: 3000, + success: true, + output: { + content: 'Mothership recovered from the failed tool', + model: 'mothership', + toolCalls: { + list: [ + { + name: 'failing_tool', + arguments: { query: 'test' }, + error: 'Tool execution failed', + duration: 1000, + startTime: '2024-01-01T10:00:01.000Z', + endTime: '2024-01-01T10:00:02.000Z', + }, + ], + count: 1, + }, + }, + executionOrder: 1, + }, + ], + } + + const { traceSpans } = buildTraceSpans(result) + + const workflowSpan = traceSpans[0] + expect(workflowSpan.status).toBe('success') + + const mothershipSpan = workflowSpan.children![0] + expect(mothershipSpan.status).toBe('success') + + const toolSpan = mothershipSpan.children![0] + expect(toolSpan.status).toBe('error') + expect(toolSpan.output).toEqual({ error: 'Tool execution failed' }) + }) }) describe('stripCustomToolPrefix', () => { diff --git a/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts b/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts index 815c5159351..ba37918435d 100644 --- a/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts +++ b/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts @@ -8,6 +8,7 @@ import type { BlockLog, ExecutionResult } from '@/executor/types' * These are internal fields used for execution tracking that shouldn't be shown to users. */ const HIDDEN_OUTPUT_KEYS = new Set(['childTraceSpans']) +const SUCCESSFUL_CHILD_ERROR_BOUNDARY_BLOCK_TYPES = new Set(['mothership']) /** * Recursively filters hidden keys from nested objects for cleaner display. @@ -126,5 +127,8 @@ function addRelativeTimestamps(spans: TraceSpan[], workflowStartMs: number): voi /** True if this span (or any descendant) has an unhandled error. */ function hasUnhandledError(span: TraceSpan): boolean { if (span.status === 'error' && !span.errorHandled) return true + if (span.status === 'success' && SUCCESSFUL_CHILD_ERROR_BOUNDARY_BLOCK_TYPES.has(span.type)) { + return false + } return span.children?.some(hasUnhandledError) ?? false }