diff --git a/apps/sim/app/api/mcp/oauth/callback/route.ts b/apps/sim/app/api/mcp/oauth/callback/route.ts index 15b115c0b7c..08171fbc97c 100644 --- a/apps/sim/app/api/mcp/oauth/callback/route.ts +++ b/apps/sim/app/api/mcp/oauth/callback/route.ts @@ -167,9 +167,8 @@ export const GET = withRouteHandler(async (request: NextRequest) => { } try { - // discoverServerTools writes the result to this server's cache so the UI's - // immediate refetch hits it instead of re-fetching live. - await mcpService.discoverServerTools(session.user.id, server.id, server.workspaceId) + // forceRefresh: skip any stale cache from before re-auth. + await mcpService.discoverServerTools(session.user.id, server.id, server.workspaceId, true) } catch (e) { logger.warn('Post-auth tools refresh failed', toError(e).message) } diff --git a/apps/sim/app/api/mcp/servers/[id]/refresh/route.ts b/apps/sim/app/api/mcp/servers/[id]/refresh/route.ts index 7bab3fade1f..9f216ebf959 100644 --- a/apps/sim/app/api/mcp/servers/[id]/refresh/route.ts +++ b/apps/sim/app/api/mcp/servers/[id]/refresh/route.ts @@ -197,7 +197,12 @@ export const POST = withRouteHandler( } try { - discoveredTools = await mcpService.discoverServerTools(userId, serverId, workspaceId) + discoveredTools = await mcpService.discoverServerTools( + userId, + serverId, + workspaceId, + true + ) connectionStatus = 'connected' toolCount = discoveredTools.length logger.info(`[${requestId}] Discovered ${toolCount} tools from server ${serverId}`) diff --git a/apps/sim/app/api/mcp/tools/discover/route.ts b/apps/sim/app/api/mcp/tools/discover/route.ts index b125fa7ff2b..612788b4875 100644 --- a/apps/sim/app/api/mcp/tools/discover/route.ts +++ b/apps/sim/app/api/mcp/tools/discover/route.ts @@ -28,7 +28,7 @@ export const GET = withRouteHandler( logger.info(`[${requestId}] Discovering MCP tools`, { serverId, workspaceId, forceRefresh }) const tools = serverId - ? await mcpService.discoverServerTools(userId, serverId, workspaceId) + ? await mcpService.discoverServerTools(userId, serverId, workspaceId, forceRefresh) : await mcpService.discoverTools(userId, workspaceId, forceRefresh) const byServer: Record = {} @@ -76,7 +76,7 @@ export const POST = withRouteHandler( const results = await Promise.allSettled( serverIds.map(async (serverId: string) => { - const tools = await mcpService.discoverServerTools(userId, serverId, workspaceId) + const tools = await mcpService.discoverServerTools(userId, serverId, workspaceId, true) return { serverId, toolCount: tools.length } }) ) diff --git a/apps/sim/components/icons.tsx b/apps/sim/components/icons.tsx index cd326d6220a..5f79d8ad05c 100644 --- a/apps/sim/components/icons.tsx +++ b/apps/sim/components/icons.tsx @@ -524,7 +524,7 @@ export function HubspotIcon(props: SVGProps) { xmlns='http://www.w3.org/2000/svg' fill='currentColor' > - + ) } @@ -2284,23 +2284,26 @@ export function ElevenLabsIcon(props: SVGProps) { } export function FindymailIcon(props: SVGProps) { + const id = useId() + const gradient0 = `findymail_paint0_${id}` + const gradient1 = `findymail_paint1_${id}` return ( ) { ) => ( ) export const ResendIcon = (props: SVGProps) => ( - + Promise + refreshTools: () => Promise getToolsByServer: (serverId: string) => McpToolForUI[] } export function useMcpTools(workspaceId: string): UseMcpToolsResult { const queryClient = useQueryClient() - const { data: mcpToolsData = [], isLoading, error: queryError } = useMcpToolsQuery(workspaceId) + const { data: mcpToolsData, isLoading, error: queryError } = useMcpToolsQuery(workspaceId) const mcpTools = useMemo(() => { return mcpToolsData.map((tool) => ({ @@ -55,22 +55,17 @@ export function useMcpTools(workspaceId: string): UseMcpToolsResult { })) }, [mcpToolsData]) - const refreshTools = useCallback( - async (forceRefresh = false) => { - if (!workspaceId) { - logger.warn('Cannot refresh tools: no workspaceId provided') - return - } + // Soft refresh — invalidate per-server entries. For cache-bypass, use `useForceRefreshMcpTools`. + const refreshTools = useCallback(async () => { + if (!workspaceId) { + logger.warn('Cannot refresh tools: no workspaceId provided') + return + } - logger.info('Refreshing MCP tools', { forceRefresh, workspaceId }) - - await queryClient.invalidateQueries({ - queryKey: mcpKeys.toolsList(workspaceId), - refetchType: forceRefresh ? 'active' : 'all', - }) - }, - [workspaceId, queryClient] - ) + await queryClient.invalidateQueries({ + queryKey: mcpKeys.serverToolsWorkspace(workspaceId), + }) + }, [workspaceId, queryClient]) const getToolsByServer = useCallback( (serverId: string): McpToolForUI[] => { diff --git a/apps/sim/hooks/queries/mcp.ts b/apps/sim/hooks/queries/mcp.ts index 7652df84e75..9f483a4fef7 100644 --- a/apps/sim/hooks/queries/mcp.ts +++ b/apps/sim/hooks/queries/mcp.ts @@ -1,7 +1,13 @@ -import { useEffect } from 'react' +import { useEffect, useMemo } from 'react' import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' -import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query' +import { + keepPreviousData, + useMutation, + useQueries, + useQuery, + useQueryClient, +} from '@tanstack/react-query' import { ApiClientError } from '@/lib/api/client/errors' import { requestJson } from '@/lib/api/client/request' import { @@ -39,8 +45,11 @@ export const mcpKeys = { all: ['mcp'] as const, servers: () => [...mcpKeys.all, 'servers'] as const, serversList: (workspaceId?: string) => [...mcpKeys.servers(), workspaceId ?? ''] as const, - tools: () => [...mcpKeys.all, 'tools'] as const, - toolsList: (workspaceId?: string) => [...mcpKeys.tools(), workspaceId ?? ''] as const, + serverTools: () => [...mcpKeys.all, 'serverTools'] as const, + serverToolsWorkspace: (workspaceId?: string) => + [...mcpKeys.serverTools(), workspaceId ?? ''] as const, + serverToolsList: (workspaceId?: string, serverId?: string) => + [...mcpKeys.serverToolsWorkspace(workspaceId), serverId ?? ''] as const, storedTools: () => [...mcpKeys.all, 'storedTools'] as const, storedToolsList: (workspaceId?: string) => [...mcpKeys.storedTools(), workspaceId ?? ''] as const, allowedDomains: () => [...mcpKeys.all, 'allowedDomains'] as const, @@ -92,11 +101,16 @@ export function useMcpServers(workspaceId: string) { async function fetchMcpTools( workspaceId: string, forceRefresh = false, - signal?: AbortSignal + signal?: AbortSignal, + serverId?: string ): Promise { try { const data = await requestJson(discoverMcpToolsContract, { - query: { workspaceId, refresh: forceRefresh || undefined }, + query: { + workspaceId, + refresh: forceRefresh || undefined, + ...(serverId ? { serverId } : {}), + }, signal, }) return data.data.tools @@ -108,24 +122,93 @@ async function fetchMcpTools( } } +/** + * Workspace aggregate derived from N parallel per-server queries via + * `useQueries`. One slow server cannot block the others. + */ export function useMcpToolsQuery(workspaceId: string) { - return useQuery({ - queryKey: mcpKeys.toolsList(workspaceId), - queryFn: ({ signal }) => fetchMcpTools(workspaceId, false, signal), - enabled: !!workspaceId, - retry: false, - staleTime: 30 * 1000, - placeholderData: keepPreviousData, + const { data: servers, isLoading: serversLoading } = useMcpServers(workspaceId) + + // Skip disabled rows (would 404 → negative-cache) and rows from a previous + // workspace (keepPreviousData on useMcpServers). + const serverIds = useMemo( + () => + servers + ? servers + .filter((s) => s.enabled && s.workspaceId === workspaceId) + .map((s) => s.id) + .sort() + : [], + [servers, workspaceId] + ) + + const results = useQueries({ + queries: serverIds.map((serverId) => ({ + queryKey: mcpKeys.serverToolsList(workspaceId, serverId), + queryFn: ({ signal }: { signal?: AbortSignal }) => + fetchMcpTools(workspaceId, false, signal, serverId), + enabled: !!workspaceId, + retry: false, + staleTime: 30 * 1000, + refetchOnWindowFocus: false, + })), }) + + return useMemo(() => { + const tools: McpTool[] = [] + let hasData = false + let anyServerLoading = false + let firstError: Error | null = null + for (const result of results) { + // Drop stale data from servers whose latest refetch errored. + if (result.data && !result.isError) { + tools.push(...result.data) + hasData = true + } + if (result.isLoading) anyServerLoading = true + if (!firstError && result.error instanceof Error) firstError = result.error + } + return { + data: tools, + isLoading: (serversLoading || anyServerLoading) && !hasData, + isFetching: serversLoading || results.some((r) => r.isFetching), + // Suppress when any healthy server rendered; per-server errors live in `perServer`. + error: hasData ? null : firstError, + perServer: results, + } + }, [results, serversLoading]) } export function useForceRefreshMcpTools() { const queryClient = useQueryClient() return useMutation({ - mutationFn: (workspaceId: string) => fetchMcpTools(workspaceId, true), + mutationFn: async (workspaceId: string) => { + const allServers = + queryClient.getQueryData(mcpKeys.serversList(workspaceId)) ?? [] + const servers = allServers.filter((s) => s.enabled && s.workspaceId === workspaceId) + const results = await Promise.allSettled( + servers.map(async (server) => { + const tools = await fetchMcpTools(workspaceId, true, undefined, server.id) + queryClient.setQueryData(mcpKeys.serverToolsList(workspaceId, server.id), tools) + return tools + }) + ) + results.forEach((result, index) => { + if (result.status === 'rejected') { + const failedServer = servers[index] + if (failedServer) { + queryClient.invalidateQueries({ + queryKey: mcpKeys.serverToolsList(workspaceId, failedServer.id), + }) + } + } + }) + return results + .filter((r): r is PromiseFulfilledResult => r.status === 'fulfilled') + .flatMap((r) => r.value) + }, onSettled: (_data, _error, workspaceId) => { - queryClient.invalidateQueries({ queryKey: mcpKeys.toolsList(workspaceId) }) queryClient.invalidateQueries({ queryKey: mcpKeys.serversList(workspaceId) }) queryClient.invalidateQueries({ queryKey: mcpKeys.storedToolsList(workspaceId) }) }, @@ -175,7 +258,9 @@ export function useCreateMcpServer() { }, onSettled: (_data, _error, variables) => { queryClient.invalidateQueries({ queryKey: mcpKeys.serversList(variables.workspaceId) }) - queryClient.invalidateQueries({ queryKey: mcpKeys.toolsList(variables.workspaceId) }) + queryClient.invalidateQueries({ + queryKey: mcpKeys.serverToolsWorkspace(variables.workspaceId), + }) }, }) } @@ -237,7 +322,9 @@ export function useDeleteMcpServer() { }, onSettled: (_data, _error, variables) => { queryClient.invalidateQueries({ queryKey: mcpKeys.serversList(variables.workspaceId) }) - queryClient.invalidateQueries({ queryKey: mcpKeys.toolsList(variables.workspaceId) }) + queryClient.removeQueries({ + queryKey: mcpKeys.serverToolsList(variables.workspaceId, variables.serverId), + }) queryClient.invalidateQueries({ queryKey: mcpKeys.storedToolsList(variables.workspaceId) }) }, }) @@ -304,7 +391,9 @@ export function useUpdateMcpServer() { }, onSettled: (_data, _error, variables) => { queryClient.invalidateQueries({ queryKey: mcpKeys.serversList(variables.workspaceId) }) - queryClient.invalidateQueries({ queryKey: mcpKeys.toolsList(variables.workspaceId) }) + queryClient.invalidateQueries({ + queryKey: mcpKeys.serverToolsList(variables.workspaceId, variables.serverId), + }) }, }) } @@ -334,7 +423,9 @@ export function useRefreshMcpServer() { }, onSettled: (_data, _error, variables) => { queryClient.invalidateQueries({ queryKey: mcpKeys.serversList(variables.workspaceId) }) - queryClient.invalidateQueries({ queryKey: mcpKeys.toolsList(variables.workspaceId) }) + queryClient.invalidateQueries({ + queryKey: mcpKeys.serverToolsList(variables.workspaceId, variables.serverId), + }) queryClient.invalidateQueries({ queryKey: mcpKeys.storedToolsList(variables.workspaceId) }) }, }) @@ -386,8 +477,14 @@ export function useMcpToolsEvents(workspaceId: string) { useEffect(() => { if (!workspaceId) return - const invalidate = () => { - queryClient.invalidateQueries({ queryKey: mcpKeys.toolsList(workspaceId) }) + const invalidate = (serverId?: string) => { + if (serverId) { + queryClient.invalidateQueries({ + queryKey: mcpKeys.serverToolsList(workspaceId, serverId), + }) + } else { + queryClient.invalidateQueries({ queryKey: mcpKeys.serverToolsWorkspace(workspaceId) }) + } queryClient.invalidateQueries({ queryKey: mcpKeys.serversList(workspaceId) }) queryClient.invalidateQueries({ queryKey: mcpKeys.storedToolsList(workspaceId) }) queryClient.invalidateQueries({ queryKey: workflowMcpServerKeys.all }) @@ -398,8 +495,15 @@ export function useMcpToolsEvents(workspaceId: string) { if (!entry) { const source = new EventSource(`/api/mcp/events?workspaceId=${workspaceId}`) - source.addEventListener('tools_changed', () => { - invalidate() + source.addEventListener('tools_changed', (e) => { + let serverId: string | undefined + try { + const parsed = JSON.parse((e as MessageEvent).data) as { serverId?: string } + serverId = parsed.serverId + } catch { + // Non-JSON payload → workspace-wide fallback. + } + invalidate(serverId) }) source.onerror = () => { diff --git a/apps/sim/lib/mcp/service.test.ts b/apps/sim/lib/mcp/service.test.ts index 42d111640d2..11a2631d34e 100644 --- a/apps/sim/lib/mcp/service.test.ts +++ b/apps/sim/lib/mcp/service.test.ts @@ -124,6 +124,9 @@ function tool(name: string, serverId: string) { describe('McpService.discoverTools per-server caching', () => { beforeEach(async () => { vi.clearAllMocks() + // `clearAllMocks` does not drain `.mockResolvedValueOnce` queues; reset + // listTools so a previous test's unconsumed mock doesn't leak into the next. + mockListTools.mockReset() mockIsDomainAllowed.mockReturnValue(true) mockValidateSsrf.mockResolvedValue('1.2.3.4') mockValidateDomain.mockImplementation(() => undefined) @@ -162,11 +165,14 @@ describe('McpService.discoverTools per-server caching', () => { expect(first.map((t) => t.name)).toEqual(['a1']) mockListTools.mockClear() - mockListTools.mockResolvedValueOnce([tool('b1', 'mcp-b')]) + // a1's positive cache is intact (the failure didn't poison it). b is now + // negative-cached so it's skipped instead of re-blocking — see + // "negative-caches a failed server so the next discoverTools skips it" + // below for the full assertion. const second = await mcpService.discoverTools(USER_ID, WORKSPACE_ID) - expect(second.map((t) => t.name).sort()).toEqual(['a1', 'b1']) - expect(mockListTools).toHaveBeenCalledTimes(1) + expect(second.map((t) => t.name)).toEqual(['a1']) + expect(mockListTools).not.toHaveBeenCalled() }) it("forceRefresh bypasses every server's cache", async () => { @@ -259,4 +265,67 @@ describe('McpService.discoverTools per-server caching', () => { expect(second.map((t) => t.name)).toEqual(['a1']) expect(mockListTools).not.toHaveBeenCalled() }) + + it('negative-caches a failed server so the next discoverTools skips it', async () => { + mockGetWorkspaceServersRows.mockResolvedValue([dbRow('mcp-a', 'A'), dbRow('mcp-b', 'B')]) + mockListTools + .mockResolvedValueOnce([tool('a1', 'mcp-a')]) + .mockRejectedValueOnce(new Error('Request timed out')) + + await mcpService.discoverTools(USER_ID, WORKSPACE_ID) + expect(mockListTools).toHaveBeenCalledTimes(2) + + mockListTools.mockClear() + // Second call: a1 is success-cached, b is failure-cached. Neither should + // hit the live transport — the slow server no longer blocks the response. + const second = await mcpService.discoverTools(USER_ID, WORKSPACE_ID) + expect(second.map((t) => t.name)).toEqual(['a1']) + expect(mockListTools).not.toHaveBeenCalled() + }) + + it('successful discoverServerTools clears the negative cache', async () => { + mockGetWorkspaceServersRows.mockResolvedValue([dbRow('mcp-a', 'A')]) + mockListTools.mockRejectedValueOnce(new Error('Request timed out')) + + await expect(mcpService.discoverServerTools(USER_ID, 'mcp-a', WORKSPACE_ID)).rejects.toThrow( + 'Request timed out' + ) + + // After the failure the negative cache is set, so the next default call + // short-circuits without re-paying the listTools timeout. + mockListTools.mockClear() + await expect(mcpService.discoverServerTools(USER_ID, 'mcp-a', WORKSPACE_ID)).rejects.toThrow( + 'cooldown' + ) + expect(mockListTools).not.toHaveBeenCalled() + + // Reconnecting via the explicit-refresh path (refresh button / OAuth + // callback) bypasses both caches and brings the server back to live. + mockListTools.mockResolvedValueOnce([tool('a1', 'mcp-a')]) + const tools = await mcpService.discoverServerTools(USER_ID, 'mcp-a', WORKSPACE_ID, true) + expect(tools.map((t) => t.name)).toEqual(['a1']) + + // discoverTools now sees the cleared negative cache + primed positive cache. + mockListTools.mockClear() + const after = await mcpService.discoverTools(USER_ID, WORKSPACE_ID) + expect(after.map((t) => t.name)).toEqual(['a1']) + expect(mockListTools).not.toHaveBeenCalled() + }) + + it('does not negative-cache OAuth-required errors', async () => { + mockGetWorkspaceServersRows.mockResolvedValue([dbRow('mcp-a', 'A')]) + mockListTools.mockRejectedValueOnce(new McpOauthAuthorizationRequiredError('mcp-a', 'A')) + + await mcpService.discoverTools(USER_ID, WORKSPACE_ID) + expect(mockListTools).toHaveBeenCalledTimes(1) + + // Second call must still attempt the live transport — OAuth re-auth has + // its own pathway and a stale negative cache would make reconnects + // silently fail until the TTL expired. + mockListTools.mockClear() + mockListTools.mockResolvedValueOnce([tool('a1', 'mcp-a')]) + const after = await mcpService.discoverTools(USER_ID, WORKSPACE_ID) + expect(after.map((t) => t.name)).toEqual(['a1']) + expect(mockListTools).toHaveBeenCalledTimes(1) + }) }) diff --git a/apps/sim/lib/mcp/service.ts b/apps/sim/lib/mcp/service.ts index b024c179300..113998c61aa 100644 --- a/apps/sim/lib/mcp/service.ts +++ b/apps/sim/lib/mcp/service.ts @@ -32,6 +32,7 @@ import { type McpCacheStorageAdapter, } from '@/lib/mcp/storage' import { + McpConnectionError, McpOauthAuthorizationRequiredError, type McpServerConfig, type McpServerStatusConfig, @@ -41,15 +42,20 @@ import { type McpToolResult, type McpTransport, } from '@/lib/mcp/types' -import { MCP_CONSTANTS } from '@/lib/mcp/utils' +import { MCP_CLIENT_CONSTANTS, MCP_CONSTANTS } from '@/lib/mcp/utils' const logger = createLogger('McpService') -// Per-server keys so one slow server can't invalidate another's cached tools. function serverCacheKey(workspaceId: string, serverId: string): string { return `workspace:${workspaceId}:server:${serverId}` } +function failureCacheKey(workspaceId: string, serverId: string): string { + return `workspace:${workspaceId}:server:${serverId}:failure` +} + +const FAILURE_CACHE_SENTINEL: McpTool[] = [] + type DiscoveryOutcome = | { kind: 'cached'; tools: McpTool[] } | { @@ -59,12 +65,17 @@ type DiscoveryOutcome = resolvedIP: string | null } | { kind: 'oauth-pending' } - | { kind: 'error'; message: string } + | { kind: 'unhealthy' } + // originalError preserves the type so markServerUnhealthy's instanceof + // exemption survives the getErrorMessage call. + | { kind: 'error'; message: string; originalError: unknown } class McpService { private cacheAdapter: McpCacheStorageAdapter private readonly cacheTimeout = MCP_CONSTANTS.CACHE_TIMEOUT private unsubscribeConnectionManager?: () => void + // Keyed on (workspaceId, serverId, userId) — OAuth-scoped tokens vary per user. + private inflightServerDiscovery = new Map>() constructor() { this.cacheAdapter = createMcpCacheAdapter() @@ -77,6 +88,14 @@ class McpService { .catch((err) => logger.warn(`Failed to invalidate cache for ${event.serverName} on listChanged:`, err) ) + this.cacheAdapter + .delete(failureCacheKey(event.workspaceId, event.serverId)) + .catch((err) => + logger.warn( + `Failed to invalidate failure cache for ${event.serverName} on listChanged:`, + err + ) + ) }) } } @@ -389,6 +408,46 @@ class McpService { } } + /** + * Negative-cache a discovery failure. OAuth-required errors are exempt so + * reconnects retry immediately. + */ + private async markServerUnhealthy( + workspaceId: string, + serverId: string, + error: unknown + ): Promise { + if (error instanceof McpOauthAuthorizationRequiredError || error instanceof UnauthorizedError) { + return + } + try { + await this.cacheAdapter.set( + failureCacheKey(workspaceId, serverId), + FAILURE_CACHE_SENTINEL, + MCP_CLIENT_CONSTANTS.FAILURE_CACHE_TTL_MS + ) + } catch (err) { + logger.warn(`Failed to write failure cache for server ${serverId}:`, err) + } + } + + private async isServerUnhealthy(workspaceId: string, serverId: string): Promise { + try { + const entry = await this.cacheAdapter.get(failureCacheKey(workspaceId, serverId)) + return entry !== null + } catch { + return false + } + } + + private async clearServerFailure(workspaceId: string, serverId: string): Promise { + try { + await this.cacheAdapter.delete(failureCacheKey(workspaceId, serverId)) + } catch (err) { + logger.warn(`Failed to clear failure cache for server ${serverId}:`, err) + } + } + /** * Discover tools from all workspace servers */ @@ -423,6 +482,12 @@ class McpService { error ) } + if (await this.isServerUnhealthy(workspaceId, config.id)) { + logger.info( + `[${requestId}] Skipping recently-failed server ${config.name} (negative-cache hit)` + ) + return { kind: 'unhealthy' } + } } try { @@ -448,7 +513,11 @@ class McpService { ) { return { kind: 'oauth-pending' } } - return { kind: 'error', message: getErrorMessage(error, 'Unknown error') } + return { + kind: 'error', + message: getErrorMessage(error, 'Unknown error'), + originalError: error, + } } }) ) @@ -484,6 +553,7 @@ class McpService { logger.warn(`[${requestId}] Cache write failed for ${server.name}:`, err) ) ) + deferredSideEffects.push(this.clearServerFailure(workspaceId, server.id)) liveConnections.push({ resolvedConfig: outcome.resolvedConfig, resolvedIP: outcome.resolvedIP, @@ -509,12 +579,23 @@ class McpService { ) return } + if (outcome.kind === 'unhealthy') { + // Status was persisted on the original failure; nothing to re-write. + failedCount++ + return + } failedCount++ logger.warn( `[${requestId}] Failed to discover tools from server ${server.name}: ${outcome.message}` ) deferredSideEffects.push( - this.updateServerStatus(server.id, workspaceId, false, outcome.message) + this.updateServerStatus(server.id, workspaceId, false, outcome.message), + this.markServerUnhealthy(workspaceId, server.id, outcome.originalError), + this.cacheAdapter + .delete(serverCacheKey(workspaceId, server.id)) + .catch((err) => + logger.warn(`[${requestId}] Cache delete failed for ${server.name}:`, err) + ) ) }) @@ -548,17 +629,62 @@ class McpService { } /** - * Discover tools from a specific server with retry logic for session errors. - * Retries once on session-related errors (400, 404, session ID issues). + * Discover tools from one server. Cache-aside by default; pass + * `forceRefresh: true` from explicit-refresh paths (refresh button, OAuth + * callback) to bypass both positive and negative caches. Concurrent callers + * for the same `(workspaceId, serverId, userId, forceRefresh)` share one + * upstream request. */ async discoverServerTools( userId: string, serverId: string, - workspaceId: string + workspaceId: string, + forceRefresh = false + ): Promise { + const inflightKey = `${workspaceId}:${serverId}:${userId}:${forceRefresh ? 'force' : 'cache'}` + const existing = this.inflightServerDiscovery.get(inflightKey) + if (existing) return existing + + const promise = this.discoverServerToolsImpl( + userId, + serverId, + workspaceId, + forceRefresh + ).finally(() => { + this.inflightServerDiscovery.delete(inflightKey) + }) + this.inflightServerDiscovery.set(inflightKey, promise) + return promise + } + + private async discoverServerToolsImpl( + userId: string, + serverId: string, + workspaceId: string, + forceRefresh: boolean ): Promise { const requestId = generateRequestId() const maxRetries = 2 + if (!forceRefresh) { + try { + const cached = await this.cacheAdapter.get(serverCacheKey(workspaceId, serverId)) + if (cached) { + logger.debug(`[${requestId}] Cache hit for server ${serverId}`) + return cached.tools + } + } catch (error) { + logger.warn(`[${requestId}] Cache read failed for server ${serverId}:`, error) + } + if (await this.isServerUnhealthy(workspaceId, serverId)) { + logger.info(`[${requestId}] Skipping recently-failed server ${serverId} (negative-cache)`) + throw new McpConnectionError( + 'Server recently failed and is in cooldown — try again shortly.', + serverId + ) + } + } + for (let attempt = 0; attempt < maxRetries; attempt++) { try { logger.info( @@ -580,15 +706,13 @@ class McpService { try { const tools = await client.listTools() logger.info(`[${requestId}] Discovered ${tools.length} tools from server ${config.name}`) - // Prime the per-server cache and reflect the successful connection on - // the row so the UI doesn't keep showing "Connect with OAuth" or stale - // disconnected/error state. await Promise.allSettled([ this.cacheAdapter .set(serverCacheKey(workspaceId, serverId), tools, this.cacheTimeout) .catch((err) => logger.warn(`[${requestId}] Cache write failed for ${config.name}:`, err) ), + this.clearServerFailure(workspaceId, serverId), this.updateServerStatus(serverId, workspaceId, true, undefined, tools.length), ]) return tools @@ -604,6 +728,15 @@ class McpService { await sleep(100) continue } + // Drop positive cache so a follow-up doesn't return stale tools. + await Promise.allSettled([ + this.cacheAdapter + .delete(serverCacheKey(workspaceId, serverId)) + .catch((err) => + logger.warn(`[${requestId}] Cache delete failed for ${serverId}:`, err) + ), + this.markServerUnhealthy(workspaceId, serverId, error), + ]) throw error } } @@ -692,7 +825,10 @@ class McpService { .from(mcpServers) .where(eq(mcpServers.workspaceId, workspaceId)) await Promise.allSettled( - rows.map((r) => this.cacheAdapter.delete(serverCacheKey(workspaceId, r.id))) + rows.flatMap((r) => [ + this.cacheAdapter.delete(serverCacheKey(workspaceId, r.id)), + this.cacheAdapter.delete(failureCacheKey(workspaceId, r.id)), + ]) ) logger.debug(`Cleared MCP tool cache for workspace ${workspaceId} (${rows.length} servers)`) } else { diff --git a/apps/sim/lib/mcp/utils.test.ts b/apps/sim/lib/mcp/utils.test.ts index 3a23c5bce26..29aded1358e 100644 --- a/apps/sim/lib/mcp/utils.test.ts +++ b/apps/sim/lib/mcp/utils.test.ts @@ -284,6 +284,13 @@ describe('categorizeError', () => { expect(result.message).toBe('Invalid request parameters') }) + it.concurrent('returns 503 for cooldown errors', () => { + const error = new Error('Server recently failed and is in cooldown — try again shortly.') + const result = categorizeError(error) + expect(result.status).toBe(503) + expect(result.message).toBe('Server temporarily unavailable') + }) + it.concurrent('returns 500 for generic errors', () => { const error = new Error('Something went wrong') const result = categorizeError(error) diff --git a/apps/sim/lib/mcp/utils.ts b/apps/sim/lib/mcp/utils.ts index af16621567e..6364cafb111 100644 --- a/apps/sim/lib/mcp/utils.ts +++ b/apps/sim/lib/mcp/utils.ts @@ -46,8 +46,8 @@ export function sanitizeHeaders( export const MCP_CLIENT_CONSTANTS = { CLIENT_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS, AUTO_REFRESH_INTERVAL: 5 * 60 * 1000, - // Cap metadata calls so a slow upstream can't hang the UI for 60s+. - LIST_TOOLS_TIMEOUT_MS: 30_000, + LIST_TOOLS_TIMEOUT_MS: 10_000, + FAILURE_CACHE_TTL_MS: 120_000, } as const /** @@ -143,6 +143,10 @@ export function categorizeError(error: unknown): { message: string; status: numb return { message: 'Request timed out', status: 408 } } + if (msg.includes('cooldown')) { + return { message: 'Server temporarily unavailable', status: 503 } + } + if (msg.includes('not found') || msg.includes('not accessible')) { return { message: 'Resource not found', status: 404 } }