1414import asyncio
1515import copy
1616import itertools
17+ import logging
1718import os
1819import time
1920import warnings
109110_mlflow_log_fn_loaded = False
110111
111112
113+ _logger = logging .getLogger (__name__ )
114+
115+
112116def _get_mlflow_log_fn ():
113117 """Lazy-import mlflow logger only when MLflow integration is configured."""
114118 global _mlflow_log_fn , _mlflow_log_fn_loaded
115119 if not _mlflow_log_fn_loaded :
116- _mlflow_log_fn_loaded = True
117120 try :
118121 from feast .mlflow_integration .logger import (
119122 log_feature_retrieval_to_mlflow ,
120123 )
121124
122125 _mlflow_log_fn = log_feature_retrieval_to_mlflow
123- except Exception :
126+ _mlflow_log_fn_loaded = True
127+ except ImportError :
128+ _mlflow_log_fn_loaded = True
129+ _mlflow_log_fn = None
130+ except Exception as e :
131+ _logger .warning ("MLflow auto-log import failed (will retry): %s" , e )
124132 _mlflow_log_fn = None
125133 return _mlflow_log_fn
126134
@@ -213,19 +221,20 @@ def __init__(
213221 # Initialize feature service cache for performance optimization
214222 self ._feature_service_cache = {}
215223
224+ # Cache for _resolve_feature_service_name lookups
225+ self ._fs_name_cache : Dict [frozenset , Optional [str ]] = {}
226+
216227 # Configure MLflow tracking URI globally from config
217228 self ._init_mlflow_tracking ()
218229
219230 def _init_mlflow_tracking (self ):
220231 """Configure MLflow globally from feature_store.yaml.
221232
222- Sets the tracking URI and experiment name so the user never needs
223- to call mlflow.set_tracking_uri() or mlflow.set_experiment() in
224- their scripts. The experiment is named after the Feast project.
233+ Sets the tracking URI and experiment name.
234+ The experiment is named after the Feast project.
225235
226236 When no tracking_uri is specified, defaults to http://127.0.0.1:5000
227- (a local MLflow tracking server). This ensures that train.py,
228- predict.py, feast ui, and the MLflow UI all share the same backend.
237+ (a local MLflow tracking server).
229238 """
230239 try :
231240 mlflow_cfg = self .config .mlflow
@@ -242,24 +251,92 @@ def _init_mlflow_tracking(self):
242251 except Exception as e :
243252 warnings .warn (f"Failed to configure MLflow tracking: { e } " )
244253
245- def _resolve_feature_service_name (
246- self , feature_refs : List [str ]
247- ) -> Optional [str ]:
248- """Try to find a feature service that covers the given feature refs."""
254+ def _resolve_feature_service_name (self , feature_refs : List [str ]) -> Optional [str ]:
255+ """Find the best-matching feature service for the given feature refs.
256+
257+ Resolution: exact match wins immediately; otherwise the smallest
258+ superset (fewest extra features) is returned. Results are cached
259+ per FeatureStore instance for O(1) repeated lookups.
260+ """
249261 try :
250- ref_set = set (feature_refs )
262+ ref_key = frozenset (feature_refs )
263+ if ref_key in self ._fs_name_cache :
264+ return self ._fs_name_cache [ref_key ]
265+
266+ best_match = None
267+ best_extra = float ("inf" )
268+
251269 for fs in self .registry .list_feature_services (
252270 self .project , allow_cache = True
253271 ):
254- fs_refs = set ()
255- for proj in fs .feature_view_projections :
256- for feat in proj .features :
257- fs_refs .add (f"{ proj .name } :{ feat .name } " )
258- if ref_set == fs_refs or ref_set .issubset (fs_refs ):
272+ fs_refs = frozenset (
273+ f"{ p .name } :{ f .name } "
274+ for p in fs .feature_view_projections
275+ for f in p .features
276+ )
277+ if ref_key == fs_refs :
278+ self ._fs_name_cache [ref_key ] = fs .name
259279 return fs .name
260- except Exception :
261- pass
262- return None
280+ if ref_key .issubset (fs_refs ):
281+ extra = len (fs_refs ) - len (ref_key )
282+ if extra < best_extra :
283+ best_match = fs .name
284+ best_extra = extra
285+
286+ self ._fs_name_cache [ref_key ] = best_match
287+ return best_match
288+ except Exception as e :
289+ _logger .debug ("Failed to resolve feature service name: %s" , e )
290+ return None
291+
292+ def _auto_log_entity_df_info (self , entity_df , start_date = None , end_date = None ):
293+ """Log entity_df info to MLflow for reproducibility.
294+
295+ Handles three entity_df types:
296+ - pd.DataFrame: saves metadata + full parquet artifact (if under 100k rows)
297+ - str (SQL query): logs the query as a param
298+ - None (range-based): logs start_date/end_date
299+ """
300+ try :
301+ import mlflow
302+
303+ if mlflow .active_run () is None :
304+ return
305+ tracking_uri = self .config .mlflow .tracking_uri or "http://127.0.0.1:5000"
306+ client = mlflow .MlflowClient (tracking_uri = tracking_uri )
307+ run_id = mlflow .active_run ().info .run_id
308+
309+ if isinstance (entity_df , str ):
310+ query = entity_df if len (entity_df ) <= 490 else entity_df [:487 ] + "..."
311+ client .log_param (run_id , "feast.entity_df_query" , query )
312+ client .set_tag (run_id , "feast.entity_df_type" , "sql" )
313+
314+ elif isinstance (entity_df , pd .DataFrame ):
315+ client .set_tag (run_id , "feast.entity_df_type" , "dataframe" )
316+ client .log_param (run_id , "feast.entity_df_rows" , str (len (entity_df )))
317+ cols = "," .join (entity_df .columns )
318+ if len (cols ) > 490 :
319+ cols = cols [:487 ] + "..."
320+ client .log_param (run_id , "feast.entity_df_columns" , cols )
321+
322+ max_rows = 100_000
323+ if len (entity_df ) <= max_rows :
324+ import tempfile
325+
326+ with tempfile .TemporaryDirectory () as tmp_dir :
327+ path = os .path .join (tmp_dir , "entity_df.parquet" )
328+ entity_df .to_parquet (path , index = False )
329+ mlflow .log_artifact (path )
330+
331+ elif entity_df is None and (start_date or end_date ):
332+ client .set_tag (run_id , "feast.entity_df_type" , "range" )
333+ if start_date :
334+ client .log_param (run_id , "feast.start_date" , str (start_date ))
335+ if end_date :
336+ client .log_param (run_id , "feast.end_date" , str (end_date ))
337+
338+ except Exception as e :
339+ _logger .debug ("Failed to log entity_df info to MLflow: %s" , e )
263340
264341 def _init_openlineage_emitter (self ) -> Optional [Any ]:
265342 """Initialize OpenLineage emitter if configured and enabled."""
@@ -1572,11 +1649,18 @@ def get_historical_features(
15721649 _log_fn = _get_mlflow_log_fn ()
15731650 if _log_fn is not None :
15741651 _duration = time .monotonic () - _retrieval_start
1575- _entity_count = (
1576- len (entity_df ) if isinstance (entity_df , pd .DataFrame ) else 0
1577- )
1652+ if isinstance (entity_df , pd .DataFrame ):
1653+ _entity_count = len (entity_df )
1654+ elif isinstance (entity_df , str ):
1655+ _entity_count = - 1
1656+ else :
1657+ _entity_count = 0
15781658 _fs = features if isinstance (features , FeatureService ) else None
1579- _fs_name = features .name if isinstance (features , FeatureService ) else self ._resolve_feature_service_name (_feature_refs )
1659+ _fs_name = (
1660+ features .name
1661+ if isinstance (features , FeatureService )
1662+ else self ._resolve_feature_service_name (_feature_refs )
1663+ )
15801664 _log_fn (
15811665 feature_refs = _feature_refs ,
15821666 entity_count = _entity_count ,
@@ -1588,6 +1672,11 @@ def get_historical_features(
15881672 tracking_uri = self .config .mlflow .tracking_uri ,
15891673 )
15901674
1675+ if self .config .mlflow .auto_log_entity_df :
1676+ self ._auto_log_entity_df_info (
1677+ entity_df , start_date = start_date , end_date = end_date
1678+ )
1679+
15911680 return job
15921681
15931682 def create_saved_dataset (
@@ -2739,13 +2828,21 @@ def get_online_features(
27392828 _feature_refs = utils ._get_features (
27402829 self .registry , self .project , features , allow_cache = True
27412830 )
2742- _entity_count = (
2743- len (entity_rows )
2744- if isinstance (entity_rows , list )
2745- else 0
2746- )
2831+ if isinstance (entity_rows , list ):
2832+ _entity_count = len (entity_rows )
2833+ elif isinstance (entity_rows , Mapping ):
2834+ try :
2835+ _entity_count = len (next (iter (entity_rows .values ())))
2836+ except Exception :
2837+ _entity_count = 0
2838+ else :
2839+ _entity_count = 0
27472840 _fs = features if isinstance (features , FeatureService ) else None
2748- _fs_name = features .name if isinstance (features , FeatureService ) else self ._resolve_feature_service_name (_feature_refs )
2841+ _fs_name = (
2842+ features .name
2843+ if isinstance (features , FeatureService )
2844+ else self ._resolve_feature_service_name (_feature_refs )
2845+ )
27492846 _log_fn (
27502847 feature_refs = _feature_refs ,
27512848 entity_count = _entity_count ,
@@ -2756,7 +2853,6 @@ def get_online_features(
27562853 project = self .project ,
27572854 tracking_uri = self .config .mlflow .tracking_uri ,
27582855 )
2759-
27602856 return response
27612857
27622858 async def get_online_features_async (
0 commit comments