Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ snowflake = [
]
sqlite_vec = ["sqlite-vec==v0.1.6"]
mcp = ["fastapi_mcp"]
mlflow = ["mlflow>=2.10.0"]

dbt = ["dbt-artifacts-parser"]

Expand Down
237 changes: 233 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import asyncio
import copy
import itertools
import logging
import os
import time
import warnings
Expand Down Expand Up @@ -105,6 +106,32 @@
_track_materialization = None # Lazy-loaded on first materialization call
_track_materialization_loaded = False

_mlflow_log_fn = None # Lazy-loaded on first feature retrieval
_mlflow_log_fn_loaded = False


_logger = logging.getLogger(__name__)


def _get_mlflow_log_fn():
"""Lazy-import mlflow logger only when MLflow integration is configured."""
global _mlflow_log_fn, _mlflow_log_fn_loaded
if not _mlflow_log_fn_loaded:
try:
from feast.mlflow_integration.logger import (
log_feature_retrieval_to_mlflow,
)

_mlflow_log_fn = log_feature_retrieval_to_mlflow
_mlflow_log_fn_loaded = True
except ImportError:
_mlflow_log_fn_loaded = True
_mlflow_log_fn = None
except Exception as e:
_logger.warning("MLflow auto-log import failed (will retry): %s", e)
_mlflow_log_fn = None
return _mlflow_log_fn


def _get_track_materialization():
"""Lazy-import feast.metrics only when materialization tracking is needed.
Expand Down Expand Up @@ -194,6 +221,123 @@ def __init__(
# Initialize feature service cache for performance optimization
self._feature_service_cache = {}

# Cache for _resolve_feature_service_name lookups
self._fs_name_cache: Dict[frozenset, Optional[str]] = {}

# Configure MLflow tracking URI globally from config
self._init_mlflow_tracking()

def _init_mlflow_tracking(self):
"""Configure MLflow globally from feature_store.yaml.

Sets the tracking URI and experiment name.
The experiment is named after the Feast project.

When no tracking_uri is specified, defaults to http://127.0.0.1:5000
(a local MLflow tracking server).
"""
try:
mlflow_cfg = self.config.mlflow
if mlflow_cfg is None or not mlflow_cfg.enabled:
return

import mlflow

tracking_uri = mlflow_cfg.tracking_uri or "http://127.0.0.1:5000"
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(self.config.project)
except ImportError:
pass
except Exception as e:
warnings.warn(f"Failed to configure MLflow tracking: {e}")

def _resolve_feature_service_name(self, feature_refs: List[str]) -> Optional[str]:
"""Find the best-matching feature service for the given feature refs.

Resolution: exact match wins immediately; otherwise the smallest
superset (fewest extra features) is returned. Results are cached
per FeatureStore instance for O(1) repeated lookups.
"""
try:
ref_key = frozenset(feature_refs)
if ref_key in self._fs_name_cache:
return self._fs_name_cache[ref_key]

best_match = None
best_extra = float("inf")

for fs in self.registry.list_feature_services(
self.project, allow_cache=True
):
fs_refs = frozenset(
f"{p.name}:{f.name}"
for p in fs.feature_view_projections
for f in p.features
)
Comment on lines +272 to +276
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 _resolve_feature_service_name uses p.name instead of p.name_to_use(), causing feature service matching failures

The _resolve_feature_service_name method at sdk/python/feast/feature_store.py:273 constructs feature service refs as f"{p.name}:{f.name}", but the _feature_refs it receives from callers (via utils._get_features at sdk/python/feast/utils.py:1164) are built using f"{projection.name_to_use()}:{f.name}". The name_to_use() method (sdk/python/feast/feature_view_projection.py:52-56) returns self.name_alias or self.name and may append @v{version_tag}. When a feature view projection has an alias or a version tag, the frozensets will never match, so the method silently returns None and the feast.feature_service tag is never set in MLflow for those retrievals.

Suggested change
fs_refs = frozenset(
f"{p.name}:{f.name}"
for p in fs.feature_view_projections
for f in p.features
)
fs_refs = frozenset(
f"{p.name_to_use()}:{f.name}"
for p in fs.feature_view_projections
for f in p.features
)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved: The current analysis (ANALYSIS_0001) performed a more thorough trace of all calling paths and determined this is NOT a bug: _resolve_feature_service_name is only called when features is a List[str] (not a FeatureService), and in that code path _get_features returns the user-provided strings as-is. User-provided feature refs use the original FV name (matching p.name), so the comparison is consistent. The previous bug was a false positive.

if ref_key == fs_refs:
self._fs_name_cache[ref_key] = fs.name
return fs.name
if ref_key.issubset(fs_refs):
extra = len(fs_refs) - len(ref_key)
if extra < best_extra:
best_match = fs.name
best_extra = extra

self._fs_name_cache[ref_key] = best_match
return best_match
except Exception as e:
_logger.debug("Failed to resolve feature service name: %s", e)
return None

def _auto_log_entity_df_info(self, entity_df, start_date=None, end_date=None):
"""Log entity_df info to MLflow for reproducibility.

Handles three entity_df types:
- pd.DataFrame: saves metadata + full parquet artifact (if under 100k rows)
- str (SQL query): logs the query as a param
- None (range-based): logs start_date/end_date
"""
try:
import mlflow

if mlflow.active_run() is None:
return
tracking_uri = self.config.mlflow.tracking_uri or "http://127.0.0.1:5000"
client = mlflow.MlflowClient(tracking_uri=tracking_uri)
run_id = mlflow.active_run().info.run_id

if isinstance(entity_df, str):
query = entity_df if len(entity_df) <= 490 else entity_df[:487] + "..."
client.log_param(run_id, "feast.entity_df_query", query)
client.set_tag(run_id, "feast.entity_df_type", "sql")

elif isinstance(entity_df, pd.DataFrame):
client.set_tag(run_id, "feast.entity_df_type", "dataframe")
client.log_param(run_id, "feast.entity_df_rows", str(len(entity_df)))
cols = ",".join(entity_df.columns)
if len(cols) > 490:
cols = cols[:487] + "..."
client.log_param(run_id, "feast.entity_df_columns", cols)

max_rows = 100_000
if len(entity_df) <= max_rows:
import tempfile

with tempfile.TemporaryDirectory() as tmp_dir:
path = os.path.join(tmp_dir, "entity_df.parquet")
entity_df.to_parquet(path, index=False)
mlflow.log_artifact(path)

elif entity_df is None and (start_date or end_date):
client.set_tag(run_id, "feast.entity_df_type", "range")
if start_date:
client.log_param(run_id, "feast.start_date", str(start_date))
if end_date:
client.log_param(run_id, "feast.end_date", str(end_date))

except Exception as e:
_logger.debug("Failed to log entity_df info to MLflow: %s", e)

def _init_openlineage_emitter(self) -> Optional[Any]:
"""Initialize OpenLineage emitter if configured and enabled."""
try:
Expand Down Expand Up @@ -1483,6 +1627,8 @@ def get_historical_features(
if end_date is not None:
kwargs["end_date"] = end_date

_retrieval_start = time.monotonic()

job = provider.get_historical_features(
self.config,
feature_views,
Expand All @@ -1494,6 +1640,43 @@ def get_historical_features(
**kwargs,
)

# Auto-log to MLflow if configured
if (
self.config.mlflow is not None
and self.config.mlflow.enabled
and self.config.mlflow.auto_log
):
_log_fn = _get_mlflow_log_fn()
if _log_fn is not None:
_duration = time.monotonic() - _retrieval_start
if isinstance(entity_df, pd.DataFrame):
_entity_count = len(entity_df)
elif isinstance(entity_df, str):
_entity_count = -1
else:
_entity_count = 0
_fs = features if isinstance(features, FeatureService) else None
_fs_name = (
features.name
if isinstance(features, FeatureService)
else self._resolve_feature_service_name(_feature_refs)
)
_log_fn(
feature_refs=_feature_refs,
entity_count=_entity_count,
duration_seconds=_duration,
retrieval_type="historical",
feature_service=_fs,
feature_service_name=_fs_name,
project=self.project,
tracking_uri=self.config.mlflow.tracking_uri,
)

if self.config.mlflow.auto_log_entity_df:
self._auto_log_entity_df_info(
entity_df, start_date=start_date, end_date=end_date
)

return job

def create_saved_dataset(
Expand Down Expand Up @@ -2621,6 +2804,8 @@ def get_online_features(
"""
provider = self._get_provider()

_retrieval_start = time.monotonic()

response = provider.get_online_features(
config=self.config,
features=features,
Expand All @@ -2631,6 +2816,43 @@ def get_online_features(
include_feature_view_version_metadata=include_feature_view_version_metadata,
)

# Auto-log to MLflow if configured
if (
self.config.mlflow is not None
and self.config.mlflow.enabled
and self.config.mlflow.auto_log
):
_log_fn = _get_mlflow_log_fn()
if _log_fn is not None:
_duration = time.monotonic() - _retrieval_start
_feature_refs = utils._get_features(
self.registry, self.project, features, allow_cache=True
)
if isinstance(entity_rows, list):
_entity_count = len(entity_rows)
elif isinstance(entity_rows, Mapping):
try:
_entity_count = len(next(iter(entity_rows.values())))
except Exception:
_entity_count = 0
else:
_entity_count = 0
_fs = features if isinstance(features, FeatureService) else None
_fs_name = (
features.name
if isinstance(features, FeatureService)
else self._resolve_feature_service_name(_feature_refs)
)
_log_fn(
feature_refs=_feature_refs,
entity_count=_entity_count,
duration_seconds=_duration,
retrieval_type="online",
feature_service=_fs,
feature_service_name=_fs_name,
project=self.project,
tracking_uri=self.config.mlflow.tracking_uri,
)
return response

async def get_online_features_async(
Expand Down Expand Up @@ -2782,7 +3004,8 @@ def _doc_feature(x):
data=requested_features_data,
)
feature_types = {
f.name: f.dtype.to_value_type() for f in requested_feature_view.features
f.name: f.dtype.to_value_type()
for f in requested_feature_view.features
}
return OnlineResponse(online_features_response, feature_types=feature_types)

Expand Down Expand Up @@ -3074,8 +3297,12 @@ def _retrieve_from_online_store_v2(
online_features_response.metadata.feature_names.val.extend(
features_to_request
)
feature_types = {f.name: f.dtype.to_value_type() for f in table.features}
return OnlineResponse(online_features_response, feature_types=feature_types)
feature_types = {
f.name: f.dtype.to_value_type() for f in table.features
}
return OnlineResponse(
online_features_response, feature_types=feature_types
)

table_entity_values, idxs, output_len = utils._get_unique_entities_from_values(
entity_key_dict,
Expand All @@ -3098,7 +3325,9 @@ def _retrieve_from_online_store_v2(
data=entity_key_dict,
)

feature_types = {f.name: f.dtype.to_value_type() for f in table.features}
feature_types = {
f.name: f.dtype.to_value_type() for f in table.features
}
return OnlineResponse(online_features_response, feature_types=feature_types)

def serve(
Expand Down
48 changes: 48 additions & 0 deletions sdk/python/feast/mlflow_integration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
MLflow integration for Feast Feature Store.

This module provides seamless integration between Feast and MLflow. When enabled
in feature_store.yaml, feature metadata is logged to MLflow
during get_historical_features and get_online_features calls.

Usage:
Configure MLflow in your feature_store.yaml:

project: my_project
# ... other config ...

mlflow:
enabled: true
tracking_uri: http://localhost:5000
auto_log: true

For advanced use cases, the module also provides:
- resolve_feature_service_from_model_uri: Map an MLflow model to its Feast
feature service.
- get_entity_df_from_mlflow_run: Reproduce training by pulling entity data
from a previous MLflow run's artifacts.
"""

from feast.mlflow_integration.config import MlflowConfig
from feast.mlflow_integration.entity_df_builder import (
FeastMlflowEntityDfError,
get_entity_df_from_mlflow_run,
)
from feast.mlflow_integration.logger import (
log_feature_retrieval_to_mlflow,
log_training_dataset_to_mlflow,
)
from feast.mlflow_integration.model_resolver import (
FeastMlflowModelResolutionError,
resolve_feature_service_from_model_uri,
)

__all__ = [
"MlflowConfig",
"log_feature_retrieval_to_mlflow",
"log_training_dataset_to_mlflow",
"resolve_feature_service_from_model_uri",
"FeastMlflowModelResolutionError",
"get_entity_df_from_mlflow_run",
"FeastMlflowEntityDfError",
]
24 changes: 24 additions & 0 deletions sdk/python/feast/mlflow_integration/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Optional

from pydantic import StrictBool, StrictStr

from feast.repo_config import FeastBaseModel


class MlflowConfig(FeastBaseModel):
enabled: StrictBool = False
""" bool: Whether MLflow integration is enabled. Defaults to False. """

tracking_uri: Optional[StrictStr] = None
""" str: MLflow tracking URI. If not set, defaults to
http://127.0.0.1:5000 (local MLflow tracking server).
Set explicitly for remote/shared MLflow deployments. """

auto_log: StrictBool = True
""" bool: Automatically log feature retrieval metadata to the active
MLflow run when get_historical_features or get_online_features is
called. Defaults to True. """

auto_log_entity_df: StrictBool = False
""" bool: When True, the input entity_df (or SQL query) is recorded in
the MLflow run. Defaults to False. """
Loading
Loading