From 092a8085cc62077d1cd3c7afb5e546c9d0f56bdd Mon Sep 17 00:00:00 2001 From: Chaitany patel Date: Fri, 10 Apr 2026 17:03:21 +0530 Subject: [PATCH] Replace fastapi_mcp with the MCP Python SDK (FastMCP) to fix RecursionError on recursive Pydantic models, expose explicit @mcp.tool() handlers that call FeatureStore directly, and add registry discovery, data access, and materialization tools with flat LLM-friendly schemas. Signed-off-by: Chaitany patel --- sdk/python/feast/feature_server.py | 8 +- .../feast/infra/mcp_servers/__init__.py | 3 +- .../feast/infra/mcp_servers/mcp_config.py | 7 +- .../feast/infra/mcp_servers/mcp_server.py | 354 ++++++++++++++++-- .../elasticsearch.py | 5 +- .../integration/test_mcp_feature_server.py | 193 ++++------ .../infra/feature_servers/test_mcp_server.py | 334 +++++++++++------ 7 files changed, 625 insertions(+), 279 deletions(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index f60eeb9d87d..304af7c4c09 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -353,8 +353,14 @@ async def lifespan(app: FastAPI): await store.initialize() async_refresh() + + mcp_sm = getattr(app.state, "mcp_session_manager", None) try: - yield + if mcp_sm: + async with mcp_sm.run(): + yield + else: + yield finally: stop_refresh() if offline_batcher is not None: diff --git a/sdk/python/feast/infra/mcp_servers/__init__.py b/sdk/python/feast/infra/mcp_servers/__init__.py index 6f0acee9d78..6757f693151 100644 --- a/sdk/python/feast/infra/mcp_servers/__init__.py +++ b/sdk/python/feast/infra/mcp_servers/__init__.py @@ -1,9 +1,10 @@ # MCP (Model Context Protocol) server implementations for Feast from .mcp_config import McpFeatureServerConfig -from .mcp_server import add_mcp_support_to_app +from .mcp_server import add_mcp_support_to_app, create_mcp_server __all__ = [ "McpFeatureServerConfig", "add_mcp_support_to_app", + "create_mcp_server", ] diff --git a/sdk/python/feast/infra/mcp_servers/mcp_config.py b/sdk/python/feast/infra/mcp_servers/mcp_config.py index 8e48da58246..b7b3f7476d0 100644 --- a/sdk/python/feast/infra/mcp_servers/mcp_config.py +++ b/sdk/python/feast/infra/mcp_servers/mcp_config.py @@ -8,19 +8,16 @@ class McpFeatureServerConfig(BaseFeatureServerConfig): """MCP (Model Context Protocol) Feature Server configuration.""" - # Feature server type selector type: Literal["mcp"] = "mcp" - # Enable MCP server support - defaults to False as requested mcp_enabled: StrictBool = False - # MCP server name for identification mcp_server_name: StrictStr = "feast-mcp-server" - # MCP server version mcp_server_version: StrictStr = "1.0.0" mcp_transport: Literal["sse", "http"] = "sse" - # The endpoint definition for transformation_service (inherited from base) + mcp_base_path: StrictStr = "/mcp" + transformation_service_endpoint: StrictStr = "localhost:6566" diff --git a/sdk/python/feast/infra/mcp_servers/mcp_server.py b/sdk/python/feast/infra/mcp_servers/mcp_server.py index 972023cdd12..dd19499581e 100644 --- a/sdk/python/feast/infra/mcp_servers/mcp_server.py +++ b/sdk/python/feast/infra/mcp_servers/mcp_server.py @@ -1,79 +1,357 @@ """ MCP (Model Context Protocol) integration for Feast Feature Server. -This module provides MCP support for Feast by integrating with fastapi_mcp -to expose Feast functionality through the Model Context Protocol. +This module provides MCP support for Feast using the MCP Python SDK (FastMCP). +It exposes Feast functionality as MCP tools with LLM-friendly schemas, +avoiding the recursive $ref issues that occur with fastapi_mcp. """ +import json import logging -from typing import Optional +from typing import Annotated, Any, Dict, List, Optional + +from pydantic import Field from feast.feature_store import FeatureStore logger = logging.getLogger(__name__) try: - from fastapi_mcp import FastApiMCP + from mcp.server import FastMCP MCP_AVAILABLE = True except ImportError: logger.warning( - "fastapi_mcp is not installed. MCP support will be disabled. " - "Install it with: pip install fastapi_mcp" + "mcp SDK is not installed. MCP support will be disabled. " + "Install it with: pip install 'feast[mcp]'" ) MCP_AVAILABLE = False - # Create placeholder classes for testing - FastApiMCP = None + FastMCP = None # type: ignore[assignment, misc] class McpTransportNotSupportedError(RuntimeError): pass -def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastApiMCP"]: +def create_mcp_server(store: FeatureStore, config) -> "FastMCP": + """ + Create a FastMCP server with Feast tools registered. + + Each tool calls FeatureStore methods directly — no internal HTTP round-trip. + The ``store`` instance is shared with the FastAPI HTTP routes. + """ + mcp = FastMCP( + name=getattr(config, "mcp_server_name", "feast-feature-store"), + ) + + # ── Registry introspection tools ────────────────────────────────── + + @mcp.tool( + description=( + "List all feature views registered in the Feast feature store. " + "Returns name, entities, features, and tags for each feature view." + ), + ) + def list_feature_views( + tags: Annotated[ + Optional[Dict[str, str]], + Field( + description="Optional dict of tag key-value pairs to filter feature views by." + ), + ] = None, + ) -> str: + fvs = store.list_feature_views(allow_cache=True, tags=tags) + return json.dumps( + [ + { + "name": fv.name, + "entities": [e.name for e in fv.entity_columns], + "features": [f.name for f in fv.features], + "tags": dict(fv.tags) if fv.tags else {}, + } + for fv in fvs + ] + ) + + @mcp.tool( + description=( + "List all entities registered in the Feast feature store. " + "Returns name, join keys, value type, and tags for each entity." + ), + ) + def list_entities( + tags: Annotated[ + Optional[Dict[str, str]], + Field( + description="Optional dict of tag key-value pairs to filter entities by." + ), + ] = None, + ) -> str: + entities = store.list_entities(allow_cache=True, tags=tags) + return json.dumps( + [ + { + "name": e.name, + "join_key": e.join_key, + "value_type": e.value_type.name if e.value_type else None, + "tags": dict(e.tags) if e.tags else {}, + } + for e in entities + ] + ) + + @mcp.tool( + description=( + "List all feature services registered in the Feast feature store. " + "Returns name, included feature views, and tags for each." + ), + ) + def list_feature_services( + tags: Annotated[ + Optional[Dict[str, str]], + Field( + description="Optional dict of tag key-value pairs to filter feature services by." + ), + ] = None, + ) -> str: + services = store.list_feature_services(tags=tags) + return json.dumps( + [ + { + "name": svc.name, + "feature_views": [p.name for p in svc.feature_view_projections], + "tags": dict(svc.tags) if svc.tags else {}, + } + for svc in services + ] + ) + + @mcp.tool( + description=( + "List all data sources registered in the Feast feature store. " + "Returns name, type, and tags for each data source." + ), + ) + def list_data_sources( + tags: Annotated[ + Optional[Dict[str, str]], + Field( + description="Optional dict of tag key-value pairs to filter data sources by." + ), + ] = None, + ) -> str: + sources = store.list_data_sources(allow_cache=True, tags=tags) + return json.dumps( + [ + { + "name": ds.name, + "type": type(ds).__name__, + "tags": dict(ds.tags) if ds.tags else {}, + } + for ds in sources + ] + ) + + # ── Data access tools ───────────────────────────────────────────── + + @mcp.tool( + description=( + "Get online feature values for a set of entities. " + "Provide feature references like 'feature_view:feature_name' " + 'and entity key-value pairs like {"driver_id": [1001, 1002]}.' + ), + ) + def get_online_features( + features: Annotated[ + List[str], + Field( + description="List of feature references in 'feature_view:feature_name' format." + ), + ], + entities: Annotated[ + Dict[str, List[Any]], + Field( + description='Entity key-value pairs, e.g. {"driver_id": [1001, 1002]}.' + ), + ], + full_feature_names: Annotated[ + bool, + Field( + description="If true, return feature names prefixed with the feature view name." + ), + ] = False, + ) -> str: + response = store.get_online_features( + features=features, + entity_rows=entities, + full_feature_names=full_feature_names, + ) + return json.dumps(response.to_dict(), default=str) + + @mcp.tool( + description=( + "Search for similar documents using vector similarity. " + "Provide feature references and either a query vector (list of floats) " + "or a query_string for text-based search. " + "Uses v2 API when query_string is provided, v1 otherwise." + ), + ) + def search_documents( + features: Annotated[ + List[str], + Field( + description="List of feature references in 'feature_view:feature_name' format." + ), + ], + top_k: Annotated[ + int, Field(description="Number of top results to return.") + ] = 10, + query: Annotated[ + Optional[List[float]], + Field( + description="Query vector as a list of floats for vector similarity search." + ), + ] = None, + query_string: Annotated[ + Optional[str], + Field( + description="Text query string for text-based document search (uses v2 API)." + ), + ] = None, + distance_metric: Annotated[ + Optional[str], + Field(description="Distance metric to use, e.g. 'L2', 'cosine'."), + ] = None, + ) -> str: + if query_string is not None: + response = store.retrieve_online_documents_v2( + features=features, + query=query, + top_k=top_k, + query_string=query_string, + distance_metric=distance_metric or "L2", + ) + else: + response = store.retrieve_online_documents( + features=features, + query=query or [], + top_k=top_k, + distance_metric=distance_metric or "L2", + ) + return json.dumps(response.to_dict(), default=str) + + # ── Materialization tools ───────────────────────────────────────── + + @mcp.tool( + description=( + "Materialize feature data from the offline store into the online store " + "for a given time range. Timestamps should be ISO-8601 strings. " + "Optionally specify feature view names to materialize." + ), + ) + def materialize( + start_ts: Annotated[ + str, + Field( + description="Start timestamp in ISO-8601 format, e.g. '2024-01-01T00:00:00'." + ), + ], + end_ts: Annotated[ + str, + Field( + description="End timestamp in ISO-8601 format, e.g. '2024-01-02T00:00:00'." + ), + ], + feature_views: Annotated[ + Optional[List[str]], + Field( + description="List of feature view names to materialize. If omitted, all are materialized." + ), + ] = None, + ) -> str: + from dateutil import parser + + from feast import utils + + start_date = utils.make_tzaware(parser.parse(start_ts)) + end_date = utils.make_tzaware(parser.parse(end_ts)) + store.materialize( + start_date=start_date, + end_date=end_date, + feature_views=feature_views, + ) + return json.dumps({"status": "ok"}) + + @mcp.tool( + description=( + "Incrementally materialize new feature data from the offline store " + "into the online store up to the given end timestamp (ISO-8601)." + ), + ) + def materialize_incremental( + end_ts: Annotated[ + str, + Field( + description="End timestamp in ISO-8601 format, e.g. '2024-01-02T00:00:00'." + ), + ], + feature_views: Annotated[ + Optional[List[str]], + Field( + description="List of feature view names to materialize. If omitted, all are materialized." + ), + ] = None, + ) -> str: + from dateutil import parser + + from feast import utils + + end_date = utils.make_tzaware(parser.parse(end_ts)) + store.materialize_incremental( + end_date=end_date, + feature_views=feature_views, + ) + return json.dumps({"status": "ok"}) + + return mcp + + +def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastMCP"]: """Add MCP support to the FastAPI app if enabled in configuration.""" if not MCP_AVAILABLE: - logger.warning("MCP support requested but fastapi_mcp is not available") + logger.warning("MCP support requested but mcp SDK is not available") return None try: - # Create MCP server from the FastAPI app - mcp = FastApiMCP( - app, - name=getattr(config, "mcp_server_name", "feast-feature-store"), - description="Feast Feature Store MCP Server - Access feature store data and operations through MCP", - ) + mcp = create_mcp_server(store, config) + + base_path = getattr(config, "mcp_base_path", "/mcp") transport = getattr(config, "mcp_transport", "sse") - if transport == "http": - mount_http = getattr(mcp, "mount_http", None) - if mount_http is None: - raise McpTransportNotSupportedError( - "mcp_transport=http requires fastapi_mcp with FastApiMCP.mount_http(). " - "Upgrade fastapi_mcp (or install feast[mcp]) to a newer version." - ) - mount_http() - elif transport == "sse": - mount_sse = getattr(mcp, "mount_sse", None) - if mount_sse is not None: - mount_sse() - else: - logger.warning( - "transport sse not supported, fallback to the deprecated mount()." - ) - mcp.mount() + if transport == "sse": + mcp_app = mcp.sse_app() + app.mount(base_path, mcp_app) + elif transport == "http": + mcp.settings.streamable_http_path = "/" + mcp_app = mcp.streamable_http_app() + app.mount(base_path, mcp_app) + # Starlette does not propagate lifespan events to mounted + # sub-apps, so the main app must manage the session manager. + app.state.mcp_session_manager = mcp.session_manager else: - # Defensive guard for programmatic callers. raise McpTransportNotSupportedError( f"Unsupported mcp_transport={transport!r}. Expected 'sse' or 'http'." ) logger.info( - "MCP support has been enabled for the Feast feature server at /mcp endpoint" + "MCP support enabled at %s endpoint (transport=%s)", + base_path, + transport, ) logger.info( - f"MCP integration initialized for {getattr(config, 'mcp_server_name', 'feast-feature-store')} " - f"v{getattr(config, 'mcp_server_version', '1.0.0')}" + "MCP server: %s v%s", + getattr(config, "mcp_server_name", "feast-feature-store"), + getattr(config, "mcp_server_version", "1.0.0"), ) return mcp @@ -81,5 +359,5 @@ def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastAp except McpTransportNotSupportedError: raise except Exception as e: - logger.error(f"Failed to initialize MCP integration: {e}", exc_info=True) + logger.error("Failed to initialize MCP integration: %s", e, exc_info=True) return None diff --git a/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py b/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py index b78d003ac25..af092902e5d 100644 --- a/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py @@ -172,7 +172,10 @@ def online_read( for hit in response["hits"]["hits"]: source = hit["_source"] timestamp = source.get("timestamp") - timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S") + try: + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") + except ValueError: + timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S") features: Dict[str, ValueProto] = {} diff --git a/sdk/python/tests/integration/test_mcp_feature_server.py b/sdk/python/tests/integration/test_mcp_feature_server.py index 0e59a71dfae..9236ff9b9fd 100644 --- a/sdk/python/tests/integration/test_mcp_feature_server.py +++ b/sdk/python/tests/integration/test_mcp_feature_server.py @@ -1,9 +1,9 @@ +import json import unittest -from unittest.mock import MagicMock, Mock, patch +from types import SimpleNamespace +from unittest.mock import MagicMock, Mock -import pytest from fastapi import FastAPI -from fastapi.testclient import TestClient from pydantic import ValidationError from feast.feature_store import FeatureStore @@ -11,135 +11,112 @@ class TestMCPFeatureServerIntegration(unittest.TestCase): - """Integration tests for MCP feature server functionality.""" + """Integration tests for MCP feature server with the MCP SDK.""" def test_mcp_config_integration(self): - """Test that MCP configuration integrates properly with the server.""" config = McpFeatureServerConfig( enabled=True, mcp_enabled=True, mcp_server_name="integration-test-server", mcp_server_version="2.1.0", mcp_transport="sse", + mcp_base_path="/mcp", ) - # Verify configuration is properly structured for MCP integration self.assertTrue(config.enabled) self.assertTrue(config.mcp_enabled) self.assertEqual(config.mcp_server_name, "integration-test-server") self.assertEqual(config.mcp_server_version, "2.1.0") self.assertEqual(config.mcp_transport, "sse") + self.assertEqual(config.mcp_base_path, "/mcp") - def test_mcp_server_functionality_with_mock_store(self): - """Test MCP server functionality with a mock feature store.""" - with patch("feast.infra.mcp_servers.mcp_server.MCP_AVAILABLE", True): - with patch( - "feast.infra.mcp_servers.mcp_server.FastApiMCP" - ) as mock_fast_api_mcp: - from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app - - # Create a more realistic mock feature store - mock_store = MagicMock(spec=FeatureStore) - mock_store.list_feature_views.return_value = [] - mock_store.list_data_sources.return_value = [] - - mock_app = FastAPI() - config = McpFeatureServerConfig( - mcp_enabled=True, - mcp_server_name="test-feast-server", - mcp_server_version="1.0.0", - ) - - mock_mcp_instance = Mock(spec_set=["mount_sse", "mount_http", "mount"]) - mock_fast_api_mcp.return_value = mock_mcp_instance - - result = add_mcp_support_to_app(mock_app, mock_store, config) - - # Verify successful integration - self.assertIsNotNone(result) - self.assertEqual(result, mock_mcp_instance) - mock_fast_api_mcp.assert_called_once() - mock_mcp_instance.mount_sse.assert_called_once() - - @patch("feast.infra.mcp_servers.mcp_server.MCP_AVAILABLE", True) - @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") - def test_complete_mcp_setup_flow(self, mock_fast_api_mcp): - """Test the complete MCP setup flow from configuration to mounting.""" + def test_create_mcp_server_with_mock_store(self): + """Test that create_mcp_server produces a usable FastMCP instance.""" + from feast.infra.mcp_servers.mcp_server import create_mcp_server + + mock_store = MagicMock(spec=FeatureStore) + mock_store.list_feature_views.return_value = [] + mock_store.list_entities.return_value = [] + mock_store.list_feature_services.return_value = [] + mock_store.list_data_sources.return_value = [] + + config = McpFeatureServerConfig( + mcp_enabled=True, + mcp_server_name="test-feast-server", + mcp_server_version="1.0.0", + ) + + mcp = create_mcp_server(mock_store, config) + + self.assertIsNotNone(mcp) + self.assertEqual(mcp.name, "test-feast-server") + + tool_names = set(mcp._tool_manager._tools.keys()) + self.assertIn("list_feature_views", tool_names) + self.assertIn("get_online_features", tool_names) + self.assertIn("search_documents", tool_names) + + def test_add_mcp_support_mounts_on_fastapi(self): + """Test that add_mcp_support_to_app mounts the MCP app on FastAPI.""" from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app - # Setup test data app = FastAPI() - mock_store = Mock(spec=FeatureStore) + mock_store = MagicMock(spec=FeatureStore) + config = McpFeatureServerConfig( enabled=True, mcp_enabled=True, mcp_server_name="e2e-test-server", mcp_server_version="1.0.0", - transformation_service_endpoint="localhost:6566", ) - mock_mcp_instance = Mock(spec_set=["mount_sse", "mount_http", "mount"]) - mock_fast_api_mcp.return_value = mock_mcp_instance - - # Execute the flow result = add_mcp_support_to_app(app, mock_store, config) - # Verify all steps completed successfully self.assertIsNotNone(result) - mock_fast_api_mcp.assert_called_once_with( - app, - name="e2e-test-server", - description="Feast Feature Store MCP Server - Access feature store data and operations through MCP", - ) - mock_mcp_instance.mount_sse.assert_called_once() - self.assertEqual(result, mock_mcp_instance) - - @pytest.mark.skipif( - condition=True, # Skip until fastapi_mcp is available - reason="Requires fastapi_mcp package to be installed", - ) - def test_real_mcp_integration(self): - """Test real MCP integration with actual FastAPI app.""" - try: - from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app - - # Create a real FastAPI app - app = FastAPI() - # Mock feature store for this test - mock_store = MagicMock(spec=FeatureStore) - mock_store.list_feature_views.return_value = [] - mock_store.list_data_sources.return_value = [] - - config = McpFeatureServerConfig( - enabled=True, - mcp_enabled=True, - mcp_server_name="real-integration-test", - mcp_server_version="1.0.0", - ) - - # This would require fastapi_mcp to be installed - result = add_mcp_support_to_app(app, mock_store, config) - - if result is not None: - # Test that the app has MCP endpoints - client = TestClient(app) - # The exact endpoints would depend on fastapi_mcp implementation - # Verify the client can be created and make a basic request - response = client.get("/health", follow_redirects=False) - # We expect this to either work or return a 404, but not crash - self.assertIn(response.status_code, [200, 404]) - self.assertIsNotNone(result) - else: - # If fastapi_mcp is not available, result should be None - self.assertIsNone(result) - - except ImportError: - # Expected if fastapi_mcp is not installed - self.skipTest("fastapi_mcp not available") + mounted_paths = [route.path for route in app.routes] + self.assertIn("/mcp", mounted_paths) + + def test_tool_handlers_return_valid_json(self): + """Test that registry introspection tools return valid JSON.""" + from feast.infra.mcp_servers.mcp_server import create_mcp_server + + mock_store = MagicMock(spec=FeatureStore) + + mock_fv = MagicMock() + mock_fv.name = "driver_stats" + mock_fv.entity_columns = [] + mock_fv.features = [] + mock_fv.tags = {"team": "ml"} + mock_store.list_feature_views.return_value = [mock_fv] + + mock_entity = MagicMock() + mock_entity.name = "driver_id" + mock_entity.join_key = "driver_id" + mock_entity.value_type = MagicMock() + mock_entity.value_type.name = "INT64" + mock_entity.tags = {} + mock_store.list_entities.return_value = [mock_entity] + + mock_store.list_feature_services.return_value = [] + mock_store.list_data_sources.return_value = [] + + config = SimpleNamespace(mcp_server_name="json-test") + mcp = create_mcp_server(mock_store, config) + + for tool_name in [ + "list_feature_views", + "list_entities", + "list_feature_services", + "list_data_sources", + ]: + fn = mcp._tool_manager._tools[tool_name].fn + result = fn(tags=None) + parsed = json.loads(result) + self.assertIsInstance(parsed, list) def test_feature_server_with_mcp_config(self): - """Test feature server startup with MCP configuration.""" + """Test feature server hook handles MCP config gracefully.""" from feast.feature_server import _add_mcp_support_if_enabled app = FastAPI() @@ -151,16 +128,12 @@ def test_feature_server_with_mcp_config(self): mcp_server_version="1.0.0", ) - # This should not raise an exception even if MCP is not available try: _add_mcp_support_if_enabled(app, mock_store) except Exception as e: - # Should handle gracefully self.assertIn("MCP", str(e).upper()) def test_mcp_server_configuration_validation(self): - """Test comprehensive MCP server configuration validation.""" - # Test various configuration combinations for transport in ["sse", "http"]: config = McpFeatureServerConfig( enabled=True, @@ -171,19 +144,7 @@ def test_mcp_server_configuration_validation(self): ) self.assertEqual(config.mcp_transport, transport) - config_default = McpFeatureServerConfig( - enabled=True, - mcp_enabled=True, - mcp_server_name="test-server-default", - mcp_server_version="1.0.0", - ) - self.assertEqual(config_default.mcp_transport, "sse") - with self.assertRaises(ValidationError): McpFeatureServerConfig( - enabled=True, - mcp_enabled=True, - mcp_server_name="bad-transport", - mcp_server_version="1.0.0", mcp_transport="websocket", ) diff --git a/sdk/python/tests/unit/infra/feature_servers/test_mcp_server.py b/sdk/python/tests/unit/infra/feature_servers/test_mcp_server.py index b23372d9eab..7f44f9b775d 100644 --- a/sdk/python/tests/unit/infra/feature_servers/test_mcp_server.py +++ b/sdk/python/tests/unit/infra/feature_servers/test_mcp_server.py @@ -1,6 +1,7 @@ +import json import unittest from types import SimpleNamespace -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch from pydantic import ValidationError @@ -12,7 +13,6 @@ class TestMcpFeatureServerConfig(unittest.TestCase): """Test MCP feature server configuration.""" def test_default_config(self): - """Test default MCP configuration values.""" config = McpFeatureServerConfig() self.assertEqual(config.type, "mcp") @@ -21,16 +21,17 @@ def test_default_config(self): self.assertEqual(config.mcp_server_name, "feast-mcp-server") self.assertEqual(config.mcp_server_version, "1.0.0") self.assertEqual(config.mcp_transport, "sse") + self.assertEqual(config.mcp_base_path, "/mcp") self.assertEqual(config.transformation_service_endpoint, "localhost:6566") def test_custom_config(self): - """Test custom MCP configuration values.""" config = McpFeatureServerConfig( enabled=True, mcp_enabled=True, mcp_server_name="custom-feast-server", mcp_server_version="2.0.0", - mcp_transport="sse", + mcp_transport="http", + mcp_base_path="/custom-mcp", transformation_service_endpoint="custom-host:8080", ) @@ -39,11 +40,10 @@ def test_custom_config(self): self.assertTrue(config.mcp_enabled) self.assertEqual(config.mcp_server_name, "custom-feast-server") self.assertEqual(config.mcp_server_version, "2.0.0") - self.assertEqual(config.mcp_transport, "sse") - self.assertEqual(config.transformation_service_endpoint, "custom-host:8080") + self.assertEqual(config.mcp_transport, "http") + self.assertEqual(config.mcp_base_path, "/custom-mcp") def test_config_validation(self): - """Test configuration validation.""" for transport in ["sse", "http"]: config = McpFeatureServerConfig(mcp_transport=transport) self.assertEqual(config.mcp_transport, transport) @@ -51,169 +51,271 @@ def test_config_validation(self): McpFeatureServerConfig(mcp_transport="websocket") def test_config_inheritance(self): - """Test that McpFeatureServerConfig properly inherits from BaseFeatureServerConfig.""" config = McpFeatureServerConfig() - # Verify it has the base configuration attributes self.assertTrue(hasattr(config, "type")) self.assertTrue(hasattr(config, "enabled")) @patch("feast.infra.mcp_servers.mcp_server.MCP_AVAILABLE", True) -class TestMCPServerUnit(unittest.TestCase): - """Unit tests for MCP server functionality with mocked dependencies.""" +class TestCreateMcpServer(unittest.TestCase): + """Test create_mcp_server tool registration.""" - @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") - def test_add_mcp_support_success(self, mock_fast_api_mcp): - """Test successful MCP support addition.""" - from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app + def test_creates_fastmcp_instance(self): + from feast.infra.mcp_servers.mcp_server import create_mcp_server - mock_app = Mock() - mock_store = Mock(spec=FeatureStore) - mock_config = SimpleNamespace( - mcp_server_name="test-server", - mcp_server_version="1.0.0", - mcp_transport="sse", - ) + mock_store = MagicMock(spec=FeatureStore) + config = SimpleNamespace(mcp_server_name="test-server") - mock_mcp_instance = Mock(spec_set=["mount_sse", "mount", "mount_http"]) - mock_fast_api_mcp.return_value = mock_mcp_instance + mcp = create_mcp_server(mock_store, config) - result = add_mcp_support_to_app(mock_app, mock_store, mock_config) + self.assertIsNotNone(mcp) + self.assertEqual(mcp.name, "test-server") - # Verify FastApiMCP was called correctly - mock_fast_api_mcp.assert_called_once_with( - mock_app, - name="test-server", - description="Feast Feature Store MCP Server - Access feature store data and operations through MCP", - ) + def test_uses_default_name(self): + from feast.infra.mcp_servers.mcp_server import create_mcp_server - mock_mcp_instance.mount_sse.assert_called_once() + mock_store = MagicMock(spec=FeatureStore) + config = SimpleNamespace() - # Verify the result - self.assertEqual(result, mock_mcp_instance) + mcp = create_mcp_server(mock_store, config) + self.assertEqual(mcp.name, "feast-feature-store") - @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") - def test_add_mcp_support_with_defaults(self, mock_fast_api_mcp): - """Test MCP support addition with default configuration values.""" - from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app + def test_registers_expected_tools(self): + from feast.infra.mcp_servers.mcp_server import create_mcp_server - mock_app = Mock() - mock_store = Mock(spec=FeatureStore) - mock_config = SimpleNamespace(mcp_transport="sse") + mock_store = MagicMock(spec=FeatureStore) + config = SimpleNamespace(mcp_server_name="test-server") + + mcp = create_mcp_server(mock_store, config) - mock_mcp_instance = Mock(spec_set=["mount_sse", "mount", "mount_http"]) - mock_fast_api_mcp.return_value = mock_mcp_instance + tool_names = set(mcp._tool_manager._tools.keys()) + expected_tools = { + "list_feature_views", + "list_entities", + "list_feature_services", + "list_data_sources", + "get_online_features", + "search_documents", + "materialize", + "materialize_incremental", + } + self.assertEqual(tool_names, expected_tools) - result = add_mcp_support_to_app(mock_app, mock_store, mock_config) - # Verify FastApiMCP was called with default name - mock_fast_api_mcp.assert_called_once_with( - mock_app, - name="feast-feature-store", - description="Feast Feature Store MCP Server - Access feature store data and operations through MCP", +@patch("feast.infra.mcp_servers.mcp_server.MCP_AVAILABLE", True) +class TestMcpToolHandlers(unittest.TestCase): + """Test that MCP tool handlers delegate to FeatureStore correctly.""" + + def _create_server(self): + from feast.infra.mcp_servers.mcp_server import create_mcp_server + + self.mock_store = MagicMock(spec=FeatureStore) + config = SimpleNamespace(mcp_server_name="test-server") + return create_mcp_server(self.mock_store, config) + + def _get_tool_fn(self, mcp, name): + """Get the raw function registered for a tool.""" + return mcp._tool_manager._tools[name].fn + + def test_list_feature_views_delegates(self): + mcp = self._create_server() + mock_fv = MagicMock() + mock_fv.name = "driver_stats" + mock_fv.entity_columns = [] + mock_fv.features = [] + mock_fv.tags = {} + self.mock_store.list_feature_views.return_value = [mock_fv] + + fn = self._get_tool_fn(mcp, "list_feature_views") + result = json.loads(fn(tags=None)) + + self.mock_store.list_feature_views.assert_called_once_with( + allow_cache=True, tags=None + ) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["name"], "driver_stats") + + def test_list_entities_delegates(self): + mcp = self._create_server() + mock_entity = MagicMock() + mock_entity.name = "driver_id" + mock_entity.join_key = "driver_id" + mock_entity.value_type = MagicMock() + mock_entity.value_type.name = "INT64" + mock_entity.tags = {} + self.mock_store.list_entities.return_value = [mock_entity] + + fn = self._get_tool_fn(mcp, "list_entities") + result = json.loads(fn(tags=None)) + + self.mock_store.list_entities.assert_called_once_with( + allow_cache=True, tags=None + ) + self.assertEqual(result[0]["name"], "driver_id") + self.assertEqual(result[0]["value_type"], "INT64") + + def test_list_feature_services_delegates(self): + mcp = self._create_server() + mock_svc = MagicMock() + mock_svc.name = "driver_service" + mock_svc.feature_view_projections = [] + mock_svc.tags = {} + self.mock_store.list_feature_services.return_value = [mock_svc] + + fn = self._get_tool_fn(mcp, "list_feature_services") + result = json.loads(fn(tags=None)) + + self.mock_store.list_feature_services.assert_called_once_with(tags=None) + self.assertEqual(result[0]["name"], "driver_service") + + def test_list_data_sources_delegates(self): + mcp = self._create_server() + mock_ds = MagicMock() + mock_ds.name = "driver_source" + mock_ds.tags = {} + self.mock_store.list_data_sources.return_value = [mock_ds] + + fn = self._get_tool_fn(mcp, "list_data_sources") + result = json.loads(fn(tags=None)) + + self.mock_store.list_data_sources.assert_called_once_with( + allow_cache=True, tags=None + ) + self.assertEqual(result[0]["name"], "driver_source") + + def test_get_online_features_delegates(self): + mcp = self._create_server() + mock_response = MagicMock() + mock_response.to_dict.return_value = {"results": []} + self.mock_store.get_online_features.return_value = mock_response + + fn = self._get_tool_fn(mcp, "get_online_features") + result = fn( + features=["driver_stats:conv_rate"], + entities={"driver_id": [1001]}, ) - self.assertEqual(result, mock_mcp_instance) + self.mock_store.get_online_features.assert_called_once_with( + features=["driver_stats:conv_rate"], + entity_rows={"driver_id": [1001]}, + full_feature_names=False, + ) + self.assertIn("results", result) - @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") - def test_add_mcp_support_http_transport(self, mock_fast_api_mcp): - from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app + def test_search_documents_with_query_string_uses_v2(self): + mcp = self._create_server() + mock_response = MagicMock() + mock_response.to_dict.return_value = {"results": []} + self.mock_store.retrieve_online_documents_v2.return_value = mock_response - mock_app = Mock() - mock_store = Mock(spec=FeatureStore) - mock_config = SimpleNamespace( - mcp_server_name="test-server", mcp_transport="http" - ) + fn = self._get_tool_fn(mcp, "search_documents") + fn(features=["doc_fv:embedding"], query_string="hello", top_k=5) - mock_mcp_instance = Mock(spec_set=["mount_http"]) - mock_fast_api_mcp.return_value = mock_mcp_instance + self.mock_store.retrieve_online_documents_v2.assert_called_once() + self.mock_store.retrieve_online_documents.assert_not_called() - result = add_mcp_support_to_app(mock_app, mock_store, mock_config) - mock_mcp_instance.mount_http.assert_called_once() - self.assertEqual(result, mock_mcp_instance) + def test_search_documents_with_vector_uses_v1(self): + mcp = self._create_server() + mock_response = MagicMock() + mock_response.to_dict.return_value = {"results": []} + self.mock_store.retrieve_online_documents.return_value = mock_response - @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") - def test_add_mcp_support_http_missing_mount_http_fails(self, mock_fast_api_mcp): - from feast.infra.mcp_servers.mcp_server import ( - McpTransportNotSupportedError, - add_mcp_support_to_app, - ) + fn = self._get_tool_fn(mcp, "search_documents") + fn(features=["doc_fv:embedding"], query=[0.1, 0.2], top_k=5) - mock_app = Mock() - mock_store = Mock(spec=FeatureStore) - mock_config = SimpleNamespace(mcp_transport="http") + self.mock_store.retrieve_online_documents.assert_called_once() + self.mock_store.retrieve_online_documents_v2.assert_not_called() - mock_mcp_instance = Mock(spec_set=["mount"]) - mock_fast_api_mcp.return_value = mock_mcp_instance - with self.assertRaises(McpTransportNotSupportedError): - add_mcp_support_to_app(mock_app, mock_store, mock_config) +@patch("feast.infra.mcp_servers.mcp_server.MCP_AVAILABLE", True) +class TestAddMcpSupportToApp(unittest.TestCase): + """Test add_mcp_support_to_app mounting logic.""" - @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") - @patch("feast.infra.mcp_servers.mcp_server.logger") - def test_add_mcp_support_with_exception(self, mock_logger, mock_fast_api_mcp): - """Test MCP support addition when FastApiMCP raises an exception.""" + def test_mounts_sse_app(self): from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app - mock_app = Mock() - mock_store = Mock(spec=FeatureStore) - mock_config = SimpleNamespace( - mcp_server_name="test-server", mcp_transport="sse" + mock_app = MagicMock() + mock_store = MagicMock(spec=FeatureStore) + config = SimpleNamespace( + mcp_server_name="test-server", + mcp_server_version="1.0.0", + mcp_transport="sse", + mcp_base_path="/mcp", ) - # Mock FastApiMCP to raise an exception - mock_fast_api_mcp.side_effect = Exception("MCP initialization failed") + result = add_mcp_support_to_app(mock_app, mock_store, config) - result = add_mcp_support_to_app(mock_app, mock_store, mock_config) + self.assertIsNotNone(result) + mock_app.mount.assert_called_once() + call_args = mock_app.mount.call_args + self.assertEqual(call_args[0][0], "/mcp") - # Verify the result is None when exception occurs - self.assertIsNone(result) + def test_mounts_http_app(self): + from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app - # Verify error was logged - mock_logger.error.assert_called_once_with( - "Failed to initialize MCP integration: MCP initialization failed", - exc_info=True, + mock_app = MagicMock() + mock_store = MagicMock(spec=FeatureStore) + config = SimpleNamespace( + mcp_server_name="test-server", + mcp_server_version="1.0.0", + mcp_transport="http", + mcp_base_path="/mcp", ) - @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") - def test_add_mcp_support_mount_exception(self, mock_fast_api_mcp): - """Test MCP support addition when mount() raises an exception.""" - from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app + result = add_mcp_support_to_app(mock_app, mock_store, config) - mock_app = Mock() - mock_store = Mock(spec=FeatureStore) - mock_config = SimpleNamespace( - mcp_server_name="test-server", mcp_transport="sse" + self.assertIsNotNone(result) + mock_app.mount.assert_called_once() + call_args = mock_app.mount.call_args + self.assertEqual(call_args[0][0], "/mcp") + + def test_invalid_transport_raises(self): + from feast.infra.mcp_servers.mcp_server import ( + McpTransportNotSupportedError, + add_mcp_support_to_app, + ) + + mock_app = MagicMock() + mock_store = MagicMock(spec=FeatureStore) + config = SimpleNamespace( + mcp_server_name="test-server", + mcp_transport="grpc", ) - mock_mcp_instance = Mock(spec_set=["mount"]) - mock_mcp_instance.mount.side_effect = Exception("Mount failed") - mock_fast_api_mcp.return_value = mock_mcp_instance + with self.assertRaises(McpTransportNotSupportedError): + add_mcp_support_to_app(mock_app, mock_store, config) - result = add_mcp_support_to_app(mock_app, mock_store, mock_config) + def test_custom_base_path(self): + from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app - # Verify the result is None when mount fails - self.assertIsNone(result) + mock_app = MagicMock() + mock_store = MagicMock(spec=FeatureStore) + config = SimpleNamespace( + mcp_server_name="test-server", + mcp_server_version="1.0.0", + mcp_transport="sse", + mcp_base_path="/custom-mcp", + ) + + add_mcp_support_to_app(mock_app, mock_store, config) + + call_args = mock_app.mount.call_args + self.assertEqual(call_args[0][0], "/custom-mcp") @patch("feast.infra.mcp_servers.mcp_server.MCP_AVAILABLE", False) class TestMCPNotAvailable(unittest.TestCase): - """Test behavior when MCP is not available.""" + """Test behavior when MCP SDK is not available.""" @patch("feast.infra.mcp_servers.mcp_server.logger") def test_add_mcp_support_mcp_not_available(self, mock_logger): - """Test add_mcp_support_to_app when MCP is not available.""" from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app - mock_app = Mock() - mock_store = Mock() - mock_config = Mock() - - result = add_mcp_support_to_app(mock_app, mock_store, mock_config) + result = add_mcp_support_to_app(Mock(), Mock(), Mock()) self.assertIsNone(result) mock_logger.warning.assert_called_once_with( - "MCP support requested but fastapi_mcp is not available" + "MCP support requested but mcp SDK is not available" ) @@ -222,7 +324,6 @@ class TestFeatureServerMCPHooks(unittest.TestCase): @patch("feast.feature_server.logger") def test_add_mcp_support_if_enabled_exception(self, mock_logger): - """Test _add_mcp_support_if_enabled when an exception occurs.""" from feast.feature_server import _add_mcp_support_if_enabled mock_app = Mock() @@ -231,7 +332,6 @@ def test_add_mcp_support_if_enabled_exception(self, mock_logger): mock_store.config.feature_server.type = "mcp" mock_store.config.feature_server.mcp_enabled = True - # Mock the import to raise an exception with patch( "feast.infra.mcp_servers.mcp_server.add_mcp_support_to_app", side_effect=Exception("Test error"),