Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,17 +811,58 @@ def _is_metrics_active():
# Apply transformation. Note: aggregations and transformation configs are mutually exclusive
# TODO: Fix to make it work for having both aggregation and transformation
# ticket: https://github.com/feast-dev/feast/issues/5689
# A UDF should only get features from the sources it declared. Drop
# features that belong to other feature views so it can't read them by
# accident (#6158). Join keys, request data and the declared features
# stay. substrait filters on its own, so leave it alone.
declared_source_names = {
projection.name
for projection in odfv.source_feature_view_projections.values()
}
undeclared_columns: set[str] = set()
# features the caller asked for that aren't one of our sources
for feature_ref in feature_refs:
ref_view_name, _, ref_feature_name = _parse_feature_ref(feature_ref)
if ref_view_name in requested_odfv_feature_names:
continue
if ref_view_name not in declared_source_names:
undeclared_columns.add(ref_feature_name)
undeclared_columns.add(f"{ref_view_name}__{ref_feature_name}")
# features that are only in the response because another requested
# ODFV needs them as input
for other_odfv in requested_on_demand_feature_views:
for projection in other_odfv.source_feature_view_projections.values():
if projection.name in declared_source_names:
continue
for feature in projection.features:
undeclared_columns.add(feature.name)
undeclared_columns.add(f"{projection.name}__{feature.name}")

if odfv.mode == "python":
if initial_response_dict is None:
initial_response_dict = initial_response.to_dict()
odfv_input_dict = {
k: v
for k, v in initial_response_dict.items()
if k not in undeclared_columns
}
transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict(
initial_response_dict
odfv_input_dict
)
elif odfv.mode in {"pandas", "substrait"}:
if initial_response_arrow is None:
initial_response_arrow = initial_response.to_arrow()
odfv_input_arrow = initial_response_arrow
if odfv.mode == "pandas":
odfv_input_arrow = initial_response_arrow.select(
[
c
for c in initial_response_arrow.column_names
if c not in undeclared_columns
]
)
transformed_features_arrow = odfv.transform_arrow(
initial_response_arrow, full_feature_names
odfv_input_arrow, full_feature_names
)
else:
raise Exception(
Expand Down
209 changes: 209 additions & 0 deletions sdk/python/tests/unit/test_on_demand_pandas_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,212 @@ def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame:
),
):
store.apply([request_source, pandas_view])


def test_odfv_udf_does_not_receive_undeclared_source_columns():
"""#6158: a pandas ODFV's UDF should only see its own source columns, not
features from another feature view that was requested too."""
with tempfile.TemporaryDirectory() as data_dir:
store = FeatureStore(
config=RepoConfig(
project="test_odfv_source_isolation",
registry=os.path.join(data_dir, "registry.db"),
provider="local",
entity_key_serialization_version=3,
online_store=SqliteOnlineStoreConfig(
path=os.path.join(data_dir, "online.db")
),
)
)

end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)
driver_df = create_driver_hourly_stats_df([1001], start_date, end_date)
driver_stats_path = os.path.join(data_dir, "driver_stats.parquet")
driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True)

driver = Entity(name="driver", join_keys=["driver_id"])
source = FileSource(
name="driver_hourly_stats_source",
path=driver_stats_path,
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

# Two feature views from the same source. The ODFV declares only fv_declared.
fv_declared = FeatureView(
name="fv_declared",
entities=[driver],
ttl=timedelta(days=0),
schema=[Field(name="conv_rate", dtype=Float32)],
online=True,
source=source,
)
fv_undeclared = FeatureView(
name="fv_undeclared",
entities=[driver],
ttl=timedelta(days=0),
schema=[Field(name="avg_daily_trips", dtype=Int64)],
online=True,
source=source,
)

@on_demand_feature_view(
sources=[fv_declared],
schema=[
Field(name="saw_undeclared", dtype=Bool),
Field(name="saw_declared", dtype=Bool),
Field(name="saw_join_key", dtype=Bool),
],
mode="pandas",
)
def guard_view(inputs: pd.DataFrame) -> pd.DataFrame:
out = pd.DataFrame()
n = range(len(inputs))
out["saw_undeclared"] = ["avg_daily_trips" in inputs.columns for _ in n]
out["saw_declared"] = ["conv_rate" in inputs.columns for _ in n]
out["saw_join_key"] = ["driver_id" in inputs.columns for _ in n]
return out

store.apply([driver, source, fv_declared, fv_undeclared, guard_view])
store.write_to_online_store(feature_view_name="fv_declared", df=driver_df)
store.write_to_online_store(feature_view_name="fv_undeclared", df=driver_df)

response = store.get_online_features(
entity_rows=[{"driver_id": 1001}],
features=[
"fv_declared:conv_rate",
"fv_undeclared:avg_daily_trips",
"guard_view:saw_undeclared",
"guard_view:saw_declared",
"guard_view:saw_join_key",
],
).to_dict()

# the other FV's column should be hidden; ours and the join key should not
assert response["saw_undeclared"] == [False]
assert response["saw_declared"] == [True]
assert response["saw_join_key"] == [True]


def _two_fv_store(data_dir):
store = FeatureStore(
config=RepoConfig(
project="test_odfv_source_isolation",
registry=os.path.join(data_dir, "registry.db"),
provider="local",
entity_key_serialization_version=3,
online_store=SqliteOnlineStoreConfig(
path=os.path.join(data_dir, "online.db")
),
)
)
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)
driver_df = create_driver_hourly_stats_df([1001], start_date, end_date)
path = os.path.join(data_dir, "driver_stats.parquet")
driver_df.to_parquet(path=path, allow_truncated_timestamps=True)
driver = Entity(name="driver", join_keys=["driver_id"])
src = FileSource(
name="driver_hourly_stats_source",
path=path,
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
fv1 = FeatureView(
name="fv1",
entities=[driver],
ttl=timedelta(days=0),
schema=[Field(name="conv_rate", dtype=Float32)],
online=True,
source=src,
)
fv2 = FeatureView(
name="fv2",
entities=[driver],
ttl=timedelta(days=0),
schema=[Field(name="avg_daily_trips", dtype=Int64)],
online=True,
source=src,
)
return store, driver, src, fv1, fv2, driver_df


def test_odfv_python_mode_does_not_receive_undeclared_source_columns():
"""Same as above but for python mode, where the UDF gets a dict (#6158)."""
with tempfile.TemporaryDirectory() as data_dir:
store, driver, src, fv1, fv2, driver_df = _two_fv_store(data_dir)

@on_demand_feature_view(
sources=[fv1],
schema=[
Field(name="saw_undeclared", dtype=Bool),
Field(name="saw_declared", dtype=Bool),
],
mode="python",
)
def guard_py(inputs: dict) -> dict:
return {
"saw_undeclared": ["avg_daily_trips" in inputs],
"saw_declared": ["conv_rate" in inputs],
}

store.apply([driver, src, fv1, fv2, guard_py])
store.write_to_online_store(feature_view_name="fv1", df=driver_df)
store.write_to_online_store(feature_view_name="fv2", df=driver_df)

response = store.get_online_features(
entity_rows=[{"driver_id": 1001}],
features=[
"fv1:conv_rate",
"fv2:avg_daily_trips",
"guard_py:saw_undeclared",
"guard_py:saw_declared",
],
).to_dict()

assert response["saw_undeclared"] == [False]
assert response["saw_declared"] == [True]


def test_odfv_does_not_see_other_requested_odfv_source_columns():
"""#6158: one ODFV's UDF should not see another ODFV's source column, even
when that feature view is only in the response as the other ODFV's input."""
with tempfile.TemporaryDirectory() as data_dir:
store, driver, src, fv1, fv2, driver_df = _two_fv_store(data_dir)

@on_demand_feature_view(
sources=[fv1],
schema=[Field(name="a_saw_fv2", dtype=Bool)],
mode="pandas",
)
def odfv_a(inputs: pd.DataFrame) -> pd.DataFrame:
out = pd.DataFrame()
out["a_saw_fv2"] = [
"avg_daily_trips" in inputs.columns for _ in range(len(inputs))
]
return out

@on_demand_feature_view(
sources=[fv2],
schema=[Field(name="b_out", dtype=Int64)],
mode="pandas",
)
def odfv_b(inputs: pd.DataFrame) -> pd.DataFrame:
out = pd.DataFrame()
out["b_out"] = list(inputs["avg_daily_trips"])
return out

store.apply([driver, src, fv1, fv2, odfv_a, odfv_b])
store.write_to_online_store(feature_view_name="fv1", df=driver_df)
store.write_to_online_store(feature_view_name="fv2", df=driver_df)

# ask for just the two ODFVs - fv2 is only here as odfv_b's input
response = store.get_online_features(
entity_rows=[{"driver_id": 1001}],
features=["odfv_a:a_saw_fv2", "odfv_b:b_out"],
).to_dict()

# odfv_a shouldn't see fv2's column; odfv_b still uses it fine
assert response["a_saw_fv2"] == [False]
assert response["b_out"] is not None