diff --git a/docs/migration.md b/docs/migration.md index 9850f74cd4..0f5fc91c3d 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -105,6 +105,46 @@ The `headers`, `timeout`, `sse_read_timeout`, and `auth` parameters have been re Note: `sse_client` retains its `headers`, `timeout`, `sse_read_timeout`, and `auth` parameters — only the streamable HTTP transport changed. +### `terminate_windows_process` removed + +The deprecated `mcp.os.win32.utilities.terminate_windows_process` function has been +removed. Process termination is handled internally by the `stdio_client` context +manager; there is no replacement API. The Windows tree-termination helper +`terminate_windows_process_tree` no longer accepts a `timeout_seconds` argument — +the value was never used (Job Object termination is immediate). + +### `stdio_client` no longer kills children of a gracefully-exited server on POSIX + +When a server exits on its own after `stdio_client` closes its stdin, background +child processes the server leaves behind are no longer killed on POSIX — their +lifetime is the server's business. The old behavior was a side effect of a shutdown +wait gated on the stdio pipes closing rather than on process exit: a child holding +an inherited pipe made a well-behaved server look hung, so its whole process tree +was killed. (That gating is an asyncio behavior specific to Python 3.11+ — on +Python 3.10 and the trio backend the old wait already resolved on process exit, so +the spurious kill never fired there.) A server that does not exit within the grace +period is still terminated +along with its entire process group. On Windows, children stay in the server's Job +Object and are still killed at shutdown — now deterministically when the job handle +is closed, rather than whenever the handle happened to be garbage-collected. + +If you relied on `stdio_client` killing everything the server spawned, make the +server terminate its own children on shutdown (its stdin reaching EOF is the +shutdown signal), or clean up the process tree from the host application after +`stdio_client` exits. + +Two related shutdown refinements: `stdio_client` now closes its end of the pipes +deterministically at shutdown, so a surviving child that keeps writing to an +inherited stdout receives `EPIPE`/`SIGPIPE` once the client is gone (previously the +pipe lingered until garbage collection); and a failed write to a server that is +still running now surfaces as a closed connection (`CONNECTION_CLOSED`) on the read +side instead of leaving requests waiting indefinitely. + +`terminate_posix_process_tree` now requires the process to lead its own process +group (spawned with `start_new_session=True`); the `getpgid()` lookup and the +per-process terminate/kill fallback are gone. The win32 utilities logger is now +named `mcp.os.win32.utilities` (was `client.stdio.win32`). + ### Removed type aliases and classes The following deprecated type aliases and classes have been removed from `mcp.types`: diff --git a/src/mcp/client/stdio.py b/src/mcp/client/stdio.py index 902dc8576c..baf7ad1ca1 100644 --- a/src/mcp/client/stdio.py +++ b/src/mcp/client/stdio.py @@ -1,21 +1,33 @@ +"""stdio client transport. + +Runs an MCP server as a subprocess and exchanges newline-delimited JSON-RPC +messages with it over stdin/stdout. Two pipe tasks bridge the server's pipes +to the session's in-memory streams; shutdown follows the MCP spec sequence +(close stdin, wait, then kill the process tree) inside a cancellation shield +with every wait bounded, so a cancelled caller can neither leak a live server +process nor hang on one. +""" + import logging import os import sys -from contextlib import asynccontextmanager +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager, suppress from pathlib import Path from typing import Literal, TextIO import anyio import anyio.lowlevel -from anyio.abc import Process -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +from anyio.abc import AsyncResource, Process from anyio.streams.text import TextReceiveStream from pydantic import BaseModel, Field from mcp import types +from mcp.client._transport import TransportStreams from mcp.os.posix.utilities import terminate_posix_process_tree from mcp.os.win32.utilities import ( - FallbackProcess, + ServerProcess, + close_process_job, create_windows_process, get_windows_executable_command, terminate_windows_process_tree, @@ -44,14 +56,24 @@ else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"] ) -# Timeout for process termination before falling back to force kill +# Grace period for the server to exit on its own after its stdin closes. PROCESS_TERMINATION_TIMEOUT = 2.0 +# Extra time after SIGTERM before SIGKILL; POSIX only (Windows kills hard). +FORCE_KILL_TIMEOUT = 2.0 + +# Time for the event loop to observe a kill; only an unkillable process runs this out. +_KILL_REAP_TIMEOUT = 2.0 + +# Time for the writer to flush accepted messages before stdin closes. +_WRITER_FLUSH_TIMEOUT = 0.5 + +# How often to poll returncode while waiting for the process to die. +_EXIT_POLL_INTERVAL = 0.01 + def get_default_environment() -> dict[str, str]: - """Returns a default environment object including only environment variables deemed - safe to inherit. - """ + """Returns only the environment variables that are safe to inherit.""" env: dict[str, str] = {} for key in DEFAULT_INHERITED_ENV_VARS: @@ -76,150 +98,227 @@ class StdioServerParameters(BaseModel): """Command line arguments to pass to the executable.""" env: dict[str, str] | None = None - """ - The environment to use when spawning the process. - - If not specified, the result of get_default_environment() will be used. - """ + """Extra environment variables, merged over get_default_environment().""" cwd: str | Path | None = None """The working directory to use when spawning the process.""" encoding: str = "utf-8" - """ - The text encoding used when sending/receiving messages to the server. - - Defaults to utf-8. - """ + """Text encoding for messages to and from the server.""" encoding_error_handler: Literal["strict", "ignore", "replace"] = "strict" - """ - The text encoding error handler. - - See https://docs.python.org/3/library/codecs.html#codec-base-classes for - explanations of possible values. - """ + """Encoding error handler; see https://docs.python.org/3/library/codecs.html#error-handlers.""" @asynccontextmanager -async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stderr): - """Client transport for stdio: this will connect to a server by spawning a - process and communicating with it over stdin/stdout. +async def stdio_client( + server: StdioServerParameters, errlog: TextIO = sys.stderr +) -> AsyncGenerator[TransportStreams, None]: + """Spawns an MCP server subprocess and connects to it over stdin/stdout. + + Raises: + OSError: If the server process cannot be spawned. + ValueError: If the spawn parameters are invalid (embedded NUL bytes). """ - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] - read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] + command = _get_executable_command(server.command) - write_stream: MemoryObjectSendStream[SessionMessage] - write_stream_reader: MemoryObjectReceiveStream[SessionMessage] + process = await _create_platform_compatible_process( + command=command, + args=server.args, + env=get_default_environment() | (server.env or {}), + errlog=errlog, + cwd=server.cwd, + ) - read_stream_writer, read_stream = anyio.create_memory_object_stream(0) - write_stream, write_stream_reader = anyio.create_memory_object_stream(0) + # The spawn succeeded; no awaits until the task group is entered, or a + # cancellation delivered in the gap would leak the live process. + read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](0) + write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](0) - try: - command = _get_executable_command(server.command) - - # Open process with stderr piped for capture - process = await _create_platform_compatible_process( - command=command, - args=server.args, - env=({**get_default_environment(), **server.env} if server.env is not None else get_default_environment()), - errlog=errlog, - cwd=server.cwd, - ) - except OSError: - # Clean up streams if process creation fails - await read_stream.aclose() - await write_stream.aclose() - await read_stream_writer.aclose() - await write_stream_reader.aclose() - raise - - async def stdout_reader(): + shutting_down = False + writer_done = anyio.Event() + + async def stdout_reader() -> None: assert process.stdout, "Opened process is missing stdout" + stdout = TextReceiveStream(process.stdout, encoding=server.encoding, errors=server.encoding_error_handler) try: async with read_stream_writer: - buffer = "" - async for chunk in TextReceiveStream( - process.stdout, - encoding=server.encoding, - errors=server.encoding_error_handler, - ): - lines = (buffer + chunk).split("\n") - buffer = lines.pop() - - for line in lines: - try: - message = types.jsonrpc_message_adapter.validate_json(line, by_name=False) - except Exception as exc: # pragma: no cover - logger.exception("Failed to parse JSONRPC message from server") - await read_stream_writer.send(exc) - continue - - session_message = SessionMessage(message) - await read_stream_writer.send(session_message) - except anyio.ClosedResourceError: # pragma: lax no cover - await anyio.lowlevel.checkpoint() - - async def stdin_writer(): + try: + # One line at a time; no read-ahead while a delivery is blocked. + buffer = "" + async for chunk in stdout: + lines = (buffer + chunk).split("\n") + buffer = lines.pop() + for line in lines: + try: + await read_stream_writer.send(_parse_line(line)) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): + return # the session is gone; only the drain below remains + finally: + await _drain_stdout(process) + except anyio.ClosedResourceError: + pass # our own shutdown closed the stdout stream under the read + except (anyio.BrokenResourceError, ConnectionError): + # Teardown noise during shutdown, a real failure otherwise; either way + # the session sees clean closure when the read stream closes. + if not shutting_down: + logger.exception("Reading from the MCP server's stdout failed mid-session") + + async def stdin_writer() -> None: assert process.stdin, "Opened process is missing stdin" try: async with write_stream_reader: async for session_message in write_stream_reader: json = session_message.message.model_dump_json(by_alias=True, exclude_unset=True) - await process.stdin.send( - (json + "\n").encode( - encoding=server.encoding, - errors=server.encoding_error_handler, - ) - ) - except anyio.ClosedResourceError: # pragma: no cover - await anyio.lowlevel.checkpoint() - - async with anyio.create_task_group() as tg, process: + data = (json + "\n").encode(encoding=server.encoding, errors=server.encoding_error_handler) + await process.stdin.send(data) + except (anyio.ClosedResourceError, anyio.BrokenResourceError, OSError): + # The server may still be alive: close the read stream so the session + # sees the connection end instead of a request hanging forever. + await read_stream_writer.aclose() + finally: + writer_done.set() + + async def shutdown() -> None: + """Winds the transport down: stop traffic, flush, stop the server, release the streams.""" + # Unblock the reader into its drain: a server stuck writing stdout cannot + # read its stdin, so draining is what lets the flush below complete. + read_stream.close() + # Bounded window for the writer to flush already-accepted messages. + write_stream.close() + with anyio.move_on_after(_WRITER_FLUSH_TIMEOUT) as flush_scope: + await writer_done.wait() + if flush_scope.cancelled_caught: + await anyio.lowlevel.cancel_shielded_checkpoint() # resync coverage on 3.11 (gh-106749) + await _stop_server_process(process) + await _aclose_all(read_stream, write_stream, read_stream_writer, write_stream_reader) + # One pass so unblocked tasks exit via their except paths before the cancel. + await anyio.lowlevel.checkpoint() + + async with anyio.create_task_group() as tg: tg.start_soon(stdout_reader) tg.start_soon(stdin_writer) try: yield read_stream, write_stream finally: - # MCP spec: stdio shutdown sequence - # 1. Close input stream to server - # 2. Wait for server to exit, or send SIGTERM if it doesn't exit in time - # 3. Send SIGKILL if still not exited - if process.stdin: # pragma: no branch - try: - await process.stdin.aclose() - except Exception: # pragma: no cover - # stdin might already be closed, which is fine - pass - - try: - # Give the process time to exit gracefully after stdin closes - with anyio.fail_after(PROCESS_TERMINATION_TIMEOUT): - await process.wait() - except TimeoutError: - # Process didn't exit from stdin closure, use platform-specific termination - # which handles SIGTERM -> SIGKILL escalation - await _terminate_process_tree(process) - except ProcessLookupError: # pragma: no cover - # Process already exited, which is fine - pass - await read_stream.aclose() - await write_stream.aclose() - await read_stream_writer.aclose() - await write_stream_reader.aclose() + shutting_down = True + # Shutdown must finish even under caller cancellation, or the server + # process would leak; every wait inside is bounded. (Native + # task.cancel() and the fallback's worker threads can still defeat it.) + with anyio.CancelScope(shield=True): + await shutdown() + # Unstick pipe tasks a kill survivor's open pipe end could still block. + tg.cancel_scope.cancel() + # The cancel lands via throw(); one yield resyncs 3.11 coverage (gh-106749). + await anyio.lowlevel.cancel_shielded_checkpoint() + + +def _parse_line(line: str) -> SessionMessage | Exception: + """Parses one stdout line, returning parse errors as values for the session to surface.""" + try: + message = types.jsonrpc_message_adapter.validate_json(line, by_name=False) + except ValueError as exc: + logger.exception("Failed to parse JSONRPC message from server") + return exc + return SessionMessage(message) -def _get_executable_command(command: str) -> str: - """Get the correct executable command normalized for the current platform. +async def _drain_stdout(process: ServerProcess) -> None: + """Consumes and discards the server's remaining stdout. - Args: - command: Base command (e.g., 'uvx', 'npx') + Keeps a server flushing buffered output from blocking on a full pipe and + missing its chance to exit; shielded, raw bytes, ends when shutdown closes + the pipe. + """ + assert process.stdout + with anyio.CancelScope(shield=True): + with suppress( + anyio.EndOfStream, + anyio.ClosedResourceError, + anyio.BrokenResourceError, + ConnectionError, + OSError, + ): + while True: + await process.stdout.receive() + + +async def _stop_server_process(process: ServerProcess) -> None: + """Closes stdin, waits out the grace period, then kills the whole tree. + + The escalation order is spec text; timeouts and tree-wide scope are SDK policy: + https://modelcontextprotocol.io/specification/2025-11-25/basic/lifecycle#shutdown + """ + assert process.stdin and process.stdout, "server process is spawned with pipes" - Returns: - str: Platform-appropriate command + await _close_pipe(process.stdin) + if not await _wait_for_process_exit(process, PROCESS_TERMINATION_TIMEOUT): + await _terminate_process_tree(process) + # Until the event loop observes the death, the transport cannot close. + if not await _wait_for_process_exit(process, _KILL_REAP_TIMEOUT): + logger.warning("MCP server process %d is still alive after the kill escalation; abandoning it", process.pid) + + # Reaps surviving Windows job members now, not at GC; no-op on POSIX. + close_process_job(process) + # A kill survivor can hold the stdout pipe open; poison the reader anyway. + await _close_pipe(process.stdout) + _close_subprocess_transport(process) + + +async def _close_pipe(stream: AsyncResource) -> None: + """Closes a pipe stream, tolerating one already closed, broken, or contended.""" + with suppress(OSError, anyio.BrokenResourceError, anyio.ClosedResourceError): + await stream.aclose() + + +async def _wait_for_process_exit(process: ServerProcess, timeout: float) -> bool: + """Returns whether the process died within the timeout, by polling returncode. + + Not process.wait(): on asyncio 3.11+ it also waits for pipe EOF, and a + child that inherited the pipes makes an exited server look hung. + """ + deadline = anyio.current_time() + timeout + while process.returncode is None: + if anyio.current_time() >= deadline: + return False + await anyio.sleep(_EXIT_POLL_INTERVAL) + return True + + +async def _terminate_process_tree(process: ServerProcess) -> None: + """Kills the process and all its descendants. + + POSIX: SIGTERM to the process group, SIGKILL after FORCE_KILL_TIMEOUT. + Windows: immediate Job Object termination (already a hard kill). """ + if sys.platform == "win32": # pragma: no cover + await terminate_windows_process_tree(process) + else: # pragma: lax no cover + # The Windows-only FallbackProcess never reaches the POSIX path. + assert isinstance(process, Process) + await terminate_posix_process_tree(process, FORCE_KILL_TIMEOUT) + + +def _close_subprocess_transport(process: ServerProcess) -> None: + """Closes the asyncio subprocess transport, if there is one. + + The transport otherwise stays open (and warns at GC) while a surviving + descendant holds a pipe end; nothing public exposes it, hence the attribute + walk. No-op on trio and the Windows fallback. + """ + transport = getattr(getattr(process, "_process", None), "_transport", None) + # Duck-typed: uvloop's UVProcessTransport is not an asyncio.SubprocessTransport. + close = getattr(transport, "close", None) + if callable(close): + # close() on <=3.12 can raise PermissionError re-killing a setuid child. + with suppress(PermissionError): + close() + + +def _get_executable_command(command: str) -> str: + """Normalizes the command for the current platform.""" if sys.platform == "win32": # pragma: no cover return get_windows_executable_command(command) else: # pragma: lax no cover @@ -232,16 +331,15 @@ async def _create_platform_compatible_process( env: dict[str, str] | None = None, errlog: TextIO = sys.stderr, cwd: Path | str | None = None, -): - """Creates a subprocess in a platform-compatible way. +) -> ServerProcess: + """Spawns the server in its own kill scope. - Unix: Creates process in a new session/process group for killpg support - Windows: Creates process in a Job Object for reliable child termination + A new session/process group on POSIX, a Job Object on Windows. """ if sys.platform == "win32": # pragma: no cover - process = await create_windows_process(command, args, env, errlog, cwd) + return await create_windows_process(command, args, env, errlog, cwd) else: # pragma: lax no cover - process = await anyio.open_process( + return await anyio.open_process( [command, *args], env=env, stderr=errlog, @@ -249,22 +347,8 @@ async def _create_platform_compatible_process( start_new_session=True, ) - return process - -async def _terminate_process_tree(process: Process | FallbackProcess, timeout_seconds: float = 2.0) -> None: - """Terminate a process and all its children using platform-specific methods. - - Unix: Uses os.killpg() for atomic process group termination - Windows: Uses Job Objects via pywin32 for reliable child process cleanup - - Args: - process: The process to terminate - timeout_seconds: Timeout in seconds before force killing (default: 2.0) - """ - if sys.platform == "win32": # pragma: no cover - await terminate_windows_process_tree(process, timeout_seconds) - else: # pragma: lax no cover - # FallbackProcess should only be used for Windows compatibility - assert isinstance(process, Process) - await terminate_posix_process_tree(process, timeout_seconds) +async def _aclose_all(*streams: AsyncResource) -> None: + """Closes every given stream.""" + for stream in streams: + await stream.aclose() diff --git a/src/mcp/os/posix/utilities.py b/src/mcp/os/posix/utilities.py index 0e9d74cf3c..d15be17194 100644 --- a/src/mcp/os/posix/utilities.py +++ b/src/mcp/os/posix/utilities.py @@ -3,55 +3,61 @@ import logging import os import signal +from contextlib import suppress import anyio from anyio.abc import Process logger = logging.getLogger(__name__) +# How often to probe for surviving group members between SIGTERM and SIGKILL. +_GROUP_POLL_INTERVAL = 0.01 -async def terminate_posix_process_tree(process: Process, timeout_seconds: float = 2.0) -> None: - """Terminate a process and all its children on POSIX systems. - - Uses os.killpg() for atomic process group termination. - Args: - process: The process to terminate - timeout_seconds: Timeout in seconds before force killing (default: 2.0) +async def terminate_posix_process_tree(process: Process, timeout_seconds: float = 2.0) -> None: + """Terminates a process and all its descendants on POSIX. + + SIGTERMs the process group, waits up to timeout_seconds for it to + disappear, then SIGKILLs whatever remains. killpg reaches every descendant + atomically, even ones whose parent already exited; daemonizers that left + the group escape by design. A group only disappears once every member is + dead and reaped, so a client running as PID 1 should reap orphans (e.g. + docker run --init) or the wait below runs its full timeout. """ - pid = getattr(process, "pid", None) or getattr(getattr(process, "popen", None), "pid", None) - if not pid: - # No PID means there's no process to terminate - it either never started, - # already exited, or we have an invalid process object - return + # The leader's pid is the pgid (start_new_session). Never use getpgid(): + # it fails once the leader is reaped, even with live members left. + pgid = process.pid try: - pgid = os.getpgid(pid) os.killpg(pgid, signal.SIGTERM) + except ProcessLookupError: + return # the whole group is already gone + except PermissionError: + # EPERM never proves the group is gone (macOS raises it for zombie or + # foreign-euid members), so keep waiting and escalating. + logger.warning( + "No permission to signal some of process group %d; waiting for it to exit anyway", pgid, exc_info=True + ) + + with anyio.move_on_after(timeout_seconds): + while _group_alive(pgid): + # Reading returncode reaps the leader on trio; a zombie leader would + # otherwise keep the group alive for the full timeout. + _ = process.returncode + await anyio.sleep(_GROUP_POLL_INTERVAL) + return + + # ESRCH: died since the last probe. EPERM: we killed what we were allowed to. + with suppress(ProcessLookupError, PermissionError): + os.killpg(pgid, signal.SIGKILL) - with anyio.move_on_after(timeout_seconds): - while True: - try: - # Check if process group still exists (signal 0 = check only) - os.killpg(pgid, 0) - await anyio.sleep(0.1) - except ProcessLookupError: - return - - try: - os.killpg(pgid, signal.SIGKILL) - except ProcessLookupError: - pass - - except (ProcessLookupError, PermissionError, OSError) as e: - logger.warning(f"Process group termination failed for PID {pid}: {e}, falling back to simple terminate") - try: - process.terminate() - with anyio.fail_after(timeout_seconds): - await process.wait() - except Exception: - logger.warning(f"Process termination failed for PID {pid}, attempting force kill") - try: - process.kill() - except Exception: - logger.exception(f"Failed to kill process {pid}") + +def _group_alive(pgid: int) -> bool: + """Probes the group with signal 0; only ESRCH proves it is gone.""" + try: + os.killpg(pgid, 0) + except ProcessLookupError: + return False + except PermissionError: + pass # unsignalable survivors or unreaped zombies; EPERM is ambiguous + return True diff --git a/src/mcp/os/win32/utilities.py b/src/mcp/os/win32/utilities.py index 6f68405f78..1cc867d4fa 100644 --- a/src/mcp/os/win32/utilities.py +++ b/src/mcp/os/win32/utilities.py @@ -4,16 +4,16 @@ import shutil import subprocess import sys +import weakref +from contextlib import suppress from pathlib import Path -from typing import BinaryIO, TextIO, cast +from typing import BinaryIO, TextIO, TypeAlias, cast import anyio -from anyio import to_thread from anyio.abc import Process from anyio.streams.file import FileReadStream, FileWriteStream -from typing_extensions import deprecated -logger = logging.getLogger("client.stdio.win32") +logger = logging.getLogger(__name__) # Windows-specific imports for Job Objects if sys.platform == "win32": @@ -28,110 +28,86 @@ win32job = None pywintypes = None -JobHandle = int +# How often FallbackProcess polls the underlying Popen for exit. +_EXIT_POLL_INTERVAL = 0.01 +# Job Object handle per spawned process, for tree termination at shutdown. +# Values stay pywin32 PyHANDLEs: if no pop site ever runs, the dying weak entry +# drops the last reference and the PyHANDLE destructor closes the handle, which +# is what makes KILL_ON_JOB_CLOSE reap an abandoned tree. +_process_jobs: "weakref.WeakKeyDictionary[Process | FallbackProcess, object]" = weakref.WeakKeyDictionary() -def get_windows_executable_command(command: str) -> str: - """Get the correct executable command normalized for Windows. - - On Windows, commands might exist with specific extensions (.exe, .cmd, etc.) - that need to be located for proper execution. - Args: - command: Base command (e.g., 'uvx', 'npx') +def get_windows_executable_command(command: str) -> str: + """Resolves the command to a Windows executable path. - Returns: - str: Windows-appropriate command path + Tries the bare name first, then the common script extensions (.cmd, .bat, + .exe, .ps1). """ try: - # First check if command exists in PATH as-is if command_path := shutil.which(command): return command_path - # Check for Windows-specific extensions for ext in [".cmd", ".bat", ".exe", ".ps1"]: ext_version = f"{command}{ext}" if ext_path := shutil.which(ext_version): return ext_path - # For regular commands or if we couldn't find special versions return command except OSError: - # Handle file system errors during path resolution - # (permissions, broken symlinks, etc.) - return command + return command # path probing failed (permissions, broken symlinks) class FallbackProcess: - """A fallback process wrapper for Windows to handle async I/O - when using subprocess.Popen, which provides sync-only FileIO objects. + """Async wrapper around subprocess.Popen for SelectorEventLoop. - This wraps stdin and stdout into async-compatible - streams (FileReadStream, FileWriteStream), - so that MCP clients expecting async streams can work properly. + Windows event loops without async subprocess support get this Popen-backed + fallback, with anyio file streams wrapping the pipes. """ - def __init__(self, popen_obj: subprocess.Popen[bytes]): + def __init__(self, popen_obj: subprocess.Popen[bytes]) -> None: self.popen: subprocess.Popen[bytes] = popen_obj - self.stdin_raw = popen_obj.stdin # type: ignore[assignment] - self.stdout_raw = popen_obj.stdout # type: ignore[assignment] - self.stderr = popen_obj.stderr # type: ignore[assignment] - - self.stdin = FileWriteStream(cast(BinaryIO, self.stdin_raw)) if self.stdin_raw else None - self.stdout = FileReadStream(cast(BinaryIO, self.stdout_raw)) if self.stdout_raw else None - - async def __aenter__(self): - """Support async context manager entry.""" - return self - - async def __aexit__( - self, - exc_type: BaseException | None, - exc_val: BaseException | None, - exc_tb: object | None, - ) -> None: - """Terminate and wait on process exit inside a thread.""" + stdin = popen_obj.stdin + stdout = popen_obj.stdout + + self.stdin = FileWriteStream(cast(BinaryIO, stdin)) if stdin else None + self.stdout = FileReadStream(cast(BinaryIO, stdout)) if stdout else None + + async def wait(self) -> int: + """Waits for exit by polling the Popen. + + A thread blocked in Popen.wait() cannot be cancelled by anyio, which + would defeat every timeout placed around this call. + """ + while (returncode := self.popen.poll()) is None: + await anyio.sleep(_EXIT_POLL_INTERVAL) + return returncode + + def terminate(self) -> None: + """Terminates the subprocess.""" self.popen.terminate() - await to_thread.run_sync(self.popen.wait) - - # Close the file handles to prevent ResourceWarning - if self.stdin: - await self.stdin.aclose() - if self.stdout: - await self.stdout.aclose() - if self.stdin_raw: - self.stdin_raw.close() - if self.stdout_raw: - self.stdout_raw.close() - if self.stderr: - self.stderr.close() - - async def wait(self): - """Async wait for process completion.""" - return await to_thread.run_sync(self.popen.wait) - - def terminate(self): - """Terminate the subprocess immediately.""" - return self.popen.terminate() def kill(self) -> None: - """Kill the subprocess immediately (alias for terminate).""" - self.terminate() + """Kills the subprocess (on Windows the same hard kill as terminate).""" + self.popen.kill() @property def pid(self) -> int: - """Return the process ID.""" + """Returns the process ID.""" return self.popen.pid @property def returncode(self) -> int | None: - """Return the exit code, or ``None`` if the process has not yet terminated.""" - return self.popen.returncode + """The exit code, or None while the process is still running. + + Polls the Popen so death is observable without anyone calling wait(). + """ + return self.popen.poll() -# ------------------------ -# Updated function -# ------------------------ +# The process handle stdio_client drives: anyio's Process, or the Popen-backed +# fallback used on Windows event loops without async subprocess support. +ServerProcess: TypeAlias = Process | FallbackProcess async def create_windows_process( @@ -141,53 +117,35 @@ async def create_windows_process( errlog: TextIO | None = sys.stderr, cwd: Path | str | None = None, ) -> Process | FallbackProcess: - """Creates a subprocess in a Windows-compatible way with Job Object support. + """Creates a subprocess with Job Object support for tree termination. - Attempts to use anyio's open_process for async subprocess creation. - In some cases this will throw NotImplementedError on Windows, e.g., - when using the SelectorEventLoop, which does not support async subprocesses. - In that case, we fall back to using subprocess.Popen. - - The process is automatically added to a Job Object to ensure all child - processes are terminated when the parent is terminated. - - Args: - command (str): The executable to run - args (list[str]): List of command line arguments - env (dict[str, str] | None): Environment variables - errlog (TextIO | None): Where to send stderr output (defaults to sys.stderr) - cwd (Path | str | None): Working directory for the subprocess + Spawns via anyio's open_process; event loops without async subprocess + support (notably the SelectorEventLoop) raise NotImplementedError, in which + case the spawn falls back to a Popen-backed FallbackProcess. Either way the + process is then assigned to a Job Object so its children can be terminated + with it; children spawned before the assignment completes are not captured + (see the inline note below). Returns: - Process | FallbackProcess: Async-compatible subprocess with stdin and stdout streams + Process | FallbackProcess: The spawned process with async stdin/stdout streams. """ - job = _create_job_object() - process = None - try: - # First try using anyio with Windows-specific flags to hide console window process = await anyio.open_process( [command, *args], env=env, # Ensure we don't create console windows for each process - creationflags=subprocess.CREATE_NO_WINDOW # type: ignore - if hasattr(subprocess, "CREATE_NO_WINDOW") - else 0, + creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0), stderr=errlog, cwd=cwd, ) except NotImplementedError: - # If Windows doesn't support async subprocess creation, use fallback + # Windows event loops without async subprocess support (SelectorEventLoop) process = await _create_windows_fallback_process(command, args, env, errlog, cwd) - except Exception: - # Try again without creation flags - process = await anyio.open_process( - [command, *args], - env=env, - stderr=errlog, - cwd=cwd, - ) + # Children spawned before the assignment completes land outside the job + # (membership is inherited at CreateProcess, never acquired retroactively); + # if that ever bites, the fix is a CREATE_SUSPENDED spawn -> assign -> resume. + job = _create_job_object() _maybe_assign_process_to_job(process, job) return process @@ -199,41 +157,26 @@ async def _create_windows_fallback_process( errlog: TextIO | None = sys.stderr, cwd: Path | str | None = None, ) -> FallbackProcess: - """Create a subprocess using subprocess.Popen as a fallback when anyio fails. - - This function wraps the sync subprocess.Popen in an async-compatible interface. - """ - try: - # Try launching with creationflags to avoid opening a new console window - popen_obj = subprocess.Popen( - [command, *args], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=errlog, - env=env, - cwd=cwd, - bufsize=0, # Unbuffered output - creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0), - ) - except Exception: - # If creationflags failed, fallback without them - popen_obj = subprocess.Popen( - [command, *args], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=errlog, - env=env, - cwd=cwd, - bufsize=0, - ) + """Spawns via subprocess.Popen and wraps it in FallbackProcess.""" + popen_obj = subprocess.Popen( + [command, *args], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=errlog, + env=env, + cwd=cwd, + bufsize=0, # Unbuffered output + creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0), + ) return FallbackProcess(popen_obj) -def _create_job_object() -> int | None: - """Create a Windows Job Object configured to terminate all processes when closed.""" - if sys.platform != "win32" or not win32job: +def _create_job_object() -> object | None: + """Creates a Windows Job Object configured to terminate all its processes when closed.""" + if sys.platform != "win32" or not win32api or not win32job: return None + job = None try: job = win32job.CreateJobObject(None, "") extended_info = win32job.QueryInformationJobObject(job, win32job.JobObjectExtendedLimitInformation) @@ -241,17 +184,20 @@ def _create_job_object() -> int | None: extended_info["BasicLimitInformation"]["LimitFlags"] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE win32job.SetInformationJobObject(job, win32job.JobObjectExtendedLimitInformation, extended_info) return job - except Exception as e: - logger.warning(f"Failed to create Job Object for process tree management: {e}") + except pywintypes.error: + logger.warning("Failed to create Job Object for process tree management", exc_info=True) + # If creation succeeded but configuration failed, close the handle now. + if job is not None: + _close_job_handle(job) return None -def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: JobHandle | None) -> None: - """Try to assign a process to a job object. +def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: object | None) -> None: + """Assigns the process to the job and records it for tree termination. - If assignment fails for any reason, the job handle is closed. + On any failure the job handle is closed instead. """ - if not job: + if job is None: return if sys.platform != "win32" or not win32api or not win32con or not win32job: @@ -262,72 +208,62 @@ def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: JobHan win32con.PROCESS_SET_QUOTA | win32con.PROCESS_TERMINATE, False, process.pid ) if not process_handle: - raise Exception("Failed to open process handle") + raise pywintypes.error(0, "OpenProcess", "Failed to open process handle") try: win32job.AssignProcessToJobObject(job, process_handle) - process._job_object = job finally: win32api.CloseHandle(process_handle) - except Exception as e: - logger.warning(f"Failed to assign process {process.pid} to Job Object: {e}") - if win32api: - win32api.CloseHandle(job) + # Record only after the CloseHandle above succeeded: had it failed, the + # except below would close the job and KILL_ON_JOB_CLOSE takes the server. + _process_jobs[process] = job + except pywintypes.error: + logger.warning("Failed to assign process %d to Job Object", process.pid, exc_info=True) + _close_job_handle(job) -async def terminate_windows_process_tree(process: Process | FallbackProcess, timeout_seconds: float = 2.0) -> None: - """Terminate a process and all its children on Windows. +def close_process_job(process: Process | FallbackProcess) -> None: + """Closes the process's Job Object handle, if it still has one. - If the process has an associated job object, it will be terminated. - Otherwise, falls back to basic process termination. + KILL_ON_JOB_CLOSE makes the close also kill any members still alive, + deterministically rather than at GC time; a deliberate divergence from + POSIX, where a graceful server's children are left alive. + """ + if sys.platform != "win32": + return + + job = _process_jobs.pop(process, None) + if job is not None: + _close_job_handle(job) - Args: - process: The process to terminate - timeout_seconds: Timeout in seconds before force killing (default: 2.0) + +async def terminate_windows_process_tree(process: Process | FallbackProcess) -> None: + """Terminates the process's job, or just the process if it has no job. + + Job termination is an immediate hard kill of every member. Windows has no + tree-wide SIGTERM; the stdin-close grace period is the server's chance to + exit cleanly. """ if sys.platform != "win32": return - job = getattr(process, "_job_object", None) - if job and win32job: + job = _process_jobs.pop(process, None) + if job is not None and win32job: try: - win32job.TerminateJobObject(job, 1) - except Exception: - # Job might already be terminated - pass + with suppress(pywintypes.error): # the job might already be terminated + win32job.TerminateJobObject(job, 1) finally: - if win32api: - try: - win32api.CloseHandle(job) - except Exception: - pass + _close_job_handle(job) - # Always try to terminate the process itself as well + # The process may have no job (creation or assignment failed); kill it directly too. try: process.terminate() - except Exception: + except OSError: pass -@deprecated( - "terminate_windows_process is deprecated and will be removed in a future version. " - "Process termination is now handled internally by the stdio_client context manager." -) -async def terminate_windows_process(process: Process | FallbackProcess): - """Terminate a Windows process. - - Note: On Windows, terminating a process with process.terminate() doesn't - always guarantee immediate process termination. - If the process does not exit within 2 seconds, process.kill() is called - to send a SIGKILL-equivalent signal. - - Args: - process: The process to terminate - """ - try: - process.terminate() - with anyio.fail_after(2.0): - await process.wait() - except TimeoutError: - # Force kill if it doesn't terminate - process.kill() +def _close_job_handle(job: object) -> None: + """Closes a Job Object handle, tolerating one that is already closed.""" + if win32api and pywintypes: + with suppress(pywintypes.error): + win32api.CloseHandle(job) diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 06e2cba4b1..f3cb88dc9c 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -1,253 +1,1120 @@ +"""Tests for the stdio client transport. + +Transport logic (framing, parse errors, shutdown escalation decisions) is tested in +process against a fake process injected through the spawn seam; only real OS behaviour +(process-group kill semantics, SIGKILL after an ignored SIGTERM, exec failure) uses +real subprocesses, synchronized only by kernel-level liveness sockets. The full +client<->server round trip is pinned by tests/interaction/transports/test_stdio.py. +""" + import errno -import shutil +import gc +import logging +import math +import os +import signal import sys -import textwrap -import time +from collections.abc import Callable from contextlib import AsyncExitStack, suppress +from pathlib import Path +from typing import TextIO, cast import anyio import anyio.abc +import anyio.lowlevel import pytest +import trio +import trio.testing +from anyio.streams.memory import MemoryObjectReceiveStream +from mcp.client import stdio +from mcp.client._transport import ReadStream from mcp.client.session import ClientSession from mcp.client.stdio import ( + _EXIT_POLL_INTERVAL, StdioServerParameters, _create_platform_compatible_process, _terminate_process_tree, stdio_client, ) +from mcp.os.posix import utilities as posix_utilities +from mcp.os.posix.utilities import terminate_posix_process_tree from mcp.os.win32.utilities import FallbackProcess from mcp.shared.exceptions import MCPError from mcp.shared.message import SessionMessage from mcp.types import CONNECTION_CLOSED, JSONRPCMessage, JSONRPCRequest, JSONRPCResponse -# Timeout for cleanup of processes that ignore SIGTERM -# This timeout ensures the test fails quickly if the cleanup logic doesn't have -# proper fallback mechanisms (SIGINT/SIGKILL) for processes that ignore SIGTERM -SIGTERM_IGNORING_PROCESS_TIMEOUT = 5.0 +# --------------------------------------------------------------------------- +# In-process fake of the spawned server process +# --------------------------------------------------------------------------- +# +# Everything between the spawn and the OS kill is pure SDK logic, so it is tested +# against this fake by monkeypatching the spawn and terminate seams. The OS half +# is tested separately below with real processes. + + +class _FakeStdin: + """The fake process's stdin: records what the client writes, signals closure.""" + + def __init__(self, process: "FakeProcess") -> None: + self._process = process + + async def send(self, data: bytes) -> None: + if self._process.stdin_send_gate is not None: + # A full pipe whose reader is busy elsewhere: the write completes + # only once the test's gate opens. + await self._process.stdin_send_gate.wait() + if self._process.stdin_send_blocks: + # A pipe whose reader stopped reading: the write never completes. + await anyio.sleep_forever() + if self._process.stdin_send_error is not None: + raise self._process.stdin_send_error + if self._process.returncode is not None: + # What the asyncio backend surfaces when writing to a dead child's pipe. + raise ConnectionResetError("Connection lost") + self._process.written.append(data) + + async def aclose(self) -> None: + self._process.stdin_closed.set() + if self._process.on_stdin_close is not None: + self._process.on_stdin_close() + if self._process.stdin_aclose_error is not None: + raise self._process.stdin_aclose_error + + +class _FakeStdout: + """The fake process's stdout: delegates to the in-memory stream. + + Optionally surfaces the abrupt-death or close-time errors a real pipe can. + """ + + def __init__( + self, + inner: MemoryObjectReceiveStream[bytes], + *, + eof_error: Exception | None = None, + aclose_error: Exception | None = None, + on_receive: Callable[[], None], + ) -> None: + self._inner = inner + self._eof_error = eof_error + self._aclose_error = aclose_error + self._on_receive = on_receive + + async def receive(self) -> bytes: + try: + chunk = await self._inner.receive() + except anyio.EndOfStream: + if self._eof_error is not None: + # A hard-killed pipe surfaces a reset, not EOF, on the proactor loop. + raise self._eof_error from None + raise + self._on_receive() + return chunk + + async def aclose(self) -> None: + await self._inner.aclose() + if self._aclose_error is not None: + raise self._aclose_error + # Real async closes yield; keeps the fake honest and shutdown scheduling realistic. + await anyio.lowlevel.checkpoint() + + +class FakeProcess: + """In-memory stand-in for the spawned server process. + + `feed`/`close_stdout` drive its stdout, `written` records client writes, `exit` + and the error knobs replay death and pipe failure modes. + """ + + def __init__( + self, + on_stdin_close: Callable[[], None] | None = None, + stdin_aclose_error: Exception | None = None, + stdin_send_error: Exception | None = None, + stdin_send_blocks: bool = False, + stdin_send_gate: anyio.Event | None = None, + stdout_eof_error: Exception | None = None, + stdout_aclose_error: Exception | None = None, + on_stdout_receive: Callable[[], None] | None = None, + ) -> None: + self._stdout_send, stdout_receive = anyio.create_memory_object_stream[bytes](math.inf) + self.stdout = _FakeStdout( + stdout_receive, + eof_error=stdout_eof_error, + aclose_error=stdout_aclose_error, + on_receive=self._dispatch_stdout_receive, + ) + self.pid = 424242 + self.written: list[bytes] = [] + self.stdin_closed = anyio.Event() + self.returncode: int | None = None + self.on_stdin_close = on_stdin_close + self.stdin_aclose_error = stdin_aclose_error + self.stdin_send_error = stdin_send_error + self.stdin_send_blocks = stdin_send_blocks + self.stdin_send_gate = stdin_send_gate + self.on_stdout_receive = on_stdout_receive + self.stdin = _FakeStdin(self) + + def _dispatch_stdout_receive(self) -> None: + # Late-bound so a test can assign `on_stdout_receive` after construction. + if self.on_stdout_receive is not None: + self.on_stdout_receive() + + async def feed(self, data: bytes) -> None: + """Make `data` readable on the fake process's stdout.""" + await self._stdout_send.send(data) + + def close_stdout(self) -> None: + """End the fake process's stdout, as the kernel does when it dies.""" + self._stdout_send.close() + + def exit(self, code: int = 0) -> None: + """Die: set the exit code and EOF stdout, as the kernel does.""" + self.returncode = code + self.close_stdout() + + def pending_stdout_chunks(self) -> int: + """How many fed chunks the client has not yet pulled off the fake stdout.""" + return self._stdout_send.statistics().current_buffer_used + + +def install_fake_process( + monkeypatch: pytest.MonkeyPatch, process: FakeProcess, *, grace_period: float | None = 0.2 +) -> list[FakeProcess]: + """Route stdio_client's spawn and terminate seams to `process`. + + Returns the list of processes the (fake) tree termination was invoked on. + `grace_period=None` keeps the production stdin-close grace (affordable only on a + virtual clock). + """ + terminated: list[FakeProcess] = [] + + async def fake_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> FakeProcess: + return process + + async def fake_terminate_tree(proc: FakeProcess) -> None: + terminated.append(proc) + proc.exit(-15) -tee = shutil.which("tee") + monkeypatch.setattr(stdio, "_create_platform_compatible_process", fake_spawn) + monkeypatch.setattr(stdio, "_terminate_process_tree", fake_terminate_tree) + if grace_period is not None: + monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", grace_period) + return terminated + + +FAKE_PARAMS = StdioServerParameters(command="fake-server") + + +def _line(message: JSONRPCMessage) -> bytes: + """The wire form of `message`: one JSON document on its own line.""" + return (message.model_dump_json(by_alias=True, exclude_unset=True) + "\n").encode() + + +async def _next_message(read_stream: ReadStream[SessionMessage | Exception]) -> JSONRPCMessage: + received = await read_stream.receive() + assert isinstance(received, SessionMessage) + return received.message + + +@pytest.mark.anyio +async def test_messages_split_and_packed_across_chunks_are_reframed(monkeypatch: pytest.MonkeyPatch) -> None: + """Framing survives arbitrary chunk boundaries. + + Split, packed, and CRLF-terminated messages are each delivered exactly once, and a + trailing line without a newline is not delivered. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + ping2 = JSONRPCRequest(jsonrpc="2.0", id=2, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (read_stream, _): + # First message split mid-bytes; its tail packed with the second, a + # CRLF-framed third (the SDK's own server emits \r\n on Windows; jiter + # treats the \r as JSON whitespace), and a partial fourth. + wire = _line(ping) + crlf_wire = ping2.model_dump_json(by_alias=True, exclude_unset=True).encode() + b"\r\n" + await process.feed(wire[:7]) + await process.feed(wire[7:] + _line(pong) + crlf_wire + b'{"jsonrpc": "2.0", "id": 99') + + assert await _next_message(read_stream) == ping + assert await _next_message(read_stream) == pong + assert await _next_message(read_stream) == ping2 + + # The partial trailing message is dropped at EOF, not delivered broken. + # (no branch: coverage mis-traces the exit arc of a `with` whose body + # raises inside a nested async context.) + with pytest.raises(anyio.EndOfStream): # pragma: no branch + process.close_stdout() + await read_stream.receive() @pytest.mark.anyio -@pytest.mark.skipif(tee is None, reason="could not find tee command") -async def test_stdio_context_manager_exiting(): - assert tee is not None - async with stdio_client(StdioServerParameters(command=tee)) as (_, _): - pass +async def test_each_outgoing_message_is_written_as_exactly_one_line(monkeypatch: pytest.MonkeyPatch) -> None: + """Client -> server framing writes one line per message. + + Every sent message reaches the server's stdin as exactly one newline-terminated + JSON document. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (_, write_stream): + await write_stream.send(SessionMessage(ping)) + await write_stream.send(SessionMessage(pong)) + # The zero-buffer handoff resumes this task before the writer has + # necessarily written; once all tasks block again, both writes have landed. + await anyio.wait_all_tasks_blocked() + assert process.written == [_line(ping), _line(pong)] @pytest.mark.anyio -@pytest.mark.skipif(tee is None, reason="could not find tee command") -async def test_stdio_client(): - assert tee is not None - server_parameters = StdioServerParameters(command=tee) - - async with stdio_client(server_parameters) as (read_stream, write_stream): - # Test sending and receiving messages - messages = [ - JSONRPCRequest(jsonrpc="2.0", id=1, method="ping"), - JSONRPCResponse(jsonrpc="2.0", id=2, result={}), - ] - - async with write_stream: - for message in messages: - session_message = SessionMessage(message) - await write_stream.send(session_message) - - read_messages: list[JSONRPCMessage] = [] - async with read_stream: - async for message in read_stream: - if isinstance(message, Exception): # pragma: no cover - raise message - - read_messages.append(message.message) - if len(read_messages) == 2: - break - - assert len(read_messages) == 2 - assert read_messages[0] == JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") - assert read_messages[1] == JSONRPCResponse(jsonrpc="2.0", id=2, result={}) +async def test_invalid_json_from_the_server_surfaces_as_an_in_stream_exception( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A line failing JSON-RPC validation is delivered as an Exception on the read stream. + + The messages after it still come through. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (read_stream, _): + await process.feed(b"not json\n" + _line(ping)) + + error = await read_stream.receive() + # The transport surfaces parse failures as the underlying validation error. + assert isinstance(error, ValueError) + assert await _next_message(read_stream) == ping @pytest.mark.anyio -async def test_stdio_client_bad_path(): - """Check that the connection doesn't hang if process errors.""" - server_params = StdioServerParameters(command=sys.executable, args=["-c", "non-existent-file.py"]) - async with stdio_client(server_params) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - # The session should raise an error when the connection closes +async def test_a_server_that_dies_before_responding_fails_initialize_with_connection_closed( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Server death (stdout EOF) is reported to the session as a closed connection. + + The in-flight initialize fails instead of hanging. + """ + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + process.exit(1) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with ( + stdio_client(FAKE_PARAMS) as (read_stream, write_stream), + ClientSession(read_stream, write_stream) as session, + ): with pytest.raises(MCPError) as exc_info: await session.initialize() - # Check that we got a connection closed error assert exc_info.value.error.code == CONNECTION_CLOSED - assert "Connection closed" in exc_info.value.error.message + assert exc_info.value.error.message == "Connection closed" + + +@pytest.mark.anyio +async def test_a_server_that_exits_on_stdin_close_is_never_terminated(monkeypatch: pytest.MonkeyPatch) -> None: + """Closing stdin (shutdown's first step) suffices for a well-behaved server. + + The escalation is never invoked. The fake's stdin also raises on close, which the + shutdown must tolerate. + """ + + process = FakeProcess( + on_stdin_close=lambda: process.exit(0), + stdin_aclose_error=anyio.ClosedResourceError(), + ) + terminated = install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + pass + + assert terminated == [] + assert process.stdin_closed.is_set() + + +def test_escalation_fires_once_and_only_after_the_grace_period(monkeypatch: pytest.MonkeyPatch) -> None: + """A server that ignores stdin closure is terminated at the grace deadline exactly. + + The kill lands no earlier than the production `PROCESS_TERMINATION_TIMEOUT` on the + runtime clock, and by the first `returncode` poll after it. + + The suite's only direct trio use: anyio's pytest plugin cannot hand the backend a + clock, so the test calls `trio.run` itself with an autojumping `MockClock`. Every + time primitive rides that one virtual clock, so the production grace elapses + instantly and the bound can be two-sided (a wall-clock upper bound flakes under + load). That virtual seconds match wall seconds is the runtime clock's contract, + deliberately not re-tested here. + """ + + class ClockedFakeProcess(FakeProcess): + """Records the virtual time of each death. + + Only the (fake) tree termination calls `exit` here, so these are the + escalation timestamps. + """ + + def __init__(self) -> None: + super().__init__() + self.exit_times: list[float] = [] + + def exit(self, code: int = 0) -> None: + self.exit_times.append(trio.current_time()) + super().exit(code) + + process = ClockedFakeProcess() + terminated = install_fake_process(monkeypatch, process, grace_period=None) + + async def run_client() -> float: + with anyio.fail_after(stdio.PROCESS_TERMINATION_TIMEOUT + 5): # virtual seconds + async with stdio_client(FAKE_PARAMS): + # Evaluated just before the context exits: the moment cleanup begins. + return trio.current_time() + + cleanup_started = trio.run(run_client, clock=trio.testing.MockClock(autojump_threshold=0)) + + assert terminated == [process] + virtual_elapsed = process.exit_times[0] - cleanup_started + # Two-sided: never before the grace deadline, and within one poll interval past it + # (shutdown's writer-flush poll); the epsilon absorbs virtual-sleep float drift. + assert ( + stdio.PROCESS_TERMINATION_TIMEOUT + <= virtual_elapsed + <= stdio.PROCESS_TERMINATION_TIMEOUT + _EXIT_POLL_INTERVAL + 1e-9 + ), virtual_elapsed + + +def test_a_server_dying_in_the_final_poll_interval_is_not_escalated(monkeypatch: pytest.MonkeyPatch) -> None: + """A server exiting in the poll interval the grace deadline cuts short is not escalated. + + Such a server is dead, not hung: the timed-out grace wait must re-check `returncode` + before deciding to escalate, so this server is never terminated. + + Runs on trio's MockClock (see the escalation-bound test above). The grace is + set to end mid-interval (0.105 with 0.01 polls) and the fake dies at 0.102 + after its stdin closes, strictly between the last in-window poll (0.10) and + the deadline (0.105), so no two timers collide. + """ + process = FakeProcess() + terminated = install_fake_process(monkeypatch, process, grace_period=0.105) + + async def run_client() -> None: + with anyio.fail_after(5): # virtual seconds + async with anyio.create_task_group() as tg: + + async def die_late() -> None: + await anyio.sleep(0.102) + process.exit(0) + + # The grace wait starts when stdin closes; anchor the death there. + process.on_stdin_close = lambda: tg.start_soon(die_late) + # no branch: the tracer drops this nested async-with's arcs under + # trio's MockClock even though the body runs. + async with stdio_client(FAKE_PARAMS): # pragma: no branch + pass + + trio.run(run_client, clock=trio.testing.MockClock(autojump_threshold=0)) + + assert terminated == [] + assert process.returncode == 0 + + +@pytest.mark.anyio +async def test_cancelling_the_client_still_runs_the_full_shutdown(monkeypatch: pytest.MonkeyPatch) -> None: + """Cancellation (a client timeout, app shutdown) must not skip the shutdown sequence. + + Stdin is still closed and a server ignoring it is still terminated. Without the + shielded shutdown this leaks the process and can deadlock. + """ + process = FakeProcess() + terminated = install_fake_process(monkeypatch, process, grace_period=0.05) + entered = anyio.Event() + # Cancel a scope owned by the client's task, not the test's task group: a host + # self-cancel is delivered by throwing through this test function's suspended + # frames, and Python 3.11's tracer loses coverage events after such a throw() + # traversal (python/cpython#106749). + cancel_scope = anyio.CancelScope() + + async def run_client_until_cancelled() -> None: + with cancel_scope: + async with stdio_client(FAKE_PARAMS): + entered.set() + await anyio.sleep_forever() + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg: + tg.start_soon(run_client_until_cancelled) + await entered.wait() + cancel_scope.cancel() + + assert process.stdin_closed.is_set() + assert terminated == [process] @pytest.mark.anyio -async def test_stdio_client_nonexistent_command(): - """Test that stdio_client raises an error for non-existent commands.""" - # Create a server with a non-existent command +async def test_writing_after_the_server_dies_reports_clean_closure(monkeypatch: pytest.MonkeyPatch) -> None: + """A send racing the server's death must not surface a raw backend exception. + + The exception (ConnectionResetError in an exception group) must not escape the + context manager; the transport still shuts down cleanly. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (_, write_stream): + process.exit(1) + # The fake's stdin now raises ConnectionResetError, as a dead child's pipe does. + await write_stream.send(SessionMessage(ping)) + + assert process.written == [] + + +@pytest.mark.anyio +async def test_exiting_with_an_unconsumed_server_message_does_not_raise(monkeypatch: pytest.MonkeyPatch) -> None: + """Exiting while a server message is still undelivered must be a clean exit. + + Shutdown closes the read stream under the blocked reader task, and that closure + must not escape the caller as a BrokenResourceError in an exception group. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + # Feed a message and never receive it: the reader parses it and blocks + # delivering into the zero-buffer read stream until shutdown breaks the send. + await process.feed(_line(ping)) + # Wait until the reader task is genuinely parked on its blocked send + # before shutdown closes the stream out from under it. + await anyio.wait_all_tasks_blocked() + + +@pytest.mark.anyio +async def test_spawn_failure_propagates_the_error_and_leaks_no_streams(monkeypatch: pytest.MonkeyPatch) -> None: + """When the spawn itself fails, the OSError reaches the caller and no streams leak. + + The transport's internal streams are all closed; an unclosed stream would fail the + test through its GC-time ResourceWarning under filterwarnings=error. + """ + + async def failing_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> FakeProcess: + raise OSError(errno.EACCES, "Permission denied") + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", failing_spawn) + + with pytest.raises(OSError) as exc_info: + async with stdio_client(FAKE_PARAMS): + pass # pragma: no cover + + assert exc_info.value.errno == errno.EACCES + # Drop the ExceptionInfo before collecting: its traceback references the suspended + # stdio_client frame, which would keep leaked streams alive across the collect. + del exc_info + gc.collect() + + +@pytest.mark.anyio +async def test_a_command_that_cannot_be_execed_raises_enoent() -> None: + """A command that cannot be exec'd raises OSError(ENOENT) out of stdio_client.""" server_params = StdioServerParameters( command="/path/to/nonexistent/command", args=["--help"], ) - # Should raise an error when trying to start the process with pytest.raises(OSError) as exc_info: - async with stdio_client(server_params) as (_, _): + async with stdio_client(server_params): pass # pragma: no cover - # The error should indicate the command was not found (ENOENT: No such file or directory) assert exc_info.value.errno == errno.ENOENT @pytest.mark.anyio -async def test_stdio_client_universal_cleanup(): - """Test that stdio_client completes cleanup within reasonable time - even when connected to processes that exit slowly. +async def test_cancellation_during_spawn_leaks_no_streams(monkeypatch: pytest.MonkeyPatch) -> None: + """Cancellation while the spawn is still in flight must not leak the internal streams. + + A caller timeout can fire mid-spawn (interpreter cold start); an unclosed stream + would fail the test through its GC-time ResourceWarning under filterwarnings=error. """ + spawn_started = anyio.Event() - # Use a Python script that simulates a long-running process - # This ensures consistent behavior across platforms - long_running_script = textwrap.dedent( - """ - import time - import sys - - # Simulate a long-running process - for i in range(100): - time.sleep(0.1) - # Flush to ensure output is visible - sys.stdout.flush() - sys.stderr.flush() - """ + async def hanging_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> FakeProcess: + spawn_started.set() + await anyio.sleep_forever() + raise NotImplementedError("unreachable: the spawn is cancelled while parked") + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", hanging_spawn) + + # Cancel a scope owned by the client's task, not the test's task group: a host + # self-cancel is delivered by throwing through this test function's suspended + # frames, and Python 3.11's tracer loses coverage events after such a throw() + # traversal (python/cpython#106749). + cancel_scope = anyio.CancelScope() + + async def run_client() -> None: + with cancel_scope: + async with stdio_client(FAKE_PARAMS): + pass # pragma: no cover + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg: + tg.start_soon(run_client) + await spawn_started.wait() + cancel_scope.cancel() + + gc.collect() + + +@pytest.mark.anyio +async def test_a_non_oserror_spawn_failure_propagates_and_leaks_no_streams(monkeypatch: pytest.MonkeyPatch) -> None: + """A non-OSError spawn failure also propagates and leaks no streams. + + Spawning can fail with more than OSError (e.g. ValueError for a NUL byte in the + command); the error reaches the caller and the transport's internal streams are + still all closed (checked through GC-time ResourceWarnings, as above). + """ + + async def failing_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> FakeProcess: + raise ValueError("embedded null byte") + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", failing_spawn) + + with pytest.raises(ValueError, match="embedded null byte"): + async with stdio_client(FAKE_PARAMS): + pass # pragma: no cover + + gc.collect() + + +@pytest.mark.anyio +async def test_a_message_sent_just_before_exit_is_flushed_to_the_server(monkeypatch: pytest.MonkeyPatch) -> None: + """A message the transport accepted must reach the server even on immediate exit. + + The caller exits right after sending. Once the writer is parked waiting, a send is + a pure handoff that returns before the write lands, so the second message here is + the one shutdown must let the writer flush before closing the server's stdin. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (_, write_stream): + await write_stream.send(SessionMessage(ping)) + await write_stream.send(SessionMessage(pong)) + + assert process.written == [_line(ping), _line(pong)] + + +@pytest.mark.anyio +async def test_a_failed_write_to_a_live_server_closes_the_read_stream_instead_of_hanging( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A failed write to a live server ends the read stream instead of hanging the session. + + When a write fails but the server is still alive (stdout never EOFs), the transport + must end the read stream so a session maps the loss to CONNECTION_CLOSED instead of + waiting forever. EIO pins that plain OSError, not just ConnectionError, is handled. + + Steps: + 1. A send fails with EIO while the server is alive; the read stream ends. + 2. Output the server produces afterwards is still drained, so it cannot wedge + on a full pipe. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + process = FakeProcess( + on_stdin_close=lambda: process.exit(0), + stdin_send_error=OSError(errno.EIO, "I/O error"), ) + terminated = install_fake_process(monkeypatch, process) - server_params = StdioServerParameters( - command=sys.executable, - args=["-c", long_running_script], + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (read_stream, write_stream): + await write_stream.send(SessionMessage(ping)) + + with pytest.raises(anyio.EndOfStream): + await read_stream.receive() + + await process.feed(_line(pong)) + await anyio.wait_all_tasks_blocked() + assert process.pending_stdout_chunks() == 0 + + assert process.written == [] + assert terminated == [] + + +@pytest.mark.anyio +async def test_exit_completes_when_a_write_is_wedged_in_a_pipe_no_one_reads( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Exiting stays bounded even when the writer is parked in a write that cannot complete. + + A kill-surviving descendant can hold the read end without reading; the flush window + expires and the post-shutdown cancellation unparks the writer. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0), stdin_send_blocks=True) + terminated = install_fake_process(monkeypatch, process) + monkeypatch.setattr(stdio, "_WRITER_FLUSH_TIMEOUT", 0.05) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (_, write_stream): + await write_stream.send(SessionMessage(ping)) + # Wait until the writer task is genuinely parked inside the wedged send. + await anyio.wait_all_tasks_blocked() + + assert process.written == [] + assert terminated == [] + assert process.stdin_closed.is_set() + + +@pytest.mark.anyio +async def test_undelivered_server_output_is_drained_at_shutdown_so_the_server_can_exit( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Output the caller never received is consumed during the stdin-close grace period. + + A real server flushing its remaining output on the way out would otherwise block on + a full pipe, never reach its stdin read, and be killed despite being well-behaved. + The fake ignores stdin closure (so it is ultimately terminated); the pin is that its + backlog was drained during the grace window. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + process = FakeProcess() + terminated = install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + # Three separate chunks: the reader parks delivering the first; the other + # two sit unconsumed in the pipe when shutdown begins. + await process.feed(_line(ping)) + await process.feed(_line(pong)) + await process.feed(_line(ping)) + await anyio.wait_all_tasks_blocked() + assert process.pending_stdout_chunks() == 2 + + assert terminated == [process] + assert process.pending_stdout_chunks() == 0 + + +@pytest.mark.anyio +async def test_shutdown_drains_stdout_first_so_a_wedged_writers_flush_can_complete( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Shutdown unblocks the reader's drain before waiting out the writer flush. + + A server wedged writing its stdout cannot get to reading its stdin, so a client + write can sit in a full pipe; the drain is what unwedges the server and lets the + flush complete. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + + received = 0 + stdin_gate = anyio.Event() + + def unwedge_once_drained() -> None: + # Accept the client's write only once all three output chunks are consumed, + # like a real server whose blocked stdout write gates its stdin read. + nonlocal received + received += 1 + if received == 3: + stdin_gate.set() + + process = FakeProcess( + on_stdin_close=lambda: process.exit(0), + stdin_send_gate=stdin_gate, + on_stdout_receive=unwedge_once_drained, ) + terminated = install_fake_process(monkeypatch, process) + # A flush wait that never gets unwedged would outlast the whole test budget. + monkeypatch.setattr(stdio, "_WRITER_FLUSH_TIMEOUT", 30.0) - start_time = time.time() + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (_read_stream, write_stream): + # The reader parks delivering a message nobody receives, with more + # chunks backed up behind it; the writer parks in the gated send. + await process.feed(_line(ping)) + await process.feed(_line(pong)) + await process.feed(_line(ping)) + await write_stream.send(SessionMessage(ping)) + await anyio.wait_all_tasks_blocked() - with anyio.move_on_after(8.0) as cancel_scope: - async with stdio_client(server_params) as (_, _): - # Immediately exit - this triggers cleanup while process is still running - pass + assert terminated == [] + assert len(process.written) == 1 + assert process.pending_stdout_chunks() == 0 - end_time = time.time() - elapsed = end_time - start_time - # On Windows: 2s (stdin wait) + 2s (terminate wait) + overhead = ~5s expected - assert elapsed < 6.0, ( - f"stdio_client cleanup took {elapsed:.1f} seconds, expected < 6.0 seconds. " - f"This suggests the timeout mechanism may not be working properly." - ) +@pytest.mark.anyio +async def test_cancellation_with_undelivered_backlog_still_drains_and_spares_the_server( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Cancellation must not skip the shutdown drain. + + A well-behaved server that can only exit once its remaining output is consumed (a + real one blocks on a full stdout pipe) still exits within the grace period and is + never terminated. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + process = FakeProcess() + terminated = install_fake_process(monkeypatch, process) + + def exit_when_flushed() -> None: + # The fake exits only once its stdin has closed AND its output backlog + # has been consumed, like a real server wedged writing its stdout. + if process.stdin_closed.is_set() and process.pending_stdout_chunks() == 0: + process.exit(0) + + process.on_stdin_close = exit_when_flushed + process.on_stdout_receive = exit_when_flushed + + entered = anyio.Event() + # Cancel a scope owned by the client's task, not the test's task group (see + # test_cancelling_the_client_still_runs_the_full_shutdown). + cancel_scope = anyio.CancelScope() + + async def run_client_until_cancelled() -> None: + with cancel_scope: + async with stdio_client(FAKE_PARAMS): + await process.feed(_line(ping)) + await process.feed(_line(pong)) + await process.feed(_line(ping)) + entered.set() + await anyio.sleep_forever() + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg: + tg.start_soon(run_client_until_cancelled) + await entered.wait() + cancel_scope.cancel() + + assert process.pending_stdout_chunks() == 0 + assert terminated == [] - # Check if we timed out - if cancel_scope.cancelled_caught: # pragma: no cover - pytest.fail( - "stdio_client cleanup timed out after 8.0 seconds. " - "This indicates the cleanup mechanism is hanging and needs fixing." - ) + +@pytest.mark.anyio +async def test_invalid_utf8_flushed_by_a_dying_server_does_not_break_shutdown( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The shutdown drain consumes raw bytes. + + A server flushing non-UTF-8 output (a crash dump, say) on its way out must not + abort the drain or surface a UnicodeDecodeError out of the context manager. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + terminated = install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + # Park the reader delivering a message nobody receives, then queue + # bytes that are not valid UTF-8 behind it. + await process.feed(_line(ping)) + await anyio.wait_all_tasks_blocked() + await process.feed(b"\xff\xfe not utf-8\n") + + assert terminated == [] + assert process.pending_stdout_chunks() == 0 @pytest.mark.anyio -@pytest.mark.skipif(sys.platform == "win32", reason="Windows signal handling is different") -async def test_stdio_client_sigint_only_process(): # pragma: lax no cover - """Test cleanup with a process that ignores SIGTERM but responds to SIGINT.""" - # Create a Python script that ignores SIGTERM but handles SIGINT - script_content = textwrap.dedent( - """ - import signal - import sys - import time +async def test_a_kill_racing_a_pending_stdout_read_is_swallowed_during_shutdown( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """A hard kill during a pending stdout read must not escape the context manager. + + The read surfaces ConnectionResetError on the proactor backend; being expected + teardown noise, it is not logged as an error either. + """ + process = FakeProcess(stdout_eof_error=ConnectionResetError("read torn down by kill")) + terminated = install_fake_process(monkeypatch, process) - # Ignore SIGTERM (what process.terminate() sends) - signal.signal(signal.SIGTERM, signal.SIG_IGN) + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + pass # the fake ignores stdin closure, so shutdown must escalate - # Handle SIGINT (Ctrl+C signal) by exiting cleanly - def sigint_handler(signum, frame): - sys.exit(0) + assert terminated == [process] + assert not [record for record in caplog.records if record.levelno >= logging.ERROR] - signal.signal(signal.SIGINT, sigint_handler) - # Keep running until SIGINT received - while True: - time.sleep(0.1) - """ +@pytest.mark.anyio +async def test_a_mid_session_stdout_failure_is_logged_and_surfaces_as_clean_closure( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """A mid-session stdout read failure ends the read stream cleanly and is logged. + + A failure outside shutdown surfaces no raw exception out of the context manager and + leaves an error log identifying the failure, unlike the silent shutdown case. + """ + process = FakeProcess( + on_stdin_close=lambda: process.exit(0), + stdout_eof_error=ConnectionResetError("pipe failed mid-session"), ) + install_fake_process(monkeypatch, process) - server_params = StdioServerParameters( - command=sys.executable, - args=["-c", script_content], + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (read_stream, _): + process.exit(1) + # (no branch: coverage mis-traces the exit arc of a `with` whose body + # raises inside a nested async context.) + with pytest.raises(anyio.EndOfStream): # pragma: no branch + await read_stream.receive() + + assert "stdout failed mid-session" in caplog.text + + +@pytest.mark.anyio +async def test_a_failing_stdout_close_still_closes_the_transport_streams(monkeypatch: pytest.MonkeyPatch) -> None: + """A close-time error on the process's stdout must not abort the rest of the shutdown. + + Such an error (a contended pipe handle on the Windows fallback) still leaves the + context exiting cleanly and the internal streams all closed (checked via GC-time + ResourceWarnings). + """ + process = FakeProcess( + on_stdin_close=lambda: process.exit(0), + stdout_aclose_error=OSError(errno.EBADF, "Bad file descriptor"), ) + terminated = install_fake_process(monkeypatch, process) - start_time = time.time() - - try: - # Use anyio timeout to prevent test from hanging forever - with anyio.move_on_after(5.0) as cancel_scope: - async with stdio_client(server_params) as (_, _): - # Let the process start and begin ignoring SIGTERM - await anyio.sleep(0.5) - # Exit context triggers cleanup - this should not hang - pass - - if cancel_scope.cancelled_caught: # pragma: no cover - raise TimeoutError("Test timed out") - - end_time = time.time() - elapsed = end_time - start_time - - # Should complete quickly even with SIGTERM-ignoring process - # This will fail if cleanup only uses process.terminate() without fallback - assert elapsed < SIGTERM_IGNORING_PROCESS_TIMEOUT, ( - f"stdio_client cleanup took {elapsed:.1f} seconds with SIGTERM-ignoring process. " - f"Expected < {SIGTERM_IGNORING_PROCESS_TIMEOUT} seconds. " - "This suggests the cleanup needs SIGINT/SIGKILL fallback." - ) - except (TimeoutError, Exception) as e: # pragma: no cover - if isinstance(e, TimeoutError) or "timed out" in str(e): - pytest.fail( - f"stdio_client cleanup timed out after {SIGTERM_IGNORING_PROCESS_TIMEOUT} seconds " - "with SIGTERM-ignoring process. " - "This confirms the cleanup needs SIGINT/SIGKILL fallback for processes that ignore SIGTERM." - ) - else: - raise + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + pass + + assert terminated == [] + gc.collect() + + +@pytest.mark.anyio +async def test_a_process_surviving_the_kill_escalation_is_logged_and_abandoned( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """A process surviving the whole kill escalation is logged and abandoned. + + If the process is still alive after the escalation (D-state, an unsignalable + survivor), shutdown still completes, bounded, and leaves a warning instead of + silently leaking a live process. + """ + process = FakeProcess() # ignores stdin closure and survives "termination" + install_fake_process(monkeypatch, process, grace_period=0.05) + + stubborn: list[FakeProcess] = [] + + async def stubborn_terminate(proc: FakeProcess) -> None: + stubborn.append(proc) # the kill has no effect + + monkeypatch.setattr(stdio, "_terminate_process_tree", stubborn_terminate) + monkeypatch.setattr(stdio, "_KILL_REAP_TIMEOUT", 0.05) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + pass + + assert stubborn == [process] + assert process.returncode is None + assert "still alive after the kill escalation" in caplog.text + # The fake "survived", so nothing ever EOF'd its stdout pipe; release it here + # or its GC-time ResourceWarning would fail a later test. + process.close_stdout() # --------------------------------------------------------------------------- -# TestChildProcessCleanup — socket-based deterministic child liveness probe +# POSIX tree-termination policy, tested through the sanctioned killpg seam # --------------------------------------------------------------------------- # -# These tests verify that `_terminate_process_tree()` kills the *entire* process -# tree (not just the immediate child), which is critical for cleaning up tools -# like `npx` that spawn their own subprocesses. -# -# Mechanism: each subprocess in the tree connects a TCP socket back to a -# listener owned by the test. We then use two kernel-guaranteed blocking-I/O -# signals — neither requires any `sleep()` or polling loop: -# -# 1. `await listener.accept()` blocks until the subprocess connects, -# proving it is running. -# 2. After `_terminate_process_tree()`, `await stream.receive(1)` raises -# `EndOfStream` (clean close / FIN) or `BrokenResourceError` (abrupt -# close / RST — typical on Windows after TerminateJobObject) because the -# kernel closes all file descriptors when a process terminates. Either -# is the direct, OS-level proof that the child is dead. +# `mcp.os.posix.utilities` is coverage-omitted and the sanctioned place to monkeypatch +# OS calls. These pin the EPERM policy without a foreign-euid process: macOS killpg +# raises EPERM when *any* group member cannot be signalled, even if others were. + + +class _StubPosixProcess: + """The two attributes `terminate_posix_process_tree` touches. + + They are the pgid source and the reap-progress probe. + """ + + pid = 54321 + returncode: int | None = None + + +@pytest.mark.anyio +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX killpg semantics") +# lax no cover: Windows CI jobs enforce 100% coverage per job and skip this test. +async def test_an_eperm_group_that_dies_during_the_grace_period_is_not_sigkilled( # pragma: lax no cover + monkeypatch: pytest.MonkeyPatch, +) -> None: + """EPERM from the SIGTERM killpg no longer short-circuits termination. + + The grace wait still runs, and a group observed to be gone during it is never + SIGKILLed. + """ + calls: list[tuple[int, int]] = [] + probes = 0 + + def fake_killpg(pgid: int, sig: int) -> None: + nonlocal probes + calls.append((pgid, sig)) + if sig == signal.SIGTERM: + raise PermissionError("one group member has a foreign euid") + if sig == 0: + probes += 1 + if probes == 1: + raise PermissionError("survivors we may not signal") + raise ProcessLookupError("group is gone") + raise NotImplementedError("no other signal should be sent") + + monkeypatch.setattr(posix_utilities.os, "killpg", fake_killpg) + stub = _StubPosixProcess() + + with anyio.fail_after(5): + await terminate_posix_process_tree(cast(anyio.abc.Process, stub)) + + assert calls == [(stub.pid, signal.SIGTERM), (stub.pid, 0), (stub.pid, 0)] + + +@pytest.mark.anyio +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX killpg semantics") +# lax no cover: same Windows-runner coverage reason as above. +async def test_an_eperm_group_that_outlives_the_grace_period_is_still_sigkilled( # pragma: lax no cover + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Even when every probe reports EPERM, the SIGKILL escalation still fires. + + It fires after the grace period, and its own EPERM is tolerated. Pre-fix, EPERM at + SIGTERM abandoned the group escalation for a leader-only kill, leaking every other + group member. The tiny timeout is the time-based grace period under test. + """ + calls: list[tuple[int, int]] = [] + + def fake_killpg(pgid: int, sig: int) -> None: + calls.append((pgid, sig)) + if sig in (signal.SIGTERM, 0, signal.SIGKILL): + raise PermissionError("a foreign-euid member never goes away") + raise NotImplementedError("no other signal should be sent") + + monkeypatch.setattr(posix_utilities.os, "killpg", fake_killpg) + stub = _StubPosixProcess() + + with anyio.fail_after(5): + await terminate_posix_process_tree(cast(anyio.abc.Process, stub), timeout_seconds=0.05) + + assert calls[0] == (stub.pid, signal.SIGTERM) + assert calls[-1] == (stub.pid, signal.SIGKILL) + assert set(calls[1:-1]) == {(stub.pid, 0)} + + +@pytest.mark.anyio +@pytest.mark.parametrize("anyio_backend", ["asyncio", "trio"]) +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX killpg semantics") +# lax no cover: same Windows-runner coverage reason as above. +async def test_the_grace_wait_reads_returncode_so_trio_can_reap_the_leaders_zombie( # pragma: lax no cover + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The wait between SIGTERM and SIGKILL reads `process.returncode` while it polls. + + On trio that property calls `Popen.poll()`, whose reap stops the leader's zombie + from keeping the group alive for the full timeout (see terminate_posix_process_tree). + Regression pin for the read itself, on both backends; the reaping side effect is + trio's documented behaviour, deliberately not re-tested here. + """ + + calls: list[tuple[int, int]] = [] + + def fake_killpg(pgid: int, sig: int) -> None: + # SIGTERM is accepted and every liveness probe reports survivors, so the + # grace wait runs to its (tiny) timeout and the SIGKILL escalation fires. + calls.append((pgid, sig)) + + class _ReadCountingProcess: + """A live-forever leader whose `returncode` property counts its reads.""" + + pid = 54321 + + def __init__(self) -> None: + self.returncode_reads = 0 + + @property + def returncode(self) -> int | None: + self.returncode_reads += 1 + return None + + monkeypatch.setattr(posix_utilities.os, "killpg", fake_killpg) + stub = _ReadCountingProcess() + + with anyio.fail_after(5): + await terminate_posix_process_tree(cast(anyio.abc.Process, stub), timeout_seconds=0.05) + + # The wait ran to its deadline (the escalation fired)... + assert calls[0] == (stub.pid, signal.SIGTERM) + assert calls[-1] == (stub.pid, signal.SIGKILL) + # ...and `returncode` was read while it polled, the read that reaps on trio. + assert stub.returncode_reads >= 1 + + +# --------------------------------------------------------------------------- +# Real-process tests: the OS facts no fake can certify +# --------------------------------------------------------------------------- # -# This replaces an older file-growth-watching approach whose fixed `sleep()` -# durations raced against slow Python interpreter startup on loaded CI runners. +# These pin kernel behaviour (process-group kill semantics, SIGKILL delivery) via a +# socket liveness probe, no sleeps or polls: `accept()` blocks until the subprocess +# connects, proving it runs (and its pre-connect setup ran); after cleanup, `receive(1)` +# raises EndOfStream (FIN) or BrokenResourceError (RST, typical of SIGKILL and Windows +# job termination) because the kernel closes a dead process's file descriptors. def _connect_back_script(port: int) -> str: - """Return a ``python -c`` script body that connects to the given port, - sends ``b'alive'``, then blocks forever. Used by TestChildProcessCleanup - subprocesses as a liveness probe.""" + """Return a ``python -c`` liveness-probe body: connect to `port`, send `b'alive'`, block forever.""" return ( f"import socket, time\n" f"s = socket.create_connection(('127.0.0.1', {port}))\n" @@ -256,15 +1123,6 @@ def _connect_back_script(port: int) -> str: ) -def _spawn_then_block(child_script: str) -> str: - """Return a ``python -c`` script body that spawns ``child_script`` as a - subprocess, then blocks forever. The ``!r`` injection avoids nested-quote - escaping for arbitrary child script content.""" - return ( - f"import subprocess, sys, time\nsubprocess.Popen([sys.executable, '-c', {child_script!r}])\ntime.sleep(3600)\n" - ) - - async def _open_liveness_listener() -> tuple[anyio.abc.SocketListener, int]: """Open a TCP listener on localhost and return it along with its port.""" multi = await anyio.create_tcp_listener(local_host="127.0.0.1") @@ -279,9 +1137,8 @@ async def _open_liveness_listener() -> tuple[anyio.abc.SocketListener, int]: async def _accept_alive(sock: anyio.abc.SocketListener) -> anyio.abc.SocketStream: """Accept one connection and assert the peer sent ``b'alive'``. - Blocks deterministically until a subprocess connects (no polling). The - outer test bounds this with ``anyio.fail_after`` to catch the case where - the subprocess chain failed to start. + Blocks until a subprocess connects (the outer test bounds this with + ``anyio.fail_after``). """ stream = await sock.accept() msg = await stream.receive(5) @@ -290,53 +1147,31 @@ async def _accept_alive(sock: anyio.abc.SocketListener) -> anyio.abc.SocketStrea async def _assert_stream_closed(stream: anyio.abc.SocketStream) -> None: - """Assert the peer holding the other end of ``stream`` has terminated. - - When a process dies, the kernel closes its file descriptors including - sockets. The next ``receive()`` on the peer socket unblocks with one of: - - - ``anyio.EndOfStream`` — clean close (FIN), typical after graceful exit - or POSIX ``SIGTERM``. - - ``anyio.BrokenResourceError`` — abrupt close (RST), typical after - Windows ``TerminateJobObject`` or POSIX ``SIGKILL``. - - Either is a deterministic, kernel-level signal that the process is dead — - no sleeps or polling required. - """ + """Assert the peer holding the other end of `stream` has terminated.""" with anyio.fail_after(5.0), pytest.raises((anyio.EndOfStream, anyio.BrokenResourceError)): await stream.receive(1) -async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None: - """Terminate the process tree, reap, and tear down pipe transports. +# lax no cover: only called by win32-skipped tests; Windows CI jobs enforce 100% +# coverage per job, where these helpers never execute. +async def _wait_until_exited(proc: anyio.abc.Process) -> None: # pragma: lax no cover + """Poll `returncode` until the process itself dies. - ``_terminate_process_tree`` kills the OS process group / Job Object but does - not call ``process.wait()`` or clean up the asyncio pipe transports. On - Windows those transports leak and emit ``ResourceWarning`` when GC'd in a - later test, causing ``PytestUnraisableExceptionWarning`` knock-on failures. + Not `proc.wait()`: on asyncio that also waits for the pipes to close, conflating + process death with pipe state. + """ + while proc.returncode is None: + await anyio.sleep(0.01) - Production ``stdio.py`` avoids this via its ``stdout_reader`` task which - reads stdout to EOF (triggering ``_ProactorReadPipeTransport._eof_received`` - → ``close()``) plus ``async with process:`` which waits and closes stdin. - These tests call ``_terminate_process_tree`` directly, so they replicate - both parts here: ``wait()`` + close stdin + drain stdout to EOF. - The stdout drain is the non-obvious part: anyio's ``StreamReaderWrapper.aclose()`` - only marks the Python-level reader closed — it never touches the underlying - ``_ProactorReadPipeTransport``. That transport starts paused and only detects - pipe EOF when someone reads, so without a drain it lives until ``__del__``. +async def _reap(proc: anyio.abc.Process) -> None: # pragma: lax no cover + """Reap an already-killed process and release its pipe transports. - Idempotent: the ``returncode`` guard skips termination if already reaped - (avoids spurious WARNING/ERROR logs from ``terminate_posix_process_tree``'s - fallback path, visible because ``log_cli = true``); ``wait()`` and stream - ``aclose()`` no-op on subsequent calls; the drain raises ``ClosedResourceError`` - on the second call, caught by the suppress. The tests call this explicitly - as the action under test and ``AsyncExitStack`` calls it again on exit as a - safety net. Bounded by ``move_on_after`` to prevent hangs. + Draining stdout to EOF lets the asyncio pipe transport observe the closure instead + of warning at GC. The bound swallows a hung cleanup on purpose; reaping is just a + safety net. """ with anyio.move_on_after(5.0): - if proc.returncode is None: - await _terminate_process_tree(proc) await proc.wait() assert proc.stdin is not None assert proc.stdout is not None @@ -346,215 +1181,220 @@ async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None await proc.stdout.aclose() -class TestChildProcessCleanup: - """Integration tests for ``_terminate_process_tree`` covering basic, - nested, and early-parent-exit process tree scenarios. See module-level - comment above for the socket-based liveness probe mechanism. - """ - - @pytest.mark.anyio - async def test_basic_child_process_cleanup(self): - """Parent spawns one child; terminating the tree kills both.""" - async with AsyncExitStack() as stack: - sock, port = await _open_liveness_listener() - stack.push_async_callback(sock.aclose) - - # Parent spawns a child; the child connects back to us. - parent_script = _spawn_then_block(_connect_back_script(port)) - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - stack.push_async_callback(_terminate_and_reap, proc) - - # Deterministic: accept() blocks until the child connects. No sleep. - with anyio.fail_after(10.0): - stream = await _accept_alive(sock) - stack.push_async_callback(stream.aclose) +def _record_spawned_processes(monkeypatch: pytest.MonkeyPatch) -> list[anyio.abc.Process | FallbackProcess]: + """Record every process `stdio_client` spawns (the real spawn still runs). - # Terminate, reap and close transports (wraps _terminate_process_tree, - # the behavior under test). - await _terminate_and_reap(proc) + A test can inspect each process afterwards and tear its process group down on + failure. + """ + spawned: list[anyio.abc.Process | FallbackProcess] = [] + + async def recording_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> anyio.abc.Process | FallbackProcess: + process = await _create_platform_compatible_process(command, args, env, errlog, cwd) + spawned.append(process) + return process + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", recording_spawn) + return spawned + + +# lax no cover: registered on every platform but a no-op on Windows, whose runners +# enforce 100% coverage per job. +def _kill_spawn_groups(spawned: list[anyio.abc.Process | FallbackProcess]) -> None: # pragma: lax no cover + """Failure-path safety net: SIGKILL each spawn-time process group. + + This stops a test failing mid-body from orphaning its sleep-forever descendants. + A no-op when the test passed, and on Windows (no process group to signal; the Job + Object covers strays). + """ + if sys.platform == "win32": + return + for process in spawned: + # macOS killpg raises EPERM for a group holding only unreaped zombies. + with suppress(ProcessLookupError, PermissionError): + os.killpg(process.pid, signal.SIGKILL) - # Deterministic: kernel closed child's socket when it died. - await _assert_stream_closed(stream) - @pytest.mark.anyio - async def test_nested_process_tree(self): - """Parent → child → grandchild; terminating the tree kills all three.""" - async with AsyncExitStack() as stack: - sock, port = await _open_liveness_listener() - stack.push_async_callback(sock.aclose) - - # Build a three-level chain: parent spawns child, child spawns - # grandchild. Every level connects back to our socket. - grandchild = _connect_back_script(port) - child = ( - f"import subprocess, sys\n" - f"subprocess.Popen([sys.executable, '-c', {grandchild!r}])\n" + _connect_back_script(port) - ) - parent_script = ( - f"import subprocess, sys\n" - f"subprocess.Popen([sys.executable, '-c', {child!r}])\n" + _connect_back_script(port) - ) - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - stack.push_async_callback(_terminate_and_reap, proc) - - # Deterministic: three blocking accepts, one per tree level. - streams: list[anyio.abc.SocketStream] = [] - with anyio.fail_after(10.0): - for _ in range(3): - stream = await _accept_alive(sock) +@pytest.mark.anyio +async def test_exiting_the_context_terminates_the_entire_process_tree(monkeypatch: pytest.MonkeyPatch) -> None: + """Exiting `stdio_client` kills the server's whole process tree. + + The tree is a parent that exits instantly on SIGTERM (so the group must outlive its + leader), a child, and a grandchild, each death observed through its liveness socket + closing. The escalation timing is pinned in process by + test_escalation_fires_once_and_only_after_the_grace_period; the production grace + constant's value is deliberately unpinned. + """ + monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", 0.2) + spawned = _record_spawned_processes(monkeypatch) + + async with AsyncExitStack() as stack: + stack.callback(_kill_spawn_groups, spawned) + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) + + grandchild = _connect_back_script(port) + child = ( + f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {grandchild!r}])\n" + + _connect_back_script(port) + ) + # The parent exits immediately on SIGTERM and never reads stdin, so cleanup + # must escalate, and the group kill must work even as its leader dies first. + parent = ( + f"import signal, subprocess, sys, time\n" + f"signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))\n" + f"subprocess.Popen([sys.executable, '-c', {child!r}])\n" + _connect_back_script(port) + ) + server_params = StdioServerParameters(command=sys.executable, args=["-c", parent]) + + # The bound covers three Python interpreter cold starts on a loaded runner; + # a healthy run takes well under a second. + with anyio.fail_after(15.0): + async with stdio_client(server_params): + streams = [await _accept_alive(sock) for _ in range(3)] + for stream in streams: stack.push_async_callback(stream.aclose) - streams.append(stream) - # Terminate the entire tree (wraps _terminate_process_tree). - await _terminate_and_reap(proc) - - # Every level of the tree must be dead: three kernel-level EOFs. - for stream in streams: - await _assert_stream_closed(stream) - - @pytest.mark.anyio - async def test_early_parent_exit(self): - """Parent exits immediately on SIGTERM; process-group termination still - catches the child (exercises the race where the parent dies mid-cleanup). - """ - async with AsyncExitStack() as stack: - sock, port = await _open_liveness_listener() - stack.push_async_callback(sock.aclose) - - # Parent installs a SIGTERM handler that exits immediately, spawns a - # child that connects back to us, then blocks. - child = _connect_back_script(port) - parent_script = ( - f"import signal, subprocess, sys, time\n" - f"signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))\n" - f"subprocess.Popen([sys.executable, '-c', {child!r}])\n" - f"time.sleep(3600)\n" - ) - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - stack.push_async_callback(_terminate_and_reap, proc) - - # Deterministic: child connected means both parent and child are up. - with anyio.fail_after(10.0): - stream = await _accept_alive(sock) - stack.push_async_callback(stream.aclose) - - # Parent will sys.exit(0) on SIGTERM, but the process-group kill - # (POSIX killpg / Windows Job Object) must still terminate the child. - await _terminate_and_reap(proc) - - # Child must be dead despite parent's early exit. + for stream in streams: await _assert_stream_closed(stream) @pytest.mark.anyio -async def test_stdio_client_graceful_stdin_exit(): - """Test that a process exits gracefully when stdin is closed, - without needing SIGTERM or SIGKILL. +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX process-group semantics") +# lax no cover: Windows CI jobs enforce 100% coverage per job and skip this test. +async def test_tree_kill_reaches_children_after_the_leader_has_already_exited() -> None: # pragma: lax no cover + """Killing the tree of an already-exited process still reaches its surviving children. + + The process group outlives its leader, and the group ID is the leader's pid by + construction (start_new_session), not something to look up from the (reaped) + leader. """ - # Create a Python script that exits when stdin is closed - script_content = textwrap.dedent( - """ - import sys - - # Read from stdin until it's closed - try: - while True: - line = sys.stdin.readline() - if not line: # EOF/stdin closed - break - except: - pass - - # Exit gracefully - sys.exit(0) - """ - ) + async with AsyncExitStack() as stack: + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) + + child = _connect_back_script(port) + # The parent spawns the child and exits immediately: the group leader is dead + # (and reaped) by the time the tree is terminated. + parent = f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {child!r}])\n" + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent]) + assert isinstance(proc, anyio.abc.Process) + stack.callback(_kill_spawn_groups, [proc]) + stack.push_async_callback(_reap, proc) + + # Two interpreter cold starts on a loaded runner; healthy runs take ~0.2s. + with anyio.fail_after(10.0): + stream = await _accept_alive(sock) + stack.push_async_callback(stream.aclose) + # The child connecting proves the parent ran; wait for the leader itself + # to be gone so the kill exercises the dead-leader path. + await _wait_until_exited(proc) - server_params = StdioServerParameters( - command=sys.executable, - args=["-c", script_content], - ) + await _terminate_process_tree(proc) - start_time = time.time() + await _assert_stream_closed(stream) - # Use anyio timeout to prevent test from hanging forever - with anyio.move_on_after(5.0) as cancel_scope: - async with stdio_client(server_params) as (_, _): - # Let the process start and begin reading stdin - await anyio.sleep(0.2) - # Exit context triggers cleanup - process should exit from stdin closure - pass - if cancel_scope.cancelled_caught: - pytest.fail( - "stdio_client cleanup timed out after 5.0 seconds. " - "Process should have exited gracefully when stdin was closed." - ) # pragma: no cover +@pytest.mark.anyio +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX process-group semantics") +# lax no cover: same Windows-runner coverage reason as above. +async def test_terminating_an_already_exited_process_is_a_no_op() -> None: # pragma: lax no cover + """Once the whole group is gone, tree termination returns without error. - end_time = time.time() - elapsed = end_time - start_time + It does not fall back to signalling a reaped pid. + """ + proc = await _create_platform_compatible_process(sys.executable, ["-c", "pass"]) + assert isinstance(proc, anyio.abc.Process) - # Should complete quickly with just stdin closure (no signals needed) - assert elapsed < 3.0, ( - f"stdio_client cleanup took {elapsed:.1f} seconds for stdin-aware process. " - f"Expected < 3.0 seconds since process should exit on stdin closure." - ) + # The bound covers one interpreter cold start on a loaded runner; a healthy run + # takes well under a second. + with anyio.fail_after(10.0): + await _wait_until_exited(proc) + await _terminate_process_tree(proc) + await _reap(proc) @pytest.mark.anyio -async def test_stdio_client_stdin_close_ignored(): - """Test that when a process ignores stdin closure, the shutdown sequence - properly escalates to SIGTERM. +@pytest.mark.skipif(sys.platform == "win32", reason="Windows signal handling is different") +# lax no cover: Windows CI jobs enforce 100% coverage per job and skip this test. +async def test_escalation_kills_a_process_that_ignores_sigterm( # pragma: lax no cover + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Cleanup escalates past SIGTERM and kills a process that ignores it. + + The child installs SIG_IGN *before* connecting to the liveness socket, so the + ignore is guaranteed in place; SIGKILL delivery is proven by the kernel closing + the socket. The only test of the SIGTERM-then-SIGKILL escalation itself; the + production constants' values are deliberately unpinned. """ - # Create a Python script that ignores stdin closure but responds to SIGTERM - script_content = textwrap.dedent( - """ - import signal - import sys - import time - - # Set up SIGTERM handler to exit cleanly - def sigterm_handler(signum, frame): - sys.exit(0) - - signal.signal(signal.SIGTERM, sigterm_handler) - - # Close stdin immediately to simulate ignoring it - sys.stdin.close() + monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", 0.2) + monkeypatch.setattr(stdio, "FORCE_KILL_TIMEOUT", 0.2) + spawned = _record_spawned_processes(monkeypatch) + + async with AsyncExitStack() as stack: + stack.callback(_kill_spawn_groups, spawned) + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) + + script = "import signal\nsignal.signal(signal.SIGTERM, signal.SIG_IGN)\n" + _connect_back_script(port) + server_params = StdioServerParameters(command=sys.executable, args=["-c", script]) + + # The bound covers an interpreter cold start on a loaded runner plus the two + # shortened escalation waits; a healthy run takes well under a second. + with anyio.fail_after(15.0): + async with stdio_client(server_params): + stream = await _accept_alive(sock) + stack.push_async_callback(stream.aclose) - # Keep running until SIGTERM - while True: - time.sleep(0.1) - """ - ) + await _assert_stream_closed(stream) - server_params = StdioServerParameters( - command=sys.executable, - args=["-c", script_content], - ) - start_time = time.time() +@pytest.mark.anyio +@pytest.mark.skipif(not Path("/proc/self/fd").is_dir(), reason="needs procfs to enumerate open file descriptors") +# lax no cover: Windows CI jobs enforce 100% coverage per job, have no procfs, and skip this. +async def test_a_graceful_exit_with_a_surviving_child_leaks_no_pipe_fds( # pragma: lax no cover + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A graceful exit with a surviving child must not leak the client's pipe fds. + + A server may exit cleanly on stdin closure while leaving a child holding the + inherited pipe ends (the POSIX policy: survivors are the server's business). The + client must still release its own pipe fds and subprocess transport at shutdown + (on asyncio nothing else ever closes them while the orphan holds the pipe) instead + of leaking them for the orphan's lifetime. + """ + spawned = _record_spawned_processes(monkeypatch) - # Use anyio timeout to prevent test from hanging forever - with anyio.move_on_after(7.0) as cancel_scope: - async with stdio_client(server_params) as (_, _): - # Let the process start - await anyio.sleep(0.2) - # Exit context triggers cleanup - pass + async with AsyncExitStack() as stack: + stack.callback(_kill_spawn_groups, spawned) + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) - if cancel_scope.cancelled_caught: - pytest.fail( - "stdio_client cleanup timed out after 7.0 seconds. " - "Process should have been terminated via SIGTERM escalation." - ) # pragma: no cover + child = _connect_back_script(port) + # The server hands its inherited pipes to a child, then exits as soon as its + # stdin closes: the well-behaved graceful path, so no kill ever happens. + server = f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {child!r}])\nsys.stdin.read()\n" + server_params = StdioServerParameters(command=sys.executable, args=["-c", server]) - end_time = time.time() - elapsed = end_time - start_time + gc.collect() # settle earlier garbage so its collection cannot close fds mid-test + baseline = set(os.listdir("/proc/self/fd")) - # Should take ~2 seconds (stdin close timeout) before SIGTERM is sent - # Total time should be between 2-4 seconds - assert 1.5 < elapsed < 4.5, ( - f"stdio_client cleanup took {elapsed:.1f} seconds for stdin-ignoring process. " - f"Expected between 2-4 seconds (2s stdin timeout + termination time)." - ) + # Two interpreter cold starts on a loaded runner; healthy runs take ~0.3s. + with anyio.fail_after(15.0): + async with stdio_client(server_params): + stream = await _accept_alive(sock) + await stream.aclose() + + leader = spawned[0] + assert isinstance(leader, anyio.abc.Process) + # The graceful path: exited on stdin closure, no termination involved. + assert leader.returncode == 0 + # Subset, not equality: other machinery may close fds, but never open new + # ones; a leaked pipe fd would show up as an extra entry. + assert set(os.listdir("/proc/self/fd")) <= baseline diff --git a/tests/interaction/transports/test_stdio.py b/tests/interaction/transports/test_stdio.py index 27cc65de42..1f65996aa3 100644 --- a/tests/interaction/transports/test_stdio.py +++ b/tests/interaction/transports/test_stdio.py @@ -1,18 +1,13 @@ """The stdio transport: one subprocess end-to-end test and one in-process framing test. -Everything else in the suite runs in a single process; the subprocess test exists to prove the same -client↔server round trip works over the stdio transport's real boundary (a child process whose -stdin/stdout carry one newline-delimited JSON-RPC message per line). The server lives in -`_stdio_server.py` and is launched via `python -m` so subprocess coverage measurement applies. - -The framing test drives `stdio_server` in-process by passing it injected text streams instead of the -real stdin/stdout, so the raw lines the transport writes can be asserted directly without a process -boundary. - -stdio is deliberately not a leg of the `connect`-fixture matrix: spawning a subprocess per test -would be slow, and the matrix already proves transport-agnosticism over three in-process -transports. Process-lifecycle edge cases (escalation to terminate/kill, parse errors) are covered by -`tests/client/test_stdio.py` and stay deferred here. +The subprocess test proves the client-server round trip over the transport's real process +boundary; its server lives in `_stdio_server.py` and is launched via `python -m` so subprocess +coverage measurement applies. The framing test drives `stdio_server` over injected in-process +streams instead. + +stdio is deliberately not a leg of the `connect`-fixture matrix: a subprocess per test would be +slow, and the matrix already proves transport-agnosticism in-process. Process-lifecycle edge +cases (terminate/kill escalation, parse errors) stay in `tests/client/test_stdio.py`. """ import io @@ -26,6 +21,7 @@ import pytest from inline_snapshot import snapshot +from mcp.client import stdio from mcp.client.client import Client from mcp.client.stdio import StdioServerParameters, stdio_client from mcp.server.stdio import stdio_server @@ -51,10 +47,21 @@ @requirement("transport:stdio") @requirement("transport:stdio:clean-shutdown") @requirement("transport:stdio:stderr-passthrough") -async def test_tool_call_and_notification_round_trip_over_a_stdio_subprocess() -> None: - """A Client connected over stdio initializes, calls a tool with arguments, receives the - server's log notification before the call returns, and the server exits when the transport - closes its stdin.""" +async def test_tool_call_and_notification_round_trip_over_a_stdio_subprocess( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A stdio-subprocess Client round-trips a tool call, a notification, and a clean exit. + + The Client initializes, calls a tool with arguments, and receives the server's log + notification before the call returns; the server exits when the transport closes its + stdin. + """ + # After stdin closes, the child must unwind, write the clean-exit line, and let coverage's + # atexit hook persist its subprocess data file before escalation. The production 2s default + # was too tight on slow Windows runners: the child was killed mid-atexit (test stayed green) + # and the silently missing data file tripped the 100% coverage gate. Not under test. + monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", 10.0) + received: list[LoggingMessageNotificationParams] = [] async def collect(params: LoggingMessageNotificationParams) -> None: @@ -66,15 +73,18 @@ async def collect(params: LoggingMessageNotificationParams) -> None: command=sys.executable, args=["-m", _stdio_server.__name__], cwd=str(_REPO_ROOT), - # stdio_client deliberately filters the inherited environment to a safe minimum, - # which drops the variables coverage.py's subprocess support uses; pass them through - # so the server module is measured. Empty when not running under coverage. - env={key: value for key, value in os.environ.items() if key.startswith("COVERAGE_")}, + # stdio_client filters the inherited environment, dropping the variables + # coverage.py's subprocess support uses; pass them through so the server module is + # measured. PYTHONWARNINGS: the child recompiles anyio (pytest's pyc tag differs), + # and on 3.14 anyio's return-in-finally SyntaxWarning would land on the snapshot stderr. + env={key: value for key, value in os.environ.items() if key.startswith("COVERAGE_")} + | {"PYTHONWARNINGS": "ignore::SyntaxWarning"}, ), errlog=errlog, ) - with anyio.fail_after(10): + # Must exceed session time plus the patched PROCESS_TERMINATION_TIMEOUT (10s). + with anyio.fail_after(20): async with Client(transport, logging_callback=collect) as client: assert client.initialize_result.server_info.name == "stdio-echo" result = await client.call_tool("echo", {"text": "across\nprocesses"}) @@ -83,28 +93,23 @@ async def collect(params: LoggingMessageNotificationParams) -> None: captured_stderr = errlog.read() assert result == snapshot(CallToolResult(content=[TextContent(text="across\nprocesses")])) - # stdio carries one ordered server→client stream, so the same notification-before-response + # stdio carries one ordered server-to-client stream, so the same notification-before-response # guarantee holds here as for the in-memory transport. assert received == snapshot( [LoggingMessageNotificationParams(level="info", logger="echo", data="echoing across\nprocesses")] ) - # The server writes this line only after its run loop returns, which happens when stdin closes: - # seeing it proves the process exited on its own rather than via the transport's terminate - # escalation, without a timing-based assertion. The capture itself proves stderr passthrough: - # the transport routes the child's stderr to the caller's `errlog` without consuming it. + # The server writes this line only after its run loop returns on stdin close: seeing it proves + # a self-exit, not the terminate escalation. The capture itself proves stderr passthrough. assert captured_stderr == snapshot("stdio-echo: clean exit\n") @requirement("transport:stdio:stream-purity") @requirement("transport:stdio:no-embedded-newlines") async def test_stdio_server_writes_one_jsonrpc_message_per_line() -> None: - """Everything `stdio_server` writes is a valid JSON-RPC message on its own line, and nothing else. + """Every `stdio_server` write is one valid JSON-RPC message on its own line. - The transport's stdin/stdout parameters are public, so the test injects in-process text streams - instead of the real process handles and drives the read/write streams directly: a JSON-RPC line on - stdin is parsed and delivered, and every message sent on the write stream appears as exactly one - newline-terminated line whose payload newlines are JSON-escaped. This proves the transport's own - framing; it does not guard `sys.stdout` against handler code that prints to it directly (see the + Each line is newline-terminated with payload newlines JSON-escaped. This proves the + transport's own framing; it does not guard `sys.stdout` against handler code (see the divergence on `transport:stdio:stream-purity`). """ captured = io.StringIO() diff --git a/tests/issues/test_1027_win_unreachable_cleanup.py b/tests/issues/test_1027_win_unreachable_cleanup.py deleted file mode 100644 index c59c5aecae..0000000000 --- a/tests/issues/test_1027_win_unreachable_cleanup.py +++ /dev/null @@ -1,240 +0,0 @@ -"""Regression test for issue #1027: Ensure cleanup procedures run properly during shutdown - -Issue #1027 reported that cleanup code after "yield" in lifespan was unreachable when -processes were terminated. This has been fixed by implementing the MCP spec-compliant -stdio shutdown sequence that closes stdin first, allowing graceful exit. - -These tests verify the fix continues to work correctly across all platforms. -""" - -import sys -import tempfile -import textwrap -from pathlib import Path - -import anyio -import pytest - -from mcp import ClientSession, StdioServerParameters -from mcp.client.stdio import _create_platform_compatible_process, stdio_client -from tests.shared.test_win32_utils import escape_path_for_python - - -@pytest.mark.anyio -async def test_lifespan_cleanup_executed(): - """Regression test ensuring MCP server cleanup code runs during shutdown. - - This test verifies that the fix for issue #1027 works correctly by: - 1. Starting an MCP server that writes a marker file on startup - 2. Shutting down the server normally via stdio_client - 3. Verifying the cleanup code (after yield) executed and wrote its marker file - - The fix implements proper stdin closure before termination, giving servers - time to run their cleanup handlers. - """ - - # Create marker files to track server lifecycle - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: - startup_marker = f.name - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: - cleanup_marker = f.name - - # Remove the files so we can detect when they're created - Path(startup_marker).unlink() - Path(cleanup_marker).unlink() - - # Create a minimal MCP server using MCPServer that tracks lifecycle - server_code = textwrap.dedent(f""" - import asyncio - import sys - from pathlib import Path - from contextlib import asynccontextmanager - from mcp.server.mcpserver import MCPServer - - STARTUP_MARKER = {escape_path_for_python(startup_marker)} - CLEANUP_MARKER = {escape_path_for_python(cleanup_marker)} - - @asynccontextmanager - async def lifespan(server): - # Write startup marker - Path(STARTUP_MARKER).write_text("started") - try: - yield {{"started": True}} - finally: - # This cleanup code now runs properly during shutdown - Path(CLEANUP_MARKER).write_text("cleaned up") - - mcp = MCPServer("test-server", lifespan=lifespan) - - @mcp.tool() - def echo(text: str) -> str: - return text - - if __name__ == "__main__": - mcp.run() - """) - - # Write the server script to a temporary file - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".py") as f: - server_script = f.name - f.write(server_code) - - try: - # Launch the MCP server - params = StdioServerParameters(command=sys.executable, args=[server_script]) - - async with stdio_client(params) as (read, write): - async with ClientSession(read, write) as session: - # Initialize the session - result = await session.initialize() - assert result.protocol_version in ["2024-11-05", "2025-06-18", "2025-11-25"] - - # Verify startup marker was created - assert Path(startup_marker).exists(), "Server startup marker not created" - assert Path(startup_marker).read_text() == "started" - - # Make a test request to ensure server is working - response = await session.call_tool("echo", {"text": "hello"}) - assert response.content[0].type == "text" - assert getattr(response.content[0], "text") == "hello" - - # Session will be closed when exiting the context manager - - # Give server a moment to complete cleanup - with anyio.move_on_after(5.0): - while not Path(cleanup_marker).exists(): # pragma: lax no cover - await anyio.sleep(0.1) - - # Verify cleanup marker was created - this works now that stdio_client - # properly closes stdin before termination, allowing graceful shutdown - assert Path(cleanup_marker).exists(), "Server cleanup marker not created - regression in issue #1027 fix" - assert Path(cleanup_marker).read_text() == "cleaned up" - - finally: - # Clean up files - for path in [server_script, startup_marker, cleanup_marker]: - try: # pragma: lax no cover - Path(path).unlink() - except FileNotFoundError: # pragma: lax no cover - pass - - -@pytest.mark.anyio -@pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") -async def test_stdin_close_triggers_cleanup(): - """Regression test verifying the stdin-based graceful shutdown mechanism. - - This test ensures the core fix for issue #1027 continues to work by: - 1. Manually managing a server process - 2. Closing stdin to trigger graceful shutdown - 3. Verifying cleanup handlers run before the process exits - - This mimics the behavior now implemented in stdio_client's shutdown sequence. - - Note on Windows ResourceWarning: - On Windows, we may see ResourceWarning about unclosed file descriptors. - This is expected behavior because: - - We're manually managing the process lifecycle - - Windows file handle cleanup works differently than Unix - - The warning doesn't indicate a real issue - cleanup still works - We filter this warning on Windows only to avoid test noise. - """ - - # Create marker files to track server lifecycle - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: - startup_marker = f.name - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: - cleanup_marker = f.name - - # Remove the files so we can detect when they're created - Path(startup_marker).unlink() - Path(cleanup_marker).unlink() - - # Create an MCP server that handles stdin closure gracefully - server_code = textwrap.dedent(f""" - import asyncio - import sys - from pathlib import Path - from contextlib import asynccontextmanager - from mcp.server.mcpserver import MCPServer - - STARTUP_MARKER = {escape_path_for_python(startup_marker)} - CLEANUP_MARKER = {escape_path_for_python(cleanup_marker)} - - @asynccontextmanager - async def lifespan(server): - # Write startup marker - Path(STARTUP_MARKER).write_text("started") - try: - yield {{"started": True}} - finally: - # This cleanup code runs when stdin closes, enabling graceful shutdown - Path(CLEANUP_MARKER).write_text("cleaned up") - - mcp = MCPServer("test-server", lifespan=lifespan) - - @mcp.tool() - def echo(text: str) -> str: - return text - - if __name__ == "__main__": - # The server should exit gracefully when stdin closes - try: - mcp.run() - except Exception: - # Server might get EOF or other errors when stdin closes - pass - """) - - # Write the server script to a temporary file - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".py") as f: - server_script = f.name - f.write(server_code) - - try: - # This test manually manages the process to verify stdin-based shutdown - # Start the server process - process = await _create_platform_compatible_process( - command=sys.executable, args=[server_script], env=None, errlog=sys.stderr, cwd=None - ) - - # Wait for server to start - with anyio.move_on_after(10.0): - while not Path(startup_marker).exists(): - await anyio.sleep(0.1) - - # Check if process is still running - if hasattr(process, "returncode") and process.returncode is not None: # pragma: lax no cover - pytest.fail(f"Server process exited with code {process.returncode}") - - assert Path(startup_marker).exists(), "Server startup marker not created" - - # Close stdin to signal shutdown - if process.stdin: # pragma: no branch - await process.stdin.aclose() - - # Wait for process to exit gracefully - try: - with anyio.fail_after(5.0): # Increased from 2.0 to 5.0 - await process.wait() - except TimeoutError: # pragma: lax no cover - # If it doesn't exit after stdin close, terminate it - process.terminate() - await process.wait() - - # Check if cleanup ran - with anyio.move_on_after(5.0): - while not Path(cleanup_marker).exists(): # pragma: lax no cover - await anyio.sleep(0.1) - - # Verify the cleanup ran - stdin closure enables graceful shutdown - assert Path(cleanup_marker).exists(), "Server cleanup marker not created - stdin-based shutdown failed" - assert Path(cleanup_marker).read_text() == "cleaned up" - - finally: - # Clean up files - for path in [server_script, startup_marker, cleanup_marker]: - try: # pragma: lax no cover - Path(path).unlink() - except FileNotFoundError: # pragma: lax no cover - pass diff --git a/tests/issues/test_552_windows_hang.py b/tests/issues/test_552_windows_hang.py index 1adb5d80cb..371d033c2b 100644 --- a/tests/issues/test_552_windows_hang.py +++ b/tests/issues/test_552_windows_hang.py @@ -1,5 +1,6 @@ """Test for issue #552: stdio_client hangs on Windows.""" +import json import sys from textwrap import dedent @@ -8,41 +9,36 @@ from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client +from mcp.types import LATEST_PROTOCOL_VERSION, InitializeResult @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific test") # pragma: no cover @pytest.mark.anyio -async def test_windows_stdio_client_with_session(): - """Test the exact scenario from issue #552: Using ClientSession with stdio_client. +async def test_initialize_succeeds_and_shutdown_returns_after_the_server_exits_mid_session(): + """Initialize completes and shutdown returns when the server exits mid-session. - This reproduces the original bug report where stdio_client hangs on Windows 11 - when used with ClientSession. + This is the proactor pipe scenario that hung on Windows 11 (issue #552). The positive + assertion matters: a session that errors quickly would also "not hang". """ - # Create a minimal MCP server that responds to initialization - server_script = dedent(""" + # A minimal server: answer initialize correctly, then exit. + server_script = dedent(f""" import json import sys - # Read initialization request line = sys.stdin.readline() + request = json.loads(line) - # Send initialization response - response = { + response = {{ "jsonrpc": "2.0", - "id": 1, - "result": { - "protocolVersion": "1.0", - "capabilities": {}, - "serverInfo": {"name": "test-server", "version": "1.0"} - } - } + "id": request["id"], + "result": {{ + "protocolVersion": {json.dumps(LATEST_PROTOCOL_VERSION)}, + "capabilities": {{}}, + "serverInfo": {{"name": "test-server", "version": "1.0"}} + }} + }} print(json.dumps(response)) sys.stdout.flush() - - # Exit after a short delay - import time - time.sleep(0.1) - sys.exit(0) """).strip() params = StdioServerParameters( @@ -50,14 +46,11 @@ async def test_windows_stdio_client_with_session(): args=["-c", server_script], ) - # This is the exact pattern from the bug report with anyio.fail_after(10): - try: - async with stdio_client(params) as (read, write): - async with ClientSession(read, write) as session: - await session.initialize() - # Should exit ClientSession without hanging - # Should exit stdio_client without hanging - except Exception: - # Connection errors are expected when process exits - pass + async with stdio_client(params) as (read, write): + async with ClientSession(read, write) as session: + result = await session.initialize() + assert isinstance(result, InitializeResult) + assert result.server_info.name == "test-server" + # Exiting ClientSession and stdio_client must not hang even though the + # server process is already gone. diff --git a/tests/server/mcpserver/test_elicitation.py b/tests/server/mcpserver/test_elicitation.py index 679fb848f5..9292586b32 100644 --- a/tests/server/mcpserver/test_elicitation.py +++ b/tests/server/mcpserver/test_elicitation.py @@ -1,4 +1,4 @@ -"""Test the elicitation feature using stdio transport.""" +"""Test the elicitation feature over the in-memory client transport.""" from typing import Any @@ -58,9 +58,9 @@ async def call_tool_and_assert( @pytest.mark.anyio -async def test_stdio_elicitation(): - """Test the elicitation feature using stdio transport.""" - mcp = MCPServer(name="StdioElicitationServer") +async def test_elicitation_accept_returns_the_users_answer_to_the_tool(): + """An accepted elicitation delivers the user's content back to the requesting tool.""" + mcp = MCPServer(name="ElicitationServer") create_ask_user_tool(mcp) # Create a custom handler for elicitation requests @@ -76,9 +76,9 @@ async def elicitation_callback(context: RequestContext[ClientSession], params: E @pytest.mark.anyio -async def test_stdio_elicitation_decline(): - """Test elicitation with user declining.""" - mcp = MCPServer(name="StdioElicitationDeclineServer") +async def test_elicitation_decline_reaches_the_tool_without_content(): + """A declined elicitation reports the decline to the tool, with no content attached.""" + mcp = MCPServer(name="ElicitationDeclineServer") create_ask_user_tool(mcp) async def elicitation_callback(context: RequestContext[ClientSession], params: ElicitRequestParams): diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 677a993567..054a157b3b 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -1,17 +1,26 @@ import io import sys +import threading +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager from io import TextIOWrapper import anyio import pytest +from mcp.server.mcpserver import MCPServer from mcp.server.stdio import stdio_server from mcp.shared.message import SessionMessage from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter @pytest.mark.anyio -async def test_stdio_server(): +async def test_stdio_server_round_trips_messages_over_injected_streams() -> None: + """stdio_server frames JSON-RPC messages as one line each in both directions. + + Parses one message per stdin line and writes each outgoing message as exactly one + line, driven over injected in-process streams. + """ stdin = io.StringIO() stdout = io.StringIO() @@ -24,52 +33,45 @@ async def test_stdio_server(): stdin.write(message.model_dump_json(by_alias=True, exclude_none=True) + "\n") stdin.seek(0) - async with stdio_server(stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout)) as ( - read_stream, - write_stream, - ): - received_messages: list[JSONRPCMessage] = [] - async with read_stream: - async for message in read_stream: - if isinstance(message, Exception): # pragma: no cover - raise message - received_messages.append(message.message) - if len(received_messages) == 2: - break - - # Verify received messages - assert len(received_messages) == 2 - assert received_messages[0] == JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") - assert received_messages[1] == JSONRPCResponse(jsonrpc="2.0", id=2, result={}) - - # Test sending responses from the server - responses = [ - JSONRPCRequest(jsonrpc="2.0", id=3, method="ping"), - JSONRPCResponse(jsonrpc="2.0", id=4, result={}), - ] - - async with write_stream: + with anyio.fail_after(5): + async with stdio_server(stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout)) as ( + read_stream, + write_stream, + ): + async with read_stream: + received_messages: list[JSONRPCMessage] = [] + for _ in range(2): + received = await read_stream.receive() + assert not isinstance(received, Exception) + received_messages.append(received.message) + + assert received_messages[0] == JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + assert received_messages[1] == JSONRPCResponse(jsonrpc="2.0", id=2, result={}) + + responses = [ + JSONRPCRequest(jsonrpc="2.0", id=3, method="ping"), + JSONRPCResponse(jsonrpc="2.0", id=4, result={}), + ] + for response in responses: - session_message = SessionMessage(response) - await write_stream.send(session_message) + await write_stream.send(SessionMessage(response)) + await write_stream.aclose() stdout.seek(0) output_lines = stdout.readlines() assert len(output_lines) == 2 received_responses = [jsonrpc_message_adapter.validate_json(line.strip()) for line in output_lines] - assert len(received_responses) == 2 assert received_responses[0] == JSONRPCRequest(jsonrpc="2.0", id=3, method="ping") assert received_responses[1] == JSONRPCResponse(jsonrpc="2.0", id=4, result={}) @pytest.mark.anyio -async def test_stdio_server_invalid_utf8(monkeypatch: pytest.MonkeyPatch): - """Non-UTF-8 bytes on stdin must not crash the server. +async def test_stdio_server_invalid_utf8(monkeypatch: pytest.MonkeyPatch) -> None: + """Non-UTF-8 stdin bytes surface as an in-stream exception without killing the stream. - Invalid bytes are replaced with U+FFFD, which then fails JSON parsing and - is delivered as an in-stream exception. Subsequent valid messages must - still be processed. + Invalid bytes are replaced with U+FFFD, fail JSON parsing, and arrive as an in-stream + exception; subsequent valid messages are still processed. """ # \xff\xfe are invalid UTF-8 start bytes. valid = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") @@ -92,3 +94,78 @@ async def test_stdio_server_invalid_utf8(monkeypatch: pytest.MonkeyPatch): second = await read_stream.receive() assert isinstance(second, SessionMessage) assert second.message == valid + + +class _KeepOpenBytesIO(io.BytesIO): + """A BytesIO that survives its TextIOWrapper being closed. + + Lets the test read what was written after `run()` has torn the wrapper down. + """ + + def close(self) -> None: + pass + + +def _run_stdio_bounded(server: MCPServer) -> None: + """Run the blocking `server.run("stdio")` in a daemon thread joined with a 5s bound. + + `run()` creates its own event loop, so a sync test cannot arm `anyio.fail_after`; + the join timeout turns a run loop that never returns on stdin EOF into a red test + instead of a silent CI hang. An exception escaping `run()` still fails the test: + pytest's unhandled-thread warning is escalated by `filterwarnings = ["error"]`. + """ + + def target() -> None: + server.run("stdio") + + thread = threading.Thread(target=target, daemon=True) + thread.start() + thread.join(5) + assert not thread.is_alive(), 'run("stdio") did not return after stdin EOF' + + +def test_mcpserver_run_stdio_serves_until_stdin_closes(monkeypatch: pytest.MonkeyPatch) -> None: + """`MCPServer.run("stdio")` serves over process stdio and returns at stdin EOF. + + Answers a request over the process's stdio and returns when stdin reaches EOF, + rather than serving forever. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + stdin_bytes = io.BytesIO(ping.model_dump_json(by_alias=True, exclude_none=True).encode() + b"\n") + captured = _KeepOpenBytesIO() + monkeypatch.setattr(sys, "stdin", TextIOWrapper(stdin_bytes, encoding="utf-8")) + monkeypatch.setattr(sys, "stdout", TextIOWrapper(captured, encoding="utf-8")) + + _run_stdio_bounded(MCPServer(name="RunStdioServer")) + + response = jsonrpc_message_adapter.validate_json(captured.getvalue().decode().strip()) + assert response == JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + + +def test_mcpserver_run_stdio_runs_lifespan_cleanup_after_stdin_closes(monkeypatch: pytest.MonkeyPatch) -> None: + """Code after `yield` in a lifespan runs when stdin EOF ends `run("stdio")`. + + Regression lock for the issue #1027 shutdown chain: the run loop must end on + stdin EOF and unwind the lifespan rather than be killed before returning. + """ + events: list[str] = [] + + @asynccontextmanager + async def lifespan(server: MCPServer) -> AsyncIterator[None]: + events.append("setup") + try: + yield + finally: + events.append("cleanup") + + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + stdin_bytes = io.BytesIO(ping.model_dump_json(by_alias=True, exclude_none=True).encode() + b"\n") + captured = _KeepOpenBytesIO() + monkeypatch.setattr(sys, "stdin", TextIOWrapper(stdin_bytes, encoding="utf-8")) + monkeypatch.setattr(sys, "stdout", TextIOWrapper(captured, encoding="utf-8")) + + _run_stdio_bounded(MCPServer(name="LifespanStdioServer", lifespan=lifespan)) + + assert events == ["setup", "cleanup"] + response = jsonrpc_message_adapter.validate_json(captured.getvalue().decode().strip()) + assert response == JSONRPCResponse(jsonrpc="2.0", id=1, result={}) diff --git a/tests/shared/test_win32_utils.py b/tests/shared/test_win32_utils.py deleted file mode 100644 index e0f9cb4995..0000000000 --- a/tests/shared/test_win32_utils.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Windows-specific test utilities.""" - - -def escape_path_for_python(path: str) -> str: - """Escape a file path for use in Python code strings. - - Converts backslashes to forward slashes which work on all platforms - and don't need escaping in Python strings. - """ - return repr(path.replace("\\", "/")) diff --git a/tests/transports/__init__.py b/tests/transports/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/transports/stdio/__init__.py b/tests/transports/stdio/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/transports/stdio/_liveness.py b/tests/transports/stdio/_liveness.py new file mode 100644 index 0000000000..5e4b679fe0 --- /dev/null +++ b/tests/transports/stdio/_liveness.py @@ -0,0 +1,80 @@ +"""Kernel-synchronized liveness probes for the real-subprocess stdio lifecycle suite. + +A spawned (grand)child connects back to a test-owned TCP listener and sends +`b'alive'`; the kernel then provides every signal a test needs, with no sleeps or +polling. The kernel closes all of a process's file descriptors on exit, so EOF +(clean close / FIN) or `BrokenResourceError` (abrupt close / RST, typical of +SIGKILL and Windows job termination) proves death; only a running process can +answer an echo, so a reply proves liveness without racing a kill. + +Extracted from the real-process section of tests/client/test_stdio.py; the two +copies on this branch are deliberate -- consolidating them is follow-up work. +""" + +import anyio +import anyio.abc +import pytest + + +def connect_back_script(port: int, *, echo: bool = False) -> str: + """Return a `python -c` script body that connects to 127.0.0.1:`port` and sends `b'alive'`. + + After the banner the script blocks forever -- or, with `echo=True`, echoes every + received chunk back so `assert_peer_echoes` can prove the process still runs. + """ + # lax no cover: echo mode is used only by POSIX-gated tests; Windows runners enforce 100% per job. + if echo: # pragma: lax no cover + tail = "while True:\n data = s.recv(65536)\n if not data:\n break\n s.sendall(data)\n" + else: + tail = "time.sleep(3600)\n" + return f"import socket, time\ns = socket.create_connection(('127.0.0.1', {port}))\ns.sendall(b'alive')\n" + tail + + +async def open_liveness_listener() -> tuple[anyio.abc.SocketListener, int]: + """Open a TCP listener on localhost and return it along with its port.""" + multi = await anyio.create_tcp_listener(local_host="127.0.0.1") + sock = multi.listeners[0] + assert isinstance(sock, anyio.abc.SocketListener) + addr = sock.extra(anyio.abc.SocketAttribute.local_address) + # IPv4 local_address is (host: str, port: int) + assert isinstance(addr, tuple) and len(addr) >= 2 and isinstance(addr[1], int) + return sock, addr[1] + + +async def accept_alive(sock: anyio.abc.SocketListener) -> anyio.abc.SocketStream: + """Accept one connection and assert the peer sent `b'alive'`. + + Reads until the full 5-byte banner arrives (TCP may legally split even a tiny + send). Callers bound this with `anyio.fail_after` to catch a subprocess that + never started. + """ + stream = await sock.accept() + msg = b"" + while len(msg) < 5: + msg += await stream.receive(5 - len(msg)) + assert msg == b"alive", f"expected b'alive', got {msg!r}" + return stream + + +async def assert_stream_closed(stream: anyio.abc.SocketStream) -> None: + """Assert the peer holding the other end of `stream` has terminated.""" + with anyio.fail_after(5.0), pytest.raises((anyio.EndOfStream, anyio.BrokenResourceError)): + await stream.receive(1) + + +async def assert_peer_echoes(stream: anyio.abc.SocketStream) -> None: # pragma: lax no cover + """Assert the peer holding the other end of `stream` is still running. + + Round-trips one echo through the stream (the peer must use `echo=True`); a dead + process can never answer, so this cannot pass spuriously. + + lax no cover: only POSIX-gated survival tests call this; Windows runners + enforce 100% coverage per job. + """ + with anyio.fail_after(5.0): + await stream.send(b"ping") + # Read until the full echo has arrived: TCP may legally split even a tiny send. + echoed = b"" + while len(echoed) < 4: + echoed += await stream.receive(4 - len(echoed)) + assert echoed == b"ping", f"expected b'ping', got {echoed!r}" diff --git a/tests/transports/stdio/conftest.py b/tests/transports/stdio/conftest.py new file mode 100644 index 0000000000..e9601ac6d5 --- /dev/null +++ b/tests/transports/stdio/conftest.py @@ -0,0 +1,77 @@ +"""Fixtures for the stdio lifecycle suite. + +Provides recording seams around `stdio_client`'s spawn and tree-termination +internals (the real implementations still run), plus a teardown that keeps a +crashed test from orphaning its sleep-forever subprocesses. +""" + +import os +import signal +import sys +from collections.abc import Generator +from contextlib import suppress +from pathlib import Path +from typing import TextIO + +import anyio.abc +import pytest + +from mcp.client import stdio +from mcp.client.stdio import _create_platform_compatible_process, _terminate_process_tree +from mcp.os.win32.utilities import FallbackProcess + + +@pytest.fixture +def spawned_processes( + monkeypatch: pytest.MonkeyPatch, +) -> Generator[list[anyio.abc.Process | FallbackProcess]]: + """Record every process `stdio_client` spawns; the real spawn still runs. + + Teardown SIGKILLs each spawn-time process group on POSIX: the safety net for a + test that dies mid-body and the reaper for deliberate survivors. On Windows + there is no group to signal (the Job Object covers strays). + """ + spawned: list[anyio.abc.Process | FallbackProcess] = [] + + async def recording_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> anyio.abc.Process | FallbackProcess: + process = await _create_platform_compatible_process(command, args, env, errlog, cwd) + spawned.append(process) + return process + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", recording_spawn) + yield spawned + _kill_spawn_groups(spawned) + + +@pytest.fixture +def terminate_calls(monkeypatch: pytest.MonkeyPatch) -> list[anyio.abc.Process | FallbackProcess]: + """Record every invocation of `stdio_client`'s tree-termination seam; the real termination still runs. + + An empty list after the context exits proves the graceful path: a FIN looks the + same whether the peer exited on stdin closure or was killed. + """ + terminated: list[anyio.abc.Process | FallbackProcess] = [] + + async def recording_terminate(process: anyio.abc.Process | FallbackProcess) -> None: + terminated.append(process) + await _terminate_process_tree(process) + + monkeypatch.setattr(stdio, "_terminate_process_tree", recording_terminate) + return terminated + + +# lax no cover: registered on every platform but a no-op on Windows, whose runners enforce 100% per job. +def _kill_spawn_groups(spawned: list[anyio.abc.Process | FallbackProcess]) -> None: # pragma: lax no cover + """SIGKILL each spawn-time process group; see `spawned_processes`.""" + if sys.platform == "win32": + return + for process in spawned: + # macOS killpg raises EPERM for a group holding only unreaped zombies. + with suppress(ProcessLookupError, PermissionError): + os.killpg(process.pid, signal.SIGKILL) diff --git a/tests/transports/stdio/test_lifecycle.py b/tests/transports/stdio/test_lifecycle.py new file mode 100644 index 0000000000..8a370c10f6 --- /dev/null +++ b/tests/transports/stdio/test_lifecycle.py @@ -0,0 +1,276 @@ +"""Real-subprocess stdio lifecycle tests that hold on both POSIX and Windows. + +The `stdio_client` tests each launch a real server through the public API and pin +one lifecycle behaviour, with kernel-level liveness sockets as the only +synchronization; the `FallbackProcess` tests wrap a raw `subprocess.Popen` +directly. Platform-divergent shutdown policy lives in test_posix.py / +test_windows.py; the full protocol round trip is pinned by +tests/interaction/transports/test_stdio.py and in-process shutdown logic by +tests/client/test_stdio.py. +""" + +import os +import subprocess +import sys +import threading +from contextlib import AsyncExitStack +from pathlib import Path + +import anyio +import anyio.abc +import pytest + +from mcp.client import stdio +from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.os.win32.utilities import FallbackProcess +from tests.transports.stdio._liveness import ( + accept_alive, + assert_stream_closed, + connect_back_script, + open_liveness_listener, +) + + +@pytest.mark.anyio +async def test_a_server_that_exits_on_stdin_close_is_reaped_and_never_terminated( + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """The happy path: closing stdin alone shuts a well-behaved server down. + + The server exits with code 0 and the escalation seam is never invoked. + """ + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + # The server exits on its own at stdin EOF -- the well-behaved response + # to shutdown's first step. + server = ( + f"import socket, sys\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"sys.stdin.read()\n" + ) + params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + # The bound covers one interpreter cold start on a loaded runner; a healthy + # run takes well under a second. + with anyio.fail_after(10.0): + async with stdio_client(params): + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + + await assert_stream_closed(stream) + + assert spawned_processes[0].returncode == 0 + assert terminate_calls == [] + + +@pytest.mark.anyio +async def test_cancelling_the_client_mid_session_terminates_the_whole_server_tree( + monkeypatch: pytest.MonkeyPatch, + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """Cancellation still runs the full shutdown against a real process tree. + + Cancellation here stands in for a client timeout or app shutdown: a server that + ignores stdin closure is escalated against, and its child dies with it. + """ + monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", 0.2) + + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + child = connect_back_script(port) + # The parent never reads stdin and blocks forever, so only the escalation + # can end it -- which cancellation must not skip. + parent = f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {child!r}])\n" + connect_back_script( + port + ) + params = StdioServerParameters(command=sys.executable, args=["-c", parent]) + + entered = anyio.Event() + # Cancel a scope owned by the client's task, not the test's task group: a + # host self-cancel is delivered by throwing through this test function's + # suspended frames, and Python 3.11's tracer loses coverage events after + # such a throw() traversal (python/cpython#106749). + cancel_scope = anyio.CancelScope() + + async def run_client_until_cancelled() -> None: + with cancel_scope: + async with stdio_client(params): + entered.set() + await anyio.sleep_forever() + + streams: list[anyio.abc.SocketStream] = [] + # The bound covers two interpreter cold starts on a loaded runner plus the + # shortened escalation wait; a healthy run takes around a second. + with anyio.fail_after(10.0): + async with anyio.create_task_group() as tg: + tg.start_soon(run_client_until_cancelled) + await entered.wait() + for _ in range(2): + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + streams.append(stream) + cancel_scope.cancel() + + for stream in streams: + await assert_stream_closed(stream) + + assert terminate_calls == spawned_processes + + +@pytest.mark.anyio +async def test_a_server_that_exits_mid_session_keeps_its_own_exit_code( + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """A server that dies on its own mid-session is reaped with the exit code it chose. + + The client surfaces the child's true status rather than synthesizing one, and + the escalation seam confirms nothing was terminated along the way. + """ + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + server = ( + f"import socket, sys\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"sys.exit(7)\n" + ) + params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + # The bound covers one interpreter cold start on a loaded runner; a healthy + # run takes well under a second. + with anyio.fail_after(10.0): + # no branch: coverage mis-traces the exit arcs of a nested `async with` on 3.11+. + async with stdio_client(params): # pragma: no branch + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + # The server is already gone before shutdown begins. + await assert_stream_closed(stream) + + assert spawned_processes[0].returncode == 7 + assert terminate_calls == [] + + +@pytest.mark.anyio +async def test_server_stderr_output_reaches_the_errlog_file( + tmp_path: Path, + spawned_processes: list[anyio.abc.Process | FallbackProcess], +) -> None: + """What the server writes to stderr lands in the file passed as `errlog`. + + The spawn hands over errlog's file descriptor as the child's stderr, so it must + be a real file -- an in-memory StringIO has no fileno. + """ + marker = "stdio-lifecycle stderr marker 4242" + + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + server = ( + f"import socket, sys\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"sys.stderr.write({marker!r} + '\\n')\n" + f"sys.stderr.flush()\n" + f"sys.stdin.read()\n" + ) + params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + with (tmp_path / "errlog.txt").open("w+", encoding="utf-8") as errlog: + # The bound covers one interpreter cold start on a loaded runner; a + # healthy run takes well under a second. + with anyio.fail_after(10.0): + async with stdio_client(params, errlog=errlog): + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + + # The server exited on stdin EOF, so every stderr write it made has + # reached the file descriptor. + errlog.seek(0) + content = errlog.read() + + assert marker in content + assert spawned_processes[0].returncode == 0 + + +@pytest.mark.skipif( + not hasattr(os, "waitid"), reason="needs os.waitid(WNOWAIT); absent on Windows and macOS before 3.13" +) +# lax no cover: Windows runners enforce 100% per job but lack os.waitid and skip this +# test; test_windows.py's SelectorEventLoop lifecycle test exercises the property there. +def test_fallback_process_reports_death_through_returncode_without_a_wait_call() -> None: # pragma: lax no cover + """`FallbackProcess.returncode` observes process death on its own. + + Pre-fix it returned Popen's cached value, which stays None until someone calls wait()/poll(). + + `os.waitid(WEXITED | WNOWAIT)` waits for the child to become reapable without + reaping it or priming Popen's cache (which would mask the regression); the + pre-fix cached read would still see None here. stdout EOF is NOT such a signal: + the kernel closes the pipes before the exit status is published, so an + EOF-then-assert version flakes. + """ + popen = subprocess.Popen( + [sys.executable, "-c", "pass"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + assert popen.stdin is not None and popen.stdout is not None + try: + process = FallbackProcess(popen) + + os.waitid(os.P_PID, popen.pid, os.WEXITED | os.WNOWAIT) + assert process.returncode == 0 + finally: + popen.stdin.close() + popen.stdout.close() + # The WNOWAIT above left the child unreaped; reap it so no zombie (and no + # Popen ResourceWarning) outlives the test. + popen.wait() + + +@pytest.mark.anyio +async def test_fallback_process_wait_is_cancellable_while_the_child_lives() -> None: + """`FallbackProcess.wait()` honours cancellation while the child is still running. + + Pre-fix it parked `Popen.wait()` in a worker thread anyio will not abandon, + which blocks every cancellation aimed at it. Runs everywhere: the wrapper holds + a plain Popen. + """ + popen = subprocess.Popen( + [sys.executable, "-c", "import sys; sys.stdin.read()"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + assert popen.stdin is not None and popen.stdout is not None + # Pre-fix, no timeout below can fire while the worker thread is parked in + # Popen.wait(); killing the child turns that regression's hang into a clean failure. + watchdog = threading.Timer(8.0, popen.kill) + watchdog.start() + try: + process = FallbackProcess(popen) + + # move_on_after's short deadline is the time-based feature under test -- + # cancellability -- not a wait for an async condition. + with anyio.fail_after(5): + with anyio.move_on_after(0.1) as scope: + await process.wait() + + assert scope.cancelled_caught + # Only the wait was cancelled; the child itself is untouched. + assert popen.poll() is None + finally: + watchdog.cancel() + popen.kill() + popen.wait() + popen.stdin.close() + popen.stdout.close() diff --git a/tests/transports/stdio/test_posix.py b/tests/transports/stdio/test_posix.py new file mode 100644 index 0000000000..521b8bd772 --- /dev/null +++ b/tests/transports/stdio/test_posix.py @@ -0,0 +1,116 @@ +"""POSIX-only stdio lifecycle tests: a gracefully-exited server's children survive the client shutdown. + +SDK-defined policy, not spec-mandated (docs/migration.md, "`stdio_client` no +longer kills children of a gracefully-exited server on POSIX"). Windows has the +opposite documented outcome; see tests/transports/stdio/test_windows.py. +""" + +import errno +import sys +from contextlib import suppress + +import anyio +import anyio.abc +import pytest + +from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.os.win32.utilities import FallbackProcess +from tests.transports.stdio._liveness import ( + accept_alive, + assert_peer_echoes, + connect_back_script, + open_liveness_listener, +) + +pytestmark = pytest.mark.skipif(sys.platform == "win32", reason="POSIX process-group semantics") + + +@pytest.mark.anyio +# lax no cover: the per-job 100% coverage gate also runs on Windows, where this file is skipped. +async def test_a_gracefully_exiting_servers_child_survives_the_client_shutdown( # pragma: lax no cover + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """A server that exits on stdin closure keeps its background child running after `stdio_client` returns. + + The client never escalates against the gracefully-exited server. SDK-defined + policy per docs/migration.md; regression for the pre-fix client that + tree-killed the child. The Windows twin in test_windows.py pins the opposite outcome. + """ + sock, port = await open_liveness_listener() + async with sock: + child = connect_back_script(port, echo=True) + # The server hands its inherited pipes to a child, then exits as soon as + # its stdin closes: the well-behaved graceful path. + server = f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {child!r}])\nsys.stdin.read()\n" + params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + # Two interpreter cold starts on a loaded runner; healthy runs take ~0.3s. + with anyio.fail_after(10.0): + async with stdio_client(params): + child_stream = await accept_alive(sock) + async with child_stream: + # Only a live process answers an echo: the child survived shutdown. + await assert_peer_echoes(child_stream) + + # A FIN-shaped probe cannot tell graceful exit from a kill; the seam can: + # no escalation was invoked, and the leader exited 0 on stdin closure. + assert terminate_calls == [] + leader = spawned_processes[0] + assert leader.returncode == 0 + # The child is deliberately left running; the spawned_processes teardown + # SIGKILLs the spawn-time process group to reap it. + + +@pytest.mark.anyio +@pytest.mark.usefixtures("spawned_processes") # failure-path safety net for the parked child +# lax no cover: same Windows-runner coverage-gate reason as above. +async def test_a_surviving_childs_write_to_the_inherited_stdout_fails_with_epipe() -> None: # pragma: lax no cover + """A surviving child writing to the stdout pipe it inherited from the server gets EPIPE once the client is gone. + + The pipe's only read end was the client's, and shutdown closed it + deterministically rather than at GC time. Pins the docs/migration.md claim + "a surviving child that keeps writing to an inherited stdout receives + EPIPE/SIGPIPE once the client is gone" (SDK-defined). + + Steps: the server hands its stdio pipes to a child and exits on stdin closure; + the child parks on its socket until `stdio_client` has fully exited (so the + write cannot race transport teardown), then writes one byte to its inherited + fd 1 and reports the errno (0 on success) back over the socket. + """ + sock, port = await open_liveness_listener() + async with sock: + # Pin SIGPIPE to SIG_IGN explicitly (CPython already starts that way) so + # the write fails with EPIPE instead of relying on interpreter startup details. + child = ( + f"import os, signal, socket\n" + f"signal.signal(signal.SIGPIPE, signal.SIG_IGN)\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"s.recv(4)\n" + f"try:\n" + f" os.write(1, b'x')\n" + f" result = b'0'\n" + f"except OSError as e:\n" + f" result = str(e.errno).encode()\n" + f"s.sendall(result)\n" + ) + server = f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {child!r}])\nsys.stdin.read()\n" + params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + # Two interpreter cold starts on a loaded runner; healthy runs take ~0.3s. + with anyio.fail_after(10.0): + async with stdio_client(params): + child_stream = await accept_alive(sock) + async with child_stream: + # The context has fully exited: the transport, and with it the + # pipe's only read end, is closed. Release the child's write. + await child_stream.send(b"go") + # The child sends its errno report and exits, so read to EOF: the + # complete reply is everything before the kernel's FIN. + reply = b"" + with suppress(anyio.EndOfStream): + while True: + reply += await child_stream.receive(16) + + assert int(reply) == errno.EPIPE, f"child reported errno {reply!r}, expected EPIPE" diff --git a/tests/transports/stdio/test_windows.py b/tests/transports/stdio/test_windows.py new file mode 100644 index 0000000000..0e7ad092c7 --- /dev/null +++ b/tests/transports/stdio/test_windows.py @@ -0,0 +1,235 @@ +"""Windows-only stdio lifecycle behaviors, against real subprocesses. + +Each test pins a contract that exists only on Windows: Job-Object reaping of a +gracefully-exited server's children (the deliberate divergence from the POSIX +policy in test_posix.py), the SelectorEventLoop fallback wrapper, and the CRLF +line endings a native text-mode server emits. Synchronization is kernel-level +only (liveness sockets); see `_liveness`. + +Per-test no-cover pragmas (as in tests/issues/test_552_windows_hang.py): bodies run +only on windows-latest CI legs, the per-job 100% gate would count them uncovered on +non-Windows runners, and strict-no-cover is skipped on Windows where they execute. +""" + +import asyncio +import sys +from contextlib import AsyncExitStack +from pathlib import Path + +import anyio +import anyio.abc +import pytest + +from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.os.win32.utilities import FallbackProcess +from mcp.shared.message import SessionMessage +from mcp.types import JSONRPCRequest, JSONRPCResponse +from tests.transports.stdio._liveness import ( + accept_alive, + assert_stream_closed, + connect_back_script, + open_liveness_listener, +) + +pytestmark = [ + pytest.mark.anyio, + pytest.mark.skipif(sys.platform != "win32", reason="Windows Job Object / event-loop semantics"), +] + + +async def test_a_gracefully_exited_servers_child_is_reaped_when_the_job_handle_closes( # pragma: no cover + tmp_path: Path, + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """A gracefully-exited server's child is killed deterministically when shutdown closes the job handle. + + The server exits cleanly on stdin closure, leaving a child behind; shutdown's + close of the server's Job Object handle (`close_process_job` + + `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE`) kills that child deterministically, not at + GC time. Documented divergence from POSIX (docs/migration.md; the POSIX twin is + test_posix.py::test_a_gracefully_exiting_servers_child_survives_the_client_shutdown). + + `terminate_calls == []` is the load-bearing distinction: the child died through + the graceful path's job-handle close, not the escalation's `TerminateJobObject`; + the two kills are indistinguishable on the socket. + + Both processes connect back and their stderr is captured via `errlog`, so a + timeout failure can report which process never showed and the child's fate + (xdist swallows subprocess stderr on CI). + """ + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + # The startup marker (and any child traceback, via stderr=sys.stderr below) + # lands in errlog, splitting "never started" from "started but never connected". + child = "import sys\nprint('child-started', file=sys.stderr, flush=True)\n" + connect_back_script(port) + # The server spawns a child, connects back itself, then exits as soon as + # its stdin closes: the graceful path, so the escalation never runs. + # The child inherits Job membership: the SDK assigns the server to the Job + # synchronously after spawn, long before the cold-starting interpreter can + # Popen the child (membership is inherited at CreateProcess, never + # acquired retroactively). + # + # The child's stdin must be DEVNULL: CPython startup queries fd 0, and + # Windows serializes that query behind the server's pending blocking + # `sys.stdin.read()` on the inherited pipe, so the child would freeze at + # interpreter startup until the next inbound byte or EOF. + # + # After stdin EOF ends the server, it reports the child's `poll()` status: + # `None` means alive at server exit; an exit/NTSTATUS code names the killer. + server = ( + f"import socket, subprocess, sys\n" + f"try:\n" + f" p = subprocess.Popen([sys.executable, '-c', {child!r}], " + f"stdin=subprocess.DEVNULL, stderr=sys.stderr)\n" + f"except BaseException as exc:\n" + f" print(exc, file=sys.stderr, flush=True)\n" + f" raise\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"sys.stdin.read()\n" + f"print('child-rc:%s' % p.poll(), file=sys.stderr, flush=True)\n" + ) + server_params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + with (tmp_path / "errlog.txt").open("w+", encoding="utf-8") as errlog: + + def server_stderr() -> str: + errlog.seek(0) + return errlog.read() + + streams: list[anyio.abc.SocketStream] = [] + spawn_started = anyio.current_time() + entered_at: float | None = None + try: + # Two interpreter cold starts on a loaded runner; healthy runs + # take well under a second. + with anyio.fail_after(15.0): + async with stdio_client(server_params, errlog=errlog): + entered_at = anyio.current_time() + # The server and child race to connect; accept both, + # order-agnostic (accept_alive verifies each banner). + for _ in range(2): + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + streams.append(stream) + except TimeoutError: + # `stdio_client.__aexit__` has already completed its shielded shutdown, + # so the stderr read carries the server's final `child-rc` line, not a + # mid-flight snapshot. + missing_leg = "the server never ran its connect line" if not streams else "the child never connected" + spawn_split = ( + "the context never entered" + if entered_at is None + else f"the context entered {entered_at - spawn_started:.1f}s after spawn began" + ) + pytest.fail( + f"{len(streams)}/2 liveness connections arrived ({missing_leg}); " + f"{spawn_split}; server stderr: {server_stderr()!r}" + ) + + # Context exit closed the job handle: KILL_ON_JOB_CLOSE killed the + # child and the server exited gracefully, so both sockets close. + # The `spawned_processes` strong reference is load-bearing: `_process_jobs` + # is weak-keyed, so without it a GC between context exit and this assert + # could close the job handle itself and mask a regression in the + # deterministic close. + try: + for stream in streams: + await assert_stream_closed(stream) + except TimeoutError: + pytest.fail(f"a socket stayed open after shutdown; server stderr: {server_stderr()!r}") + + leader = spawned_processes[0] + # The graceful path: the server exited on stdin closure with code 0, + # and the tree-termination escalation was never invoked. + assert leader.returncode == 0, server_stderr() + assert terminate_calls == [], server_stderr() + + +# Overrides the suite-wide anyio_backend fixture for this test only: a selector +# event loop cannot run asyncio subprocesses, forcing stdio_client onto FallbackProcess. +@pytest.mark.parametrize("anyio_backend", [("asyncio", {"loop_factory": asyncio.SelectorEventLoop})]) +async def test_a_selector_event_loop_session_uses_the_fallback_process_and_exits_cleanly( # pragma: no cover + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """Under a `SelectorEventLoop`, `stdio_client` falls back to `FallbackProcess` and still exits cleanly. + + A selector event loop has no asyncio subprocess support, so `stdio_client` + falls back to the Popen-based `FallbackProcess` wrapper; a well-behaved server + still completes the full clean lifecycle: spawn, liveness, exit on stdin + closure, reaped, never escalated against. + + The `isinstance` check is the engagement proof: if a future anyio gains selector + subprocess support, the spawn would silently return a normal Process. A hang here + most likely means the known fallback hazard documented in `stdio_client`'s + shutdown comment (reader thread parked in a synchronous `ReadFile`), which is + why this test pins only the clean-exit path, never a kill path. + """ + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + # Connect back for liveness, then exit as soon as stdin closes: the + # well-behaved server, so shutdown's first step suffices. + server = ( + f"import socket, sys\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"sys.stdin.read()\n" + ) + server_params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + # One interpreter cold start on a loaded runner; healthy runs take ~0.3s. + with anyio.fail_after(10.0): + async with stdio_client(server_params): + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + # The engagement proof, asserted while the session is live. + assert isinstance(spawned_processes[0], FallbackProcess) + + # The server exited on stdin closure: socket closed, exit code 0, and the + # escalation never fired. + await assert_stream_closed(stream) + assert spawned_processes[0].returncode == 0 + assert terminate_calls == [] + + +async def test_a_native_server_emitting_crlf_line_endings_round_trips_messages() -> None: # pragma: no cover + """The client round-trips messages from a text-mode Windows server that frames its output with \\r\\n. + + `TextIOWrapper`'s `newline=None` translates "\\n" to `os.linesep`, so such a + server emits \\r\\n; the client still parses each line because the reader + splits on "\\n" only and the JSON parser tolerates the trailing "\\r" as + whitespace. The SDK's own server writes through such a wrapper, so this + tolerance is load-bearing for Windows interop. + + tests/issues/test_552_windows_hang.py exercises the same wire form implicitly + through `initialize()`; this test is the explicit owner of the framing claim. + """ + # Read one request, answer it via print() (which emits \r\n on Windows), then + # exit when stdin closes. json.loads/dumps keep the script free of SDK imports. + server = ( + "import json, sys\n" + "line = sys.stdin.readline()\n" + "request = json.loads(line)\n" + "print(json.dumps({'jsonrpc': '2.0', 'id': request['id'], 'result': {}}))\n" + "sys.stdout.flush()\n" + "sys.stdin.read()\n" + ) + server_params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + + # One interpreter cold start on a loaded runner; healthy runs take ~0.3s. + with anyio.fail_after(10.0): + async with stdio_client(server_params) as (read_stream, write_stream): + await write_stream.send(SessionMessage(ping)) + received = await read_stream.receive() + # A reader that choked on the trailing \r would deliver a ValueError + # here instead of a parsed message. + assert isinstance(received, SessionMessage) + assert received.message == JSONRPCResponse(jsonrpc="2.0", id=1, result={})