diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 4c10c1903e5..b3c1a55a83a 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -811,17 +811,58 @@ def _is_metrics_active(): # Apply transformation. Note: aggregations and transformation configs are mutually exclusive # TODO: Fix to make it work for having both aggregation and transformation # ticket: https://github.com/feast-dev/feast/issues/5689 + # A UDF should only get features from the sources it declared. Drop + # features that belong to other feature views so it can't read them by + # accident (#6158). Join keys, request data and the declared features + # stay. substrait filters on its own, so leave it alone. + declared_source_names = { + projection.name + for projection in odfv.source_feature_view_projections.values() + } + undeclared_columns: set[str] = set() + # features the caller asked for that aren't one of our sources + for feature_ref in feature_refs: + ref_view_name, _, ref_feature_name = _parse_feature_ref(feature_ref) + if ref_view_name in requested_odfv_feature_names: + continue + if ref_view_name not in declared_source_names: + undeclared_columns.add(ref_feature_name) + undeclared_columns.add(f"{ref_view_name}__{ref_feature_name}") + # features that are only in the response because another requested + # ODFV needs them as input + for other_odfv in requested_on_demand_feature_views: + for projection in other_odfv.source_feature_view_projections.values(): + if projection.name in declared_source_names: + continue + for feature in projection.features: + undeclared_columns.add(feature.name) + undeclared_columns.add(f"{projection.name}__{feature.name}") + if odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() + odfv_input_dict = { + k: v + for k, v in initial_response_dict.items() + if k not in undeclared_columns + } transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict( - initial_response_dict + odfv_input_dict ) elif odfv.mode in {"pandas", "substrait"}: if initial_response_arrow is None: initial_response_arrow = initial_response.to_arrow() + odfv_input_arrow = initial_response_arrow + if odfv.mode == "pandas": + odfv_input_arrow = initial_response_arrow.select( + [ + c + for c in initial_response_arrow.column_names + if c not in undeclared_columns + ] + ) transformed_features_arrow = odfv.transform_arrow( - initial_response_arrow, full_feature_names + odfv_input_arrow, full_feature_names ) else: raise Exception( diff --git a/sdk/python/tests/unit/test_on_demand_pandas_transformation.py b/sdk/python/tests/unit/test_on_demand_pandas_transformation.py index c5ab657becc..19886c1faf4 100644 --- a/sdk/python/tests/unit/test_on_demand_pandas_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_pandas_transformation.py @@ -341,3 +341,212 @@ def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame: ), ): store.apply([request_source, pandas_view]) + + +def test_odfv_udf_does_not_receive_undeclared_source_columns(): + """#6158: a pandas ODFV's UDF should only see its own source columns, not + features from another feature view that was requested too.""" + with tempfile.TemporaryDirectory() as data_dir: + store = FeatureStore( + config=RepoConfig( + project="test_odfv_source_isolation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=3, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + driver_df = create_driver_hourly_stats_df([1001], start_date, end_date) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) + + driver = Entity(name="driver", join_keys=["driver_id"]) + source = FileSource( + name="driver_hourly_stats_source", + path=driver_stats_path, + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + + # Two feature views from the same source. The ODFV declares only fv_declared. + fv_declared = FeatureView( + name="fv_declared", + entities=[driver], + ttl=timedelta(days=0), + schema=[Field(name="conv_rate", dtype=Float32)], + online=True, + source=source, + ) + fv_undeclared = FeatureView( + name="fv_undeclared", + entities=[driver], + ttl=timedelta(days=0), + schema=[Field(name="avg_daily_trips", dtype=Int64)], + online=True, + source=source, + ) + + @on_demand_feature_view( + sources=[fv_declared], + schema=[ + Field(name="saw_undeclared", dtype=Bool), + Field(name="saw_declared", dtype=Bool), + Field(name="saw_join_key", dtype=Bool), + ], + mode="pandas", + ) + def guard_view(inputs: pd.DataFrame) -> pd.DataFrame: + out = pd.DataFrame() + n = range(len(inputs)) + out["saw_undeclared"] = ["avg_daily_trips" in inputs.columns for _ in n] + out["saw_declared"] = ["conv_rate" in inputs.columns for _ in n] + out["saw_join_key"] = ["driver_id" in inputs.columns for _ in n] + return out + + store.apply([driver, source, fv_declared, fv_undeclared, guard_view]) + store.write_to_online_store(feature_view_name="fv_declared", df=driver_df) + store.write_to_online_store(feature_view_name="fv_undeclared", df=driver_df) + + response = store.get_online_features( + entity_rows=[{"driver_id": 1001}], + features=[ + "fv_declared:conv_rate", + "fv_undeclared:avg_daily_trips", + "guard_view:saw_undeclared", + "guard_view:saw_declared", + "guard_view:saw_join_key", + ], + ).to_dict() + + # the other FV's column should be hidden; ours and the join key should not + assert response["saw_undeclared"] == [False] + assert response["saw_declared"] == [True] + assert response["saw_join_key"] == [True] + + +def _two_fv_store(data_dir): + store = FeatureStore( + config=RepoConfig( + project="test_odfv_source_isolation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=3, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + driver_df = create_driver_hourly_stats_df([1001], start_date, end_date) + path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet(path=path, allow_truncated_timestamps=True) + driver = Entity(name="driver", join_keys=["driver_id"]) + src = FileSource( + name="driver_hourly_stats_source", + path=path, + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + fv1 = FeatureView( + name="fv1", + entities=[driver], + ttl=timedelta(days=0), + schema=[Field(name="conv_rate", dtype=Float32)], + online=True, + source=src, + ) + fv2 = FeatureView( + name="fv2", + entities=[driver], + ttl=timedelta(days=0), + schema=[Field(name="avg_daily_trips", dtype=Int64)], + online=True, + source=src, + ) + return store, driver, src, fv1, fv2, driver_df + + +def test_odfv_python_mode_does_not_receive_undeclared_source_columns(): + """Same as above but for python mode, where the UDF gets a dict (#6158).""" + with tempfile.TemporaryDirectory() as data_dir: + store, driver, src, fv1, fv2, driver_df = _two_fv_store(data_dir) + + @on_demand_feature_view( + sources=[fv1], + schema=[ + Field(name="saw_undeclared", dtype=Bool), + Field(name="saw_declared", dtype=Bool), + ], + mode="python", + ) + def guard_py(inputs: dict) -> dict: + return { + "saw_undeclared": ["avg_daily_trips" in inputs], + "saw_declared": ["conv_rate" in inputs], + } + + store.apply([driver, src, fv1, fv2, guard_py]) + store.write_to_online_store(feature_view_name="fv1", df=driver_df) + store.write_to_online_store(feature_view_name="fv2", df=driver_df) + + response = store.get_online_features( + entity_rows=[{"driver_id": 1001}], + features=[ + "fv1:conv_rate", + "fv2:avg_daily_trips", + "guard_py:saw_undeclared", + "guard_py:saw_declared", + ], + ).to_dict() + + assert response["saw_undeclared"] == [False] + assert response["saw_declared"] == [True] + + +def test_odfv_does_not_see_other_requested_odfv_source_columns(): + """#6158: one ODFV's UDF should not see another ODFV's source column, even + when that feature view is only in the response as the other ODFV's input.""" + with tempfile.TemporaryDirectory() as data_dir: + store, driver, src, fv1, fv2, driver_df = _two_fv_store(data_dir) + + @on_demand_feature_view( + sources=[fv1], + schema=[Field(name="a_saw_fv2", dtype=Bool)], + mode="pandas", + ) + def odfv_a(inputs: pd.DataFrame) -> pd.DataFrame: + out = pd.DataFrame() + out["a_saw_fv2"] = [ + "avg_daily_trips" in inputs.columns for _ in range(len(inputs)) + ] + return out + + @on_demand_feature_view( + sources=[fv2], + schema=[Field(name="b_out", dtype=Int64)], + mode="pandas", + ) + def odfv_b(inputs: pd.DataFrame) -> pd.DataFrame: + out = pd.DataFrame() + out["b_out"] = list(inputs["avg_daily_trips"]) + return out + + store.apply([driver, src, fv1, fv2, odfv_a, odfv_b]) + store.write_to_online_store(feature_view_name="fv1", df=driver_df) + store.write_to_online_store(feature_view_name="fv2", df=driver_df) + + # ask for just the two ODFVs - fv2 is only here as odfv_b's input + response = store.get_online_features( + entity_rows=[{"driver_id": 1001}], + features=["odfv_a:a_saw_fv2", "odfv_b:b_out"], + ).to_dict() + + # odfv_a shouldn't see fv2's column; odfv_b still uses it fine + assert response["a_saw_fv2"] == [False] + assert response["b_out"] is not None