feat: wire clustering into ingestion pipeline and API#1241
feat: wire clustering into ingestion pipeline and API#1241o-love wants to merge 3 commits intocluster-enginefrom
Conversation
5c3ccd6 to
685c9ee
Compare
685c9ee to
1131247
Compare
1131247 to
c60cb13
Compare
c60cb13 to
ae1f158
Compare
There was a problem hiding this comment.
Pull request overview
Integrates the cluster engine into the server’s semantic ingestion flow and configuration/API layers so incoming messages are grouped into semantic clusters (and optionally split via reranker) before feature extraction and consolidation.
Changes:
- Wire cluster state storage + cluster assignment/splitting into the ingestion pipeline and background ingestion loop.
- Extend server/client config + API schemas to expose cluster configuration knobs (TTL, similarity threshold, time gap, split reranker).
- Update resource management and tests to support reranker + clustering end-to-end.
Reviewed changes
Copilot reviewed 27 out of 28 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pyproject.toml | Adjust ruff/pytest config (test ignores, pythonpath). |
| packages/server/src/memmachine_server/server/api_v2/router.py | Normalize feature metadata serialization (citations/other handling). |
| packages/server/src/memmachine_server/server/api_v2/config_service.py | Apply/serve new semantic cluster config fields via API. |
| packages/server/src/memmachine_server/semantic_memory/semantic_session_manager.py | Broaden config-store protocol types to Sequence. |
| packages/server/src/memmachine_server/semantic_memory/semantic_model.py | Add optional reranker to Resources bundle for ingestion/splitting. |
| packages/server/src/memmachine_server/semantic_memory/semantic_memory.py | Pass cluster params/state storage into ingestion; adjust background ingestion set selection. |
| packages/server/src/memmachine_server/semantic_memory/semantic_llm.py | Generalize types to Sequence/Mapping for LLM helpers. |
| packages/server/src/memmachine_server/semantic_memory/semantic_ingestion.py | Core change: cluster-aware ingestion, state persistence, optional reranker-driven splitting, cluster-scoped feature ops. |
| packages/server/src/memmachine_server/semantic_memory/config_store/config_store_sqlalchemy.py | Update SQLAlchemy config store return/param types to Sequence. |
| packages/server/src/memmachine_server/semantic_memory/config_store/config_store.py | Update config storage protocol type hints to Sequence. |
| packages/server/src/memmachine_server/semantic_memory/config_store/caching_semantic_config_storage.py | Update caching layer type hints to MutableMapping/Sequence. |
| packages/server/src/memmachine_server/common/resource_manager/semantic_manager.py | Provision cluster state storage + resolve split reranker; pass cluster params into SemanticService. |
| packages/server/src/memmachine_server/common/resource_manager/init.py | Add config property to the CommonResourceManager protocol (TYPE_CHECKING import). |
| packages/server/src/memmachine_server/common/configuration/init.py | Add cluster-related semantic memory configuration fields + docs. |
| packages/server/server_tests/memmachine_server/server/api_v2/test_config_service.py | Add validation tests for cluster config update spec constraints. |
| packages/server/server_tests/memmachine_server/semantic_memory/test_semantic_memory_integration.py | Update integration fixture for cluster state storage + reranker accessor. |
| packages/server/server_tests/memmachine_server/semantic_memory/test_semantic_memory_background.py | Update background ingestion tests to inject cluster state storage. |
| packages/server/server_tests/memmachine_server/semantic_memory/test_semantic_ingestion.py | Major test expansion for cluster assignment, persistence, splitting, and cluster metadata behavior. |
| packages/server/server_tests/memmachine_server/semantic_memory/conftest.py | Add in-memory cluster state storage + mock reranker fixtures. |
| packages/server/server_tests/memmachine_server/main/test_memmachine_integration.py | Update smoke test to new internal ingestion trigger field names. |
| packages/server/server_tests/memmachine_server/common/test_utils.py | Add tests for merge_async_iterators utility. |
| packages/server/server_tests/memmachine_server/common/configuration/test_semantic_conf.py | Validate new semantic cluster config parsing. |
| packages/common/src/memmachine_common/api/doc.py | Document cluster-aware ingestion semantics and new config fields. |
| packages/common/src/memmachine_common/api/config_spec.py | Expose new cluster fields in config API response/update models. |
| packages/client/src/memmachine_client/config.py | Extend client config update/get methods for new semantic cluster fields. |
| packages/client/client_tests/test_config.py | Update client tests for new semantic cluster fields round-trip. |
| .gitignore | Ignore .worktrees/. |
| .github/workflows/installation-test.yml | Install common wheel prior to server wheel during installation tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if spec.ingestion_trigger_messages is None: | ||
| return | ||
|
|
||
| sm.ingestion_trigger_messages = spec.ingestion_trigger_messages | ||
| changes.append( |
There was a problem hiding this comment.
_apply_semantic_memory_ingestion_updates() returns early when ingestion_trigger_messages is unset, which also prevents applying ingestion_trigger_age_seconds updates unless the caller also sets ingestion_trigger_messages. This breaks the “partial update” behavior described by the API (a client should be able to update age without changing message-count). Consider checking spec.model_fields_set for each field independently (similar to the cluster_* helpers) and applying ingestion_trigger_age_seconds whenever it is provided.
| dirty_sets = self._semantic_storage.get_history_set_ids( | ||
| min_uningested_messages=1, |
There was a problem hiding this comment.
Background ingestion now queries get_history_set_ids(min_uningested_messages=1) every loop, meaning any set with at least one pending message will be revisited at the background interval even if no cluster is eligible to ingest yet (e.g., below message threshold and not old enough). On busy deployments with many “not-ready” sets this can create significant unnecessary load (repeated DB scans + state reads). If possible, reintroduce a coarse prefilter (e.g., min_uningested_messages=max(trigger_messages, 1) and/or older_than=now-trigger_age) or add logic to skip sets until their next eligible time based on cluster state.
| dirty_sets = self._semantic_storage.get_history_set_ids( | |
| min_uningested_messages=1, | |
| # Use the ingestion message limit as a coarse prefilter to avoid | |
| # repeatedly scanning sets that cannot yet trigger ingestion. | |
| if ( | |
| getattr(self, "_cluster_ingestion_message_limit", None) is not None | |
| and self._cluster_ingestion_message_limit > 0 | |
| ): | |
| min_uningested_messages = self._cluster_ingestion_message_limit | |
| else: | |
| min_uningested_messages = 1 | |
| dirty_sets = self._semantic_storage.get_history_set_ids( | |
| min_uningested_messages=min_uningested_messages, |
|
|
||
| logger = logging.getLogger(__name__) | ||
| _CLUSTER_METADATA_KEY = "cluster_id" | ||
|
|
There was a problem hiding this comment.
_MAX_CLUSTERS_PER_RUN = 5 introduces a hard-coded throttle on how many ready clusters can be processed per ingestion run, but the value isn’t documented or configurable. This makes throughput tuning difficult and can lead to surprising latency under load. Consider wiring this as a parameter/config value (or at least adding a short rationale comment next to the constant).
| # Hard cap on how many ready clusters are processed per ingestion run. | |
| # This is a conservative default to avoid unbounded work and resource contention; | |
| # increase cautiously if higher throughput is required. |
| spec_data: dict[str, Any] = {} | ||
| _set_if_not_none(spec_data, "enabled", enabled) | ||
| _set_if_not_none(spec_data, "database", database) | ||
| _set_if_not_none(spec_data, "llm_model", llm_model) | ||
| _set_if_not_none(spec_data, "embedding_model", embedding_model) |
There was a problem hiding this comment.
The server update logic relies on distinguishing “field omitted” vs “field explicitly set to null” (e.g., cluster_idle_ttl_seconds / cluster_max_time_gap_seconds / cluster_split_reranker use model_fields_set so users can clear values). This client helper currently only includes keys when the value is non-None and then does model_dump(exclude_none=True), so there’s no way for callers to send an explicit null to clear these fields. Consider using a sentinel default (e.g., UNSET) or adding explicit clear_* parameters so the client can exercise the full API.
70a6258 to
bae9bb0
Compare
The ResourceManager protocol requires get_reranker but the test's local _ResourceManager was missing it, causing isinstance check to fail at runtime.
bae9bb0 to
f7ae24e
Compare
Purpose of the change
Wire the cluster engine into the semantic ingestion pipeline, session manager, resource manager, and config/API layer so that incoming messages are clustered before feature extraction.
Description
ClusterManagerintosemantic_ingestion.py— messages are grouped into clusters, and ingestion triggers per-clusterRerankertoResourcesmodel and wire throughsemantic_manager.pyconfig_spec.py:cluster_idle_ttl_seconds,cluster_similarity_threshold,cluster_max_time_gap_seconds,cluster_split_rerankersemantic_session_manager.pyto handle cluster lifecycle (create, ingest, garbage collect)Stack: PR 3/4 —
main←storage-interface-refactor←cluster-engine←cluster-integration←eval-harnessDepends on #1240
Type of change
How Has This Been Tested?
Unit Test
Integration Test
test_semantic_ingestion.py— cluster-aware ingestion triggers and extractiontest_semantic_session_manager.py— session + cluster lifecycletest_config_service.py— cluster config fields round-trip through APItest_memmachine_mock.py— end-to-end with cluster wiringtest_utils.py— utility function changestest_semantic_conf.py— configuration validationTest Results: All tests pass locally.
Checklist
Maintainer Checklist
Screenshots/Gifs
N/A
Further comments
This is the largest PR in the stack (34 files, ~2400 net lines) as it touches the most integration points. The cluster config fields have sensible defaults so existing deployments are unaffected.