Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: wire clustering into ingestion pipeline and API
  • Loading branch information
o-love committed Apr 8, 2026
commit 3846d212ee774aaf9de7ecd723b33c06888dc75e
3 changes: 3 additions & 0 deletions .github/workflows/installation-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ jobs:
shell: bash
run: |
set -eo pipefail
python -m pip install *common*.whl
whl_name=$(ls *server*.whl)
python -m pip install --find-links . "$whl_name"

Expand Down Expand Up @@ -105,6 +106,7 @@ jobs:
run: |
set -eo pipefail
export PYTHONUTF8=1
python -m pip install *common*.whl
whl_name=$(ls *server*.whl)
python -m pip install --find-links . "$whl_name"

Expand All @@ -124,6 +126,7 @@ jobs:
shell: bash
run: |
set -eo pipefail
python -m pip install *common*.whl
whl_name=$(ls *server*.whl)
python -m pip install --find-links . "$whl_name"

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,4 @@ site/

# Ignore documentation generated by extensions
.spelling
.worktrees/
10 changes: 10 additions & 0 deletions packages/client/client_tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@ def test_get_semantic_memory_config(self, config, mock_client):
"database": "postgres-db",
"llm_model": "gpt-4",
"embedding_model": "openai-embedder",
"cluster_similarity_threshold": 0.4,
"cluster_max_time_gap_seconds": 3600,
}
)
result = config.get_semantic_memory_config()
Expand All @@ -420,6 +422,8 @@ def test_get_semantic_memory_config(self, config, mock_client):
assert result.database == "postgres-db"
assert result.llm_model == "gpt-4"
assert result.embedding_model == "openai-embedder"
assert result.cluster_similarity_threshold == 0.4
assert result.cluster_max_time_gap_seconds == 3600
mock_client.request.assert_called_once_with(
"GET",
"http://localhost:8080/api/v2/config/memory/semantic",
Expand All @@ -442,6 +446,9 @@ def test_update_semantic_memory_config(self, config, mock_client):
embedding_model="openai-embedder",
ingestion_trigger_messages=10,
ingestion_trigger_age_seconds=3600,
cluster_idle_ttl_seconds=1800,
cluster_similarity_threshold=0.45,
cluster_max_time_gap_seconds=7200,
)
assert isinstance(result, UpdateMemoryConfigResponse)
assert result.success is True
Expand All @@ -457,6 +464,9 @@ def test_update_semantic_memory_config(self, config, mock_client):
assert body["embedding_model"] == "openai-embedder"
assert body["ingestion_trigger_messages"] == 10
assert body["ingestion_trigger_age_seconds"] == 3600
assert body["cluster_idle_ttl_seconds"] == 1800
assert body["cluster_similarity_threshold"] == 0.45
assert body["cluster_max_time_gap_seconds"] == 7200

def test_update_semantic_memory_config_partial(self, config, mock_client):
mock_client.request.return_value = _mock_response(
Expand Down
56 changes: 49 additions & 7 deletions packages/client/src/memmachine_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
logger = logging.getLogger(__name__)


def _set_if_not_none(target: dict[str, Any], key: str, value: object) -> None:
if value is not None:
target[key] = value


class Config:
"""
Configuration interface for managing MemMachine server settings.
Expand Down Expand Up @@ -438,8 +443,12 @@ def update_semantic_memory_config(
database: str | None = None,
llm_model: str | None = None,
embedding_model: str | None = None,
cluster_split_reranker: str | None = None,
ingestion_trigger_messages: int | None = None,
ingestion_trigger_age_seconds: int | None = None,
cluster_idle_ttl_seconds: int | None = None,
cluster_similarity_threshold: float | None = None,
cluster_max_time_gap_seconds: int | None = None,
timeout: int | None = None,
) -> UpdateMemoryConfigResponse:
"""
Expand All @@ -450,8 +459,12 @@ def update_semantic_memory_config(
database: Name of the database to use for semantic memory
llm_model: Name of the language model to use for feature extraction
embedding_model: Name of the embedder to use for semantic similarity
cluster_split_reranker: Reranker ID used for cluster split scoring
ingestion_trigger_messages: Number of messages before triggering ingestion
ingestion_trigger_age_seconds: Age threshold in seconds for triggering ingestion
cluster_idle_ttl_seconds: Idle TTL in seconds for empty cluster GC
cluster_similarity_threshold: Cosine similarity threshold for clustering
cluster_max_time_gap_seconds: Maximum time gap in seconds for clustering
timeout: Request timeout in seconds (uses client default if not provided)

Returns:
Expand All @@ -463,14 +476,43 @@ def update_semantic_memory_config(

"""
self._check_closed()
spec = UpdateSemanticMemorySpec(
enabled=enabled,
database=database,
llm_model=llm_model,
embedding_model=embedding_model,
ingestion_trigger_messages=ingestion_trigger_messages,
ingestion_trigger_age_seconds=ingestion_trigger_age_seconds,
spec_data: dict[str, Any] = {}
_set_if_not_none(spec_data, "enabled", enabled)
_set_if_not_none(spec_data, "database", database)
_set_if_not_none(spec_data, "llm_model", llm_model)
_set_if_not_none(spec_data, "embedding_model", embedding_model)
Comment on lines +479 to +483
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server update logic relies on distinguishing “field omitted” vs “field explicitly set to null” (e.g., cluster_idle_ttl_seconds / cluster_max_time_gap_seconds / cluster_split_reranker use model_fields_set so users can clear values). This client helper currently only includes keys when the value is non-None and then does model_dump(exclude_none=True), so there’s no way for callers to send an explicit null to clear these fields. Consider using a sentinel default (e.g., UNSET) or adding explicit clear_* parameters so the client can exercise the full API.

Copilot uses AI. Check for mistakes.
_set_if_not_none(
spec_data,
"cluster_split_reranker",
cluster_split_reranker,
)
_set_if_not_none(
spec_data,
"ingestion_trigger_messages",
ingestion_trigger_messages,
)
_set_if_not_none(
spec_data,
"ingestion_trigger_age_seconds",
ingestion_trigger_age_seconds,
)
_set_if_not_none(
spec_data,
"cluster_idle_ttl_seconds",
cluster_idle_ttl_seconds,
)
_set_if_not_none(
spec_data,
"cluster_similarity_threshold",
cluster_similarity_threshold,
)
_set_if_not_none(
spec_data,
"cluster_max_time_gap_seconds",
cluster_max_time_gap_seconds,
)

spec = UpdateSemanticMemorySpec(**spec_data)
payload = spec.model_dump(exclude_none=True)
try:
response = self.client.request(
Expand Down
45 changes: 45 additions & 0 deletions packages/common/src/memmachine_common/api/config_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,30 @@ class SemanticMemoryConfigResponse(BaseModel):
str | None,
Field(default=None, description=SpecDoc.SEMANTIC_EMBEDDING_MODEL),
]
cluster_split_reranker: Annotated[
str | None,
Field(default=None, description=SpecDoc.SEMANTIC_CLUSTER_SPLIT_RERANKER),
]
cluster_similarity_threshold: Annotated[
float | None,
Field(default=None, description=SpecDoc.SEMANTIC_CLUSTER_SIMILARITY_THRESHOLD),
]
cluster_max_time_gap_seconds: Annotated[
int | None,
Field(default=None, description=SpecDoc.SEMANTIC_CLUSTER_MAX_TIME_GAP),
]
ingestion_trigger_messages: Annotated[
int | None,
Field(default=None, description=SpecDoc.SEMANTIC_INGESTION_MESSAGES),
]
ingestion_trigger_age_seconds: Annotated[
int | None,
Field(default=None, description=SpecDoc.SEMANTIC_INGESTION_AGE),
]
cluster_idle_ttl_seconds: Annotated[
int | None,
Field(default=None, description=SpecDoc.SEMANTIC_CLUSTER_IDLE_TTL),
]


class GetConfigResponse(BaseModel):
Expand Down Expand Up @@ -439,6 +463,10 @@ class UpdateSemanticMemorySpec(BaseModel):
str | None,
Field(default=None, description=SpecDoc.SEMANTIC_EMBEDDING_MODEL),
]
cluster_split_reranker: Annotated[
str | None,
Field(default=None, description=SpecDoc.SEMANTIC_CLUSTER_SPLIT_RERANKER),
]
ingestion_trigger_messages: Annotated[
int | None,
Field(default=None, gt=0, description=SpecDoc.SEMANTIC_INGESTION_MESSAGES),
Expand All @@ -447,6 +475,23 @@ class UpdateSemanticMemorySpec(BaseModel):
int | None,
Field(default=None, gt=0, description=SpecDoc.SEMANTIC_INGESTION_AGE),
]
cluster_idle_ttl_seconds: Annotated[
int | None,
Field(default=None, gt=0, description=SpecDoc.SEMANTIC_CLUSTER_IDLE_TTL),
]
cluster_similarity_threshold: Annotated[
float | None,
Field(
default=None,
ge=0.0,
le=1.0,
description=SpecDoc.SEMANTIC_CLUSTER_SIMILARITY_THRESHOLD,
),
]
cluster_max_time_gap_seconds: Annotated[
int | None,
Field(default=None, gt=0, description=SpecDoc.SEMANTIC_CLUSTER_MAX_TIME_GAP),
]


class UpdateMemoryConfigSpec(BaseModel):
Expand Down
28 changes: 23 additions & 5 deletions packages/common/src/memmachine_common/api/doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,28 @@ class SpecDoc:
Must reference an embedder configured in the resources section."""

SEMANTIC_INGESTION_MESSAGES = """
The number of uningested messages that triggers an ingestion cycle."""
The number of pending messages in a semantic cluster that triggers ingestion
for that cluster."""

SEMANTIC_INGESTION_AGE = """
The maximum age (in seconds) of uningested messages before
triggering an ingestion cycle."""
The maximum age (in seconds) of the oldest pending message in a semantic
cluster before triggering ingestion for that cluster."""

SEMANTIC_CLUSTER_IDLE_TTL = """
The idle time-to-live (in seconds) for empty semantic clusters before they are
garbage collected. Set to null to disable cluster GC."""

SEMANTIC_CLUSTER_SIMILARITY_THRESHOLD = """
Cosine similarity threshold for grouping messages into semantic clusters.
Higher values produce tighter clusters; lower values merge more messages."""

SEMANTIC_CLUSTER_MAX_TIME_GAP = """
Maximum time gap (in seconds) allowed between messages in the same cluster.
Set to null to disable time-based cluster splitting."""

SEMANTIC_CLUSTER_SPLIT_RERANKER = """
Reranker ID used to score cluster split boundaries during ingestion.
Defaults to the long-term memory reranker when unset."""

UPDATE_EPISODIC_MEMORY = """
Partial update for episodic memory configuration. Only supplied
Expand Down Expand Up @@ -1118,8 +1135,9 @@ class RouterDoc:
- database: The database resource to use for storing semantic memories
- llm_model: The language model to use for feature extraction
- embedding_model: The embedder to use for semantic similarity
- ingestion_trigger_messages: Number of messages before triggering ingestion
- ingestion_trigger_age_seconds: Age threshold for triggering ingestion
- ingestion_trigger_messages: Pending messages in a cluster before ingestion
- ingestion_trigger_age_seconds: Oldest pending message age before ingestion
- cluster_idle_ttl_seconds: Idle TTL before empty clusters are garbage collected
"""

# --- Semantic Set Type API Router Docs ---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,23 @@ def test_semantic_config_timedelta_float():
conf = SemanticMemoryConf(**raw_conf)
assert conf.ingestion_trigger_messages == 24
assert conf.ingestion_trigger_age == timedelta(minutes=2, milliseconds=500)


def test_semantic_config_cluster_settings():
raw_conf: dict[str, Any] = {
"database": "database",
"llm_model": "llm",
"embedding_model": "embedding",
"ingestion_trigger_messages": 5,
"ingestion_trigger_age": "PT1M",
"cluster_similarity_threshold": 0.45,
"cluster_max_time_gap": 300,
"cluster_idle_ttl": "PT2H",
"config_database": "database",
}

conf = SemanticMemoryConf(**raw_conf)

assert conf.cluster_similarity_threshold == 0.45
assert conf.cluster_max_time_gap == timedelta(seconds=300)
assert conf.cluster_idle_ttl == timedelta(hours=2)
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Any, cast

import pytest
Expand All @@ -7,6 +8,7 @@
chunk_text_balanced,
cluster_texts,
extract_sentences,
merge_async_iterators,
unflatten_like,
)

Expand Down Expand Up @@ -229,3 +231,88 @@ def test_extract_sentences():
}
for sentence in expected_sentences:
assert any(sentence in s for s in sentences)


async def _evented_iterator(values: list[str], events: list[asyncio.Event]):
for value, event in zip(values, events, strict=True):
await event.wait()
yield value


async def _raising_iterator(event: asyncio.Event, error: Exception):
await event.wait()
raise error
if False:
yield None


async def _blocking_iterator(cancel_event: asyncio.Event):
try:
await asyncio.Event().wait()
yield None
finally:
cancel_event.set()


@pytest.mark.asyncio
async def test_merge_async_iterators_empty():
results = [item async for item in merge_async_iterators([])]
assert results == []


@pytest.mark.asyncio
async def test_merge_async_iterators_interleaves():
events_a = [asyncio.Event(), asyncio.Event()]
events_b = [asyncio.Event(), asyncio.Event()]
merged = merge_async_iterators(
[
_evented_iterator(["a1", "a2"], events_a),
_evented_iterator(["b1", "b2"], events_b),
]
)

events_b[0].set()
assert await asyncio.wait_for(anext(merged), timeout=1) == "b1"
events_a[0].set()
assert await asyncio.wait_for(anext(merged), timeout=1) == "a1"
events_b[1].set()
assert await asyncio.wait_for(anext(merged), timeout=1) == "b2"
events_a[1].set()
assert await asyncio.wait_for(anext(merged), timeout=1) == "a2"

with pytest.raises(StopAsyncIteration):
await asyncio.wait_for(anext(merged), timeout=1)


@pytest.mark.asyncio
async def test_merge_async_iterators_propagates_error():
ok_events = [asyncio.Event()]
error_event = asyncio.Event()
merged = merge_async_iterators(
[
_evented_iterator(["ok"], ok_events),
_raising_iterator(error_event, ValueError("boom")),
]
)

ok_events[0].set()
assert await asyncio.wait_for(anext(merged), timeout=1) == "ok"

error_event.set()
with pytest.raises(ValueError, match="boom"):
await asyncio.wait_for(anext(merged), timeout=1)


@pytest.mark.asyncio
async def test_merge_async_iterators_cancels_producers():
cancel_event = asyncio.Event()
start_event = asyncio.Event()
merged = merge_async_iterators(
[_evented_iterator(["first"], [start_event]), _blocking_iterator(cancel_event)]
)

start_event.set()
assert await asyncio.wait_for(anext(merged), timeout=1) == "first"
await merged.aclose()

await asyncio.wait_for(cancel_event.wait(), timeout=1)
Loading