forked from modelcontextprotocol/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconftest.py
More file actions
137 lines (107 loc) · 4.68 KB
/
conftest.py
File metadata and controls
137 lines (107 loc) · 4.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from contextlib import asynccontextmanager
from unittest.mock import patch
import pytest
import mcp.shared.memory
from mcp.shared.message import SessionMessage
from mcp.types import (
JSONRPCNotification,
JSONRPCRequest,
)
class SpyMemoryObjectSendStream:
def __init__(self, original_stream):
self.original_stream = original_stream
self.sent_messages: list[SessionMessage] = []
async def send(self, message):
self.sent_messages.append(message)
await self.original_stream.send(message)
async def aclose(self):
await self.original_stream.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.aclose()
class StreamSpyCollection:
def __init__(
self,
client_spy: SpyMemoryObjectSendStream,
server_spy: SpyMemoryObjectSendStream,
):
self.client = client_spy
self.server = server_spy
def clear(self) -> None:
"""Clear all captured messages."""
self.client.sent_messages.clear()
self.server.sent_messages.clear()
def get_client_requests(self, method: str | None = None) -> list[JSONRPCRequest]:
"""Get client-sent requests, optionally filtered by method."""
return [
req.message.root
for req in self.client.sent_messages
if isinstance(req.message.root, JSONRPCRequest) and (method is None or req.message.root.method == method)
]
def get_server_requests(self, method: str | None = None) -> list[JSONRPCRequest]:
"""Get server-sent requests, optionally filtered by method."""
return [
req.message.root
for req in self.server.sent_messages
if isinstance(req.message.root, JSONRPCRequest) and (method is None or req.message.root.method == method)
]
def get_client_notifications(self, method: str | None = None) -> list[JSONRPCNotification]:
"""Get client-sent notifications, optionally filtered by method."""
return [
notif.message.root
for notif in self.client.sent_messages
if isinstance(notif.message.root, JSONRPCNotification)
and (method is None or notif.message.root.method == method)
]
def get_server_notifications(self, method: str | None = None) -> list[JSONRPCNotification]:
"""Get server-sent notifications, optionally filtered by method."""
return [
notif.message.root
for notif in self.server.sent_messages
if isinstance(notif.message.root, JSONRPCNotification)
and (method is None or notif.message.root.method == method)
]
@pytest.fixture
def stream_spy():
"""Fixture that provides spies for both client and server write streams.
Example usage:
async def test_something(stream_spy):
# ... set up server and client ...
spies = stream_spy()
# Run some operation that sends messages
await client.some_operation()
# Check the messages
requests = spies.get_client_requests(method="some/method")
assert len(requests) == 1
# Clear for the next operation
spies.clear()
"""
client_spy = None
server_spy = None
# Store references to our spy objects
def capture_spies(c_spy, s_spy):
nonlocal client_spy, server_spy
client_spy = c_spy
server_spy = s_spy
# Create patched version of stream creation
original_create_streams = mcp.shared.memory.create_client_server_memory_streams
@asynccontextmanager
async def patched_create_streams():
async with original_create_streams() as (client_streams, server_streams):
client_read, client_write = client_streams
server_read, server_write = server_streams
# Create spy wrappers
spy_client_write = SpyMemoryObjectSendStream(client_write)
spy_server_write = SpyMemoryObjectSendStream(server_write)
# Capture references for the test to use
capture_spies(spy_client_write, spy_server_write)
yield (client_read, spy_client_write), (server_read, spy_server_write)
# Apply the patch for the duration of the test
with patch("mcp.shared.memory.create_client_server_memory_streams", patched_create_streams):
# Return a collection with helper methods
def get_spy_collection() -> StreamSpyCollection:
assert client_spy is not None, "client_spy was not initialized"
assert server_spy is not None, "server_spy was not initialized"
return StreamSpyCollection(client_spy, server_spy)
yield get_spy_collection