Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ba1feb6
Revert "fix: Update milvus connect function to work with remote insta…
franciscojavierarceo May 29, 2025
86b6a20
test: Add Operator E2E tests for Feast Apply and Materialize function…
Srihari1192 May 29, 2025
1ed32d4
fix: Updating milvus connect function to work with remote instance (#…
Fiona-Waters May 30, 2025
2bb7248
feat: Add MCP support to feature server configuration
YassinNouh21 May 31, 2025
479eb7a
fix linter
YassinNouh21 May 31, 2025
edbb58f
add example
YassinNouh21 May 31, 2025
a8b17e1
test: add test cases for the mcp server
YassinNouh21 May 31, 2025
bc00dba
fix linter
YassinNouh21 May 31, 2025
e3a30d7
formatting
YassinNouh21 May 31, 2025
8a644b5
docs: update README for MCP setup instructions
YassinNouh21 May 31, 2025
08c4dd6
fix: update transformation service endpoint and refactor MCP integration
YassinNouh21 May 31, 2025
6c6f6ee
feat: add Model Context Protocol (MCP) support and update documentation
YassinNouh21 May 31, 2025
c3043c1
fix: update entity key serialization version and improve README clarity
YassinNouh21 May 31, 2025
8b26804
Merge branch 'master' into feat/mcp
YassinNouh21 May 31, 2025
1bec81f
fix: refactor MCP imports to use the correct module path
YassinNouh21 May 31, 2025
8aa31a7
feat: Add MCP server implementation files
YassinNouh21 Jun 1, 2025
9409e6a
test: MCP server unit tests and integration tests
YassinNouh21 Jun 1, 2025
426744b
fix formatting
YassinNouh21 Jun 1, 2025
c40809b
fix formatting
YassinNouh21 Jun 1, 2025
6135cde
adding compiled requirements
franciscojavierarceo Jun 4, 2025
b31cc87
Merge branch 'master' into feat/mcp
franciscojavierarceo Jun 4, 2025
b9a84f3
fix linter
franciscojavierarceo Jun 4, 2025
dc71713
reverting duckdb change
franciscojavierarceo Jun 4, 2025
ba6f51a
needed to do file source and duckdb
franciscojavierarceo Jun 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: update transformation service endpoint and refactor MCP integration
Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com>
  • Loading branch information
YassinNouh21 committed May 31, 2025
commit 08c4dd65c6fbf8771db491b25ebf18e98ef9781c
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/feature_servers/mcp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ class McpFeatureServerConfig(BaseFeatureServerConfig):
mcp_transport: Optional[StrictStr] = None

# The endpoint definition for transformation_service (inherited from base)
transformation_service_endpoint: StrictStr = "localhost:6569"
transformation_service_endpoint: StrictStr = "localhost:6566"
275 changes: 25 additions & 250 deletions sdk/python/feast/infra/feature_servers/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@
"""

import logging
from typing import Any, Dict, List, Optional, Union
from typing import Optional

import feast
from feast.feature_service import FeatureService
from feast.feature_store import FeatureStore

logger = logging.getLogger(__name__)

try:
from fastapi_mcp import FastMCPIntegration
from fastapi_mcp import FastApiMCP

MCP_AVAILABLE = True
except ImportError:
Expand All @@ -25,259 +23,36 @@
)
MCP_AVAILABLE = False
# Create placeholder classes for testing
FastMCPIntegration = None
FastApiMCP = None


class FeastMCPServer:
"""MCP Server implementation for Feast."""

def __init__(
self,
store: FeatureStore,
server_name: str = "feast-mcp-server",
version: str = "1.0.0",
):
self.store = store
self.server_name = server_name
self.version = version
self._mcp_integration = None

if not MCP_AVAILABLE:
raise ImportError(
"fastapi_mcp is required for MCP support. Install it with: pip install fastapi_mcp"
)

def create_mcp_integration(self, app) -> Optional["FastMCPIntegration"]:
"""Create and configure MCP integration with the FastAPI app."""
if not MCP_AVAILABLE:
logger.warning("MCP not available, skipping integration")
return None

try:
# Initialize MCP integration
mcp_integration = FastMCPIntegration(
server_name=self.server_name, server_version=self.version
)

# Register MCP tools
self._register_tools(mcp_integration)

# Register MCP resources
self._register_resources(mcp_integration)

# Attach to FastAPI app
mcp_integration.attach_to_app(app)

self._mcp_integration = mcp_integration
logger.info(
f"MCP integration initialized for {self.server_name} v{self.version}"
)

return mcp_integration

except Exception as e:
logger.error(f"Failed to initialize MCP integration: {e}")
return None

def _register_tools(self, mcp_integration: "FastMCPIntegration"):
"""Register MCP tools that clients can invoke."""

@mcp_integration.tool()
async def get_online_features(
entities: Dict[str, List[Any]],
features: Optional[List[str]] = None,
feature_service: Optional[str] = None,
full_feature_names: bool = False,
) -> Dict[str, Any]:
"""
Get online features from Feast feature store.

Args:
entities: Dictionary of entity values
features: List of feature references (optional if feature_service is provided)
feature_service: Name of feature service (optional if features is provided)
full_feature_names: Whether to return full feature names

Returns:
Dictionary containing feature values
"""
try:
features_to_get: Union[List[str], FeatureService]
if feature_service:
fs_obj = self.store.get_feature_service(feature_service)
if not fs_obj:
raise ValueError(
f"FeatureService '{feature_service}' not found."
)
features_to_get = fs_obj
elif features:
features_to_get = features
else:
raise ValueError(
"Either 'features' or 'feature_service' must be provided."
)

result = self.store.get_online_features(
features=features_to_get,
entity_rows=entities,
full_feature_names=full_feature_names,
)

return result.to_dict()
except Exception as e:
logger.error(f"Error getting online features: {e}")
raise

@mcp_integration.tool()
async def list_feature_views(self) -> List[Dict[str, Any]]:
"""List all Feature Views in the Feast feature store."""
try:
feature_views = self.store.list_feature_views()
return [
{
"name": fv.name,
"entities": [
e.name if hasattr(e, "name") else e for e in fv.entities
],
"features": [f.name for f in fv.features],
"ttl": fv.ttl.total_seconds() if fv.ttl else None,
"online": fv.online,
"description": fv.description,
# Add other relevant FeatureView attributes as needed
}
for fv in feature_views
]
except Exception as e:
logger.error(f"Error listing feature views: {e}")
raise

@mcp_integration.tool()
async def list_feature_services() -> List[Dict[str, Any]]:
"""
List all feature services in the feature store.

Returns:
List of feature service information
"""
try:
feature_services = self.store.list_feature_services()
return [
{
"name": fs.name,
"features": [
f"{proj.name}:{feat}"
for proj in fs.feature_view_projections
for feat in proj.features
],
"description": getattr(fs, "description", None),
"tags": getattr(fs, "tags", {}),
}
for fs in feature_services
]
except Exception as e:
logger.error(f"Error listing feature services: {e}")
raise

@mcp_integration.tool()
async def get_feature_store_info() -> Dict[str, Any]:
"""
Get information about the Feast feature store.

Returns:
Dictionary containing feature store information
"""
try:
return {
"project": self.store.project,
"provider": self.store.config.provider,
"online_store_type": getattr(
self.store.config.online_store, "type", "unknown"
),
"registry_type": getattr(
self.store.config.registry, "registry_type", "unknown"
),
"version": feast.__version__,
}
except Exception as e:
logger.error(f"Error getting feature store info: {e}")
raise

def _register_resources(self, mcp_integration: "FastMCPIntegration"):
"""Register MCP resources that clients can access."""

@mcp_integration.resource("feast://feature-views")
async def get_feature_views_resource() -> str:
"""
Resource containing all feature views in JSON format.
"""
try:
feature_views = await self._get_tool_handler("list_feature_views")()
import json

return json.dumps(feature_views, indent=2)
except Exception as e:
logger.error(f"Error getting feature views resource: {e}")
raise

@mcp_integration.resource("feast://feature-services")
async def get_feature_services_resource() -> str:
"""
Resource containing all feature services in JSON format.
"""
try:
feature_services = await self._get_tool_handler(
"list_feature_services"
)()
import json

return json.dumps(feature_services, indent=2)
except Exception as e:
logger.error(f"Error getting feature services resource: {e}")
raise

def _get_tool_handler(self, tool_name: str):
"""Get a tool handler by name."""
if self._mcp_integration and hasattr(self._mcp_integration, "_tools"):
return self._mcp_integration._tools.get(tool_name)
return None


def add_mcp_support_to_app(
app, store: FeatureStore, config
) -> Optional[FeastMCPServer]:
"""
Add MCP support to a FastAPI application.

Args:
app: FastAPI application instance
store: Feast FeatureStore instance
config: MCP configuration

Returns:
FeastMCPServer instance if successful, None otherwise
"""
if not config or not getattr(config, "mcp_enabled", False):
logger.info("MCP support is disabled")
return None

def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastApiMCP"]:
"""Add MCP support to the FastAPI app if enabled in configuration."""
if not MCP_AVAILABLE:
logger.warning("MCP support requested but fastapi_mcp is not available")
return None

try:
server_name = getattr(config, "mcp_server_name", "feast-mcp-server")
version = getattr(config, "mcp_server_version", "1.0.0")

mcp_server = FeastMCPServer(store, server_name, version)
mcp_integration = mcp_server.create_mcp_integration(app)

if mcp_integration:
logger.info("MCP support successfully added to Feast feature server")
return mcp_server
else:
logger.error("Failed to create MCP integration")
return None
# Create MCP server from the FastAPI app
mcp = FastApiMCP(
app,
name=getattr(config, "mcp_server_name", "feast-feature-store"),
description="Feast Feature Store MCP Server - Access feature store data and operations through MCP",
)

# Mount the MCP server to the FastAPI app
mcp.mount()

logger.info(
"MCP support has been enabled for the Feast feature server at /mcp endpoint"
)
logger.info(
f"MCP integration initialized for {getattr(config, 'mcp_server_name', 'feast-feature-store')} "
f"v{getattr(config, 'mcp_server_version', '1.0.0')}"
)

return mcp

except Exception as e:
logger.error(f"Error adding MCP support: {e}")
logger.error(f"Failed to initialize MCP integration: {e}")
return None
Loading