Skip to content

Commit 6a73322

Browse files
committed
feat: add cluster engine for semantic message grouping
Introduce ClusterManager, ClusterSplitter, and ClusterStore abstraction with SQLAlchemy and in-memory implementations. Clusters group incoming messages by semantic similarity before ingestion.
1 parent 1e58170 commit 6a73322

10 files changed

Lines changed: 2170 additions & 0 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Test helpers for cluster state storage."""
Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
from datetime import UTC, datetime, timedelta
2+
3+
import pytest
4+
import pytest_asyncio
5+
6+
from memmachine_server.semantic_memory.cluster_manager import (
7+
ClusterInfo,
8+
ClusterSplitRecord,
9+
ClusterState,
10+
)
11+
from memmachine_server.semantic_memory.cluster_store.cluster_store import (
12+
ClusterStateStorage,
13+
)
14+
from memmachine_server.semantic_memory.cluster_store.cluster_store_sqlalchemy import (
15+
BaseClusterStore,
16+
ClusterStateStorageSqlAlchemy,
17+
)
18+
from memmachine_server.semantic_memory.cluster_store.in_memory_cluster_store import (
19+
InMemoryClusterStateStorage,
20+
)
21+
22+
23+
@pytest_asyncio.fixture
24+
async def sqlite_cluster_state_storage(sqlalchemy_sqlite_engine):
25+
async with sqlalchemy_sqlite_engine.begin() as conn:
26+
await conn.run_sync(BaseClusterStore.metadata.drop_all)
27+
await conn.run_sync(BaseClusterStore.metadata.create_all)
28+
29+
storage = ClusterStateStorageSqlAlchemy(sqlalchemy_sqlite_engine)
30+
await storage.startup()
31+
yield storage
32+
33+
async with sqlalchemy_sqlite_engine.begin() as conn:
34+
await conn.run_sync(BaseClusterStore.metadata.drop_all)
35+
36+
37+
@pytest_asyncio.fixture
38+
async def pg_cluster_state_storage(sqlalchemy_pg_engine):
39+
async with sqlalchemy_pg_engine.begin() as conn:
40+
await conn.run_sync(BaseClusterStore.metadata.drop_all)
41+
await conn.run_sync(BaseClusterStore.metadata.create_all)
42+
43+
storage = ClusterStateStorageSqlAlchemy(sqlalchemy_pg_engine)
44+
await storage.startup()
45+
yield storage
46+
47+
async with sqlalchemy_pg_engine.begin() as conn:
48+
await conn.run_sync(BaseClusterStore.metadata.drop_all)
49+
50+
51+
@pytest_asyncio.fixture
52+
async def in_memory_cluster_state_storage():
53+
storage = InMemoryClusterStateStorage()
54+
await storage.startup()
55+
yield storage
56+
await storage.delete_all()
57+
58+
59+
@pytest.fixture(
60+
params=[
61+
"sqlite_cluster_state_storage",
62+
pytest.param("pg_cluster_state_storage", marks=pytest.mark.integration),
63+
"in_memory_cluster_state_storage",
64+
]
65+
)
66+
def cluster_state_storage(request):
67+
return request.getfixturevalue(request.param)
68+
69+
70+
def _sample_state(now: datetime) -> ClusterState:
71+
return ClusterState(
72+
clusters={
73+
"cluster_0": ClusterInfo(
74+
centroid=[1.0, 0.0],
75+
count=2,
76+
last_ts=now - timedelta(minutes=1),
77+
),
78+
"cluster_1": ClusterInfo(
79+
centroid=[0.0, 1.0],
80+
count=1,
81+
last_ts=now,
82+
),
83+
},
84+
event_to_cluster={
85+
"event-a": "cluster_0",
86+
"event-b": "cluster_1",
87+
},
88+
pending_events={
89+
"cluster_0": {"event-a": now - timedelta(minutes=2)},
90+
"cluster_1": {"event-b": now - timedelta(minutes=1)},
91+
},
92+
next_cluster_id=2,
93+
)
94+
95+
96+
@pytest.mark.asyncio
97+
async def test_get_state_returns_none_when_missing(
98+
cluster_state_storage: ClusterStateStorage,
99+
) -> None:
100+
loaded = await cluster_state_storage.get_state(set_id="missing")
101+
assert loaded is None
102+
103+
104+
@pytest.mark.asyncio
105+
async def test_round_trip_state(
106+
cluster_state_storage: ClusterStateStorage,
107+
) -> None:
108+
now = datetime.now(tz=UTC)
109+
state = _sample_state(now)
110+
await cluster_state_storage.save_state(set_id="set-a", state=state)
111+
112+
loaded = await cluster_state_storage.get_state(set_id="set-a")
113+
114+
assert loaded is not None
115+
assert loaded == state
116+
117+
118+
@pytest.mark.asyncio
119+
async def test_delete_state(cluster_state_storage: ClusterStateStorage) -> None:
120+
now = datetime.now(tz=UTC)
121+
await cluster_state_storage.save_state(set_id="set-b", state=_sample_state(now))
122+
123+
await cluster_state_storage.delete_state(set_id="set-b")
124+
125+
loaded = await cluster_state_storage.get_state(set_id="set-b")
126+
assert loaded is None
127+
128+
129+
@pytest.mark.asyncio
130+
async def test_delete_all(cluster_state_storage: ClusterStateStorage) -> None:
131+
now = datetime.now(tz=UTC)
132+
await cluster_state_storage.save_state(set_id="set-c", state=_sample_state(now))
133+
await cluster_state_storage.save_state(set_id="set-d", state=_sample_state(now))
134+
135+
await cluster_state_storage.delete_all()
136+
137+
assert await cluster_state_storage.get_state(set_id="set-c") is None
138+
assert await cluster_state_storage.get_state(set_id="set-d") is None
139+
140+
141+
@pytest.mark.asyncio
142+
async def test_save_overwrites_state(
143+
cluster_state_storage: ClusterStateStorage,
144+
) -> None:
145+
now = datetime.now(tz=UTC)
146+
await cluster_state_storage.save_state(set_id="set-e", state=_sample_state(now))
147+
148+
new_state = ClusterState(
149+
clusters={
150+
"cluster_2": ClusterInfo(
151+
centroid=[0.5, 0.5],
152+
count=1,
153+
last_ts=now + timedelta(minutes=5),
154+
)
155+
},
156+
event_to_cluster={"event-c": "cluster_2"},
157+
pending_events={"cluster_2": {"event-c": now + timedelta(minutes=4)}},
158+
next_cluster_id=3,
159+
)
160+
161+
await cluster_state_storage.save_state(set_id="set-e", state=new_state)
162+
163+
loaded = await cluster_state_storage.get_state(set_id="set-e")
164+
assert loaded == new_state
165+
166+
167+
@pytest.mark.asyncio
168+
async def test_save_reload_and_update_state(
169+
cluster_state_storage: ClusterStateStorage,
170+
) -> None:
171+
now = datetime.now(tz=UTC)
172+
state = _sample_state(now)
173+
await cluster_state_storage.save_state(set_id="set-f", state=state)
174+
175+
loaded = await cluster_state_storage.get_state(set_id="set-f")
176+
assert loaded is not None
177+
178+
loaded.clusters["cluster_2"] = ClusterInfo(
179+
centroid=[0.25, 0.75],
180+
count=1,
181+
last_ts=now + timedelta(minutes=10),
182+
)
183+
loaded.event_to_cluster["event-c"] = "cluster_2"
184+
loaded.pending_events.setdefault("cluster_2", {})["event-c"] = now + timedelta(
185+
minutes=9
186+
)
187+
loaded.next_cluster_id = 3
188+
189+
await cluster_state_storage.save_state(set_id="set-f", state=loaded)
190+
191+
reloaded = await cluster_state_storage.get_state(set_id="set-f")
192+
assert reloaded == loaded
193+
194+
195+
@pytest.mark.asyncio
196+
async def test_round_trip_state_with_split_records(
197+
cluster_state_storage: ClusterStateStorage,
198+
) -> None:
199+
now = datetime.now(tz=UTC)
200+
state = ClusterState(
201+
clusters={
202+
"cluster_0": ClusterInfo(
203+
centroid=[1.0, 0.0],
204+
count=2,
205+
last_ts=now - timedelta(minutes=1),
206+
),
207+
"cluster_1": ClusterInfo(
208+
centroid=[0.0, 1.0],
209+
count=1,
210+
last_ts=now,
211+
),
212+
},
213+
event_to_cluster={
214+
"event-a": "cluster_0",
215+
"event-b": "cluster_1",
216+
},
217+
pending_events={
218+
"cluster_0": {"event-a": now - timedelta(minutes=2)},
219+
"cluster_1": {"event-b": now - timedelta(minutes=1)},
220+
},
221+
next_cluster_id=2,
222+
split_records={
223+
"cluster_0": ClusterSplitRecord(
224+
original_cluster_id="cluster_0",
225+
segment_ids=["cluster_0", "abcdef1234567890"],
226+
input_hash="sha256hex",
227+
)
228+
},
229+
)
230+
231+
await cluster_state_storage.save_state(set_id="set-split-a", state=state)
232+
233+
loaded = await cluster_state_storage.get_state(set_id="set-split-a")
234+
assert loaded == state
235+
236+
237+
@pytest.mark.asyncio
238+
async def test_overwrite_state_replaces_split_records(
239+
cluster_state_storage: ClusterStateStorage,
240+
) -> None:
241+
now = datetime.now(tz=UTC)
242+
initial_state = ClusterState(
243+
clusters={
244+
"cluster_0": ClusterInfo(
245+
centroid=[1.0, 0.0],
246+
count=1,
247+
last_ts=now,
248+
)
249+
},
250+
event_to_cluster={"event-a": "cluster_0"},
251+
pending_events={"cluster_0": {"event-a": now}},
252+
next_cluster_id=1,
253+
split_records={
254+
"cluster_0": ClusterSplitRecord(
255+
original_cluster_id="cluster_0",
256+
segment_ids=["cluster_0", "old-seg"],
257+
input_hash="hash-a",
258+
)
259+
},
260+
)
261+
await cluster_state_storage.save_state(
262+
set_id="set-split-b",
263+
state=initial_state,
264+
)
265+
266+
new_state = ClusterState(
267+
clusters={
268+
"cluster_1": ClusterInfo(
269+
centroid=[0.25, 0.75],
270+
count=1,
271+
last_ts=now + timedelta(minutes=5),
272+
)
273+
},
274+
event_to_cluster={"event-b": "cluster_1"},
275+
pending_events={"cluster_1": {"event-b": now + timedelta(minutes=4)}},
276+
next_cluster_id=2,
277+
split_records={
278+
"cluster_1": ClusterSplitRecord(
279+
original_cluster_id="cluster_1",
280+
segment_ids=["cluster_1", "new-seg"],
281+
input_hash="hash-b",
282+
)
283+
},
284+
)
285+
await cluster_state_storage.save_state(set_id="set-split-b", state=new_state)
286+
287+
loaded = await cluster_state_storage.get_state(set_id="set-split-b")
288+
assert loaded == new_state
289+
290+
291+
@pytest.mark.asyncio
292+
async def test_delete_state_removes_split_records(
293+
cluster_state_storage: ClusterStateStorage,
294+
) -> None:
295+
now = datetime.now(tz=UTC)
296+
state = ClusterState(
297+
clusters={
298+
"cluster_0": ClusterInfo(
299+
centroid=[1.0, 0.0],
300+
count=1,
301+
last_ts=now,
302+
)
303+
},
304+
event_to_cluster={"event-a": "cluster_0"},
305+
pending_events={"cluster_0": {"event-a": now}},
306+
next_cluster_id=1,
307+
split_records={
308+
"cluster_0": ClusterSplitRecord(
309+
original_cluster_id="cluster_0",
310+
segment_ids=["cluster_0", "seg-1"],
311+
input_hash="hash-a",
312+
)
313+
},
314+
)
315+
await cluster_state_storage.save_state(set_id="set-split-c", state=state)
316+
317+
await cluster_state_storage.delete_state(set_id="set-split-c")
318+
319+
loaded = await cluster_state_storage.get_state(set_id="set-split-c")
320+
assert loaded is None
321+
322+
323+
@pytest.mark.asyncio
324+
async def test_round_trip_empty_split_records(
325+
cluster_state_storage: ClusterStateStorage,
326+
) -> None:
327+
now = datetime.now(tz=UTC)
328+
state = ClusterState(
329+
clusters={
330+
"cluster_0": ClusterInfo(
331+
centroid=[1.0, 0.0],
332+
count=1,
333+
last_ts=now,
334+
)
335+
},
336+
event_to_cluster={"event-a": "cluster_0"},
337+
pending_events={"cluster_0": {"event-a": now}},
338+
next_cluster_id=1,
339+
split_records={},
340+
)
341+
342+
await cluster_state_storage.save_state(set_id="set-split-d", state=state)
343+
344+
loaded = await cluster_state_storage.get_state(set_id="set-split-d")
345+
assert loaded is not None
346+
assert loaded.split_records == {}

0 commit comments

Comments
 (0)