Skip to content

feat: wire clustering into ingestion pipeline and API#1241

Open
o-love wants to merge 3 commits intocluster-enginefrom
cluster-integration
Open

feat: wire clustering into ingestion pipeline and API#1241
o-love wants to merge 3 commits intocluster-enginefrom
cluster-integration

Conversation

@o-love
Copy link
Copy Markdown
Contributor

@o-love o-love commented Mar 18, 2026

Purpose of the change

Wire the cluster engine into the semantic ingestion pipeline, session manager, resource manager, and config/API layer so that incoming messages are clustered before feature extraction.

Description

  • Integrate ClusterManager into semantic_ingestion.py — messages are grouped into clusters, and ingestion triggers per-cluster
  • Add Reranker to Resources model and wire through semantic_manager.py
  • Expose cluster config fields in config_spec.py: cluster_idle_ttl_seconds, cluster_similarity_threshold, cluster_max_time_gap_seconds, cluster_split_reranker
  • Update semantic_session_manager.py to handle cluster lifecycle (create, ingest, garbage collect)
  • Update API router and config service to serve new cluster configuration
  • Update CI workflow and pyproject.toml for new dependencies

Stack: PR 3/4mainstorage-interface-refactorcluster-enginecluster-integrationeval-harness

Depends on #1240

Type of change

  • New feature (non-breaking change which adds functionality)

How Has This Been Tested?

  • Unit Test

  • Integration Test

  • test_semantic_ingestion.py — cluster-aware ingestion triggers and extraction

  • test_semantic_session_manager.py — session + cluster lifecycle

  • test_config_service.py — cluster config fields round-trip through API

  • test_memmachine_mock.py — end-to-end with cluster wiring

  • test_utils.py — utility function changes

  • test_semantic_conf.py — configuration validation

Test Results: All tests pass locally.

Checklist

  • My code follows the style guidelines of this project (See STYLE_GUIDE.md)
  • I have performed a self-review of my own code
  • I have commented my code
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added unit tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • Any dependent changes have been merged and published in downstream modules
  • I have checked my code and corrected any misspellings

Maintainer Checklist

  • Confirmed all checks passed
  • Contributor has signed the commit(s)
  • Reviewed the code
  • Run, Tested, and Verified the change(s) work as expected

Screenshots/Gifs

N/A

Further comments

This is the largest PR in the stack (34 files, ~2400 net lines) as it touches the most integration points. The cluster config fields have sensible defaults so existing deployments are unaffected.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Integrates the cluster engine into the server’s semantic ingestion flow and configuration/API layers so incoming messages are grouped into semantic clusters (and optionally split via reranker) before feature extraction and consolidation.

Changes:

  • Wire cluster state storage + cluster assignment/splitting into the ingestion pipeline and background ingestion loop.
  • Extend server/client config + API schemas to expose cluster configuration knobs (TTL, similarity threshold, time gap, split reranker).
  • Update resource management and tests to support reranker + clustering end-to-end.

Reviewed changes

Copilot reviewed 27 out of 28 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pyproject.toml Adjust ruff/pytest config (test ignores, pythonpath).
packages/server/src/memmachine_server/server/api_v2/router.py Normalize feature metadata serialization (citations/other handling).
packages/server/src/memmachine_server/server/api_v2/config_service.py Apply/serve new semantic cluster config fields via API.
packages/server/src/memmachine_server/semantic_memory/semantic_session_manager.py Broaden config-store protocol types to Sequence.
packages/server/src/memmachine_server/semantic_memory/semantic_model.py Add optional reranker to Resources bundle for ingestion/splitting.
packages/server/src/memmachine_server/semantic_memory/semantic_memory.py Pass cluster params/state storage into ingestion; adjust background ingestion set selection.
packages/server/src/memmachine_server/semantic_memory/semantic_llm.py Generalize types to Sequence/Mapping for LLM helpers.
packages/server/src/memmachine_server/semantic_memory/semantic_ingestion.py Core change: cluster-aware ingestion, state persistence, optional reranker-driven splitting, cluster-scoped feature ops.
packages/server/src/memmachine_server/semantic_memory/config_store/config_store_sqlalchemy.py Update SQLAlchemy config store return/param types to Sequence.
packages/server/src/memmachine_server/semantic_memory/config_store/config_store.py Update config storage protocol type hints to Sequence.
packages/server/src/memmachine_server/semantic_memory/config_store/caching_semantic_config_storage.py Update caching layer type hints to MutableMapping/Sequence.
packages/server/src/memmachine_server/common/resource_manager/semantic_manager.py Provision cluster state storage + resolve split reranker; pass cluster params into SemanticService.
packages/server/src/memmachine_server/common/resource_manager/init.py Add config property to the CommonResourceManager protocol (TYPE_CHECKING import).
packages/server/src/memmachine_server/common/configuration/init.py Add cluster-related semantic memory configuration fields + docs.
packages/server/server_tests/memmachine_server/server/api_v2/test_config_service.py Add validation tests for cluster config update spec constraints.
packages/server/server_tests/memmachine_server/semantic_memory/test_semantic_memory_integration.py Update integration fixture for cluster state storage + reranker accessor.
packages/server/server_tests/memmachine_server/semantic_memory/test_semantic_memory_background.py Update background ingestion tests to inject cluster state storage.
packages/server/server_tests/memmachine_server/semantic_memory/test_semantic_ingestion.py Major test expansion for cluster assignment, persistence, splitting, and cluster metadata behavior.
packages/server/server_tests/memmachine_server/semantic_memory/conftest.py Add in-memory cluster state storage + mock reranker fixtures.
packages/server/server_tests/memmachine_server/main/test_memmachine_integration.py Update smoke test to new internal ingestion trigger field names.
packages/server/server_tests/memmachine_server/common/test_utils.py Add tests for merge_async_iterators utility.
packages/server/server_tests/memmachine_server/common/configuration/test_semantic_conf.py Validate new semantic cluster config parsing.
packages/common/src/memmachine_common/api/doc.py Document cluster-aware ingestion semantics and new config fields.
packages/common/src/memmachine_common/api/config_spec.py Expose new cluster fields in config API response/update models.
packages/client/src/memmachine_client/config.py Extend client config update/get methods for new semantic cluster fields.
packages/client/client_tests/test_config.py Update client tests for new semantic cluster fields round-trip.
.gitignore Ignore .worktrees/.
.github/workflows/installation-test.yml Install common wheel prior to server wheel during installation tests.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +271 to +275
if spec.ingestion_trigger_messages is None:
return

sm.ingestion_trigger_messages = spec.ingestion_trigger_messages
changes.append(
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_apply_semantic_memory_ingestion_updates() returns early when ingestion_trigger_messages is unset, which also prevents applying ingestion_trigger_age_seconds updates unless the caller also sets ingestion_trigger_messages. This breaks the “partial update” behavior described by the API (a client should be able to update age without changing message-count). Consider checking spec.model_fields_set for each field independently (similar to the cluster_* helpers) and applying ingestion_trigger_age_seconds whenever it is provided.

Copilot uses AI. Check for mistakes.
Comment on lines +827 to +828
dirty_sets = self._semantic_storage.get_history_set_ids(
min_uningested_messages=1,
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Background ingestion now queries get_history_set_ids(min_uningested_messages=1) every loop, meaning any set with at least one pending message will be revisited at the background interval even if no cluster is eligible to ingest yet (e.g., below message threshold and not old enough). On busy deployments with many “not-ready” sets this can create significant unnecessary load (repeated DB scans + state reads). If possible, reintroduce a coarse prefilter (e.g., min_uningested_messages=max(trigger_messages, 1) and/or older_than=now-trigger_age) or add logic to skip sets until their next eligible time based on cluster state.

Suggested change
dirty_sets = self._semantic_storage.get_history_set_ids(
min_uningested_messages=1,
# Use the ingestion message limit as a coarse prefilter to avoid
# repeatedly scanning sets that cannot yet trigger ingestion.
if (
getattr(self, "_cluster_ingestion_message_limit", None) is not None
and self._cluster_ingestion_message_limit > 0
):
min_uningested_messages = self._cluster_ingestion_message_limit
else:
min_uningested_messages = 1
dirty_sets = self._semantic_storage.get_history_set_ids(
min_uningested_messages=min_uningested_messages,

Copilot uses AI. Check for mistakes.

logger = logging.getLogger(__name__)
_CLUSTER_METADATA_KEY = "cluster_id"

Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_MAX_CLUSTERS_PER_RUN = 5 introduces a hard-coded throttle on how many ready clusters can be processed per ingestion run, but the value isn’t documented or configurable. This makes throughput tuning difficult and can lead to surprising latency under load. Consider wiring this as a parameter/config value (or at least adding a short rationale comment next to the constant).

Suggested change
# Hard cap on how many ready clusters are processed per ingestion run.
# This is a conservative default to avoid unbounded work and resource contention;
# increase cautiously if higher throughput is required.

Copilot uses AI. Check for mistakes.
Comment on lines +479 to +483
spec_data: dict[str, Any] = {}
_set_if_not_none(spec_data, "enabled", enabled)
_set_if_not_none(spec_data, "database", database)
_set_if_not_none(spec_data, "llm_model", llm_model)
_set_if_not_none(spec_data, "embedding_model", embedding_model)
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server update logic relies on distinguishing “field omitted” vs “field explicitly set to null” (e.g., cluster_idle_ttl_seconds / cluster_max_time_gap_seconds / cluster_split_reranker use model_fields_set so users can clear values). This client helper currently only includes keys when the value is non-None and then does model_dump(exclude_none=True), so there’s no way for callers to send an explicit null to clear these fields. Consider using a sentinel default (e.g., UNSET) or adding explicit clear_* parameters so the client can exercise the full API.

Copilot uses AI. Check for mistakes.
@sscargal sscargal added this to the v0.3.4 milestone Apr 6, 2026
@sscargal sscargal self-requested a review April 6, 2026 21:38
@o-love o-love force-pushed the cluster-integration branch 2 times, most recently from 70a6258 to bae9bb0 Compare April 8, 2026 22:00
o-love added 2 commits April 8, 2026 15:03
The ResourceManager protocol requires get_reranker but the test's
local _ResourceManager was missing it, causing isinstance check to
fail at runtime.
@o-love o-love force-pushed the cluster-integration branch from bae9bb0 to f7ae24e Compare April 8, 2026 22:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants