diff --git a/apps/sim/app/api/copilot/chat/abort/route.ts b/apps/sim/app/api/copilot/chat/abort/route.ts index c18e62548e..6a81e6e1d8 100644 --- a/apps/sim/app/api/copilot/chat/abort/route.ts +++ b/apps/sim/app/api/copilot/chat/abort/route.ts @@ -10,7 +10,11 @@ import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1' import { fetchGo } from '@/lib/copilot/request/go/fetch' import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http' import { withCopilotSpan, withIncomingGoSpan } from '@/lib/copilot/request/otel' -import { abortActiveStream, waitForPendingChatStream } from '@/lib/copilot/request/session' +import { + abortActiveStream, + releasePendingChatStream, + waitForPendingChatStream, +} from '@/lib/copilot/request/session' import { getMothershipBaseURL, getMothershipSourceEnvHeaders } from '@/lib/copilot/server/agent-url' import { env } from '@/lib/core/config/env' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' @@ -139,11 +143,21 @@ export const POST = withRouteHandler((request: NextRequest) => } ) if (!settled) { - rootSpan.setAttribute(TraceAttr.CopilotAbortOutcome, CopilotAbortOutcome.SettleTimeout) - return NextResponse.json( - { error: 'Previous response is still shutting down', aborted, settled: false }, - { status: 409 } - ) + // The holder didn't settle within the grace window even though the + // user explicitly stopped it and abort markers are written on both + // sides (local + Go). Don't leave the chat hostage to a wedged + // handler: break its stream lock. This is safe by construction — + // releaseLock only deletes when the value still matches this + // streamId (never clobbers a newer stream), and the old handler's + // heartbeat uses extendLock-if-owner, so it observes the loss and + // stops heartbeating rather than re-asserting. + await releasePendingChatStream(chatId, streamId) + logger.warn('Stream did not settle after abort; force-released chat stream lock', { + chatId, + streamId, + }) + rootSpan.setAttribute(TraceAttr.CopilotAbortOutcome, CopilotAbortOutcome.ForceReleased) + return NextResponse.json({ aborted, settled: false, forceReleased: true }) } rootSpan.setAttribute(TraceAttr.CopilotAbortOutcome, CopilotAbortOutcome.Settled) return NextResponse.json({ aborted, settled: true }) diff --git a/apps/sim/lib/api/contracts/copilot.ts b/apps/sim/lib/api/contracts/copilot.ts index 3671177492..3da7fdd6d0 100644 --- a/apps/sim/lib/api/contracts/copilot.ts +++ b/apps/sim/lib/api/contracts/copilot.ts @@ -713,6 +713,9 @@ export const copilotChatAbortContract = defineRouteContract({ schema: z.object({ aborted: z.boolean(), settled: z.boolean().optional(), + // True when the stream did not settle within the grace window and the + // chat stream lock was force-broken so the chat is immediately usable. + forceReleased: z.boolean().optional(), }), }, }) diff --git a/apps/sim/lib/copilot/constants.ts b/apps/sim/lib/copilot/constants.ts index 4dd1935ce0..3028222f12 100644 --- a/apps/sim/lib/copilot/constants.ts +++ b/apps/sim/lib/copilot/constants.ts @@ -13,6 +13,26 @@ export const SIM_AGENT_API_URL = /** Default timeout for the copilot orchestration stream loop (60 min). */ export const ORCHESTRATION_TIMEOUT_MS = 3_600_000 +/** + * Watchdog cap for a single sim-executed copilot tool. A tool that neither + * resolves nor rejects within its cap is failed with a timeout error so the + * checkpoint loop can resume Go with an error result instead of wedging the + * chat (and its pending-stream lock) behind a hung await forever. + */ +export const TOOL_WATCHDOG_DEFAULT_MS = 60_000 + +/** + * Watchdog cap for tool classes with legitimately long runtimes (workflow + * executions, media/image generation, sandboxed code, deep research). Those + * tools carry their own inner budgets (plan execution timeouts, sandbox + * timeouts), so this cap only backstops a true hang and sits above all of + * them — matching ORCHESTRATION_TIMEOUT_MS so it never undercuts a legal run. + */ +export const TOOL_WATCHDOG_LONG_RUNNING_MS = ORCHESTRATION_TIMEOUT_MS + +/** Extra slack the resume gate allows past the slowest pending tool's watchdog. */ +export const TOOL_WATCHDOG_RESUME_GRACE_MS = 30_000 + /** Timeout for the client-side streaming response handler (60 min). */ export const STREAM_TIMEOUT_MS = 3_600_000 diff --git a/apps/sim/lib/copilot/generated/trace-attribute-values-v1.ts b/apps/sim/lib/copilot/generated/trace-attribute-values-v1.ts index a343f9e091..a6fe28e8fd 100644 --- a/apps/sim/lib/copilot/generated/trace-attribute-values-v1.ts +++ b/apps/sim/lib/copilot/generated/trace-attribute-values-v1.ts @@ -72,6 +72,7 @@ export type BillingRouteOutcomeValue = (typeof BillingRouteOutcome)[BillingRoute export const CopilotAbortOutcome = { BadRequest: 'bad_request', FallbackPersistFailed: 'fallback_persist_failed', + ForceReleased: 'force_released', MissingMessageId: 'missing_message_id', MissingStreamId: 'missing_stream_id', NoChatId: 'no_chat_id', diff --git a/apps/sim/lib/copilot/generated/trace-attributes-v1.ts b/apps/sim/lib/copilot/generated/trace-attributes-v1.ts index d3f930bd45..441ec59d16 100644 --- a/apps/sim/lib/copilot/generated/trace-attributes-v1.ts +++ b/apps/sim/lib/copilot/generated/trace-attributes-v1.ts @@ -270,6 +270,8 @@ export const TraceAttr = { CopilotVfsInputMediaTypeClaimed: 'copilot.vfs.input.media_type_claimed', CopilotVfsInputMediaTypeDetected: 'copilot.vfs.input.media_type_detected', CopilotVfsInputWidth: 'copilot.vfs.input.width', + CopilotVfsMaterializeFileCount: 'copilot.vfs.materialize.file_count', + CopilotVfsMaterializePhaseMs: 'copilot.vfs.materialize.phase_ms', CopilotVfsMetadataFailed: 'copilot.vfs.metadata.failed', CopilotVfsOutcome: 'copilot.vfs.outcome', CopilotVfsOutputBytes: 'copilot.vfs.output.bytes', @@ -877,6 +879,8 @@ export const TraceAttrValues: readonly TraceAttrValue[] = [ 'copilot.vfs.input.media_type_claimed', 'copilot.vfs.input.media_type_detected', 'copilot.vfs.input.width', + 'copilot.vfs.materialize.file_count', + 'copilot.vfs.materialize.phase_ms', 'copilot.vfs.metadata.failed', 'copilot.vfs.outcome', 'copilot.vfs.output.bytes', diff --git a/apps/sim/lib/copilot/generated/trace-spans-v1.ts b/apps/sim/lib/copilot/generated/trace-spans-v1.ts index eb5920a4dc..9ade1eabbf 100644 --- a/apps/sim/lib/copilot/generated/trace-spans-v1.ts +++ b/apps/sim/lib/copilot/generated/trace-spans-v1.ts @@ -66,6 +66,7 @@ export const TraceSpan = { CopilotToolsWriteCsvToTable: 'copilot.tools.write_csv_to_table', CopilotToolsWriteOutputFile: 'copilot.tools.write_output_file', CopilotToolsWriteOutputTable: 'copilot.tools.write_output_table', + CopilotVfsMaterialize: 'copilot.vfs.materialize', CopilotVfsPrepareImage: 'copilot.vfs.prepare_image', CopilotVfsReadFile: 'copilot.vfs.read_file', GenAiAgentExecute: 'gen_ai.agent.execute', @@ -140,6 +141,7 @@ export const TraceSpanValues: readonly TraceSpanValue[] = [ 'copilot.tools.write_csv_to_table', 'copilot.tools.write_output_file', 'copilot.tools.write_output_table', + 'copilot.vfs.materialize', 'copilot.vfs.prepare_image', 'copilot.vfs.read_file', 'gen_ai.agent.execute', diff --git a/apps/sim/lib/copilot/request/lifecycle/run.test.ts b/apps/sim/lib/copilot/request/lifecycle/run.test.ts index 95bd73c0c6..f49c6b1c9a 100644 --- a/apps/sim/lib/copilot/request/lifecycle/run.test.ts +++ b/apps/sim/lib/copilot/request/lifecycle/run.test.ts @@ -7,19 +7,23 @@ import type { ExecutionContext, StreamingContext } from '@/lib/copilot/request/t const { mockCreateRunSegment, + mockForceFailHungToolCall, mockGetEffectiveDecryptedEnv, mockGetMothershipBaseURL, mockGetMothershipSourceEnvHeaders, mockPrepareExecutionContext, mockRunStreamLoop, + mockToolWatchdogTimeoutMs, mockUpdateRunStatus, } = vi.hoisted(() => ({ mockCreateRunSegment: vi.fn(), + mockForceFailHungToolCall: vi.fn(), mockGetEffectiveDecryptedEnv: vi.fn(), mockGetMothershipBaseURL: vi.fn(), mockGetMothershipSourceEnvHeaders: vi.fn(), mockPrepareExecutionContext: vi.fn(), mockRunStreamLoop: vi.fn(), + mockToolWatchdogTimeoutMs: vi.fn(() => 60_000), mockUpdateRunStatus: vi.fn(), })) @@ -84,6 +88,8 @@ vi.mock('@/lib/copilot/request/tools/billing', () => ({ vi.mock('@/lib/copilot/request/tools/executor', () => ({ executeToolAndReport: vi.fn(), + forceFailHungToolCall: mockForceFailHungToolCall, + toolWatchdogTimeoutMs: mockToolWatchdogTimeoutMs, })) import { MothershipStreamV1ToolOutcome } from '@/lib/copilot/generated/mothership-stream-v1' @@ -583,4 +589,102 @@ describe('runCopilotLifecycle', () => { // Final attempt (2) is terminal → not flagged, so Go bills + surfaces it. expect(bodies[3].willRetryOnStreamError).toBeUndefined() }) + + it('force-fails a hung tool promise and resumes with an error result instead of wedging', async () => { + vi.useFakeTimers() + try { + const fetchUrls: string[] = [] + const bodies: Record[] = [] + const executionContext: ExecutionContext = { + userId: 'user-1', + workflowId: '', + workspaceId: 'ws-1', + chatId: 'chat-1', + decryptedEnvVars: {}, + } + + // Mirror the real helper: settle the tool call into a terminal error + // state so the resume loop can serialize an error result for it. + mockForceFailHungToolCall.mockImplementation( + async (toolCallId: string, context: StreamingContext, message: string) => { + const tool = context.toolCalls.get(toolCallId) + if (!tool) return + tool.status = MothershipStreamV1ToolOutcome.error + tool.endTime = Date.now() + tool.result = { success: false } + tool.error = message + } + ) + + // Initial leg checkpoints on an async tool whose promise NEVER settles — + // the exact shape of the prod incident (claimed, marked running, hung). + mockRunStreamLoop.mockImplementationOnce( + async ( + fetchUrl: string, + fetchOptions: RequestInit, + context: StreamingContext + ): Promise => { + fetchUrls.push(fetchUrl) + bodies.push(JSON.parse(String(fetchOptions.body))) + context.toolCalls.set('tool-hung', { + id: 'tool-hung', + name: 'read', + status: 'executing', + }) + context.pendingToolPromises.set('tool-hung', new Promise(() => {})) + context.awaitingAsyncContinuation = { + checkpointId: 'ckpt-1', + pendingToolCallIds: ['tool-hung'], + } + } + ) + + // Resume leg completes normally with the error result delivered. + mockRunStreamLoop.mockImplementationOnce( + async ( + fetchUrl: string, + fetchOptions: RequestInit, + context: StreamingContext + ): Promise => { + fetchUrls.push(fetchUrl) + bodies.push(JSON.parse(String(fetchOptions.body))) + context.accumulatedContent = 'The file read failed, but here is what I know.' + } + ) + + const lifecycle = runCopilotLifecycle( + { message: 'hello', messageId: 'stream-1' }, + { + userId: 'user-1', + workspaceId: 'ws-1', + chatId: 'chat-1', + executionId: 'exec-1', + runId: 'run-1', + executionContext, + } + ) + + // Wait budget = watchdog (60s, mocked) + resume grace (30s). Advance past it. + await vi.advanceTimersByTimeAsync(91_000) + const result = await lifecycle + + expect(mockForceFailHungToolCall).toHaveBeenCalledWith( + 'tool-hung', + expect.anything(), + expect.stringContaining('hung') + ) + expect(fetchUrls[1]).toBe('http://mothership.test/api/tools/resume') + expect(bodies[1].results).toEqual([ + expect.objectContaining({ + callId: 'tool-hung', + name: 'read', + success: false, + data: { error: expect.stringContaining('hung') }, + }), + ]) + expect(result.success).toBe(true) + } finally { + vi.useRealTimers() + } + }) }) diff --git a/apps/sim/lib/copilot/request/lifecycle/run.ts b/apps/sim/lib/copilot/request/lifecycle/run.ts index bf72de725b..01c597c635 100644 --- a/apps/sim/lib/copilot/request/lifecycle/run.ts +++ b/apps/sim/lib/copilot/request/lifecycle/run.ts @@ -5,7 +5,7 @@ import { sleep } from '@sim/utils/helpers' import { generateId } from '@sim/utils/id' import { isWorkspaceOnEnterprisePlan } from '@/lib/billing/core/subscription' import { createRunSegment, updateRunStatus } from '@/lib/copilot/async-runs/repository' -import { SIM_AGENT_VERSION } from '@/lib/copilot/constants' +import { SIM_AGENT_VERSION, TOOL_WATCHDOG_RESUME_GRACE_MS } from '@/lib/copilot/constants' import { MothershipStreamV1EventType, MothershipStreamV1RunKind, @@ -24,7 +24,11 @@ import { setTerminalToolCallState, } from '@/lib/copilot/request/tool-call-state' import { handleBillingLimitResponse } from '@/lib/copilot/request/tools/billing' -import { executeToolAndReport } from '@/lib/copilot/request/tools/executor' +import { + executeToolAndReport, + forceFailHungToolCall, + toolWatchdogTimeoutMs, +} from '@/lib/copilot/request/tools/executor' import type { TraceCollector } from '@/lib/copilot/request/trace' import { RequestTraceV1SpanStatus } from '@/lib/copilot/request/trace' import type { @@ -405,15 +409,48 @@ async function runCheckpointLoop( if (!continuation) break if (context.pendingToolPromises.size > 0) { + // Bounded by the slowest pending tool's watchdog plus grace. The + // per-tool watchdog already guarantees each promise settles; this gate + // is the structural backstop so that no tool failure mode — known or + // unknown — can park the checkpoint loop (and the chat's pending-stream + // lock) forever. + const waitBudgetMs = + Array.from(context.pendingToolPromises.keys()).reduce( + (max, toolCallId) => + Math.max(max, toolWatchdogTimeoutMs(context.toolCalls.get(toolCallId)?.name)), + 0 + ) + TOOL_WATCHDOG_RESUME_GRACE_MS const waitSpan = context.trace.startSpan('Wait for Tools', 'lifecycle.wait_tools', { checkpointId: continuation.checkpointId, pendingCount: context.pendingToolPromises.size, + waitBudgetMs, }) logger.info('Waiting for in-flight tool executions before resume', { checkpointId: continuation.checkpointId, pendingCount: context.pendingToolPromises.size, + waitBudgetMs, }) - await Promise.allSettled(context.pendingToolPromises.values()) + const settledInTime = await Promise.race([ + Promise.allSettled(context.pendingToolPromises.values()).then(() => true), + sleep(waitBudgetMs).then(() => false), + ]) + if (!settledInTime) { + const hungToolCallIds = Array.from(context.pendingToolPromises.keys()) + logger.error('Pending tool executions exceeded the resume wait budget; force-failing', { + checkpointId: continuation.checkpointId, + waitBudgetMs, + hungToolCallIds, + }) + for (const toolCallId of hungToolCallIds) { + await forceFailHungToolCall( + toolCallId, + context, + 'Tool execution hung on the Sim executor and was abandoned so the conversation could continue.' + ) + context.pendingToolPromises.delete(toolCallId) + } + } + waitSpan.attributes = { ...waitSpan.attributes, settledInTime } context.trace.endSpan(waitSpan) } diff --git a/apps/sim/lib/copilot/request/tools/executor.ts b/apps/sim/lib/copilot/request/tools/executor.ts index c5cb4141a0..d50705558a 100644 --- a/apps/sim/lib/copilot/request/tools/executor.ts +++ b/apps/sim/lib/copilot/request/tools/executor.ts @@ -9,6 +9,7 @@ import { markAsyncToolRunning, upsertAsyncToolCall, } from '@/lib/copilot/async-runs/repository' +import { TOOL_WATCHDOG_DEFAULT_MS, TOOL_WATCHDOG_LONG_RUNNING_MS } from '@/lib/copilot/constants' import { MothershipStreamV1AsyncToolRecordStatus, MothershipStreamV1EventType, @@ -17,7 +18,28 @@ import { MothershipStreamV1ToolOutcome, MothershipStreamV1ToolPhase, } from '@/lib/copilot/generated/mothership-stream-v1' -import { CreateWorkflow } from '@/lib/copilot/generated/tool-catalog-v1' +import { + CrawlWebsite, + CreateFile, + CreateWorkflow, + DownloadToWorkspaceFile, + EditContent, + Ffmpeg, + FunctionExecute, + GenerateAudio, + GenerateImage, + GenerateVideo, + KnowledgeBase, + MaterializeFile, + Media, + Research, + Run, + RunBlock, + RunFromBlock, + RunWorkflow, + RunWorkflowUntilBlock, + WorkspaceFile, +} from '@/lib/copilot/generated/tool-catalog-v1' import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1' import { publishToolConfirmation } from '@/lib/copilot/persistence/tool-confirm' import { withCopilotToolSpan } from '@/lib/copilot/request/otel' @@ -189,6 +211,124 @@ function abortRequested( ) } +/** + * Tool classes whose legitimate runtime can far exceed the default watchdog: + * workflow executions, sandboxed code, media/image/audio generation, deep + * research, large downloads, knowledge-base indexing, and file-content + * producers (create/edit/materialize hit the E2B doc compile/recalc/render + * pipeline on doc-backed files). They get the long watchdog cap; everything + * else (read/glob/grep/metadata CRUD/...) must settle within the strict + * default or be failed so the run can continue. + */ +const LONG_RUNNING_TOOL_IDS: ReadonlySet = new Set([ + Run.id, + RunBlock.id, + RunFromBlock.id, + RunWorkflow.id, + RunWorkflowUntilBlock.id, + FunctionExecute.id, + GenerateImage.id, + GenerateAudio.id, + GenerateVideo.id, + Ffmpeg.id, + Media.id, + Research.id, + CrawlWebsite.id, + KnowledgeBase.id, + DownloadToWorkspaceFile.id, + CreateFile.id, + EditContent.id, + MaterializeFile.id, + WorkspaceFile.id, +]) + +export function toolWatchdogTimeoutMs(toolName: string | undefined): number { + return toolName && LONG_RUNNING_TOOL_IDS.has(toolName) + ? TOOL_WATCHDOG_LONG_RUNNING_MS + : TOOL_WATCHDOG_DEFAULT_MS +} + +class ToolExecutionTimeoutError extends Error { + constructor(toolName: string, timeoutMs: number) { + super( + `Tool '${toolName}' timed out after ${Math.round(timeoutMs / 1000)}s on the Sim executor and was abandoned.` + ) + this.name = 'ToolExecutionTimeoutError' + } +} + +/** + * Execute a tool with a hard settlement guarantee. If the handler neither + * resolves nor rejects within the tool's watchdog cap, throw a timeout error + * so the standard failure path (persist failed row, publish terminal + * confirmation, resume Go with an error result) runs and the chat never + * wedges behind a hung await. The losing promise keeps running detached; its + * eventual settlement is ignored. + */ +async function executeToolWithWatchdog(toolCall: ToolCallState, execContext: ExecutionContext) { + const timeoutMs = toolWatchdogTimeoutMs(toolCall.name) + const execution = executeTool(toolCall.name, toolCall.params || {}, execContext) + let timer: ReturnType | undefined + try { + return await Promise.race([ + execution, + new Promise((_, reject) => { + timer = setTimeout( + () => reject(new ToolExecutionTimeoutError(toolCall.name, timeoutMs)), + timeoutMs + ) + }), + ]) + } finally { + if (timer) clearTimeout(timer) + // Swallow the abandoned promise's eventual rejection so it can't surface + // as an unhandled rejection after a watchdog loss. + execution.catch(() => {}) + } +} + +/** + * Last-resort settlement for a tool whose promise never settled (a hang the + * per-tool watchdog could not see, e.g. in post-processing or persistence). + * Records a terminal error state + failed async row so the checkpoint loop + * can resume Go with an error result instead of waiting forever. + */ +export async function forceFailHungToolCall( + toolCallId: string, + context: StreamingContext, + message: string +): Promise { + const toolCall = context.toolCalls.get(toolCallId) + if (!toolCall || toolCall.endTime || isTerminalToolCallStatus(toolCall.status)) return + setTerminalToolCallState(toolCall, { + status: MothershipStreamV1ToolOutcome.error, + error: message, + }) + logger.error('Force-failed hung tool call', { + toolCallId, + toolName: toolCall.name, + message, + }) + markToolResultSeen(toolCallId) + await completeAsyncToolCall({ + toolCallId, + status: MothershipStreamV1AsyncToolRecordStatus.failed, + result: { error: message }, + error: message, + }).catch((err) => { + logger.warn('Failed to persist force-failed async tool status', { + toolCallId, + error: toError(err).message, + }) + }) + publishTerminalToolConfirmation({ + toolCallId, + status: MothershipStreamV1ToolOutcome.error, + message, + data: { error: message }, + }) +} + function cancelledCompletion(message: string): AsyncToolCompletion { return buildCompletionSignal({ status: MothershipStreamV1ToolOutcome.cancelled, @@ -389,7 +529,7 @@ async function executeToolAndReportInner( try { ensureHandlersRegistered() - let result = await executeTool(toolCall.name, toolCall.params || {}, execContext) + let result = await executeToolWithWatchdog(toolCall, execContext) if (toolCall.endTime || isTerminalToolCallStatus(toolCall.status)) { endToolSpanFromTerminalState() return terminalCompletionFromToolCall(toolCall) diff --git a/apps/sim/lib/copilot/tool-executor/register-handlers.ts b/apps/sim/lib/copilot/tool-executor/register-handlers.ts index 789e5e16b1..41e31df57c 100644 --- a/apps/sim/lib/copilot/tool-executor/register-handlers.ts +++ b/apps/sim/lib/copilot/tool-executor/register-handlers.ts @@ -24,6 +24,7 @@ import { Glob as GlobTool, Grep as GrepTool, ListFolders, + ListIntegrationTools, ListUserWorkspaces, ListWorkspaceMcpServers, LoadDeployment, @@ -74,6 +75,7 @@ import { executeUpdateWorkspaceMcpServer, } from '../tools/handlers/deployment/manage' import { executeFunctionExecute } from '../tools/handlers/function-execute' +import { executeListIntegrationTools } from '../tools/handlers/integration-tools' import { executeCompleteJob, executeManageJob, @@ -192,6 +194,7 @@ function buildHandlerMap(): Record { [OpenResource.id]: h(executeOpenResource), [RestoreResource.id]: h(executeRestoreResource), [GetPlatformActions.id]: h(executeGetPlatformActions), + [ListIntegrationTools.id]: h(executeListIntegrationTools), [MaterializeFile.id]: h(executeMaterializeFile), [FunctionExecute.id]: h(executeFunctionExecute), diff --git a/apps/sim/lib/copilot/tools/handlers/integration-tools.ts b/apps/sim/lib/copilot/tools/handlers/integration-tools.ts new file mode 100644 index 0000000000..a3e307bac8 --- /dev/null +++ b/apps/sim/lib/copilot/tools/handlers/integration-tools.ts @@ -0,0 +1,39 @@ +import { getExposedIntegrationTools } from '@/lib/copilot/integration-tools' +import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types' +import { stripVersionSuffix } from '@/tools/utils' + +export async function executeListIntegrationTools( + params: Record, + _context: ExecutionContext +): Promise { + const raw = typeof params.integration === 'string' ? params.integration.trim() : '' + if (!raw) { + return { success: false, error: "Missing required parameter 'integration'" } + } + + const all = getExposedIntegrationTools() + const service = stripVersionSuffix(raw.toLowerCase()) + const matches = all.filter((tool) => tool.service === service) + + if (matches.length === 0) { + const services = Array.from(new Set(all.map((tool) => tool.service))).sort() + return { + success: false, + error: `Unknown integration "${raw}". Available integrations: ${services.join(', ')}`, + } + } + + return { + success: true, + output: { + integration: service, + note: 'Call load_integration_tool({tool_ids: [""]}) with the exact id before invoking an operation.', + tools: matches.map((tool) => ({ + id: tool.toolId, + operation: tool.operation, + name: tool.config.name, + description: tool.config.description, + })), + }, + } +} diff --git a/apps/sim/lib/copilot/vfs/workflow-alias-backing.ts b/apps/sim/lib/copilot/vfs/workflow-alias-backing.ts index 05082355b3..b57ea3a72c 100644 --- a/apps/sim/lib/copilot/vfs/workflow-alias-backing.ts +++ b/apps/sim/lib/copilot/vfs/workflow-alias-backing.ts @@ -1,7 +1,5 @@ import { db } from '@sim/db' import { workspaceFileFolder, workspaceFiles } from '@sim/db/schema' -import { createLogger } from '@sim/logger' -import { toError } from '@sim/utils/errors' import { and, eq, inArray, isNull } from 'drizzle-orm' import { WORKFLOW_CHANGELOG_BACKING_FOLDER, @@ -19,8 +17,6 @@ import { type WorkspaceFileRecord, } from '@/lib/uploads/contexts/workspace/workspace-file-manager' -const logger = createLogger('WorkflowAliasBacking') - export interface WorkflowAliasBacking { changelogFolderId: string plansRootFolderId: string @@ -97,24 +93,6 @@ export async function ensureWorkflowAliasBacking(args: { } } -export async function ensureWorkflowAliasBackingQuietly(args: { - workspaceId: string - userId: string - workflowId: string - workflowName?: string -}): Promise { - try { - return await ensureWorkflowAliasBacking(args) - } catch (error) { - logger.warn('Failed to ensure workflow alias backing', { - workspaceId: args.workspaceId, - workflowId: args.workflowId, - error: toError(error).message, - }) - return null - } -} - export async function ensureWorkspacePlanBacking(args: { workspaceId: string userId: string diff --git a/apps/sim/lib/copilot/vfs/workspace-vfs.ts b/apps/sim/lib/copilot/vfs/workspace-vfs.ts index b3a12fc671..8f710b7513 100644 --- a/apps/sim/lib/copilot/vfs/workspace-vfs.ts +++ b/apps/sim/lib/copilot/vfs/workspace-vfs.ts @@ -1,3 +1,4 @@ +import { trace } from '@opentelemetry/api' import { db } from '@sim/db' import { a2aAgent, @@ -23,7 +24,10 @@ import { buildWorkspaceMd, type WorkspaceMdData, } from '@/lib/copilot/chat/workspace-context' +import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1' +import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1' import { getExposedIntegrationTools } from '@/lib/copilot/integration-tools' +import { markSpanForError } from '@/lib/copilot/request/otel' import { compileDoc, getE2BDocFormat } from '@/lib/copilot/tools/server/files/doc-compile' import { extractDocText, isExtractableDocExt } from '@/lib/copilot/tools/server/files/doc-extract' import { runE2BCompiledCheck } from '@/lib/copilot/tools/server/files/doc-recalc' @@ -32,10 +36,7 @@ import { collectWorkflowFieldIssues, lintEditedWorkflowState, } from '@/lib/copilot/tools/server/workflow/edit-workflow/lint' -import { - collectUnresolvedReferences, - UNRESOLVABLE_AT_LINT_NOTE, -} from '@/lib/copilot/tools/server/workflow/edit-workflow/validation' +import { UNRESOLVABLE_AT_LINT_NOTE } from '@/lib/copilot/tools/server/workflow/edit-workflow/validation' import { extractDocumentStyle } from '@/lib/copilot/vfs/document-style' import { type FileReadResult, readFileRecord } from '@/lib/copilot/vfs/file-reader' import { normalizeVfsSegment } from '@/lib/copilot/vfs/normalize-segment' @@ -75,7 +76,6 @@ import { serializeVersions, serializeWorkflowMeta, } from '@/lib/copilot/vfs/serializers' -import { ensureWorkflowAliasBackingQuietly } from '@/lib/copilot/vfs/workflow-alias-backing' import { buildWorkflowAliasLinks, isWorkflowAliasBackingPath, @@ -393,63 +393,95 @@ export class WorkspaceVFS { this.files = new Map() this._workspaceId = workspaceId - const [ - wfSummary, - kbSummary, - tblSummary, - fileSummary, - envSummary, - toolsSummary, - mcpServersSummary, - skillsSummary, - taskSummary, - jobsSummary, - wsRow, - members, - ] = await Promise.all([ - this.materializeWorkflows(workspaceId, userId), - this.materializeKnowledgeBases(workspaceId, userId), - this.materializeTables(workspaceId), - this.materializeFiles(workspaceId), - this.materializeEnvironment(workspaceId, userId), - this.materializeCustomTools(workspaceId, userId), - this.materializeMcpServers(workspaceId), - this.materializeSkills(workspaceId), - this.materializeTasks(workspaceId, userId), - this.materializeJobs(workspaceId), - getWorkspaceWithOwner(workspaceId), - getUsersWithPermissions(workspaceId), - ]) - - const workspaceMdData = { - workspace: wsRow, - members, - workflows: wfSummary, - knowledgeBases: kbSummary, - tables: tblSummary, - files: fileSummary, - oauthIntegrations: envSummary.oauthIntegrations, - envVariables: envSummary.envVariables, - tasks: taskSummary, - customTools: toolsSummary, - mcpServers: mcpServersSummary, - skills: skillsSummary, - jobs: jobsSummary, + // Per-phase wall-clock, stamped on the span so a slow materialize in a + // trace names its bottleneck instead of showing up as unattributed dead + // time inside read/glob/grep (how the v0.7 lint.json regression hid). + const phaseMs: Record = {} + const timed = (phase: string, promise: Promise): Promise => { + const t0 = Date.now() + return promise.finally(() => { + phaseMs[phase] = Date.now() - t0 + }) } - this.files.set('WORKSPACE.md', buildWorkspaceMd(workspaceMdData)) - this.files.set('WORKSPACE_CONTEXT.md', buildWorkspaceContextMd(workspaceMdData)) + await trace + .getTracer('sim-copilot-vfs', '1.0.0') + .startActiveSpan( + TraceSpan.CopilotVfsMaterialize, + { attributes: { [TraceAttr.WorkspaceId]: workspaceId } }, + async (span) => { + try { + const [ + wfSummary, + kbSummary, + tblSummary, + fileSummary, + envSummary, + toolsSummary, + mcpServersSummary, + skillsSummary, + taskSummary, + jobsSummary, + wsRow, + members, + ] = await Promise.all([ + timed('workflows', this.materializeWorkflows(workspaceId)), + timed('knowledge_bases', this.materializeKnowledgeBases(workspaceId, userId)), + timed('tables', this.materializeTables(workspaceId)), + timed('files', this.materializeFiles(workspaceId)), + timed('environment', this.materializeEnvironment(workspaceId, userId)), + timed('custom_tools', this.materializeCustomTools(workspaceId, userId)), + timed('mcp_servers', this.materializeMcpServers(workspaceId)), + timed('skills', this.materializeSkills(workspaceId)), + timed('tasks', this.materializeTasks(workspaceId, userId)), + timed('jobs', this.materializeJobs(workspaceId)), + timed('workspace_row', getWorkspaceWithOwner(workspaceId)), + timed('members', getUsersWithPermissions(workspaceId)), + ]) + + const workspaceMdData = { + workspace: wsRow, + members, + workflows: wfSummary, + knowledgeBases: kbSummary, + tables: tblSummary, + files: fileSummary, + oauthIntegrations: envSummary.oauthIntegrations, + envVariables: envSummary.envVariables, + tasks: taskSummary, + customTools: toolsSummary, + mcpServers: mcpServersSummary, + skills: skillsSummary, + jobs: jobsSummary, + } - await this.materializeRecentlyDeleted(workspaceId, userId) + this.files.set('WORKSPACE.md', buildWorkspaceMd(workspaceMdData)) + this.files.set('WORKSPACE_CONTEXT.md', buildWorkspaceContextMd(workspaceMdData)) - for (const [path, content] of getStaticComponentFiles()) { - this.files.set(path, content) - } + await timed('recently_deleted', this.materializeRecentlyDeleted(workspaceId, userId)) + + for (const [path, content] of getStaticComponentFiles()) { + this.files.set(path, content) + } + + span.setAttributes({ + [TraceAttr.CopilotVfsMaterializeFileCount]: this.files.size, + [TraceAttr.CopilotVfsMaterializePhaseMs]: JSON.stringify(phaseMs), + }) + } catch (err) { + markSpanForError(span, err) + throw err + } finally { + span.end() + } + } + ) logger.info('VFS materialized', { workspaceId, fileCount: this.files.size, durationMs: Date.now() - start, + phaseMs, }) } @@ -972,10 +1004,7 @@ export class WorkspaceVFS { * workflows/{name}/ (if at workspace root) * Returns a summary for WORKSPACE.md generation. */ - private async materializeWorkflows( - workspaceId: string, - userId: string - ): Promise { + private async materializeWorkflows(workspaceId: string): Promise { const workflowArtifactsEnabled = isMothershipBetaFeaturesEnabled const [workflowRows, folderRows] = await Promise.all([ listWorkflows(workspaceId), @@ -984,18 +1013,12 @@ export class WorkspaceVFS { const folderPaths = this.buildFolderPaths(folderRows) - if (workflowArtifactsEnabled) { - await Promise.all( - workflowRows.map((wf) => - ensureWorkflowAliasBackingQuietly({ - workspaceId, - userId, - workflowId: wf.id, - workflowName: wf.name, - }) - ) - ) - } + // NOTE: materialization is a pure READ. Alias backing (changelog/plan + // folders + files) is ensured at write time — workflow create/rename + // (lib/workflows/utils) and alias writes (vfs/resource-writer, + // tools/server/files/workspace-file) — never here. Ensuring per workflow + // on every materialize meant N storage/DB writes per read tool call, and + // concurrent materializations contending on the same rows. const workspaceFiles = workflowArtifactsEnabled ? await listWorkspaceFiles(workspaceId, { includeReservedSystemFiles: true }) : [] @@ -1099,31 +1122,26 @@ export class WorkspaceVFS { // Dynamically-computed validation state (lint.json), derived from // the raw normalized state so subBlock values, advancedMode, // canonicalModes, and subflow edges are all available. + // + // CPU-only by design: tier-2 reference resolution + // (collectUnresolvedReferences) runs DB queries per selector field + // and is validated where it matters — at edit_workflow apply time. + // Running it here meant workflows × selectors sequential DB queries + // on every read/glob/grep call, which is what made `files/` reads + // take ~40s in large workspaces. try { const graphLint = lintEditedWorkflowState(normalized as any) const fieldIssues = collectWorkflowFieldIssues(normalized.blocks as any) - let unresolvedReferences: Awaited> = [] - try { - unresolvedReferences = await collectUnresolvedReferences(normalized as any, { - userId, - workspaceId, - }) - } catch (resolveErr) { - // Tier-2 resolution is best-effort; degrade to graph + config lint. - logger.warn('Failed to resolve workflow references for lint.json', { - workflowId: wf.id, - error: toError(resolveErr).message, - }) - } - this.files.set( `${prefix}lint.json`, JSON.stringify( { ...graphLint, fieldIssues, - unresolvedReferences, - notes: [UNRESOLVABLE_AT_LINT_NOTE], + notes: [ + UNRESOLVABLE_AT_LINT_NOTE, + 'Credential/resource reference resolution is validated when editing the workflow, not in this snapshot.', + ], }, null, 2