diff --git a/agent/agent.go b/agent/agent.go index 804efda255a2f..4d9feb807e677 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -495,7 +495,7 @@ func (a *agent) init() { } return "" }, a.contextConfig) - a.mcpAPI = agentmcp.NewAPI(a.logger.Named("mcp"), a.mcpManager, a.contextConfigAPI.MCPConfigFiles) + a.mcpAPI = agentmcp.NewAPI(a.mcpManager) // agentcontext.Manager is the new consolidated resolver, // watcher, and pusher. It coexists with contextConfigAPI @@ -513,16 +513,19 @@ func (a *agent) init() { Clock: a.clock, WorkingDir: workingDirFn, InitialSources: initialContextSources(a.contextConfig, workingDirFn), - // The manager runs its own self-contained MCP runner: it - // connects to the .mcp.json servers it discovers, lists - // their tools, and pushes them to coderd as KindMCPServer - // resources. This is independent of a.mcpManager, which - // serves the agent's MCP HTTP API; the two MCP paths share - // no state during the rollout. - MCPExecer: a.execer, - MCPUpdateEnv: a.updateCommandEnv, + // The manager surfaces MCP servers and their tools as + // KindMCPServer resources by reading the shared MCP engine's + // catalog (a.mcpManager). That engine owns the single set of + // MCP server connections used for both discovery and tool-call + // execution, so each declared server is launched once. + MCPCatalog: func() []agentcontext.MCPServerStatus { + return mcpCatalogToContext(a.mcpManager.Catalog()) + }, }) a.contextAPI = agentcontext.NewAPI(a.contextManager) + // Re-resolve and re-push KindMCPServer resources whenever the MCP + // engine's catalog changes (startup connect, .mcp.json edits). + a.mcpManager.SetOnReload(a.contextManager.Trigger) a.reconnectingPTYServer = reconnectingpty.NewServer( a.logger.Named("reconnecting-pty"), a.sshServer, @@ -1553,7 +1556,6 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, // lifecycle transition to avoid delaying Ready. // This runs inside the tracked goroutine so it // is properly awaited on shutdown. - a.mcpManager.MarkStartupSettled() if mcpErr := a.mcpManager.Reload(a.gracefulCtx, a.contextConfigAPI.MCPConfigFiles()); mcpErr != nil { a.logger.Warn(ctx, "failed to reload workspace MCP servers", slog.Error(mcpErr)) } diff --git a/agent/agentcontext/doc.go b/agent/agentcontext/doc.go index 5ed7ebf043af5..70fb9fc549f5e 100644 --- a/agent/agentcontext/doc.go +++ b/agent/agentcontext/doc.go @@ -18,12 +18,12 @@ // to coderd without coupling this package to any particular // drpc client version. // -// Live MCP server tool lists are produced by this package's own -// self-contained MCP runner: it connects to the MCP servers declared in -// the .mcp.json files the resolver discovers, lists their tools, and -// surfaces them as KindMCPServer resources so MCP servers and their -// tools are pushed to coderd alongside instruction files and skills. -// This runs independently of agent/x/agentmcp, which owns the agent's -// MCP HTTP proxy; the two MCP paths share no state and both continue to -// operate unchanged during the rollout. +// Live MCP server tool lists come from the shared MCP engine in +// agent/x/agentmcp, which owns the single set of MCP server connections +// used for both tool discovery and tool-call execution. This package +// reads that engine's catalog through the injected MCPCatalog option and +// surfaces the servers and their tools as KindMCPServer resources, so +// MCP servers are pushed to coderd alongside instruction files and +// skills. The engine notifies this package through the Manager's Trigger +// when its catalog changes, driving a re-resolve and re-push. package agentcontext diff --git a/agent/agentcontext/manager.go b/agent/agentcontext/manager.go index e61367bdc7178..7523417788235 100644 --- a/agent/agentcontext/manager.go +++ b/agent/agentcontext/manager.go @@ -9,7 +9,6 @@ import ( "golang.org/x/xerrors" "cdr.dev/slog/v3" - "github.com/coder/coder/v2/agent/agentexec" "github.com/coder/quartz" ) @@ -39,18 +38,13 @@ type ManagerOptions struct { // Tests use this to inject MCP resources (via // Resolver.MCPResources) and tighten caps. Resolver *Resolver - // MCPExecer, when non-nil, enables the self-contained MCP - // runner: the Manager connects to the MCP servers declared - // in the .mcp.json files it discovers, lists their tools, - // and surfaces them as KindMCPServer resources in every - // snapshot. The runner uses this Execer to launch stdio MCP - // servers. It is ignored when the resolver already has an - // MCP provider (e.g. a test injecting one via Resolver). - MCPExecer agentexec.Execer - // MCPUpdateEnv optionally enriches the environment handed to - // stdio MCP servers (typically the agent's per-command env). - // Used only when MCPExecer is set; may be nil. - MCPUpdateEnv func([]string) ([]string, error) + // MCPCatalog, when non-nil, supplies the per-server MCP snapshot + // the Manager surfaces as KindMCPServer resources on every + // resolve. The agent injects the shared MCP engine's catalog here + // so discovery and execution use one set of server connections. + // It is ignored when the resolver already has an MCP provider + // (e.g. a test injecting one via Resolver). + MCPCatalog func() []MCPServerStatus // Debounce overrides the watcher's debounce window. Debounce time.Duration } @@ -73,11 +67,7 @@ type Manager struct { workingDir func() string allowedRoots []string resolver *Resolver - // mcpRunner, when non-nil, owns the agent's self-contained - // MCP connection lifecycle and feeds the resolver's MCP - // provider. runMCPSync (started by Run) drives its reloads. - mcpRunner *mcpRunner - debounce time.Duration + debounce time.Duration mu sync.Mutex sources []Source @@ -152,17 +142,16 @@ func NewManager(opts ManagerOptions) *Manager { runStartedCh: make(chan struct{}), } - // Enable the self-contained MCP runner unless the resolver - // already has a provider (tests inject one via Resolver). The - // runner connects to the .mcp.json servers the resolver - // discovers and surfaces their tools as KindMCPServer - // resources; runMCPSync (started in Run) drives its reloads. - // The provider must be wired before the eager first resolve - // below so the seam is present from the first snapshot. - if resolver.MCPResources == nil && opts.MCPExecer != nil { - m.mcpRunner = newMCPRunner(m.logger.Named("mcp"), opts.MCPExecer, opts.MCPUpdateEnv, m.Trigger) + // Surface the shared MCP engine's catalog as KindMCPServer + // resources unless the resolver already has a provider (tests + // inject one via Resolver). The engine owns the connection + // lifecycle and notifies this Manager via Trigger when its + // catalog changes (see agent wiring). The provider must be wired + // before the eager first resolve below so the seam is present + // from the first snapshot. + if resolver.MCPResources == nil && opts.MCPCatalog != nil { resolver.MCPResources = func() []Resource { - return buildMCPServerResources(m.mcpRunner.Servers()) + return buildMCPServerResources(opts.MCPCatalog()) } } @@ -236,14 +225,6 @@ func (m *Manager) Run(ctx context.Context) error { defer watcher.Close() - // Drive MCP server reloads from discovered .mcp.json files for - // the lifetime of Run. Started here (not in NewManager) so it - // runs alongside the trigger loop that consumes its re-resolve - // signals. - if m.mcpRunner != nil { - go m.runMCPSync(ctx) - } - for { select { case <-ctx.Done(): diff --git a/agent/agentcontext/manager_test.go b/agent/agentcontext/manager_test.go index 08ce803ead9e9..43c32e3c50e1e 100644 --- a/agent/agentcontext/manager_test.go +++ b/agent/agentcontext/manager_test.go @@ -25,13 +25,6 @@ import ( // Claude config files into snapshots and breaks every // Len(Resources, N) assertion. func TestMain(m *testing.M) { - // The MCP runner re-execs this test binary as a fake stdio MCP - // server (TEST_MCP_FAKE_SERVER=1). Serve and exit before any test - // setup runs. - if maybeServeFakeMCPServer() { - os.Exit(0) - } - home, err := os.MkdirTemp("", "agentcontext-test-home-") if err != nil { panic(err) diff --git a/agent/agentcontext/mcpcatalog_test.go b/agent/agentcontext/mcpcatalog_test.go new file mode 100644 index 0000000000000..f6a68b3aa2b85 --- /dev/null +++ b/agent/agentcontext/mcpcatalog_test.go @@ -0,0 +1,77 @@ +package agentcontext_test + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/coder/coder/v2/agent/agentcontext" + "github.com/coder/coder/v2/testutil" +) + +// TestManager_MCPCatalogSurfacesResources verifies the injected MCP +// catalog is surfaced as KindMCPServer resources, and that a catalog +// change picked up on the next Trigger re-resolves the snapshot. In +// production the shared MCP engine wires SetOnReload to the Manager's +// Trigger so a reload re-publishes the updated tools. +func TestManager_MCPCatalogSurfacesResources(t *testing.T) { + t.Parallel() + dir := t.TempDir() + + var mu sync.Mutex + servers := []agentcontext.MCPServerStatus{{ + Name: "srv", + Connected: true, + Tools: []agentcontext.MCPTool{{Name: "echo", Description: "echoes input"}}, + }} + + m := newTestManager(t, agentcontext.ManagerOptions{ + WorkingDir: func() string { return dir }, + MCPCatalog: func() []agentcontext.MCPServerStatus { + mu.Lock() + defer mu.Unlock() + return append([]agentcontext.MCPServerStatus(nil), servers...) + }, + }) + + // The eager first snapshot already reflects the injected catalog. + got := findMCPServerResource(m.Snapshot(), "srv") + require.NotNil(t, got) + require.Equal(t, agentcontext.StatusOK, got.Status) + require.Len(t, got.Tools, 1) + require.Equal(t, "echo", got.Tools[0].Name) + + ctx := testutil.Context(t, testutil.WaitLong) + go func() { _ = m.Run(ctx) }() + + // A catalog change re-resolves on the next Trigger. + mu.Lock() + servers = []agentcontext.MCPServerStatus{{ + Name: "srv", + Connected: true, + Tools: []agentcontext.MCPTool{ + {Name: "echo"}, + {Name: "ping"}, + }, + }} + mu.Unlock() + m.Trigger() + + require.Eventually(t, func() bool { + got := findMCPServerResource(m.Snapshot(), "srv") + return got != nil && len(got.Tools) == 2 + }, testutil.WaitShort, testutil.IntervalMedium, + "catalog change should re-resolve into the snapshot") +} + +// findMCPServerResource returns the KindMCPServer resource for the named +// server, or nil if absent. +func findMCPServerResource(snap agentcontext.Snapshot, name string) *agentcontext.Resource { + for i := range snap.Resources { + if r := snap.Resources[i]; r.Kind == agentcontext.KindMCPServer && r.Source == name { + return &snap.Resources[i] + } + } + return nil +} diff --git a/agent/agentcontext/mcpexec_test.go b/agent/agentcontext/mcpexec_test.go deleted file mode 100644 index 3b57790668355..0000000000000 --- a/agent/agentcontext/mcpexec_test.go +++ /dev/null @@ -1,189 +0,0 @@ -package agentcontext_test - -import ( - "bufio" - "encoding/json" - "fmt" - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/coder/coder/v2/agent/agentcontext" - "github.com/coder/coder/v2/agent/agentexec" - "github.com/coder/coder/v2/testutil" -) - -// TestManager_MCPServerToolsInSnapshot exercises the MCP runner end to -// end against a real subprocess: a .mcp.json in the working directory is -// discovered by the resolver, the runner connects the declared stdio -// server, lists its tools, and they surface as a KindMCPServer resource -// in the manager's snapshot (the same snapshot that is pushed to coderd). -func TestManager_MCPServerToolsInSnapshot(t *testing.T) { - t.Parallel() - dir := t.TempDir() - writeMCPConfig(t, dir, "fake", map[string]string{"TEST_MCP_FAKE_SERVER": "1"}) - - m := newTestManager(t, agentcontext.ManagerOptions{ - WorkingDir: func() string { return dir }, - MCPExecer: agentexec.DefaultExecer, - }) - ctx := testutil.Context(t, testutil.WaitLong) - go func() { _ = m.Run(ctx) }() - - require.Eventually(t, func() bool { - return findMCPServer(m.Snapshot(), "fake") != nil - }, testutil.WaitLong, testutil.IntervalMedium, - "the connected MCP server's tools should surface in the snapshot") - - got := findMCPServer(m.Snapshot(), "fake") - require.NotNil(t, got) - require.Equal(t, agentcontext.StatusOK, got.Status) - require.Len(t, got.Tools, 1) - require.Equal(t, "echo", got.Tools[0].Name) - require.Equal(t, "echoes input", got.Tools[0].Description) -} - -// TestManager_MCPServerHangingCloseDoesNotStall is a regression test for -// a server that ignores stdin-close. mcp-go's stdio Close() closes stdin -// and then blocks on cmd.Wait(); without a force-kill the runner's -// per-server connect (and thus the whole reload) would hang and the -// tools would never be published. The runner force-kills the subprocess, -// so the tool still surfaces in the snapshot. -func TestManager_MCPServerHangingCloseDoesNotStall(t *testing.T) { - t.Parallel() - dir := t.TempDir() - writeMCPConfig(t, dir, "hang", map[string]string{ - "TEST_MCP_FAKE_SERVER": "1", - "TEST_MCP_HANG_AFTER_LIST": "1", - }) - - m := newTestManager(t, agentcontext.ManagerOptions{ - WorkingDir: func() string { return dir }, - MCPExecer: agentexec.DefaultExecer, - }) - ctx := testutil.Context(t, testutil.WaitLong) - go func() { _ = m.Run(ctx) }() - - require.Eventually(t, func() bool { - got := findMCPServer(m.Snapshot(), "hang") - return got != nil && got.Status == agentcontext.StatusOK - }, testutil.WaitLong, testutil.IntervalMedium, - "a hanging MCP server must not stall the reload; its tool should still surface") -} - -// findMCPServer returns the KindMCPServer resource for the named server, -// or nil if absent. -func findMCPServer(snap agentcontext.Snapshot, name string) *agentcontext.Resource { - for i := range snap.Resources { - if r := snap.Resources[i]; r.Kind == agentcontext.KindMCPServer && r.Source == name { - return &r - } - } - return nil -} - -// writeMCPConfig writes a .mcp.json into dir declaring a single stdio MCP -// server that re-execs this test binary into serveFakeMCPServer (via the -// TEST_MCP_FAKE_SERVER env, which TestMain handles). -func writeMCPConfig(t *testing.T, dir, name string, env map[string]string) { - t.Helper() - testBin, err := os.Executable() - require.NoError(t, err) - cfg := map[string]any{ - "mcpServers": map[string]any{ - name: map[string]any{ - "command": testBin, - "env": env, - }, - }, - } - data, err := json.Marshal(cfg) - require.NoError(t, err) - require.NoError(t, os.WriteFile(filepath.Join(dir, ".mcp.json"), data, 0o600)) -} - -// maybeServeFakeMCPServer serves the fake stdio MCP server when -// TEST_MCP_FAKE_SERVER=1 and reports whether it handled the process so -// the caller (TestMain) can exit. The runner re-execs the test binary -// into this, so it must run at the very top of TestMain. When -// TEST_MCP_HANG_AFTER_LIST=1 the server blocks after serving instead of -// returning, simulating a server that ignores stdin-close so a test can -// exercise the runner's force-kill (the process is then killed by the -// parent and never returns here). -func maybeServeFakeMCPServer() (served bool) { - if os.Getenv("TEST_MCP_FAKE_SERVER") != "1" { - return false - } - serveFakeMCPServer() - if os.Getenv("TEST_MCP_HANG_AFTER_LIST") == "1" { - select {} - } - return true -} - -// serveFakeMCPServer serves a minimal MCP protocol over stdin/stdout: it -// answers initialize and advertises a single "echo" tool, then returns -// when the client closes stdin (EOF). -func serveFakeMCPServer() { - scanner := bufio.NewScanner(os.Stdin) - for scanner.Scan() { - line := scanner.Bytes() - - var req struct { - JSONRPC string `json:"jsonrpc"` - ID json.RawMessage `json:"id"` - Method string `json:"method"` - } - if err := json.Unmarshal(line, &req); err != nil { - continue - } - - var resp any - switch req.Method { - case "initialize": - resp = map[string]any{ - "jsonrpc": "2.0", - "id": req.ID, - "result": map[string]any{ - "protocolVersion": "2025-03-26", - "capabilities": map[string]any{"tools": map[string]any{}}, - "serverInfo": map[string]any{"name": "fake-server", "version": "0.0.1"}, - }, - } - case "notifications/initialized": - // Notifications take no response. - continue - case "tools/list": - resp = map[string]any{ - "jsonrpc": "2.0", - "id": req.ID, - "result": map[string]any{ - "tools": []map[string]any{ - { - "name": "echo", - "description": "echoes input", - "inputSchema": map[string]any{ - "type": "object", - "properties": map[string]any{}, - }, - }, - }, - }, - } - default: - resp = map[string]any{ - "jsonrpc": "2.0", - "id": req.ID, - "error": map[string]any{"code": -32601, "message": "method not found"}, - } - } - - out, err := json.Marshal(resp) - if err != nil { - continue - } - _, _ = fmt.Fprintf(os.Stdout, "%s\n", out) - } -} diff --git a/agent/agentcontext/mcprunner.go b/agent/agentcontext/mcprunner.go deleted file mode 100644 index 1469adc7b51dd..0000000000000 --- a/agent/agentcontext/mcprunner.go +++ /dev/null @@ -1,502 +0,0 @@ -package agentcontext - -import ( - "context" - "encoding/hex" - "encoding/json" - "errors" - "io/fs" - "os" - "os/exec" - "reflect" - "slices" - "strings" - "sync" - "time" - - "github.com/mark3labs/mcp-go/client" - "github.com/mark3labs/mcp-go/client/transport" - "github.com/mark3labs/mcp-go/mcp" - "golang.org/x/sync/errgroup" - "golang.org/x/xerrors" - - "cdr.dev/slog/v3" - "github.com/coder/coder/v2/agent/agentexec" - "github.com/coder/coder/v2/agent/usershell" - "github.com/coder/coder/v2/buildinfo" -) - -// mcpConnectTimeout bounds how long the runner waits for a single MCP -// server to start its transport, initialize, and report its tools. -const mcpConnectTimeout = 30 * time.Second - -// mcpConnectConcurrency bounds how many MCP servers the runner connects -// to at once. .mcp.json files rarely declare many servers, but the cap -// keeps a pathological config from spawning an unbounded number of -// subprocesses simultaneously. -const mcpConnectConcurrency = 8 - -// mcpServerConfig is a single MCP server declaration parsed from a -// .mcp.json file. It is the runner's self-contained equivalent of the -// agent/x/agentmcp ServerConfig: agentcontext deliberately does not -// import that package so the two MCP paths stay completely separate. -type mcpServerConfig struct { - Name string - Transport string - Command string - Args []string - Env map[string]string - URL string - Headers map[string]string -} - -// mcpConfigFile mirrors the on-disk .mcp.json schema. -type mcpConfigFile struct { - MCPServers map[string]json.RawMessage `json:"mcpServers"` -} - -// mcpServerEntry is a single server block inside mcpServers. -type mcpServerEntry struct { - Command string `json:"command"` - Args []string `json:"args"` - Env map[string]string `json:"env"` - Type string `json:"type"` - URL string `json:"url"` - Headers map[string]string `json:"headers"` -} - -// parseMCPConfig reads a .mcp.json file at path and returns the declared -// MCP servers sorted by name. It returns an empty slice when the -// mcpServers key is missing or empty. It is a self-contained copy of the -// agent/x/agentmcp parser so agentcontext can discover and start its own -// MCP servers without importing that package. -func parseMCPConfig(path string) ([]mcpServerConfig, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, xerrors.Errorf("read mcp config %q: %w", path, err) - } - - var cfg mcpConfigFile - if err := json.Unmarshal(data, &cfg); err != nil { - return nil, xerrors.Errorf("parse mcp config %q: %w", path, err) - } - - if len(cfg.MCPServers) == 0 { - return []mcpServerConfig{}, nil - } - - servers := make([]mcpServerConfig, 0, len(cfg.MCPServers)) - for name, raw := range cfg.MCPServers { - var entry mcpServerEntry - if err := json.Unmarshal(raw, &entry); err != nil { - return nil, xerrors.Errorf("parse server %q in %q: %w", name, path, err) - } - - tr := inferMCPTransport(entry) - if tr == "" { - return nil, xerrors.Errorf("server %q in %q has no command or url", name, path) - } - - resolveMCPEnvVars(entry.Env) - - servers = append(servers, mcpServerConfig{ - Name: name, - Transport: tr, - Command: entry.Command, - Args: entry.Args, - Env: entry.Env, - URL: entry.URL, - Headers: entry.Headers, - }) - } - - slices.SortFunc(servers, func(a, b mcpServerConfig) int { - return strings.Compare(a.Name, b.Name) - }) - - return servers, nil -} - -// inferMCPTransport determines the transport type for a server entry. -// An explicit "type" field takes priority; otherwise the presence of -// "command" implies stdio and "url" implies http. -func inferMCPTransport(e mcpServerEntry) string { - if e.Type != "" { - return e.Type - } - if e.Command != "" { - return "stdio" - } - if e.URL != "" { - return "http" - } - return "" -} - -// resolveMCPEnvVars expands ${VAR} references in env map values using -// the current process environment. -func resolveMCPEnvVars(env map[string]string) { - for k, v := range env { - env[k] = os.Expand(v, os.Getenv) - } -} - -// mcpRunner connects to the MCP servers declared in the .mcp.json files -// the context resolver discovers, lists each server's tools, and caches -// a non-blocking per-server snapshot that buildMCPServerResources turns -// into KindMCPServer resources. It owns its own connection lifecycle and -// does not share state with agent/x/agentmcp: the two MCP paths run -// independently during the rollout. Connections are one-shot -// (connect, initialize, list tools, close) because the runner only needs -// each server's tool list to push to coderd, not a live tool-call proxy. -type mcpRunner struct { - logger slog.Logger - execer agentexec.Execer - updateEnv func([]string) ([]string, error) - onChange func() - - // reloadMu serializes Reload so a slow reload cannot interleave - // with a newer one and publish a stale cache. The sole production - // caller (runMCPSync) already calls Reload sequentially; the mutex - // is defensive. - reloadMu sync.Mutex - - mu sync.Mutex - cache []MCPServerStatus -} - -// newMCPRunner constructs a runner. onChange is invoked (outside the -// cache lock) after a Reload that changes the per-server snapshot, so -// the manager can re-resolve and push the updated KindMCPServer -// resources. updateEnv may be nil. -func newMCPRunner(logger slog.Logger, execer agentexec.Execer, updateEnv func([]string) ([]string, error), onChange func()) *mcpRunner { - return &mcpRunner{ - logger: logger, - execer: execer, - updateEnv: updateEnv, - onChange: onChange, - } -} - -// Servers returns a deep copy of the current per-server MCP snapshot. It -// never blocks on I/O: the resolver calls it on every re-resolve. -func (r *mcpRunner) Servers() []MCPServerStatus { - r.mu.Lock() - defer r.mu.Unlock() - return cloneMCPServers(r.cache) -} - -// Reload reparses the supplied .mcp.json paths, connects to every -// declared server in parallel, lists its tools, and replaces the cached -// snapshot with the fresh result (including per-server failures). It -// fires onChange when the snapshot changed. Reload is best-effort: a -// server that fails to connect or list tools is recorded as a -// disconnected entry rather than aborting the whole reload. -func (r *mcpRunner) Reload(ctx context.Context, paths []string) { - r.reloadMu.Lock() - defer r.reloadMu.Unlock() - - configs := r.parseConfigs(ctx, paths) - statuses := r.connectAll(ctx, configs) - - r.mu.Lock() - changed := !reflect.DeepEqual(r.cache, statuses) - r.cache = statuses - r.mu.Unlock() - - if changed && r.onChange != nil { - r.onChange() - } -} - -// parseConfigs parses every path and returns the union of declared -// servers, deduplicated by name (first occurrence wins). Missing files -// are skipped silently; other parse errors are logged and skipped so one -// broken .mcp.json does not drop the servers declared in sibling files. -func (r *mcpRunner) parseConfigs(ctx context.Context, paths []string) []mcpServerConfig { - var all []mcpServerConfig - for _, path := range paths { - configs, err := parseMCPConfig(path) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - continue - } - r.logger.Warn(ctx, "failed to parse MCP config", - slog.F("path", path), - slog.Error(err), - ) - continue - } - all = append(all, configs...) - } - - seen := make(map[string]struct{}, len(all)) - deduped := make([]mcpServerConfig, 0, len(all)) - for _, cfg := range all { - if _, ok := seen[cfg.Name]; ok { - continue - } - seen[cfg.Name] = struct{}{} - deduped = append(deduped, cfg) - } - return deduped -} - -// connectAll connects to each server in parallel (bounded) and returns -// one status per server in name order. Per-server failures are isolated: -// a failed connect or list becomes a disconnected status carrying the -// error instead of failing the batch. -func (r *mcpRunner) connectAll(ctx context.Context, configs []mcpServerConfig) []MCPServerStatus { - if len(configs) == 0 { - return nil - } - statuses := make([]MCPServerStatus, len(configs)) - var eg errgroup.Group - eg.SetLimit(mcpConnectConcurrency) - for i, cfg := range configs { - eg.Go(func() error { - st := MCPServerStatus{Name: cfg.Name} - tools, err := r.connectAndList(ctx, cfg) - if err != nil { - r.logger.Warn(ctx, "failed to connect MCP server", - slog.F("server", cfg.Name), - slog.Error(err), - ) - st.Err = err.Error() - } else { - st.Connected = true - st.Tools = tools - } - statuses[i] = st - return nil - }) - } - _ = eg.Wait() - return statuses -} - -// connectAndList starts a single MCP server, completes the initialize -// handshake, lists its tools, and closes the connection. Tool names are -// returned exactly as the server reported them; the resource carries the -// server name separately, so any flattening into a single namespace is -// left to the control plane. -func (r *mcpRunner) connectAndList(ctx context.Context, cfg mcpServerConfig) ([]MCPTool, error) { - tr, err := r.createTransport(ctx, cfg) - if err != nil { - return nil, xerrors.Errorf("create transport for %q: %w", cfg.Name, err) - } - - c := client.NewClient(tr) - - // Tie the subprocess to cmdCtx. mcp-go's stdio Close() closes stdin - // and then blocks on cmd.Wait() with no kill: a server that ignores - // stdin-close would stall this reload indefinitely (the deferred - // Close runs before connectAndList returns, so it would block the - // errgroup and hold the reload lock). Canceling cmdCtx force-kills - // the process via exec.CommandContext, so Close's Wait returns. The - // deferred cleanup cancels before closing, and runs before the - // connectCtx cancel because defers are LIFO. - cmdCtx, cmdCancel := context.WithCancel(ctx) - connectCtx, cancel := context.WithTimeout(cmdCtx, mcpConnectTimeout) - defer cancel() - - if err := c.Start(cmdCtx); err != nil { - cmdCancel() - _ = c.Close() - return nil, xerrors.Errorf("start %q: %w", cfg.Name, err) - } - defer func() { - cmdCancel() - _ = c.Close() - }() - - if _, err := c.Initialize(connectCtx, mcp.InitializeRequest{ - Params: mcp.InitializeParams{ - ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION, - ClientInfo: mcp.Implementation{ - Name: "coder-agent", - Version: buildinfo.Version(), - }, - }, - }); err != nil { - return nil, xerrors.Errorf("initialize %q: %w", cfg.Name, err) - } - - result, err := c.ListTools(connectCtx, mcp.ListToolsRequest{}) - if err != nil { - return nil, xerrors.Errorf("list tools from %q: %w", cfg.Name, err) - } - - tools := make([]MCPTool, 0, len(result.Tools)) - for _, tool := range result.Tools { - tools = append(tools, MCPTool{ - Name: tool.Name, - Description: tool.Description, - InputSchema: toolInputSchema(tool.InputSchema), - }) - } - return tools, nil -} - -// createTransport builds the mcp-go transport for a server config. -func (r *mcpRunner) createTransport(ctx context.Context, cfg mcpServerConfig) (transport.Interface, error) { - switch cfg.Transport { - case "stdio": - env := r.buildEnv(ctx, cfg.Env) - return transport.NewStdioWithOptions( - cfg.Command, - env, - cfg.Args, - transport.WithCommandFunc(func(ctx context.Context, command string, cmdEnv []string, args []string) (*exec.Cmd, error) { - cmd := r.execer.CommandContext(ctx, command, args...) - cmd.Env = cmdEnv - return cmd, nil - }), - ), nil - case "http", "": - return transport.NewStreamableHTTP(cfg.URL, transport.WithHTTPHeaders(cfg.Headers)) - case "sse": - return transport.NewSSE(cfg.URL, transport.WithHeaders(cfg.Headers)) - default: - return nil, xerrors.Errorf("unsupported transport %q", cfg.Transport) - } -} - -// buildEnv enriches the process environment via the agent's updateEnv -// callback, then merges explicit overrides from the server config on -// top. Note: env enrichment is captured per Reload; an env change alone -// (without a .mcp.json change) does not trigger a re-list. -func (r *mcpRunner) buildEnv(ctx context.Context, explicit map[string]string) []string { - env := usershell.SystemEnvInfo{}.Environ() - if r.updateEnv != nil { - updated, err := r.updateEnv(env) - if err != nil { - r.logger.Warn(ctx, "failed to enrich MCP server environment", slog.Error(err)) - env = usershell.SystemEnvInfo{}.Environ() - } else { - env = updated - } - } - if len(explicit) == 0 { - return env - } - - existing := make(map[string]int, len(env)) - for i, kv := range env { - if k, _, ok := strings.Cut(kv, "="); ok { - existing[k] = i - } - } - for k, v := range explicit { - entry := k + "=" + v - if idx, ok := existing[k]; ok { - env[idx] = entry - } else { - env = append(env, entry) - } - } - return env -} - -// toolInputSchema converts an mcp-go tool input schema into the -// JSON-Schema-shaped map MCPTool carries. Required is converted to -// []any (not []string) so the downstream structpb encoding accepts it. -// An empty schema yields nil so the tool ships with InputSchema unset. -func toolInputSchema(s mcp.ToolInputSchema) map[string]any { - out := map[string]any{} - if s.Type != "" { - out["type"] = s.Type - } - if len(s.Properties) > 0 { - out["properties"] = s.Properties - } - if len(s.Required) > 0 { - required := make([]any, len(s.Required)) - for i, req := range s.Required { - required[i] = req - } - out["required"] = required - } - if len(out) == 0 { - return nil - } - return out -} - -// cloneMCPServers deep-copies a per-server snapshot so callers cannot -// mutate the runner's cache. Tool input schemas are treated as immutable -// and shared by reference. -func cloneMCPServers(in []MCPServerStatus) []MCPServerStatus { - if len(in) == 0 { - return nil - } - out := make([]MCPServerStatus, len(in)) - for i, s := range in { - s.Tools = slices.Clone(s.Tools) - out[i] = s - } - return out -} - -// runMCPSync keeps the runner's connected servers in sync with the -// .mcp.json files the resolver discovers. It subscribes to snapshot -// changes, extracts the set of KindMCPConfig paths (keyed by content -// hash so in-place edits are detected), and reloads the runner only when -// that set changes. A reload fires the runner's onChange, which -// re-resolves and surfaces the updated KindMCPServer resources; that -// re-resolve does not change the config set, so it does not loop. -func (m *Manager) runMCPSync(ctx context.Context) { - changes, unsubscribe := m.SubscribeChanges() - defer unsubscribe() - - var lastKey string - reload := func() { - paths, key := mcpConfigSet(m.Snapshot()) - if key == lastKey { - return - } - lastKey = key - m.mcpRunner.Reload(ctx, paths) - } - - // Pick up any .mcp.json discovered before we subscribed. - reload() - for { - select { - case <-ctx.Done(): - return - case <-m.closedCh: - return - case <-changes: - reload() - } - } -} - -// mcpConfigSet extracts the .mcp.json config files from a snapshot's -// KindMCPConfig resources. It returns the sorted unique source paths -// plus a key encoding path:contenthash pairs, so callers detect both -// path-set changes and in-place content edits. An empty set yields an -// empty key. -func mcpConfigSet(snap Snapshot) (paths []string, key string) { - hashes := make(map[string]string, len(snap.Resources)) - for _, r := range snap.Resources { - if r.Kind != KindMCPConfig || r.Source == "" { - continue - } - hashes[r.Source] = hex.EncodeToString(r.ContentHash[:]) - } - if len(hashes) == 0 { - return nil, "" - } - paths = make([]string, 0, len(hashes)) - for p := range hashes { - paths = append(paths, p) - } - slices.Sort(paths) - parts := make([]string, len(paths)) - for i, p := range paths { - parts[i] = p + ":" + hashes[p] - } - return paths, strings.Join(parts, "\n") -} diff --git a/agent/agentcontext/mcprunner_internal_test.go b/agent/agentcontext/mcprunner_internal_test.go deleted file mode 100644 index eba6d88a79145..0000000000000 --- a/agent/agentcontext/mcprunner_internal_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package agentcontext - -import ( - "os" - "path/filepath" - "testing" - - "github.com/mark3labs/mcp-go/mcp" - "github.com/stretchr/testify/require" -) - -func TestParseMCPConfig(t *testing.T) { - t.Parallel() - - write := func(t *testing.T, body string) string { - t.Helper() - dir := t.TempDir() - path := filepath.Join(dir, ".mcp.json") - require.NoError(t, os.WriteFile(path, []byte(body), 0o600)) - return path - } - - t.Run("InfersTransportAndSorts", func(t *testing.T) { - t.Parallel() - path := write(t, `{"mcpServers": { - "zebra": {"command": "zebra-bin", "args": ["--flag"]}, - "alpha": {"url": "https://example.com/mcp"} - }}`) - got, err := parseMCPConfig(path) - require.NoError(t, err) - require.Len(t, got, 2) - // Sorted by name. - require.Equal(t, "alpha", got[0].Name) - require.Equal(t, "http", got[0].Transport) - require.Equal(t, "https://example.com/mcp", got[0].URL) - require.Equal(t, "zebra", got[1].Name) - require.Equal(t, "stdio", got[1].Transport) - require.Equal(t, "zebra-bin", got[1].Command) - require.Equal(t, []string{"--flag"}, got[1].Args) - }) - - t.Run("ExplicitTypeWins", func(t *testing.T) { - t.Parallel() - path := write(t, `{"mcpServers": {"s": {"type": "sse", "url": "https://x"}}}`) - got, err := parseMCPConfig(path) - require.NoError(t, err) - require.Len(t, got, 1) - require.Equal(t, "sse", got[0].Transport) - }) - - t.Run("EmptyServers", func(t *testing.T) { - t.Parallel() - path := write(t, `{"mcpServers": {}}`) - got, err := parseMCPConfig(path) - require.NoError(t, err) - require.Empty(t, got) - }) - - t.Run("RejectsServerWithoutCommandOrURL", func(t *testing.T) { - t.Parallel() - path := write(t, `{"mcpServers": {"s": {}}}`) - _, err := parseMCPConfig(path) - require.Error(t, err) - }) - - t.Run("InvalidJSON", func(t *testing.T) { - t.Parallel() - path := write(t, `{not json`) - _, err := parseMCPConfig(path) - require.Error(t, err) - }) -} - -// TestParseMCPConfig_ExpandsEnv is a standalone (non-parallel) test -// because t.Setenv cannot be used under a parallel parent test. -func TestParseMCPConfig_ExpandsEnv(t *testing.T) { - t.Setenv("AGENTCONTEXT_MCP_TEST_TOKEN", "secret") - dir := t.TempDir() - path := filepath.Join(dir, ".mcp.json") - require.NoError(t, os.WriteFile(path, - []byte(`{"mcpServers": {"s": {"command": "x", "env": {"TOKEN": "${AGENTCONTEXT_MCP_TEST_TOKEN}"}}}}`), 0o600)) - got, err := parseMCPConfig(path) - require.NoError(t, err) - require.Len(t, got, 1) - require.Equal(t, "secret", got[0].Env["TOKEN"]) -} - -func TestToolInputSchema(t *testing.T) { - t.Parallel() - - t.Run("FullSchema", func(t *testing.T) { - t.Parallel() - got := toolInputSchema(mcp.ToolInputSchema{ - Type: "object", - Properties: map[string]any{"q": map[string]any{"type": "string"}}, - Required: []string{"q"}, - }) - require.Equal(t, "object", got["type"]) - require.Equal(t, map[string]any{"q": map[string]any{"type": "string"}}, got["properties"]) - // Required is converted to []any so structpb.NewStruct accepts it. - require.Equal(t, []any{"q"}, got["required"]) - }) - - t.Run("EmptyYieldsNil", func(t *testing.T) { - t.Parallel() - require.Nil(t, toolInputSchema(mcp.ToolInputSchema{})) - }) - - t.Run("TypeOnly", func(t *testing.T) { - t.Parallel() - got := toolInputSchema(mcp.ToolInputSchema{Type: "object"}) - require.Equal(t, map[string]any{"type": "object"}, got) - }) -} - -func TestMCPConfigSet(t *testing.T) { - t.Parallel() - - t.Run("Empty", func(t *testing.T) { - t.Parallel() - paths, key := mcpConfigSet(Snapshot{}) - require.Empty(t, paths) - require.Empty(t, key) - }) - - t.Run("SortedAndKeyedByContentHash", func(t *testing.T) { - t.Parallel() - snap := Snapshot{Resources: []Resource{ - {Kind: KindMCPConfig, Source: "/b/.mcp.json", ContentHash: [32]byte{0x01}}, - {Kind: KindMCPConfig, Source: "/a/.mcp.json", ContentHash: [32]byte{0x02}}, - // Non-config and empty-source resources are ignored. - {Kind: KindInstructionFile, Source: "/a/AGENTS.md"}, - {Kind: KindMCPServer, Source: "fs"}, - {Kind: KindMCPConfig, Source: ""}, - }} - paths, key := mcpConfigSet(snap) - require.Equal(t, []string{"/a/.mcp.json", "/b/.mcp.json"}, paths) - require.NotEmpty(t, key) - - // An in-place content edit (same path, new hash) changes the key. - snap2 := Snapshot{Resources: []Resource{ - {Kind: KindMCPConfig, Source: "/b/.mcp.json", ContentHash: [32]byte{0x01}}, - {Kind: KindMCPConfig, Source: "/a/.mcp.json", ContentHash: [32]byte{0x09}}, - }} - _, key2 := mcpConfigSet(snap2) - require.NotEqual(t, key, key2) - }) -} diff --git a/agent/agentsocket/proto/agentsocket.pb.go b/agent/agentsocket/proto/agentsocket.pb.go index 52a70e8a8cfd5..42f2a252292b6 100644 --- a/agent/agentsocket/proto/agentsocket.pb.go +++ b/agent/agentsocket/proto/agentsocket.pb.go @@ -1206,7 +1206,8 @@ func (*RemoveContextSourceResponse) Descriptor() ([]byte, []int) { // ContextResource is the on-wire form of a resolved context resource. // Payload bytes are never sent over the socket; they ship to coderd via // the drpc PushContextState path. Mirrors agentcontext.Resource minus -// the payload and the tool list, which reach consumers via ListMCPTools. +// the payload and the per-server MCP tool list, which ship to coderd via +// the same PushContextState path. type ContextResource struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache diff --git a/agent/agentsocket/proto/agentsocket.proto b/agent/agentsocket/proto/agentsocket.proto index 60349fac813b0..d9139e880bd6b 100644 --- a/agent/agentsocket/proto/agentsocket.proto +++ b/agent/agentsocket/proto/agentsocket.proto @@ -108,7 +108,8 @@ message RemoveContextSourceResponse {} // ContextResource is the on-wire form of a resolved context resource. // Payload bytes are never sent over the socket; they ship to coderd via // the drpc PushContextState path. Mirrors agentcontext.Resource minus -// the payload and the tool list, which reach consumers via ListMCPTools. +// the payload and the per-server MCP tool list, which ship to coderd via +// the same PushContextState path. message ContextResource { string id = 1; string kind = 2; diff --git a/agent/mcpcatalog.go b/agent/mcpcatalog.go new file mode 100644 index 0000000000000..b2231d4126186 --- /dev/null +++ b/agent/mcpcatalog.go @@ -0,0 +1,36 @@ +package agent + +import ( + "github.com/coder/coder/v2/agent/agentcontext" + "github.com/coder/coder/v2/agent/x/agentmcp" +) + +// mcpCatalogToContext adapts the shared MCP engine's catalog into the +// agentcontext per-server snapshot the resolver turns into KindMCPServer +// resources. The two types are kept separate so agentcontext does not +// import agent/x/agentmcp. +func mcpCatalogToContext(servers []agentmcp.ServerStatus) []agentcontext.MCPServerStatus { + if len(servers) == 0 { + return nil + } + out := make([]agentcontext.MCPServerStatus, 0, len(servers)) + for _, s := range servers { + cs := agentcontext.MCPServerStatus{ + Name: s.Name, + Connected: s.Connected, + Err: s.Err, + } + if len(s.Tools) > 0 { + cs.Tools = make([]agentcontext.MCPTool, 0, len(s.Tools)) + for _, t := range s.Tools { + cs.Tools = append(cs.Tools, agentcontext.MCPTool{ + Name: t.Name, + Description: t.Description, + InputSchema: t.InputSchema, + }) + } + } + out = append(out, cs) + } + return out +} diff --git a/agent/x/agentmcp/api.go b/agent/x/agentmcp/api.go index c600210cd6e53..ef7f7205641cf 100644 --- a/agent/x/agentmcp/api.go +++ b/agent/x/agentmcp/api.go @@ -1,67 +1,36 @@ package agentmcp import ( - "context" "errors" "net/http" "github.com/go-chi/chi/v5" - "cdr.dev/slog/v3" - "github.com/coder/coder/v2/agent/agentchat" "github.com/coder/coder/v2/coderd/httpapi" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/workspacesdk" ) -// API exposes MCP tool discovery and call proxying through the -// agent. +// API exposes MCP tool-call proxying through the agent. Tool discovery +// is handled in-process by the agentcontext manager, which reads the +// shared Manager's catalog and pushes it to coderd as pinned context +// resources; this API serves only execution. type API struct { - logger slog.Logger - manager *Manager - mcpConfigFiles func() []string + manager *Manager } -// NewAPI creates a new MCP API handler. mcpConfigFiles returns -// the resolved .mcp.json paths and is called on every tool-list -// request to detect config changes. -func NewAPI(logger slog.Logger, m *Manager, mcpConfigFiles func() []string) *API { - return &API{logger: logger, manager: m, mcpConfigFiles: mcpConfigFiles} +// NewAPI creates a new MCP API handler. +func NewAPI(m *Manager) *API { + return &API{manager: m} } // Routes returns the HTTP handler for MCP-related routes. func (api *API) Routes() http.Handler { r := chi.NewRouter() - r.Get("/tools", api.handleListTools) r.Post("/call-tool", api.handleCallTool) return r } -// handleListTools returns the current MCP tool cache after the -// manager performs startup-safe config synchronization. -func (api *API) handleListTools(rw http.ResponseWriter, r *http.Request) { - ctx := r.Context() - logger := api.logger.With(agentchat.Fields(ctx)...) - - tools, err := api.manager.Tools(ctx, api.mcpConfigFiles()) - if err != nil { - switch { - case errors.Is(err, context.Canceled): - logger.Warn(ctx, "mcp tool list canceled by caller", slog.Error(err)) - case errors.Is(err, context.DeadlineExceeded): - logger.Warn(ctx, "mcp tool list timed out", slog.Error(err)) - default: - logger.Warn(ctx, "mcp tool list failed", slog.Error(err)) - } - } - if tools == nil { - tools = []workspacesdk.MCPToolInfo{} - } - httpapi.Write(ctx, rw, http.StatusOK, workspacesdk.ListMCPToolsResponse{ - Tools: tools, - }) -} - // handleCallTool proxies a tool invocation to the appropriate // MCP server based on the tool name prefix. func (api *API) handleCallTool(rw http.ResponseWriter, r *http.Request) { diff --git a/agent/x/agentmcp/api_internal_test.go b/agent/x/agentmcp/api_internal_test.go index 42689475119b1..11f677752af01 100644 --- a/agent/x/agentmcp/api_internal_test.go +++ b/agent/x/agentmcp/api_internal_test.go @@ -1,321 +1,55 @@ package agentmcp import ( - "context" + "bytes" "encoding/json" "net/http" "net/http/httptest" - "os" - "path/filepath" - "sync" "testing" - "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "cdr.dev/slog/v3" "cdr.dev/slog/v3/sloggers/slogtest" "github.com/coder/coder/v2/agent/agentexec" "github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/testutil" ) -func TestHandleListTools_ReloadOnChange(t *testing.T) { +// TestHandleCallTool_ErrorMapping verifies the call-tool handler maps +// Manager errors to the right HTTP status codes. Tool discovery is no +// longer served over HTTP (the agentcontext manager reads the catalog +// in-process), so only the execution endpoint is exercised here. +func TestHandleCallTool_ErrorMapping(t *testing.T) { t.Parallel() - if os.Getenv("TEST_MCP_FAKE_SERVER") == "1" { - runFakeMCPServer() - return - } - - // Cases that share the single-request-and-check pattern. - type singleRequestCase struct { - name string - entries func(t *testing.T) map[string]mcpServerEntry - reloadManager bool - closeManager bool - expectedTools int - toolNameContains string - } - - cases := []singleRequestCase{ - { - name: "InitialRequestNoReload", - entries: func(t *testing.T) map[string]mcpServerEntry { - t.Helper() - _, entry := fakeMCPServerConfig(t, "srv") - return map[string]mcpServerEntry{"srv": entry} - }, - reloadManager: true, - expectedTools: 1, - toolNameContains: "echo", - }, - { - name: "ManagerClosedReturnsEmpty", - entries: func(_ *testing.T) map[string]mcpServerEntry { - return map[string]mcpServerEntry{} - }, - closeManager: true, - expectedTools: 0, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - - configPath := writeMCPConfig(t, dir, tc.entries(t)) - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - if tc.closeManager { - require.NoError(t, m.Close()) - } else { - m.MarkStartupSettled() - t.Cleanup(func() { _ = m.Close() }) - } - - if tc.reloadManager { - err := m.Reload(ctx, []string{configPath}) - require.NoError(t, err) - } - - api := NewAPI(logger, m, func() []string { - return []string{configPath} - }) - - req := httptest.NewRequest(http.MethodGet, "/tools", nil) - rec := httptest.NewRecorder() - api.Routes().ServeHTTP(rec, req) - - require.Equal(t, http.StatusOK, rec.Code) - var resp workspacesdk.ListMCPToolsResponse - require.NoError(t, json.NewDecoder(rec.Body).Decode(&resp)) - require.Len(t, resp.Tools, tc.expectedTools) - if tc.toolNameContains != "" { - assert.Contains(t, resp.Tools[0].Name, tc.toolNameContains) - } - }) - } - - // ConfigChangeTriggersReload has a mutate-then-re-request flow - // that does not fit the single-request table pattern. - t.Run("ConfigChangeTriggersReload", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - - _, entry1 := fakeMCPServerConfig(t, "srv1") - configPath := writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv1": entry1}) - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - m.MarkStartupSettled() - t.Cleanup(func() { _ = m.Close() }) - - err := m.Reload(ctx, []string{configPath}) - require.NoError(t, err) - - api := NewAPI(logger, m, func() []string { - return []string{configPath} - }) - - // Verify initial tools. - req := httptest.NewRequest(http.MethodGet, "/tools", nil) - rec := httptest.NewRecorder() - api.Routes().ServeHTTP(rec, req) - require.Equal(t, http.StatusOK, rec.Code) - - var resp1 workspacesdk.ListMCPToolsResponse - require.NoError(t, json.NewDecoder(rec.Body).Decode(&resp1)) - require.Len(t, resp1.Tools, 1) - assert.Contains(t, resp1.Tools[0].Name, "srv1") - - // Mutate the config file. - _, entry2 := fakeMCPServerConfig(t, "srv2") - writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv2": entry2}) - - // Next request should trigger a reload and return new tools. - req2 := httptest.NewRequest(http.MethodGet, "/tools", nil) - rec2 := httptest.NewRecorder() - api.Routes().ServeHTTP(rec2, req2) - require.Equal(t, http.StatusOK, rec2.Code) - - var resp2 workspacesdk.ListMCPToolsResponse - require.NoError(t, json.NewDecoder(rec2.Body).Decode(&resp2)) - require.Len(t, resp2.Tools, 1) - assert.Contains(t, resp2.Tools[0].Name, "srv2") - }) -} - -// TestHandleListTools_ReloadsAfterStartupSettled exercises the -// cold-start path end-to-end against a real *Manager. Startup has -// settled, so the handler may drive the first safe reload. -func TestHandleListTools_ReloadsAfterStartupSettled(t *testing.T) { - t.Parallel() - - if os.Getenv("TEST_MCP_FAKE_SERVER") == "1" { - runFakeMCPServer() - return - } - - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - - _, entry := fakeMCPServerConfig(t, "srv") - configPath := writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv": entry}) - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - m.MarkStartupSettled() - t.Cleanup(func() { _ = m.Close() }) - - // No prior m.Reload: snapshot empty and tools unset. - require.Empty(t, m.cachedTools(), "manager should start with no tools") - - api := NewAPI(logger, m, func() []string { - return []string{configPath} - }) - - req := httptest.NewRequest(http.MethodGet, "/tools", nil).WithContext(ctx) - rec := httptest.NewRecorder() - api.Routes().ServeHTTP(rec, req) - - require.Equal(t, http.StatusOK, rec.Code) - var resp workspacesdk.ListMCPToolsResponse - require.NoError(t, json.NewDecoder(rec.Body).Decode(&resp)) - require.Len(t, resp.Tools, 1) - assert.Contains(t, resp.Tools[0].Name, "echo") -} - -func TestHandleListTools_WaitsForStartupSettled(t *testing.T) { - t.Parallel() - - if os.Getenv("TEST_MCP_FAKE_SERVER") == "1" { - runFakeMCPServer() - return - } - - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - configPath := filepath.Join(dir, ".mcp.json") - + ctx := testutil.Context(t, testutil.WaitShort) + logger := slogtest.Make(t, nil) m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) t.Cleanup(func() { _ = m.Close() }) - pathsRequested := make(chan struct{}) - var pathsOnce sync.Once - api := NewAPI(logger, m, func() []string { - pathsOnce.Do(func() { close(pathsRequested) }) - return []string{configPath} - }) - - req := httptest.NewRequest(http.MethodGet, "/tools", nil).WithContext(ctx) - rec := httptest.NewRecorder() - done := make(chan struct{}) - go func() { - api.Routes().ServeHTTP(rec, req) - close(done) - }() - - select { - case <-pathsRequested: - case <-ctx.Done(): - t.Fatalf("handler did not request paths: %v", ctx.Err()) - } - - select { - case <-done: - t.Fatal("handler returned before startup settled") - default: - } - - _, entry := fakeMCPServerConfig(t, "srv") - writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv": entry}) - m.MarkStartupSettled() - - select { - case <-done: - case <-ctx.Done(): - t.Fatalf("handler did not return after startup settled: %v", ctx.Err()) - } - - require.Equal(t, http.StatusOK, rec.Code) - var resp workspacesdk.ListMCPToolsResponse - require.NoError(t, json.NewDecoder(rec.Body).Decode(&resp)) - require.Len(t, resp.Tools, 1) - assert.Contains(t, resp.Tools[0].Name, "echo") -} - -func TestHandleListTools_LogsListErrors(t *testing.T) { - t.Parallel() + api := NewAPI(m) cases := []struct { - name string - ctx func() context.Context - closeManager bool - message string + name string + toolName string + wantCode int }{ - { - name: "Canceled", - ctx: func() context.Context { - ctx, cancel := context.WithCancel(context.Background()) - cancel() - return ctx - }, - message: "mcp tool list canceled by caller", - }, - { - name: "DeadlineExceeded", - ctx: func() context.Context { - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Second)) - cancel() - return ctx - }, - message: "mcp tool list timed out", - }, - { - name: "ManagerClosed", - ctx: context.Background, - closeManager: true, - message: "mcp tool list failed", - }, + {name: "InvalidToolName", toolName: "noseparator", wantCode: http.StatusBadRequest}, + {name: "UnknownServer", toolName: "ghost__echo", wantCode: http.StatusNotFound}, } - for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { t.Parallel() - ctx := tc.ctx() - sink := testutil.NewFakeSink(t) - logger := sink.Logger(slog.LevelDebug) - dir := t.TempDir() - configPath := filepath.Join(dir, ".mcp.json") - - m := NewManager(context.Background(), logger, agentexec.DefaultExecer, nil) - m.MarkStartupSettled() - t.Cleanup(func() { _ = m.Close() }) - if tc.closeManager { - require.NoError(t, m.Close()) - } - - api := NewAPI(logger, m, func() []string { - return []string{configPath} - }) - - req := httptest.NewRequest(http.MethodGet, "/tools", nil).WithContext(ctx) + body, err := json.Marshal(workspacesdk.CallMCPToolRequest{ToolName: tc.toolName}) + require.NoError(t, err) + req := httptest.NewRequest(http.MethodPost, "/call-tool", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() + api.Routes().ServeHTTP(rec, req) - require.Equal(t, http.StatusOK, rec.Code) - entries := sink.Entries(func(e slog.SinkEntry) bool { - return e.Message == tc.message - }) - require.Len(t, entries, 1) + require.Equal(t, tc.wantCode, rec.Code) }) } } diff --git a/agent/x/agentmcp/configwatcher_internal_test.go b/agent/x/agentmcp/configwatcher_internal_test.go index 4b93242ed35af..037f591da71f4 100644 --- a/agent/x/agentmcp/configwatcher_internal_test.go +++ b/agent/x/agentmcp/configwatcher_internal_test.go @@ -3,8 +3,6 @@ package agentmcp import ( "context" "encoding/json" - "net/http" - "net/http/httptest" "os" "path/filepath" "sync" @@ -18,7 +16,6 @@ import ( "cdr.dev/slog/v3" "cdr.dev/slog/v3/sloggers/slogtest" "github.com/coder/coder/v2/agent/agentexec" - "github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/testutil" "github.com/coder/quartz" ) @@ -32,13 +29,13 @@ import ( // fsnotify-backed watcher, the manager picks up the late file // without external prompting. -// awaitTools polls cachedTools until the predicate succeeds or +// awaitTools polls connectedTools until the predicate succeeds or // the context expires. It avoids time.Sleep loops in callers. -func awaitTools(ctx context.Context, t *testing.T, m *Manager, pred func([]workspacesdk.MCPToolInfo) bool) []workspacesdk.MCPToolInfo { +func awaitTools(ctx context.Context, t *testing.T, m *Manager, pred func([]catalogTool) bool) []catalogTool { t.Helper() - var final []workspacesdk.MCPToolInfo + var final []catalogTool testutil.Eventually(ctx, t, func(context.Context) bool { - final = m.cachedTools() + final = m.connectedTools() return pred(final) }, testutil.IntervalFast) return final @@ -69,12 +66,11 @@ func TestWatcher_LateFileTriggersReload(t *testing.T) { m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) useFastDebounce(t, m) - m.MarkStartupSettled() t.Cleanup(func() { _ = m.Close() }) // First Reload arms the watcher but finds nothing on disk. require.NoError(t, m.Reload(ctx, []string{configPath})) - require.Empty(t, m.cachedTools(), "manager should start with no tools") + require.Empty(t, m.connectedTools(), "manager should start with no tools") // Write the file after the manager has already settled. The // watcher must observe the Create event, debounce it, and @@ -82,11 +78,11 @@ func TestWatcher_LateFileTriggersReload(t *testing.T) { _, entry := fakeMCPServerConfig(t, "srv") writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv": entry}) - tools := awaitTools(ctx, t, m, func(tools []workspacesdk.MCPToolInfo) bool { + tools := awaitTools(ctx, t, m, func(tools []catalogTool) bool { return len(tools) == 1 }) require.Len(t, tools, 1) - assert.Contains(t, tools[0].Name, "echo") + assert.Equal(t, "echo", tools[0].tool) // The snapshot must now reflect the on-disk file so the // next Reload short-circuits. @@ -110,13 +106,12 @@ func TestWatcher_RewriteTriggersReload(t *testing.T) { m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) useFastDebounce(t, m) - m.MarkStartupSettled() t.Cleanup(func() { _ = m.Close() }) require.NoError(t, m.Reload(ctx, []string{configPath})) - tools := m.cachedTools() + tools := m.connectedTools() require.Len(t, tools, 1) - assert.Contains(t, tools[0].Name, "srv") + assert.Equal(t, "srv", tools[0].server) // Overwrite the config with a different server name. The // watcher should fire and the cache should reflect the new @@ -124,12 +119,11 @@ func TestWatcher_RewriteTriggersReload(t *testing.T) { _, entry2 := fakeMCPServerConfig(t, "srv2") writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv2": entry2}) - tools = awaitTools(ctx, t, m, func(tools []workspacesdk.MCPToolInfo) bool { - return len(tools) == 1 && len(tools[0].Name) > 0 && - (tools[0].ServerName == "srv2") + tools = awaitTools(ctx, t, m, func(tools []catalogTool) bool { + return len(tools) == 1 && tools[0].server == "srv2" }) require.Len(t, tools, 1) - assert.Equal(t, "srv2", tools[0].ServerName) + assert.Equal(t, "srv2", tools[0].server) } func TestWatcher_RemovalTransitionsToEmpty(t *testing.T) { @@ -149,18 +143,17 @@ func TestWatcher_RemovalTransitionsToEmpty(t *testing.T) { m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) useFastDebounce(t, m) - m.MarkStartupSettled() t.Cleanup(func() { _ = m.Close() }) require.NoError(t, m.Reload(ctx, []string{configPath})) - require.Len(t, m.cachedTools(), 1) + require.Len(t, m.connectedTools(), 1) require.NoError(t, os.Remove(configPath)) - awaitTools(ctx, t, m, func(tools []workspacesdk.MCPToolInfo) bool { + awaitTools(ctx, t, m, func(tools []catalogTool) bool { return len(tools) == 0 }) - assert.Empty(t, m.cachedTools()) + assert.Empty(t, m.connectedTools()) } // TestWatcher_DebouncesBurst uses the quartz mock clock to @@ -249,7 +242,6 @@ func TestWatcher_CloseStopsGoroutine(t *testing.T) { for range 5 { m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) useFastDebounce(t, m) - m.MarkStartupSettled() require.NoError(t, m.Reload(ctx, []string{configPath})) require.NoError(t, m.Close()) @@ -262,13 +254,14 @@ func TestWatcher_CloseStopsGoroutine(t *testing.T) { } } -// TestWatcher_DualAgentHTTPNoStall mimics the dual-agent +// TestWatcher_DualAgentLateConfigWarmsCatalog mimics the dual-agent // workspace scenario from workspace-otto-aa16: the inner sandbox -// agent calls MarkStartupSettled and Reload while the host agent -// has not yet written ~/.mcp.json. Once the file appears, an -// HTTP request to /tools must return the MCP tools quickly -// instead of triggering a multi-second "reload canceled" stall. -func TestWatcher_DualAgentHTTPNoStall(t *testing.T) { +// agent Reloads while the host agent has not yet written +// ~/.mcp.json. Once the file appears, the config watcher must pick +// it up and warm the catalog so the tools surface without a +// multi-second "reload canceled" stall, and reading the catalog +// must never block on an in-flight reload. +func TestWatcher_DualAgentLateConfigWarmsCatalog(t *testing.T) { t.Parallel() if os.Getenv("TEST_MCP_FAKE_SERVER") == "1" { @@ -283,40 +276,28 @@ func TestWatcher_DualAgentHTTPNoStall(t *testing.T) { m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) useFastDebounce(t, m) - m.MarkStartupSettled() t.Cleanup(func() { _ = m.Close() }) // First Reload races ahead of the host agent: empty config. require.NoError(t, m.Reload(ctx, []string{configPath})) - require.Empty(t, m.cachedTools()) + require.Empty(t, m.connectedTools()) - api := NewAPI(logger, m, func() []string { return []string{configPath} }) - - // Host agent writes the file later. + // Host agent writes the file later. The watcher must pick it up + // and warm the catalog. _, entry := fakeMCPServerConfig(t, "srv") writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv": entry}) - // Wait for the watcher to pick up the file so we know the - // cache is warm before issuing the HTTP request. - awaitTools(ctx, t, m, func(tools []workspacesdk.MCPToolInfo) bool { + tools := awaitTools(ctx, t, m, func(tools []catalogTool) bool { return len(tools) == 1 }) + require.Len(t, tools, 1) + assert.Equal(t, "echo", tools[0].tool) - req := httptest.NewRequest(http.MethodGet, "/tools", nil).WithContext(ctx) - rec := httptest.NewRecorder() - + // Reading the catalog never blocks on a reload. start := time.Now() - api.Routes().ServeHTTP(rec, req) - elapsed := time.Since(start) - - require.Equal(t, http.StatusOK, rec.Code) - require.Less(t, elapsed, testutil.WaitShort, - "warm HTTP request should not stall on watcher reload; took %s", elapsed) - - var resp workspacesdk.ListMCPToolsResponse - require.NoError(t, json.NewDecoder(rec.Body).Decode(&resp)) - require.Len(t, resp.Tools, 1) - assert.Contains(t, resp.Tools[0].Name, "echo") + _ = m.Catalog() + require.Less(t, time.Since(start), testutil.WaitShort, + "reading the catalog must not block on watcher reload") } // TestWatcher_LateParentDirTriggersReload exercises the @@ -343,11 +324,10 @@ func TestWatcher_LateParentDirTriggersReload(t *testing.T) { m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) useFastDebounce(t, m) - m.MarkStartupSettled() t.Cleanup(func() { _ = m.Close() }) require.NoError(t, m.Reload(ctx, []string{configPath})) - require.Empty(t, m.cachedTools()) + require.Empty(t, m.connectedTools()) // Create the missing parent directory. fsnotify will deliver // a Create event on root; handleEvent must release the root @@ -357,11 +337,11 @@ func TestWatcher_LateParentDirTriggersReload(t *testing.T) { _, entry := fakeMCPServerConfig(t, "srv") writeMCPConfig(t, missing, map[string]mcpServerEntry{"srv": entry}) - tools := awaitTools(ctx, t, m, func(tools []workspacesdk.MCPToolInfo) bool { + tools := awaitTools(ctx, t, m, func(tools []catalogTool) bool { return len(tools) == 1 }) require.Len(t, tools, 1) - assert.Contains(t, tools[0].Name, "echo") + assert.Equal(t, "echo", tools[0].tool) } // TestWatcher_SharedParentRefcount covers the multi-path @@ -389,7 +369,6 @@ func TestWatcher_SharedParentRefcount(t *testing.T) { m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) useFastDebounce(t, m) - m.MarkStartupSettled() t.Cleanup(func() { _ = m.Close() }) // First Reload arms both paths, sharing the dir watch. @@ -426,7 +405,7 @@ func TestWatcher_SharedParentRefcount(t *testing.T) { require.NoError(t, err) require.NoError(t, os.WriteFile(pathA, data, 0o600)) - tools := awaitTools(ctx, t, m, func(tools []workspacesdk.MCPToolInfo) bool { + tools := awaitTools(ctx, t, m, func(tools []catalogTool) bool { return len(tools) == 1 }) require.Len(t, tools, 1) @@ -464,7 +443,6 @@ func TestWatcher_CloseDoesNotStallOnInFlightReload(t *testing.T) { m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) useFastDebounce(t, m) - m.MarkStartupSettled() // Arm the watcher with an initial empty Reload. We install the // hook after this so the first connectAll (with empty diff --git a/agent/x/agentmcp/manager.go b/agent/x/agentmcp/manager.go index cd6a7051515fd..363f61a3dfb3c 100644 --- a/agent/x/agentmcp/manager.go +++ b/agent/x/agentmcp/manager.go @@ -43,10 +43,6 @@ const connectTimeout = 30 * time.Second // take before being canceled. const toolCallTimeout = 60 * time.Second -// toolsReloadTimeout bounds how long Tools waits for a -// post-startup reload to settle. -const toolsReloadTimeout = 35 * time.Second - var ( // ErrInvalidToolName is returned when the tool name format // is not "server__tool". @@ -85,20 +81,20 @@ type Manager struct { clock quartz.Clock closed bool servers map[string]*serverEntry - tools []workspacesdk.MCPToolInfo + catalog []ServerStatus snapshot map[string]fileSnapshot serverGen uint64 sf tailscalesingleflight.Group[string, struct{}] - // startupSettled is closed once startup scripts reach a terminal - // state. Before that, missing MCP config files are unknown - // because startup scripts may still create them. - startupSettled chan struct{} - startupOnce sync.Once + // onChange, when non-nil, is invoked (outside the cache lock) + // after a reload changes the per-server catalog, so the + // agentcontext manager can re-resolve and re-push the updated + // KindMCPServer resources. + onChange func() // firstSyncSettled records that a reload body reached a - // terminal result, successful or not. It gates whether callers - // may receive cached tools after reload errors. + // terminal result, successful or not. It gates whether the + // SnapshotChanged short-circuit may skip a reload. firstSyncSettled bool // closedCh is closed by Close to unblock waiters that do not @@ -148,17 +144,16 @@ func NewManager( ) *Manager { managerCtx, cancel := context.WithCancel(ctx) return &Manager{ - ctx: managerCtx, - cancel: cancel, - logger: logger, - clock: quartz.NewReal(), - execer: execer, - updateEnv: updateEnv, - servers: make(map[string]*serverEntry), - snapshot: make(map[string]fileSnapshot), - startupSettled: make(chan struct{}), - closedCh: make(chan struct{}), - watchDebounce: defaultWatchDebounce, + ctx: managerCtx, + cancel: cancel, + logger: logger, + clock: quartz.NewReal(), + execer: execer, + updateEnv: updateEnv, + servers: make(map[string]*serverEntry), + snapshot: make(map[string]fileSnapshot), + closedCh: make(chan struct{}), + watchDebounce: defaultWatchDebounce, } } @@ -182,80 +177,15 @@ func (m *Manager) Reload(ctx context.Context, paths []string) error { return m.waitReload(ctx, ch, 0) } -// MarkStartupSettled marks startup scripts as terminal for MCP -// config purposes. Missing config files after this point are a real -// empty config, not an unknown startup state. -func (m *Manager) MarkStartupSettled() { - m.startupOnce.Do(func() { close(m.startupSettled) }) -} - -// Tools returns the current MCP tool cache after startup-safe config -// synchronization. -// -// Before startup has settled via MarkStartupSettled, Tools blocks until -// settlement or ctx cancels. After settlement, it drives a config reload -// bounded by toolsReloadTimeout. -// -// On error before the first sync settles, Tools returns nil tools and -// the error. On error after a prior sync, it returns cached tools and -// the error so callers can degrade gracefully. -func (m *Manager) Tools(ctx context.Context, paths []string) ([]workspacesdk.MCPToolInfo, error) { - if err := m.waitForStartupSettled(ctx); err != nil { - return nil, err - } - - ch, started, err := m.startReloadIfNeeded(paths) - if err != nil { - return m.toolsAfterReloadError(err) - } - if !started { - return normalizeTools(m.cachedTools()), nil - } - - if err := m.waitReload(ctx, ch, toolsReloadTimeout); err != nil { - return m.toolsAfterReloadError(err) - } - return normalizeTools(m.cachedTools()), nil -} - -func (m *Manager) waitForStartupSettled(ctx context.Context) error { - select { - case <-m.startupSettled: - return nil - default: - } - - select { - case <-m.startupSettled: - return nil - case <-ctx.Done(): - return ctx.Err() - case <-m.ctx.Done(): - if err := m.closeErr(); err != nil { - return err - } - return m.ctx.Err() - case <-m.closedCh: - return ErrManagerClosed - } -} - -func (m *Manager) toolsAfterReloadError(err error) ([]workspacesdk.MCPToolInfo, error) { - m.mu.RLock() - firstSyncSettled := m.firstSyncSettled - tools := slices.Clone(m.tools) - m.mu.RUnlock() - if !firstSyncSettled { - return nil, err - } - return normalizeTools(tools), err -} - -func normalizeTools(tools []workspacesdk.MCPToolInfo) []workspacesdk.MCPToolInfo { - if tools == nil { - return []workspacesdk.MCPToolInfo{} - } - return tools +// SetOnReload registers a callback fired (outside the cache lock) after +// a reload changes the per-server catalog. The agent wires this to the +// agentcontext manager's Trigger so discovery re-resolves and re-pushes +// the updated KindMCPServer resources. It must be called before the +// first Reload. +func (m *Manager) SetOnReload(fn func()) { + m.mu.Lock() + m.onChange = fn + m.mu.Unlock() } // startReloadIfNeeded registers the reload with the singleflight group @@ -528,11 +458,12 @@ func (m *Manager) doReload(ctx context.Context, mcpConfigFiles []string) error { _ = entry.client.Close() } - // Refresh tools outside the lock to avoid blocking - // concurrent reads during network I/O. - if err := m.RefreshTools(ctx); err != nil { - logger := m.logger.With(agentchat.Fields(ctx)...) - logger.Warn(ctx, "failed to refresh MCP tools after connect", slog.Error(err)) + // Rebuild the per-server catalog outside the lock to avoid + // blocking concurrent reads during network I/O, then notify the + // agentcontext manager when it changed so it re-resolves and + // re-pushes the KindMCPServer resources. + if m.refreshCatalog(ctx, wanted) { + m.fireOnChange() } return nil } @@ -728,12 +659,24 @@ func captureSnapshot(paths []string) map[string]fileSnapshot { return snap } -// cachedTools returns the cached tool list. Thread-safe. -func (m *Manager) cachedTools() []workspacesdk.MCPToolInfo { +// Catalog returns a deep copy of the current per-server MCP snapshot. It +// never blocks on I/O: the agentcontext resolver calls it on every +// re-resolve to build KindMCPServer resources. +func (m *Manager) Catalog() []ServerStatus { m.mu.RLock() defer m.mu.RUnlock() + return cloneServerStatuses(m.catalog) +} - return slices.Clone(m.tools) +// fireOnChange invokes the registered reload callback, if any, without +// holding the cache lock. +func (m *Manager) fireOnChange() { + m.mu.RLock() + fn := m.onChange + m.mu.RUnlock() + if fn != nil { + fn() + } } // CallTool proxies a tool call to the appropriate MCP server. @@ -767,15 +710,17 @@ func (m *Manager) CallTool(ctx context.Context, req workspacesdk.CallMCPToolRequ return convertResult(result), nil } -// RefreshTools re-fetches tool lists from all connected servers -// in parallel and rebuilds the cache. On partial failure, tools -// from servers that responded successfully are merged with the -// existing cached tools for servers that failed, so a single -// dead server doesn't block updates from healthy ones. -func (m *Manager) RefreshTools(ctx context.Context) error { +// refreshCatalog re-lists tools from the connected servers and rebuilds +// the per-server catalog the agentcontext resolver consumes. Every +// declared server in wanted appears in the result: a server with a live +// client contributes its listed tools (or its list error), and a server +// that never connected appears as an unreadable entry so it surfaces in +// the snapshot instead of vanishing. It returns whether the catalog +// changed so the caller can fire the reload callback. +func (m *Manager) refreshCatalog(ctx context.Context, wanted map[string]ServerConfig) bool { logger := m.logger.With(agentchat.Fields(ctx)...) - // Snapshot servers under read lock. + // Snapshot the connected servers under the read lock. m.mu.RLock() servers := make(map[string]*serverEntry, len(m.servers)) for k, v := range m.servers { @@ -784,16 +729,15 @@ func (m *Manager) RefreshTools(ctx context.Context) error { gen := m.serverGen m.mu.RUnlock() - // Fetch tool lists in parallel without holding any lock. - type serverTools struct { - name string - tools []workspacesdk.MCPToolInfo + // List tools from every connected server in parallel, without + // holding any lock. + type listResult struct { + tools []ToolInfo + err error } var ( mu sync.Mutex - results []serverTools - failed []string - errs []error + results = make(map[string]listResult, len(servers)) ) var eg errgroup.Group for name, entry := range servers { @@ -807,63 +751,58 @@ func (m *Manager) RefreshTools(ctx context.Context) error { slog.Error(err), ) mu.Lock() - errs = append(errs, xerrors.Errorf("list tools from %q: %w", name, err)) - failed = append(failed, name) + results[name] = listResult{err: err} mu.Unlock() return nil } - var tools []workspacesdk.MCPToolInfo + tools := make([]ToolInfo, 0, len(result.Tools)) for _, tool := range result.Tools { - tools = append(tools, workspacesdk.MCPToolInfo{ - ServerName: name, - Name: name + ToolNameSep + tool.Name, + tools = append(tools, ToolInfo{ + Name: tool.Name, Description: tool.Description, - Schema: tool.InputSchema.Properties, - Required: tool.InputSchema.Required, + InputSchema: toolInputSchemaMap(tool.InputSchema), }) } mu.Lock() - results = append(results, serverTools{name: name, tools: tools}) + results[name] = listResult{tools: tools} mu.Unlock() return nil }) } _ = eg.Wait() - // Build the new tool list. For servers that failed, preserve - // their tools from the existing cache so a single dead server - // doesn't remove healthy tools. - var merged []workspacesdk.MCPToolInfo - for _, st := range results { - merged = append(merged, st.tools...) - } - if len(failed) > 0 { - failedSet := make(map[string]struct{}, len(failed)) - for _, f := range failed { - failedSet[f] = struct{}{} - } - m.mu.RLock() - for _, t := range m.tools { - if _, ok := failedSet[t.ServerName]; ok { - merged = append(merged, t) - } + // Build one status per declared server so a server that never + // connected surfaces as an unreadable entry rather than vanishing. + catalog := make([]ServerStatus, 0, len(wanted)) + for name := range wanted { + st := ServerStatus{Name: name} + switch res, ok := results[name]; { + case ok && res.err == nil: + st.Connected = true + st.Tools = res.tools + case ok: + st.Err = res.err.Error() + default: + st.Err = "failed to connect" } - m.mu.RUnlock() + catalog = append(catalog, st) } - slices.SortFunc(merged, func(a, b workspacesdk.MCPToolInfo) int { + slices.SortFunc(catalog, func(a, b ServerStatus) int { return strings.Compare(a.Name, b.Name) }) m.mu.Lock() - // Skip the write if the server map changed since the - // snapshot. A doReload that bumped the generation will - // produce a correct tool list; this write would be stale. - if m.serverGen == gen { - m.tools = merged + defer m.mu.Unlock() + // Skip the write if the server map changed since the snapshot. A + // doReload that bumped the generation will rebuild the catalog. + if m.serverGen != gen { + return false } - m.mu.Unlock() - - return errors.Join(errs...) + if reflect.DeepEqual(m.catalog, catalog) { + return false + } + m.catalog = catalog + return true } // Close terminates all MCP server connections and child @@ -908,10 +847,10 @@ func (m *Manager) Close() error { } } m.servers = make(map[string]*serverEntry) - // Prevent an in-flight RefreshTools from repopulating tools - // after Close clears the cache. + // Prevent an in-flight refreshCatalog from repopulating the + // catalog after Close clears it. m.serverGen++ - m.tools = nil + m.catalog = nil // Cancel while holding the lock so waiters that observe // m.ctx.Done also observe m.closed when checking closeErr. @@ -1094,3 +1033,64 @@ func convertResult(result *mcp.CallToolResult) workspacesdk.CallMCPToolResponse IsError: result.IsError, } } + +// ServerStatus is a point-in-time view of one MCP server's connection +// state and tools, used by the agentcontext resolver to build +// KindMCPServer resources. Tool names are exactly as the server +// reported them (no server prefix); the resource carries the server +// name separately. +type ServerStatus struct { + Name string + Connected bool + Err string + Tools []ToolInfo +} + +// ToolInfo is one tool exposed by an MCP server. InputSchema is the +// JSON-Schema-shaped object the server reported for the tool's +// arguments, or nil when the schema is empty. +type ToolInfo struct { + Name string + Description string + InputSchema map[string]any +} + +// toolInputSchemaMap converts an mcp-go tool input schema into the +// JSON-Schema-shaped map ToolInfo carries. Required is converted to +// []any so the downstream protobuf/structpb encoding accepts it. An +// empty schema yields nil so the tool ships with InputSchema unset. +func toolInputSchemaMap(s mcp.ToolInputSchema) map[string]any { + out := map[string]any{} + if s.Type != "" { + out["type"] = s.Type + } + if len(s.Properties) > 0 { + out["properties"] = s.Properties + } + if len(s.Required) > 0 { + required := make([]any, len(s.Required)) + for i, req := range s.Required { + required[i] = req + } + out["required"] = required + } + if len(out) == 0 { + return nil + } + return out +} + +// cloneServerStatuses deep-copies a catalog so callers cannot mutate the +// Manager's cache. Tool input schemas are treated as immutable and +// shared by reference. +func cloneServerStatuses(in []ServerStatus) []ServerStatus { + if len(in) == 0 { + return nil + } + out := make([]ServerStatus, len(in)) + for i, s := range in { + s.Tools = slices.Clone(s.Tools) + out[i] = s + } + return out +} diff --git a/agent/x/agentmcp/manager_internal_test.go b/agent/x/agentmcp/manager_internal_test.go index 16d9faf6463bc..8ec8bc77e83f7 100644 --- a/agent/x/agentmcp/manager_internal_test.go +++ b/agent/x/agentmcp/manager_internal_test.go @@ -6,8 +6,6 @@ import ( "encoding/json" "fmt" "os" - "path/filepath" - "sync" "testing" "time" @@ -279,286 +277,6 @@ func TestManager_WaitReloadTimeout(t *testing.T) { assert.Contains(t, err.Error(), "tools reload timed out after 1m0s") } -func TestManager_ToolsStartupGate(t *testing.T) { - t.Parallel() - - if os.Getenv("TEST_MCP_FAKE_SERVER") == "1" { - runFakeMCPServer() - return - } - - t.Run("MissingBeforeStartupCanAppearBeforeSettlement", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - configPath := filepath.Join(dir, ".mcp.json") - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - t.Cleanup(func() { _ = m.Close() }) - - type result struct { - tools []workspacesdk.MCPToolInfo - err error - } - done := make(chan result, 1) - go func() { - tools, err := m.Tools(ctx, []string{configPath}) - done <- result{tools: tools, err: err} - }() - - _, entry := fakeMCPServerConfig(t, "srv") - writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv": entry}) - m.MarkStartupSettled() - - select { - case got := <-done: - require.NoError(t, got.err) - require.Len(t, got.tools, 1) - assert.Contains(t, got.tools[0].Name, "echo") - case <-ctx.Done(): - t.Fatalf("Tools did not return after startup settled: %v", ctx.Err()) - } - }) - - t.Run("MissingAfterStartupReturnsEmptyAndMarksFirstSync", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - configPath := filepath.Join(dir, ".mcp.json") - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - m.MarkStartupSettled() - t.Cleanup(func() { _ = m.Close() }) - - tools, err := m.Tools(ctx, []string{configPath}) - require.NoError(t, err) - assert.Empty(t, tools) - - m.mu.RLock() - firstSyncSettled := m.firstSyncSettled - m.mu.RUnlock() - assert.True(t, firstSyncSettled) - }) - - t.Run("ConfigAppearsAfterEmptySyncReloads", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - configPath := filepath.Join(dir, ".mcp.json") - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - m.MarkStartupSettled() - t.Cleanup(func() { _ = m.Close() }) - - tools, err := m.Tools(ctx, []string{configPath}) - require.NoError(t, err) - require.Empty(t, tools) - - _, entry := fakeMCPServerConfig(t, "srv") - writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv": entry}) - - tools, err = m.Tools(ctx, []string{configPath}) - require.NoError(t, err) - require.Len(t, tools, 1) - assert.Contains(t, tools[0].Name, "echo") - }) - - t.Run("ConcurrentFirstListToolsCallsAllSucceed", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - _, entry := fakeMCPServerConfig(t, "srv") - configPath := writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv": entry}) - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - m.MarkStartupSettled() - t.Cleanup(func() { _ = m.Close() }) - - const callers = 5 - var wg sync.WaitGroup - errs := make([]error, callers) - toolCounts := make([]int, callers) - for i := range callers { - wg.Go(func() { - tools, err := m.Tools(ctx, []string{configPath}) - errs[i] = err - toolCounts[i] = len(tools) - }) - } - wg.Wait() - - for i := range callers { - assert.NoError(t, errs[i], "caller %d should not fail", i) - assert.Equal(t, 1, toolCounts[i], "caller %d should see tools", i) - } - }) - - t.Run("CloseUnblocksStartupWait", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - configPath := filepath.Join(dir, ".mcp.json") - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - - done := make(chan error, 1) - go func() { - _, err := m.Tools(ctx, []string{configPath}) - done <- err - }() - require.NoError(t, m.Close()) - - select { - case err := <-done: - require.Error(t, err) - assert.ErrorIs(t, err, ErrManagerClosed) - case <-ctx.Done(): - t.Fatalf("Tools did not return after Close: %v", ctx.Err()) - } - }) - - t.Run("CallerCanceledBeforeStartupReturnsNoTools", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - configPath := filepath.Join(dir, ".mcp.json") - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - t.Cleanup(func() { _ = m.Close() }) - - callerCtx, cancel := context.WithCancel(ctx) - cancel() - tools, err := m.Tools(callerCtx, []string{configPath}) - require.Error(t, err) - assert.ErrorIs(t, err, context.Canceled) - assert.Nil(t, tools) - }) - - t.Run("ManagerCanceledBeforeStartupReturnsNoTools", func(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitLong)) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - configPath := filepath.Join(dir, ".mcp.json") - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - t.Cleanup(func() { _ = m.Close() }) - - cancel() - tools, err := m.Tools(testutil.Context(t, testutil.WaitLong), []string{configPath}) - require.Error(t, err) - assert.ErrorIs(t, err, context.Canceled) - assert.Nil(t, tools) - }) - - t.Run("ClosedBeforeFirstSyncReturnsNoTools", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - configPath := filepath.Join(dir, ".mcp.json") - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - m.MarkStartupSettled() - require.NoError(t, m.Close()) - - tools, err := m.Tools(ctx, []string{configPath}) - require.Error(t, err) - assert.ErrorIs(t, err, ErrManagerClosed) - assert.Nil(t, tools) - }) - - t.Run("CanceledBeforeFirstSyncStillStartsReload", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - configPath := filepath.Join(dir, ".mcp.json") - paths := []string{configPath} - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - m.MarkStartupSettled() - t.Cleanup(func() { _ = m.Close() }) - - callerCtx, cancel := context.WithCancel(ctx) - cancel() - tools, err := m.Tools(callerCtx, paths) - require.Error(t, err) - assert.ErrorIs(t, err, context.Canceled) - assert.Empty(t, tools) - - testutil.Eventually(ctx, t, func(context.Context) bool { - m.mu.RLock() - firstSyncSettled := m.firstSyncSettled - m.mu.RUnlock() - return firstSyncSettled && !m.SnapshotChanged(paths) - }, testutil.IntervalFast) - - tools, err = m.Tools(ctx, paths) - require.NoError(t, err) - assert.Empty(t, tools) - }) - - t.Run("CanceledAfterFirstSyncNoopReturnsCachedTools", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - _, entry := fakeMCPServerConfig(t, "srv") - configPath := writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv": entry}) - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - m.MarkStartupSettled() - t.Cleanup(func() { _ = m.Close() }) - - tools, err := m.Tools(ctx, []string{configPath}) - require.NoError(t, err) - require.Len(t, tools, 1) - - callerCtx, cancel := context.WithCancel(ctx) - cancel() - tools, err = m.Tools(callerCtx, []string{configPath}) - require.NoError(t, err) - require.Len(t, tools, 1) - assert.Contains(t, tools[0].Name, "echo") - }) - - t.Run("ManagerCanceledAfterFirstSyncReturnsCachedTools", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitLong) - logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug) - dir := t.TempDir() - _, entry := fakeMCPServerConfig(t, "srv") - configPath := writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv": entry}) - paths := []string{configPath} - - m := NewManager(ctx, logger, agentexec.DefaultExecer, nil) - m.MarkStartupSettled() - t.Cleanup(func() { _ = m.Close() }) - - tools, err := m.Tools(ctx, paths) - require.NoError(t, err) - require.Len(t, tools, 1) - - _, nextEntry := fakeMCPServerConfig(t, "srv2") - writeMCPConfig(t, dir, map[string]mcpServerEntry{"srv2": nextEntry}) - require.True(t, m.SnapshotChanged(paths)) - - m.cancel() - tools, err = m.Tools(ctx, paths) - require.Error(t, err) - assert.ErrorIs(t, err, context.Canceled) - require.Len(t, tools, 1) - assert.Contains(t, tools[0].Name, "echo") - }) -} - // runFakeMCPServer implements a minimal JSON-RPC / MCP server over // stdin/stdout, just enough for initialize + tools/list. func runFakeMCPServer() { diff --git a/agent/x/agentmcp/reload_internal_test.go b/agent/x/agentmcp/reload_internal_test.go index 1557b336e8fee..192fef21fe659 100644 --- a/agent/x/agentmcp/reload_internal_test.go +++ b/agent/x/agentmcp/reload_internal_test.go @@ -5,6 +5,8 @@ import ( "encoding/json" "os" "path/filepath" + "slices" + "strings" "sync" "testing" @@ -19,6 +21,38 @@ import ( "github.com/coder/coder/v2/testutil" ) +// catalogTool is a flattened (server, tool) pair taken from the +// Manager's per-server catalog. Tests assert on these pairs instead of +// the control plane's flattened "server__tool" form: the agent exposes +// raw per-server tool names, and joining them into a single namespace +// is chatd's concern. +type catalogTool struct { + server string + tool string +} + +// connectedTools flattens the Manager's catalog into (server, tool) +// pairs for connected servers, sorted by server then tool, so tests can +// assert on the count and identity of the live tool set. +func (m *Manager) connectedTools() []catalogTool { + var out []catalogTool + for _, s := range m.Catalog() { + if !s.Connected { + continue + } + for _, tl := range s.Tools { + out = append(out, catalogTool{server: s.Name, tool: tl.Name}) + } + } + slices.SortFunc(out, func(a, b catalogTool) int { + if a.server != b.server { + return strings.Compare(a.server, b.server) + } + return strings.Compare(a.tool, b.tool) + }) + return out +} + // writeMCPConfig writes a .mcp.json file with the given server // entries. Each entry maps a server name to its config. func writeMCPConfig(t *testing.T, dir string, servers map[string]mcpServerEntry) string { @@ -220,11 +254,11 @@ func TestSnapshotChanged_MultipleConfigFiles(t *testing.T) { require.NoError(t, err) // Tools from both files should be present. - tools := m.cachedTools() + tools := m.connectedTools() require.Len(t, tools, 2, "should have tools from both config files") - assert.Contains(t, tools[0].Name, "srv1", + assert.Equal(t, "srv1", tools[0].server, "first tool should be from first config") - assert.Contains(t, tools[1].Name, "srv2b", + assert.Equal(t, "srv2b", tools[1].server, "second tool should be from second config") } @@ -246,9 +280,9 @@ func TestReload(t *testing.T) { err := m.Reload(ctx, []string{configPath}) require.NoError(t, err) - tools := m.cachedTools() + tools := m.connectedTools() require.Len(t, tools, 1, "should have one tool from the fake server") - assert.Contains(t, tools[0].Name, "echo") + assert.Equal(t, "echo", tools[0].tool) // Snapshot should be fresh. assert.False(t, m.SnapshotChanged([]string{configPath})) @@ -293,7 +327,7 @@ func TestReload(t *testing.T) { assert.NoError(t, err, "caller %d should not fail", i) } - tools := m.cachedTools() + tools := m.connectedTools() require.Len(t, tools, 1) }) @@ -340,9 +374,9 @@ func TestReload(t *testing.T) { // First reload. err := m.Reload(ctx, []string{configPath}) require.NoError(t, err) - tools1 := m.cachedTools() + tools1 := m.connectedTools() require.Len(t, tools1, 1) - assert.Contains(t, tools1[0].Name, "srv1") + assert.Equal(t, "srv1", tools1[0].server) // Rewrite config with a different server. _, entry2 := fakeMCPServerConfig(t, "srv2") @@ -352,9 +386,9 @@ func TestReload(t *testing.T) { assert.True(t, m.SnapshotChanged([]string{configPath})) err = m.Reload(ctx, []string{configPath}) require.NoError(t, err) - tools2 := m.cachedTools() + tools2 := m.connectedTools() require.Len(t, tools2, 1) - assert.Contains(t, tools2[0].Name, "srv2") + assert.Equal(t, "srv2", tools2[0].server) }) t.Run("PerServerConnectFailureUpdatesSnapshot", func(t *testing.T) { @@ -393,14 +427,14 @@ func TestReload(t *testing.T) { err := m.Reload(ctx, []string{configPath}) require.NoError(t, err) - require.Len(t, m.cachedTools(), 1) + require.Len(t, m.connectedTools(), 1) // Delete config file. require.NoError(t, os.Remove(configPath)) err = m.Reload(ctx, []string{configPath}) require.NoError(t, err) - assert.Empty(t, m.cachedTools(), "tools should be empty after config deleted") + assert.Empty(t, m.connectedTools(), "tools should be empty after config deleted") // Subsequent reload finds snapshot unchanged. assert.False(t, m.SnapshotChanged([]string{configPath})) @@ -451,7 +485,7 @@ func TestDifferentialReload(t *testing.T) { "unchanged server should reuse client pointer") // Both servers should have tools. - tools := m.cachedTools() + tools := m.connectedTools() require.Len(t, tools, 2) }) @@ -505,7 +539,7 @@ func TestDifferentialReload(t *testing.T) { err := m.Reload(ctx, []string{configPath}) require.NoError(t, err) - require.Len(t, m.cachedTools(), 2) + require.Len(t, m.connectedTools(), 2) // Capture srvB's client before removal. m.mu.RLock() @@ -519,9 +553,9 @@ func TestDifferentialReload(t *testing.T) { err = m.Reload(ctx, []string{configPath}) require.NoError(t, err) - tools := m.cachedTools() + tools := m.connectedTools() require.Len(t, tools, 1) - assert.Contains(t, tools[0].Name, "srvA") + assert.Equal(t, "srvA", tools[0].server) // The old client for srvB should be closed. // ListTools on a closed client returns an error. @@ -545,7 +579,7 @@ func TestDifferentialReload(t *testing.T) { err := m.Reload(ctx, []string{configPath}) require.NoError(t, err) - require.Len(t, m.cachedTools(), 1) + require.Len(t, m.connectedTools(), 1) m.mu.RLock() origClient := m.servers["srv"].client @@ -568,7 +602,7 @@ func TestDifferentialReload(t *testing.T) { "failed connect should retain old client") // Tools should still work. - tools := m.cachedTools() + tools := m.connectedTools() require.Len(t, tools, 1) }) @@ -586,9 +620,9 @@ func TestDifferentialReload(t *testing.T) { err := m.Reload(ctx, []string{configPath}) require.NoError(t, err) - tools := m.cachedTools() + tools := m.connectedTools() require.Len(t, tools, 1) - toolName := tools[0].Name + toolName := tools[0].server + ToolNameSep + tools[0].tool // Add a second server (srv unchanged, so client is reused). _, entry2 := fakeMCPServerConfig(t, "srv2") @@ -637,9 +671,9 @@ func TestReload_FirstBootPath(t *testing.T) { err := m.Reload(ctx, []string{configPath}) require.NoError(t, err) - tools := m.cachedTools() + tools := m.connectedTools() require.Len(t, tools, 1) - assert.Contains(t, tools[0].Name, "echo") + assert.Equal(t, "echo", tools[0].tool) } // TestReload_NoopWhenUnchanged verifies that Reload returns @@ -709,7 +743,7 @@ func TestClose_SuppressesSubprocessExitError(t *testing.T) { err := m.Reload(ctx, []string{configPath}) require.NoError(t, err) - require.Len(t, m.cachedTools(), 1, "server should be connected") + require.Len(t, m.connectedTools(), 1, "server should be connected") // Close kills the subprocess. The ExitError guard should // suppress the "signal: killed" error. diff --git a/coderd/x/chatd/chatd_test.go b/coderd/x/chatd/chatd_test.go index 101a869031ffe..125db2b4851e4 100644 --- a/coderd/x/chatd/chatd_test.go +++ b/coderd/x/chatd/chatd_test.go @@ -262,8 +262,6 @@ func newWorkspaceToolTestServer( mockConn.EXPECT().SetExtraHeaders(gomock.Any()).AnyTimes() mockConn.EXPECT().ContextConfig(gomock.Any()). Return(workspacesdk.ContextConfigResponse{}, xerrors.New("not supported")).AnyTimes() - mockConn.EXPECT().ListMCPTools(gomock.Any()). - Return(workspacesdk.ListMCPToolsResponse{}, nil).AnyTimes() mockConn.EXPECT().LS(gomock.Any(), gomock.Any(), gomock.Any()). Return(workspacesdk.LSResponse{AbsolutePathString: "/home/coder"}, nil).AnyTimes() mockConn.EXPECT().ReadFile(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). @@ -898,17 +896,6 @@ func TestExploreChatUsesPersistedMCPSnapshot(t *testing.T) { mockConn.EXPECT().ContextConfig(gomock.Any()). Return(workspacesdk.ContextConfigResponse{}, xerrors.New("not supported")).AnyTimes() workspaceToolName := "workspace-snapshot-mcp__echo" - mockConn.EXPECT().ListMCPTools(gomock.Any()). - Return(workspacesdk.ListMCPToolsResponse{Tools: []workspacesdk.MCPToolInfo{{ - ServerName: "workspace-snapshot-mcp", - Name: workspaceToolName, - Description: "Workspace echo tool", - Schema: map[string]any{ - "input": map[string]any{"type": "string"}, - }, - Required: []string{"input"}, - }}}, nil). - AnyTimes() mockConn.EXPECT().LS(gomock.Any(), gomock.Any(), gomock.Any()). Return(workspacesdk.LSResponse{AbsolutePathString: "/home/coder"}, nil).AnyTimes() mockConn.EXPECT().ReadFile(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). @@ -3124,10 +3111,6 @@ func TestPersistToolResultWithBinaryData(t *testing.T) { ContextConfig(gomock.Any()). Return(workspacesdk.ContextConfigResponse{}, xerrors.New("not supported")). AnyTimes() - mockConn.EXPECT(). - ListMCPTools(gomock.Any()). - Return(workspacesdk.ListMCPToolsResponse{}, nil). - AnyTimes() mockConn.EXPECT(). LS(gomock.Any(), gomock.Any(), gomock.Any()). Return(workspacesdk.LSResponse{}, nil). @@ -6478,14 +6461,11 @@ func toolMessageForTest( func setupToolExecutionAgentConn( t *testing.T, mockConn *agentconnmock.MockAgentConn, - mcpTools ...workspacesdk.MCPToolInfo, ) { t.Helper() mockConn.EXPECT().SetExtraHeaders(gomock.Any()).AnyTimes() mockConn.EXPECT().ContextConfig(gomock.Any()). Return(workspacesdk.ContextConfigResponse{}, xerrors.New("not supported")).AnyTimes() - mockConn.EXPECT().ListMCPTools(gomock.Any()). - Return(workspacesdk.ListMCPToolsResponse{Tools: mcpTools}, nil).AnyTimes() mockConn.EXPECT().LS(gomock.Any(), gomock.Any(), gomock.Any()). Return(workspacesdk.LSResponse{AbsolutePathString: "/home/coder"}, nil).AnyTimes() mockConn.EXPECT().ReadFile(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). @@ -9438,10 +9418,6 @@ func TestComputerUseSubagentToolsAndModel(t *testing.T) { // for the initial screenshot check in the computer use path. ctrl := gomock.NewController(t) mockConn := agentconnmock.NewMockAgentConn(ctrl) - mockConn.EXPECT(). - ListMCPTools(gomock.Any()). - Return(workspacesdk.ListMCPToolsResponse{}, nil). - AnyTimes() mockConn.EXPECT(). ExecuteDesktopAction(gomock.Any(), gomock.Any()). Return(workspacesdk.DesktopActionResponse{ @@ -10205,8 +10181,6 @@ func TestMCPServerToolInvocation(t *testing.T) { mockConn.EXPECT().SetExtraHeaders(gomock.Any()).AnyTimes() mockConn.EXPECT().ContextConfig(gomock.Any()). Return(workspacesdk.ContextConfigResponse{}, xerrors.New("not supported")).AnyTimes() - mockConn.EXPECT().ListMCPTools(gomock.Any()). - Return(workspacesdk.ListMCPToolsResponse{}, nil).AnyTimes() mockConn.EXPECT().LS(gomock.Any(), gomock.Any(), gomock.Any()). Return(workspacesdk.LSResponse{}, nil).AnyTimes() mockConn.EXPECT().ReadFile(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). @@ -10714,8 +10688,6 @@ func TestMCPServerOAuth2TokenRefresh(t *testing.T) { mockConn.EXPECT().SetExtraHeaders(gomock.Any()).AnyTimes() mockConn.EXPECT().ContextConfig(gomock.Any()). Return(workspacesdk.ContextConfigResponse{}, xerrors.New("not supported")).AnyTimes() - mockConn.EXPECT().ListMCPTools(gomock.Any()). - Return(workspacesdk.ListMCPToolsResponse{}, nil).AnyTimes() mockConn.EXPECT().LS(gomock.Any(), gomock.Any(), gomock.Any()). Return(workspacesdk.LSResponse{}, nil).AnyTimes() mockConn.EXPECT().ReadFile(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). @@ -12670,8 +12642,6 @@ func setupWorkspaceContextAgentConn( }, nil }, ).AnyTimes() - mockConn.EXPECT().ListMCPTools(gomock.Any()). - Return(workspacesdk.ListMCPToolsResponse{}, nil).AnyTimes() mockConn.EXPECT().LS(gomock.Any(), gomock.Any(), gomock.Any()). Return(workspacesdk.LSResponse{AbsolutePathString: "/home/coder"}, nil).AnyTimes() mockConn.EXPECT().ReadFile(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). diff --git a/codersdk/workspacesdk/agentconn.go b/codersdk/workspacesdk/agentconn.go index 2b4ab3384bc8c..934ae051c878b 100644 --- a/codersdk/workspacesdk/agentconn.go +++ b/codersdk/workspacesdk/agentconn.go @@ -104,7 +104,6 @@ type AgentConn interface { DialContext(ctx context.Context, network string, addr string) (net.Conn, error) GetPeerDiagnostics() tailnet.PeerDiagnostics ListContainers(ctx context.Context) (codersdk.WorkspaceAgentListContainersResponse, error) - ListMCPTools(ctx context.Context) (ListMCPToolsResponse, error) ListProcesses(ctx context.Context) (ListProcessesResponse, error) ListeningPorts(ctx context.Context) (codersdk.WorkspaceAgentListeningPortsResponse, error) Netcheck(ctx context.Context) (healthsdk.AgentNetcheckReport, error) @@ -1132,12 +1131,6 @@ type FileEditResult struct { Diff string `json:"diff"` } -// ListMCPToolsResponse is the response from the agent's -// MCP tool discovery endpoint. -type ListMCPToolsResponse struct { - Tools []MCPToolInfo `json:"tools"` -} - // MCPToolInfo describes a single tool discovered from an MCP // server configured in the workspace's .mcp.json file. type MCPToolInfo struct { @@ -1215,23 +1208,6 @@ func (c *agentConn) ListProcesses(ctx context.Context) (ListProcessesResponse, e return resp, json.NewDecoder(res.Body).Decode(&resp) } -// ListMCPTools returns tools discovered from MCP servers configured -// in the workspace. -func (c *agentConn) ListMCPTools(ctx context.Context) (ListMCPToolsResponse, error) { - ctx, span := tracing.StartSpan(ctx) - defer span.End() - res, err := c.apiRequest(ctx, http.MethodGet, "/api/v0/mcp/tools", nil) - if err != nil { - return ListMCPToolsResponse{}, xerrors.Errorf("do request: %w", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return ListMCPToolsResponse{}, codersdk.ReadBodyAsError(res) - } - var resp ListMCPToolsResponse - return resp, json.NewDecoder(res.Body).Decode(&resp) -} - // ContextConfig returns the resolved context configuration from // the workspace agent. func (c *agentConn) ContextConfig(ctx context.Context) (ContextConfigResponse, error) { diff --git a/codersdk/workspacesdk/agentconnmock/agentconnmock.go b/codersdk/workspacesdk/agentconnmock/agentconnmock.go index 7e91f2e681a52..bfde170d0177e 100644 --- a/codersdk/workspacesdk/agentconnmock/agentconnmock.go +++ b/codersdk/workspacesdk/agentconnmock/agentconnmock.go @@ -282,21 +282,6 @@ func (mr *MockAgentConnMockRecorder) ListContainers(ctx any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListContainers", reflect.TypeOf((*MockAgentConn)(nil).ListContainers), ctx) } -// ListMCPTools mocks base method. -func (m *MockAgentConn) ListMCPTools(ctx context.Context) (workspacesdk.ListMCPToolsResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListMCPTools", ctx) - ret0, _ := ret[0].(workspacesdk.ListMCPToolsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListMCPTools indicates an expected call of ListMCPTools. -func (mr *MockAgentConnMockRecorder) ListMCPTools(ctx any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListMCPTools", reflect.TypeOf((*MockAgentConn)(nil).ListMCPTools), ctx) -} - // ListProcesses mocks base method. func (m *MockAgentConn) ListProcesses(ctx context.Context) (workspacesdk.ListProcessesResponse, error) { m.ctrl.T.Helper()