Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixed lint and udf example...should be singleton
  • Loading branch information
franciscojavierarceo committed Mar 28, 2024
commit aa90fc1436c9c823624e8142a4c0c060b777c02a
2 changes: 2 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2115,10 +2115,12 @@ def _augment_response_with_on_demand_transforms(
elif odfv.mode in {"pandas", "substrait"}:
if initial_response_df is None:
initial_response_df = initial_response.to_df()
print("about to happen\n", initial_response_df)
transformed_features_df: pd.DataFrame = odfv.get_transformed_features(
initial_response_df,
full_feature_names,
)
print("it did not happen")
Comment thread
franciscojavierarceo marked this conversation as resolved.
Outdated
else:
raise Exception(
f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'."
Expand Down
79 changes: 35 additions & 44 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import warnings
from datetime import datetime
from types import FunctionType
from typing import Any, Dict, List, Optional, Type, Union, cast
from typing import Any, Dict, List, Optional, Type, Union

import dill
import pandas as pd
Expand Down Expand Up @@ -387,6 +387,7 @@ def get_transformed_features_df(
df_with_features: pd.DataFrame,
full_feature_names: bool = False,
) -> pd.DataFrame:
print("-" * 40, "something happened")
# Apply on demand transformations
if not isinstance(df_with_features, pd.DataFrame):
raise TypeError("get_transformed_features_df only accepts pd.DataFrame")
Expand Down Expand Up @@ -428,64 +429,54 @@ def get_transformed_features_df(

def get_transformed_features_dict(
self,
feature_dict: Dict[str, List[Any]],
full_feature_names: bool = False,
feature_dict: Dict[str, Any], # type: ignore
) -> Dict[str, Any]:
# generates a mapping between feature names and fv__feature names (and vice versa)
name_map: Dict[str, str] = {}

# we need a mapping from full feature name to short and back to do a renaming
# The simplest thing to do is to make the full reference, copy the columns with the short reference
# and rerun
columns_to_cleanup: List[str] = []
for source_fv_projection in self.source_feature_view_projections.values():
for feature in source_fv_projection.features:
full_feature_ref = f"{source_fv_projection.name}__{feature.name}"
if full_feature_ref in feature_dict:
name_map[full_feature_ref] = feature.name
elif feature.name in feature_dict:
name_map[feature.name] = name_map[full_feature_ref]

rows = []
# this doesn't actually require 2 x |key_space| space; k and name_map[k] point to the same object in memory
for values in zip(*feature_dict.values()):
rows.append(
{
**{k: v for k, v in zip(feature_dict.keys(), values)},
**{name_map[k]: v for k, v in zip(feature_dict.keys(), values)},
}
)
if full_feature_ref in feature_dict.keys():
# Make sure the partial feature name is always present
feature_dict[feature.name] = feature_dict[full_feature_ref]
columns_to_cleanup.append(str(feature.name))
elif feature.name in feature_dict.keys():
# Make sure the full feature name is always present
feature_dict[full_feature_ref] = feature_dict[feature.name]
columns_to_cleanup.append(str(full_feature_ref))

# construct output dictionary and mapping from expected feature names to alternative feature names
output_dict: Dict[str, List[Any]] = {}
correct_feature_name_to_alias: Dict[str, str] = {}
for feature in self.features:
long_name = self._get_projected_feature_name(feature.name)
correct_name = long_name if full_feature_names else feature.name
correct_feature_name_to_alias[correct_name] = (
feature.name if full_feature_names else long_name
)
output_dict[correct_name] = [None] * len(rows)

# populate output dictionary per row
for i, row in enumerate(rows):
row_output = self.feature_transformation.transform(row)
for feature_name in output_dict:
output_dict[feature_name][i] = row_output.get(
feature_name,
row_output[correct_feature_name_to_alias[feature_name]],
)
output_dict: Dict[str, Any] = self.feature_transformation.transform(
feature_dict
)
for feature_name in columns_to_cleanup:
del output_dict[feature_name]
return output_dict

def get_transformed_features(
self,
features: Union[Dict[str, List[Any]], pd.DataFrame],
features: Union[Dict[str, Any], pd.DataFrame],
full_feature_names: bool = False,
) -> Union[Dict[str, List[Any]], pd.DataFrame]:
) -> Union[Dict[str, Any], pd.DataFrame]:
# TODO: classic inheritance pattern....maybe fix this
if self.mode == "python" and isinstance(features, dict):
if self.mode == "python" and isinstance(features, Dict):
# note full_feature_names is not needed for the dictionary
return self.get_transformed_features_dict(
feature_dict=cast(features, Dict[str, List[Any]]), # type: ignore
full_feature_names=full_feature_names,
feature_dict=features,
)
elif self.mode == "pandas" and isinstance(features, pd.DataFrame):
print(
"*" * 30,
"\n",
type(features),
self.mode,
"\n",
"*" * 30,
)
return self.get_transformed_features_df(
df_with_features=cast(features, pd.DataFrame), # type: ignore
df_with_features=features,
full_feature_names=full_feature_names,
)
else:
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/feast/transformation/python_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def transform(self, input_dict: Dict) -> Dict:
raise TypeError(
f"input_dict should be type Dict[str, List[Any]] but got {type(input_dict).__name__}"
)
output_dict = self.udf.__call__(input_dict)
# Ensuring that the inputs are included as well
output_dict = {**input_dict, **self.udf.__call__(input_dict)}
if not isinstance(output_dict, Dict):
Comment thread
franciscojavierarceo marked this conversation as resolved.
raise TypeError(
f"output_dict should be type Dict[str, List[Any]] but got {type(output_dict).__name__}"
Expand Down
15 changes: 11 additions & 4 deletions sdk/python/tests/unit/test_on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ def udf1(features_df: pd.DataFrame) -> pd.DataFrame:
def udf2(features_df: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["output1"] = features_df["feature1"] + 100
df["output2"] = features_df["feature2"] + 100
df["output2"] = features_df["feature2"] + 101
return df


def python_native_udf(features_dict: Dict[str, List[Any]]) -> Dict[str, List[Any]]:
def python_native_udf(features_dict: Dict[str, List[Any]]) -> Dict[str, Any]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@franciscojavierarceo sorry for the late find. Can you look at this test examples? The signatures are correct, but actual transformation seems to assume that features_dict["feature1"] is an int, but it should be a list of ints, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops let me update the test case tomorrow but it should be a singleton, the type hint is wrong. I adjusted this late and missed this one.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, got it. If it should be a singleton, then also double-check what is actually passed from odfv class as well. I may be wrong, but I think it's passing a list.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add another test when using get_online_features for an ODFV to confirm it more explicitly. I do have a test below confirming the correct behavior for self.feature_transformation.transform() returning the expected result.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to add an explicit get_online_features test for substrait as well. Just testing transform is clearly not enough.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's the fix for the type: #4054

output_dict: Dict[str, List[Any]] = {
"output1": [features_dict["feature1"] + 100],
"output2": [features_dict["feature2"] + 100],
"output1": features_dict["feature1"] + 100,
"output2": features_dict["feature2"] + 101,
}
return output_dict

Expand Down Expand Up @@ -204,6 +204,13 @@ def test_python_native_transformation_mode():
}
)

assert on_demand_feature_view_python_native.get_transformed_features(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HaoXuAI see here as example of usage.

{
"feature1": 0,
"feature2": 1,
}
) == {"feature1": 0, "feature2": 1, "output1": 100, "output2": 102}


@pytest.mark.filterwarnings("ignore:udf and udf_string parameters are deprecated")
def test_from_proto_backwards_compatible_udf():
Expand Down