From 1ed577f4a3bd34c1dab8166a796eb49c730bc915 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Sat, 23 May 2026 12:16:18 +0200 Subject: [PATCH 1/3] fix(streamable-http): avoid startup race after initialize (#1675) --- src/mcp/client/streamable_http.py | 7 +++-- tests/shared/test_streamable_http.py | 39 ++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9a119c6338..0f47ab6a2c 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -10,7 +10,7 @@ import anyio import httpx -from anyio.abc import TaskGroup +from anyio.abc import TaskGroup, TaskStatus from httpx_sse import EventSource, ServerSentEvent, aconnect_sse from pydantic import ValidationError @@ -437,10 +437,13 @@ async def post_writer( write_stream: ContextSendStream[SessionMessage], start_get_stream: Callable[[], None], tg: TaskGroup, + *, + task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED, ) -> None: """Handle writing requests to the server.""" try: async with write_stream_reader, read_stream_writer, write_stream: + task_status.started() async def _handle_message(session_message: SessionMessage) -> None: message = session_message.message @@ -570,7 +573,7 @@ async def streamable_http_client( def start_get_stream() -> None: tg.start_soon(transport.handle_get_stream, client, read_stream_writer) - tg.start_soon( + await tg.start( transport.post_writer, client, write_stream_reader, diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3d5770fb61..e19523f9b2 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -1071,6 +1071,45 @@ async def test_streamable_http_client_basic_connection(basic_server: None, basic assert result.server_info.name == SERVER_NAME +@pytest.mark.anyio +async def test_streamable_http_client_no_race_on_consecutive_requests(basic_server: None, basic_server_url: str): + """Regression test for a start-up race immediately after initialize(). + + In some cases, the first request after initialize() (e.g. list_tools()) + could behave inconsistently. This test runs multiple short-lived sessions + to reliably catch any start-up race. + """ + for iteration in range(10): # pragma: no branch + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + tools = await session.list_tools() + assert len(tools.tools) == 10, f"Iteration {iteration}: expected 10 tools, got {len(tools.tools)}" + assert tools.tools[0].name == "test_tool" + + tools2 = await session.list_tools() + assert len(tools2.tools) == 10 + + resource = await session.read_resource(uri="foobar://test-iteration") + assert len(resource.contents) == 1 + + +@pytest.mark.anyio +async def test_streamable_http_client_rapid_request_sequence(basic_server: None, basic_server_url: str): + """Stress test for rapid sequences of requests.""" + async with streamable_http_client(f"{basic_server_url}/mcp") as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + for i in range(20): + tools = await session.list_tools() + assert len(tools.tools) == 10, f"Request {i}: expected 10 tools, got {len(tools.tools)}" + + resource = await session.read_resource(uri="foobar://final-test") + assert len(resource.contents) == 1 + + @pytest.mark.anyio async def test_streamable_http_client_resource_read(initialized_client_session: ClientSession): """Test client resource read functionality.""" From 894e46b8b9b3c964ec83a24bd2ebc800355f65da Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Sat, 23 May 2026 12:30:50 +0200 Subject: [PATCH 2/3] fix(stdio): close stdout on teardown --- src/mcp/client/stdio.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/mcp/client/stdio.py b/src/mcp/client/stdio.py index 902dc8576c..110d0d353c 100644 --- a/src/mcp/client/stdio.py +++ b/src/mcp/client/stdio.py @@ -205,6 +205,12 @@ async def stdin_writer(): except ProcessLookupError: # pragma: no cover # Process already exited, which is fine pass + + if process.stdout: # pragma: no branch + try: + await process.stdout.aclose() + except Exception: # pragma: no cover + pass await read_stream.aclose() await write_stream.aclose() await read_stream_writer.aclose() From 29ae0feccb7845e4ce488abbc0f1e1fc84e79914 Mon Sep 17 00:00:00 2001 From: Jianke LIN Date: Sat, 23 May 2026 12:38:50 +0200 Subject: [PATCH 3/3] fix(streamable-http): signal post_writer readiness --- src/mcp/client/streamable_http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 0f47ab6a2c..0f5f27942a 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -443,7 +443,7 @@ async def post_writer( """Handle writing requests to the server.""" try: async with write_stream_reader, read_stream_writer, write_stream: - task_status.started() + task_status.started(None) async def _handle_message(session_message: SessionMessage) -> None: message = session_message.message