Skip to content

Commit a63d529

Browse files
committed
refine MLflow integration, UI runs endpoint, and lockfiles
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
1 parent 8076b18 commit a63d529

23 files changed

+2234
-2558
lines changed

sdk/python/feast/feature_store.py

Lines changed: 127 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import asyncio
1515
import copy
1616
import itertools
17+
import logging
1718
import os
1819
import time
1920
import warnings
@@ -109,18 +110,25 @@
109110
_mlflow_log_fn_loaded = False
110111

111112

113+
_logger = logging.getLogger(__name__)
114+
115+
112116
def _get_mlflow_log_fn():
113117
"""Lazy-import mlflow logger only when MLflow integration is configured."""
114118
global _mlflow_log_fn, _mlflow_log_fn_loaded
115119
if not _mlflow_log_fn_loaded:
116-
_mlflow_log_fn_loaded = True
117120
try:
118121
from feast.mlflow_integration.logger import (
119122
log_feature_retrieval_to_mlflow,
120123
)
121124

122125
_mlflow_log_fn = log_feature_retrieval_to_mlflow
123-
except Exception:
126+
_mlflow_log_fn_loaded = True
127+
except ImportError:
128+
_mlflow_log_fn_loaded = True
129+
_mlflow_log_fn = None
130+
except Exception as e:
131+
_logger.warning("MLflow auto-log import failed (will retry): %s", e)
124132
_mlflow_log_fn = None
125133
return _mlflow_log_fn
126134

@@ -213,19 +221,20 @@ def __init__(
213221
# Initialize feature service cache for performance optimization
214222
self._feature_service_cache = {}
215223

224+
# Cache for _resolve_feature_service_name lookups
225+
self._fs_name_cache: Dict[frozenset, Optional[str]] = {}
226+
216227
# Configure MLflow tracking URI globally from config
217228
self._init_mlflow_tracking()
218229

219230
def _init_mlflow_tracking(self):
220231
"""Configure MLflow globally from feature_store.yaml.
221232
222-
Sets the tracking URI and experiment name so the user never needs
223-
to call mlflow.set_tracking_uri() or mlflow.set_experiment() in
224-
their scripts. The experiment is named after the Feast project.
233+
Sets the tracking URI and experiment name.
234+
The experiment is named after the Feast project.
225235
226236
When no tracking_uri is specified, defaults to http://127.0.0.1:5000
227-
(a local MLflow tracking server). This ensures that train.py,
228-
predict.py, feast ui, and the MLflow UI all share the same backend.
237+
(a local MLflow tracking server).
229238
"""
230239
try:
231240
mlflow_cfg = self.config.mlflow
@@ -242,24 +251,92 @@ def _init_mlflow_tracking(self):
242251
except Exception as e:
243252
warnings.warn(f"Failed to configure MLflow tracking: {e}")
244253

245-
def _resolve_feature_service_name(
246-
self, feature_refs: List[str]
247-
) -> Optional[str]:
248-
"""Try to find a feature service that covers the given feature refs."""
254+
def _resolve_feature_service_name(self, feature_refs: List[str]) -> Optional[str]:
255+
"""Find the best-matching feature service for the given feature refs.
256+
257+
Resolution: exact match wins immediately; otherwise the smallest
258+
superset (fewest extra features) is returned. Results are cached
259+
per FeatureStore instance for O(1) repeated lookups.
260+
"""
249261
try:
250-
ref_set = set(feature_refs)
262+
ref_key = frozenset(feature_refs)
263+
if ref_key in self._fs_name_cache:
264+
return self._fs_name_cache[ref_key]
265+
266+
best_match = None
267+
best_extra = float("inf")
268+
251269
for fs in self.registry.list_feature_services(
252270
self.project, allow_cache=True
253271
):
254-
fs_refs = set()
255-
for proj in fs.feature_view_projections:
256-
for feat in proj.features:
257-
fs_refs.add(f"{proj.name}:{feat.name}")
258-
if ref_set == fs_refs or ref_set.issubset(fs_refs):
272+
fs_refs = frozenset(
273+
f"{p.name}:{f.name}"
274+
for p in fs.feature_view_projections
275+
for f in p.features
276+
)
277+
if ref_key == fs_refs:
278+
self._fs_name_cache[ref_key] = fs.name
259279
return fs.name
260-
except Exception:
261-
pass
262-
return None
280+
if ref_key.issubset(fs_refs):
281+
extra = len(fs_refs) - len(ref_key)
282+
if extra < best_extra:
283+
best_match = fs.name
284+
best_extra = extra
285+
286+
self._fs_name_cache[ref_key] = best_match
287+
return best_match
288+
except Exception as e:
289+
_logger.debug("Failed to resolve feature service name: %s", e)
290+
return None
291+
292+
def _auto_log_entity_df_info(self, entity_df, start_date=None, end_date=None):
293+
"""Log entity_df info to MLflow for reproducibility.
294+
295+
Handles three entity_df types:
296+
- pd.DataFrame: saves metadata + full parquet artifact (if under 100k rows)
297+
- str (SQL query): logs the query as a param
298+
- None (range-based): logs start_date/end_date
299+
"""
300+
try:
301+
import mlflow
302+
303+
if mlflow.active_run() is None:
304+
return
305+
tracking_uri = self.config.mlflow.tracking_uri or "http://127.0.0.1:5000"
306+
client = mlflow.MlflowClient(tracking_uri=tracking_uri)
307+
run_id = mlflow.active_run().info.run_id
308+
309+
if isinstance(entity_df, str):
310+
query = entity_df if len(entity_df) <= 490 else entity_df[:487] + "..."
311+
client.log_param(run_id, "feast.entity_df_query", query)
312+
client.set_tag(run_id, "feast.entity_df_type", "sql")
313+
314+
elif isinstance(entity_df, pd.DataFrame):
315+
client.set_tag(run_id, "feast.entity_df_type", "dataframe")
316+
client.log_param(run_id, "feast.entity_df_rows", str(len(entity_df)))
317+
cols = ",".join(entity_df.columns)
318+
if len(cols) > 490:
319+
cols = cols[:487] + "..."
320+
client.log_param(run_id, "feast.entity_df_columns", cols)
321+
322+
max_rows = 100_000
323+
if len(entity_df) <= max_rows:
324+
import tempfile
325+
326+
with tempfile.TemporaryDirectory() as tmp_dir:
327+
path = os.path.join(tmp_dir, "entity_df.parquet")
328+
entity_df.to_parquet(path, index=False)
329+
mlflow.log_artifact(path)
330+
331+
elif entity_df is None and (start_date or end_date):
332+
client.set_tag(run_id, "feast.entity_df_type", "range")
333+
if start_date:
334+
client.log_param(run_id, "feast.start_date", str(start_date))
335+
if end_date:
336+
client.log_param(run_id, "feast.end_date", str(end_date))
337+
338+
except Exception as e:
339+
_logger.debug("Failed to log entity_df info to MLflow: %s", e)
263340

264341
def _init_openlineage_emitter(self) -> Optional[Any]:
265342
"""Initialize OpenLineage emitter if configured and enabled."""
@@ -1572,11 +1649,18 @@ def get_historical_features(
15721649
_log_fn = _get_mlflow_log_fn()
15731650
if _log_fn is not None:
15741651
_duration = time.monotonic() - _retrieval_start
1575-
_entity_count = (
1576-
len(entity_df) if isinstance(entity_df, pd.DataFrame) else 0
1577-
)
1652+
if isinstance(entity_df, pd.DataFrame):
1653+
_entity_count = len(entity_df)
1654+
elif isinstance(entity_df, str):
1655+
_entity_count = -1
1656+
else:
1657+
_entity_count = 0
15781658
_fs = features if isinstance(features, FeatureService) else None
1579-
_fs_name = features.name if isinstance(features, FeatureService) else self._resolve_feature_service_name(_feature_refs)
1659+
_fs_name = (
1660+
features.name
1661+
if isinstance(features, FeatureService)
1662+
else self._resolve_feature_service_name(_feature_refs)
1663+
)
15801664
_log_fn(
15811665
feature_refs=_feature_refs,
15821666
entity_count=_entity_count,
@@ -1588,6 +1672,11 @@ def get_historical_features(
15881672
tracking_uri=self.config.mlflow.tracking_uri,
15891673
)
15901674

1675+
if self.config.mlflow.auto_log_entity_df:
1676+
self._auto_log_entity_df_info(
1677+
entity_df, start_date=start_date, end_date=end_date
1678+
)
1679+
15911680
return job
15921681

15931682
def create_saved_dataset(
@@ -2739,13 +2828,21 @@ def get_online_features(
27392828
_feature_refs = utils._get_features(
27402829
self.registry, self.project, features, allow_cache=True
27412830
)
2742-
_entity_count = (
2743-
len(entity_rows)
2744-
if isinstance(entity_rows, list)
2745-
else 0
2746-
)
2831+
if isinstance(entity_rows, list):
2832+
_entity_count = len(entity_rows)
2833+
elif isinstance(entity_rows, Mapping):
2834+
try:
2835+
_entity_count = len(next(iter(entity_rows.values())))
2836+
except Exception:
2837+
_entity_count = 0
2838+
else:
2839+
_entity_count = 0
27472840
_fs = features if isinstance(features, FeatureService) else None
2748-
_fs_name = features.name if isinstance(features, FeatureService) else self._resolve_feature_service_name(_feature_refs)
2841+
_fs_name = (
2842+
features.name
2843+
if isinstance(features, FeatureService)
2844+
else self._resolve_feature_service_name(_feature_refs)
2845+
)
27492846
_log_fn(
27502847
feature_refs=_feature_refs,
27512848
entity_count=_entity_count,
@@ -2756,7 +2853,6 @@ def get_online_features(
27562853
project=self.project,
27572854
tracking_uri=self.config.mlflow.tracking_uri,
27582855
)
2759-
27602856
return response
27612857

27622858
async def get_online_features_async(

sdk/python/feast/mlflow_integration/__init__.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
"""
22
MLflow integration for Feast Feature Store.
33
4-
This module provides seamless integration between Feast and MLflow for
5-
automatic experiment tracking of feature retrieval operations. When enabled
6-
in feature_store.yaml, feature metadata is logged automatically to MLflow
4+
This module provides seamless integration between Feast and MLflow. When enabled
5+
in feature_store.yaml, feature metadata is logged to MLflow
76
during get_historical_features and get_online_features calls.
87
98
Usage:
@@ -17,9 +16,6 @@
1716
tracking_uri: http://localhost:5000
1817
auto_log: true
1918
20-
Then use Feast normally - feature retrieval metadata is logged automatically
21-
to any active MLflow run.
22-
2319
For advanced use cases, the module also provides:
2420
- resolve_feature_service_from_model_uri: Map an MLflow model to its Feast
2521
feature service.

sdk/python/feast/mlflow_integration/config.py

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,19 @@
66

77

88
class MlflowConfig(FeastBaseModel):
9-
"""Configuration for MLflow integration.
10-
11-
This enables automatic logging of feature retrieval metadata to MLflow
12-
during get_historical_features and get_online_features calls.
13-
14-
Example configuration in feature_store.yaml:
15-
mlflow:
16-
enabled: true
17-
tracking_uri: http://localhost:5000
18-
auto_log: true
19-
"""
20-
219
enabled: StrictBool = False
2210
""" bool: Whether MLflow integration is enabled. Defaults to False. """
2311

2412
tracking_uri: Optional[StrictStr] = None
25-
""" str: MLflow tracking URI. If not set, uses MLflow's default
26-
(MLFLOW_TRACKING_URI env var or local ./mlruns). """
13+
""" str: MLflow tracking URI. If not set, defaults to
14+
http://127.0.0.1:5000 (local MLflow tracking server).
15+
Set explicitly for remote/shared MLflow deployments. """
2716

2817
auto_log: StrictBool = True
2918
""" bool: Automatically log feature retrieval metadata to the active
3019
MLflow run when get_historical_features or get_online_features is
3120
called. Defaults to True. """
3221

33-
auto_log_dataset: StrictBool = False
34-
""" bool: When True, the training DataFrame produced by
35-
get_historical_features().to_df() is logged as an MLflow dataset
36-
input on the active run. Defaults to False because the DataFrame
37-
can be large. """
22+
auto_log_entity_df: StrictBool = False
23+
""" bool: When True, the input entity_df (or SQL query) is recorded in
24+
the MLflow run. Defaults to False. """

sdk/python/feast/mlflow_integration/entity_df_builder.py

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@ def get_entity_df_from_mlflow_run(
1818
run_id: str,
1919
tracking_uri: Optional[str] = None,
2020
timestamp_column: str = "event_timestamp",
21+
max_rows: Optional[int] = None,
2122
) -> pd.DataFrame:
22-
"""Build an entity DataFrame from an MLflow run's artifacts or params.
23+
"""Build an entity DataFrame from an MLflow run's artifacts.
2324
2425
Convention: the run should have an artifact named ``entity_df.parquet``
25-
(or ``entity_df.csv``). Alternatively, a run param
26-
``feast.entity_df_path`` pointing to a local/remote file path.
26+
(or ``entity_df.csv``), saved automatically when
27+
``auto_log_entity_df: true`` is set in ``feature_store.yaml``.
2728
2829
Args:
2930
run_id: The MLflow run ID.
3031
tracking_uri: Optional MLflow tracking URI.
3132
timestamp_column: Expected name of the timestamp column in the
3233
entity DataFrame.
34+
max_rows: Optional limit on number of rows to load. When set,
35+
only the first ``max_rows`` rows are returned (useful for
36+
large artifacts to avoid OOM).
3337
3438
Returns:
3539
A ``pd.DataFrame`` suitable for passing to
@@ -47,50 +51,33 @@ def get_entity_df_from_mlflow_run(
4751
"mlflow is not installed. Install with: pip install feast[mlflow]"
4852
)
4953

50-
if tracking_uri:
51-
mlflow.set_tracking_uri(tracking_uri)
52-
53-
client = mlflow.MlflowClient()
54+
client = mlflow.MlflowClient(tracking_uri=tracking_uri)
5455

5556
try:
56-
run = client.get_run(run_id)
57+
client.get_run(run_id)
5758
except MlflowException as e:
5859
raise FeastMlflowEntityDfError(f"Run '{run_id}' not found: {e}")
5960

6061
# Strategy 1: artifact entity_df.parquet
6162
df = _try_artifact(client, run_id, "entity_df.parquet", "parquet")
6263
if df is not None:
64+
if max_rows is not None:
65+
df = df.head(max_rows)
6366
_validate_timestamp_col(df, timestamp_column)
6467
return df
6568

6669
# Strategy 2: artifact entity_df.csv
6770
df = _try_artifact(client, run_id, "entity_df.csv", "csv")
6871
if df is not None:
72+
if max_rows is not None:
73+
df = df.head(max_rows)
6974
_validate_timestamp_col(df, timestamp_column)
7075
return df
7176

72-
# Strategy 3: run param feast.entity_df_path
73-
params = run.data.params
74-
path = params.get("feast.entity_df_path")
75-
if path:
76-
try:
77-
if path.endswith(".parquet"):
78-
df = pd.read_parquet(path)
79-
else:
80-
df = pd.read_csv(path)
81-
_validate_timestamp_col(df, timestamp_column)
82-
return df
83-
except FeastMlflowEntityDfError:
84-
raise
85-
except Exception as e:
86-
raise FeastMlflowEntityDfError(
87-
f"Could not load entity df from param path '{path}': {e}"
88-
)
89-
9077
raise FeastMlflowEntityDfError(
9178
f"No entity data found for run '{run_id}'. "
92-
f"Expected artifact 'entity_df.parquet' or 'entity_df.csv', "
93-
f"or param 'feast.entity_df_path'."
79+
f"Expected artifact 'entity_df.parquet' or 'entity_df.csv'. "
80+
f"Ensure auto_log_entity_df is enabled in feature_store.yaml."
9481
)
9582

9683

@@ -101,7 +88,10 @@ def _try_artifact(client, run_id: str, artifact_name: str, fmt: str):
10188
if fmt == "parquet":
10289
return pd.read_parquet(local_path)
10390
return pd.read_csv(local_path)
104-
except Exception:
91+
except Exception as e:
92+
_logger.debug(
93+
"Artifact '%s' not found for run '%s': %s", artifact_name, run_id, e
94+
)
10595
return None
10696

10797

0 commit comments

Comments
 (0)