From 7b368eb3075f5116892e7e5a4f9e01a72ed2dede Mon Sep 17 00:00:00 2001 From: Vedant Agarwal Date: Mon, 15 Jun 2026 13:55:29 -0700 Subject: [PATCH] fix: do not pass undeclared feature view columns to ODFV UDFs An OnDemandFeatureView's UDF was receiving every feature column in the online response, including features from feature views that the ODFV did not list in its sources. That allowed a UDF to silently depend on an undeclared source. This filters the UDF input down to the ODFV's declared sources before the transform runs, so columns from undeclared feature views are hidden. Join keys, request data and the declared features are kept. substrait already restricts its inputs through its query plan, so it is left as is. Fixes #6158. Signed-off-by: Vedant Agarwal --- sdk/python/feast/utils.py | 45 +++- .../test_on_demand_pandas_transformation.py | 209 ++++++++++++++++++ 2 files changed, 252 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 4c10c1903e5..b3c1a55a83a 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -811,17 +811,58 @@ def _is_metrics_active(): # Apply transformation. Note: aggregations and transformation configs are mutually exclusive # TODO: Fix to make it work for having both aggregation and transformation # ticket: https://github.com/feast-dev/feast/issues/5689 + # A UDF should only get features from the sources it declared. Drop + # features that belong to other feature views so it can't read them by + # accident (#6158). Join keys, request data and the declared features + # stay. substrait filters on its own, so leave it alone. + declared_source_names = { + projection.name + for projection in odfv.source_feature_view_projections.values() + } + undeclared_columns: set[str] = set() + # features the caller asked for that aren't one of our sources + for feature_ref in feature_refs: + ref_view_name, _, ref_feature_name = _parse_feature_ref(feature_ref) + if ref_view_name in requested_odfv_feature_names: + continue + if ref_view_name not in declared_source_names: + undeclared_columns.add(ref_feature_name) + undeclared_columns.add(f"{ref_view_name}__{ref_feature_name}") + # features that are only in the response because another requested + # ODFV needs them as input + for other_odfv in requested_on_demand_feature_views: + for projection in other_odfv.source_feature_view_projections.values(): + if projection.name in declared_source_names: + continue + for feature in projection.features: + undeclared_columns.add(feature.name) + undeclared_columns.add(f"{projection.name}__{feature.name}") + if odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() + odfv_input_dict = { + k: v + for k, v in initial_response_dict.items() + if k not in undeclared_columns + } transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict( - initial_response_dict + odfv_input_dict ) elif odfv.mode in {"pandas", "substrait"}: if initial_response_arrow is None: initial_response_arrow = initial_response.to_arrow() + odfv_input_arrow = initial_response_arrow + if odfv.mode == "pandas": + odfv_input_arrow = initial_response_arrow.select( + [ + c + for c in initial_response_arrow.column_names + if c not in undeclared_columns + ] + ) transformed_features_arrow = odfv.transform_arrow( - initial_response_arrow, full_feature_names + odfv_input_arrow, full_feature_names ) else: raise Exception( diff --git a/sdk/python/tests/unit/test_on_demand_pandas_transformation.py b/sdk/python/tests/unit/test_on_demand_pandas_transformation.py index c5ab657becc..19886c1faf4 100644 --- a/sdk/python/tests/unit/test_on_demand_pandas_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_pandas_transformation.py @@ -341,3 +341,212 @@ def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame: ), ): store.apply([request_source, pandas_view]) + + +def test_odfv_udf_does_not_receive_undeclared_source_columns(): + """#6158: a pandas ODFV's UDF should only see its own source columns, not + features from another feature view that was requested too.""" + with tempfile.TemporaryDirectory() as data_dir: + store = FeatureStore( + config=RepoConfig( + project="test_odfv_source_isolation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=3, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + driver_df = create_driver_hourly_stats_df([1001], start_date, end_date) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) + + driver = Entity(name="driver", join_keys=["driver_id"]) + source = FileSource( + name="driver_hourly_stats_source", + path=driver_stats_path, + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + + # Two feature views from the same source. The ODFV declares only fv_declared. + fv_declared = FeatureView( + name="fv_declared", + entities=[driver], + ttl=timedelta(days=0), + schema=[Field(name="conv_rate", dtype=Float32)], + online=True, + source=source, + ) + fv_undeclared = FeatureView( + name="fv_undeclared", + entities=[driver], + ttl=timedelta(days=0), + schema=[Field(name="avg_daily_trips", dtype=Int64)], + online=True, + source=source, + ) + + @on_demand_feature_view( + sources=[fv_declared], + schema=[ + Field(name="saw_undeclared", dtype=Bool), + Field(name="saw_declared", dtype=Bool), + Field(name="saw_join_key", dtype=Bool), + ], + mode="pandas", + ) + def guard_view(inputs: pd.DataFrame) -> pd.DataFrame: + out = pd.DataFrame() + n = range(len(inputs)) + out["saw_undeclared"] = ["avg_daily_trips" in inputs.columns for _ in n] + out["saw_declared"] = ["conv_rate" in inputs.columns for _ in n] + out["saw_join_key"] = ["driver_id" in inputs.columns for _ in n] + return out + + store.apply([driver, source, fv_declared, fv_undeclared, guard_view]) + store.write_to_online_store(feature_view_name="fv_declared", df=driver_df) + store.write_to_online_store(feature_view_name="fv_undeclared", df=driver_df) + + response = store.get_online_features( + entity_rows=[{"driver_id": 1001}], + features=[ + "fv_declared:conv_rate", + "fv_undeclared:avg_daily_trips", + "guard_view:saw_undeclared", + "guard_view:saw_declared", + "guard_view:saw_join_key", + ], + ).to_dict() + + # the other FV's column should be hidden; ours and the join key should not + assert response["saw_undeclared"] == [False] + assert response["saw_declared"] == [True] + assert response["saw_join_key"] == [True] + + +def _two_fv_store(data_dir): + store = FeatureStore( + config=RepoConfig( + project="test_odfv_source_isolation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=3, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + driver_df = create_driver_hourly_stats_df([1001], start_date, end_date) + path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet(path=path, allow_truncated_timestamps=True) + driver = Entity(name="driver", join_keys=["driver_id"]) + src = FileSource( + name="driver_hourly_stats_source", + path=path, + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + fv1 = FeatureView( + name="fv1", + entities=[driver], + ttl=timedelta(days=0), + schema=[Field(name="conv_rate", dtype=Float32)], + online=True, + source=src, + ) + fv2 = FeatureView( + name="fv2", + entities=[driver], + ttl=timedelta(days=0), + schema=[Field(name="avg_daily_trips", dtype=Int64)], + online=True, + source=src, + ) + return store, driver, src, fv1, fv2, driver_df + + +def test_odfv_python_mode_does_not_receive_undeclared_source_columns(): + """Same as above but for python mode, where the UDF gets a dict (#6158).""" + with tempfile.TemporaryDirectory() as data_dir: + store, driver, src, fv1, fv2, driver_df = _two_fv_store(data_dir) + + @on_demand_feature_view( + sources=[fv1], + schema=[ + Field(name="saw_undeclared", dtype=Bool), + Field(name="saw_declared", dtype=Bool), + ], + mode="python", + ) + def guard_py(inputs: dict) -> dict: + return { + "saw_undeclared": ["avg_daily_trips" in inputs], + "saw_declared": ["conv_rate" in inputs], + } + + store.apply([driver, src, fv1, fv2, guard_py]) + store.write_to_online_store(feature_view_name="fv1", df=driver_df) + store.write_to_online_store(feature_view_name="fv2", df=driver_df) + + response = store.get_online_features( + entity_rows=[{"driver_id": 1001}], + features=[ + "fv1:conv_rate", + "fv2:avg_daily_trips", + "guard_py:saw_undeclared", + "guard_py:saw_declared", + ], + ).to_dict() + + assert response["saw_undeclared"] == [False] + assert response["saw_declared"] == [True] + + +def test_odfv_does_not_see_other_requested_odfv_source_columns(): + """#6158: one ODFV's UDF should not see another ODFV's source column, even + when that feature view is only in the response as the other ODFV's input.""" + with tempfile.TemporaryDirectory() as data_dir: + store, driver, src, fv1, fv2, driver_df = _two_fv_store(data_dir) + + @on_demand_feature_view( + sources=[fv1], + schema=[Field(name="a_saw_fv2", dtype=Bool)], + mode="pandas", + ) + def odfv_a(inputs: pd.DataFrame) -> pd.DataFrame: + out = pd.DataFrame() + out["a_saw_fv2"] = [ + "avg_daily_trips" in inputs.columns for _ in range(len(inputs)) + ] + return out + + @on_demand_feature_view( + sources=[fv2], + schema=[Field(name="b_out", dtype=Int64)], + mode="pandas", + ) + def odfv_b(inputs: pd.DataFrame) -> pd.DataFrame: + out = pd.DataFrame() + out["b_out"] = list(inputs["avg_daily_trips"]) + return out + + store.apply([driver, src, fv1, fv2, odfv_a, odfv_b]) + store.write_to_online_store(feature_view_name="fv1", df=driver_df) + store.write_to_online_store(feature_view_name="fv2", df=driver_df) + + # ask for just the two ODFVs - fv2 is only here as odfv_b's input + response = store.get_online_features( + entity_rows=[{"driver_id": 1001}], + features=["odfv_a:a_saw_fv2", "odfv_b:b_out"], + ).to_dict() + + # odfv_a shouldn't see fv2's column; odfv_b still uses it fine + assert response["a_saw_fv2"] == [False] + assert response["b_out"] is not None