From 43964bb9200e9476fed0d35551d8bf7c17d37f00 Mon Sep 17 00:00:00 2001 From: kaXianc2-gom Date: Mon, 22 Jun 2026 11:09:22 +0800 Subject: [PATCH] fix: reject duplicate JSON-RPC request IDs with 409 Conflict MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a client reuses a JSON-RPC request id while the original request is still in flight, StreamableHTTPServerTransport now checks for a collision in _request_streams before registering a new stream slot. Previously the assignment was unconditional, which silently overwrote the prior (send, receive) pair — the original request's caller would hang until timeout and the response could be routed to the wrong caller. The fix mirrors the existing GET_STREAM_KEY collision guard at line 695. Also adds an optional request_id parameter to _create_error_response so the JSON-RPC error envelope carries the offending id instead of always using null. Fixes #2655 Co-Authored-By: Claude --- src/mcp/server/streamable_http.py | 19 +++++++- tests/shared/test_streamable_http.py | 70 ++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 2 deletions(-) diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index c1c8a0f61..63eff3750 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -288,6 +288,8 @@ def _create_error_response( status_code: HTTPStatus, error_code: int = INVALID_REQUEST, headers: dict[str, str] | None = None, + *, + request_id: RequestId | None = None, ) -> Response: """Create an error response with a simple string message.""" response_headers = {"Content-Type": CONTENT_TYPE_JSON} @@ -300,7 +302,7 @@ def _create_error_response( # Return a properly formatted JSON error response error_response = JSONRPCError( jsonrpc="2.0", - id=None, + id=request_id, error=ErrorData(code=error_code, message=error_message), ) @@ -436,7 +438,7 @@ async def _validate_accept_header(self, request: Request, scope: Scope, send: Se return False return True - async def _handle_post_request(self, scope: Scope, request: Request, receive: Receive, send: Send) -> None: + async def _handle_post_request(self, scope: Scope, request: Request, receive: Receive, send: Send) -> None: # noqa: C901 """Handle POST requests containing JSON-RPC messages.""" writer = self._read_stream_writer if writer is None: # pragma: no cover @@ -523,6 +525,19 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re # Extract the request ID outside the try block for proper scope request_id = str(message.id) + + # Reject duplicate request IDs — a client that reuses an id while + # the original request is still in flight violates the JSON-RPC + # spec and would silently overwrite the prior stream slot. + if request_id in self._request_streams: + response = self._create_error_response( + f"Conflict: Request ID {request_id!r} is already in flight on this session", + HTTPStatus.CONFLICT, + request_id=message.id, + ) + await response(scope, receive, send) + return + # Register this stream for the request ID self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](0) request_stream_reader = self._request_streams[request_id][1] diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 7ceac8e86..0a1efa8c7 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -2384,3 +2384,73 @@ async def asgi_receive() -> Message: assert body_chunks[-1] == {"type": "http.response.body", "body": b"", "more_body": False} assert "Error in standalone SSE writer" not in caplog.text assert "Error in standalone SSE response" not in caplog.text + + +@pytest.mark.anyio +async def test_duplicate_request_id_rejected_with_409() -> None: + """A POST with a request id already in _request_streams is rejected 409. + + When a client reuses a JSON-RPC request id while the original request + is still in flight, the server must surface the violation instead of + silently overwriting the prior stream slot (gh-2655). + """ + transport = StreamableHTTPServerTransport( + mcp_session_id="test-session", + security_settings=TransportSecuritySettings(enable_dns_rebinding_protection=False), + ) + # Satisfy the read-stream guard so the POST handler proceeds. + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) + transport._read_stream_writer = read_stream_writer # pyright: ignore[reportPrivateUsage] + + # Seed an in-flight request stream so the duplicate check triggers. + duplicate_id = "dup-1" + seeded_send, seeded_receive = anyio.create_memory_object_stream[EventMessage](0) + transport._request_streams[duplicate_id] = (seeded_send, seeded_receive) # pyright: ignore[reportPrivateUsage] + + sent: list[Message] = [] + + async def asgi_send(message: Message) -> None: + sent.append(message) + + async def asgi_receive() -> Message: + return { + "type": "http.request", + "body": json.dumps( + { + "jsonrpc": "2.0", + "method": "tools/call", + "params": {"name": "test", "arguments": {}}, + "id": duplicate_id, + } + ).encode(), + "more_body": False, + } + + scope: Scope = { + "type": "http", + "method": "POST", + "path": "/mcp", + "query_string": b"", + "headers": [ + (b"accept", b"application/json, text/event-stream"), + (b"content-type", b"application/json"), + (b"mcp-session-id", b"test-session"), + ], + } + + async with read_stream_writer, read_stream, seeded_send, seeded_receive: + with anyio.fail_after(5): + await transport.handle_request(scope, asgi_receive, asgi_send) + + status = next(m["status"] for m in sent if m["type"] == "http.response.start") + assert status == 409 + + body = b"".join(m["body"] for m in sent if m["type"] == "http.response.body") + error_response = json.loads(body) + assert error_response["jsonrpc"] == "2.0" + assert error_response["id"] == duplicate_id + assert error_response["error"]["code"] == -32600 + assert duplicate_id in error_response["error"]["message"] + + # The original in-flight stream must not have been replaced. + assert duplicate_id in transport._request_streams # pyright: ignore[reportPrivateUsage]