Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
uploading current progress...calling it a night
Signed-off-by: Francisco Javier Arceo <franciscojavierarceo@users.noreply.github.com>
  • Loading branch information
franciscojavierarceo committed Mar 26, 2024
commit bbc55dec5307b421bb9800926461fee242903ec1
239 changes: 97 additions & 142 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,34 +70,32 @@ class OnDemandFeatureView(BaseFeatureView):
features: List[Field]
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_sources: Dict[str, RequestSource]
feature_transformation: Union[PandasTransformation, SubstraitTransformation]
feature_transformation: Union[PandasTransformation, PythonTransformation, SubstraitTransformation]
mode: str
description: str
tags: Dict[str, str]
owner: str

@log_exceptions # noqa: C901
def __init__( # noqa: C901
self,
*,
name: str,
schema: List[Field],
sources: List[
Union[
FeatureView,
RequestSource,
FeatureViewProjection,
]
],
udf: Optional[FunctionType] = None,
udf_string: str = "",
feature_transformation: Optional[
Union[PandasTransformation, SubstraitTransformation]
] = None,
mode: str = "pandas",
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
self,
*,
name: str,
schema: List[Field],
sources: List[
Union[
FeatureView,
RequestSource,
FeatureViewProjection,
]
],
udf: Optional[FunctionType] = None,
udf_string: str = "",
feature_transformation: Union[PandasTransformation, PythonTransformation, SubstraitTransformation],
mode: str = "pandas",
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
):
"""
Creates an OnDemandFeatureView object.
Expand Down Expand Up @@ -139,8 +137,10 @@ def __init__( # noqa: C901
)
if isinstance(inspect.signature(udf).return_annotation, pd.DataFrame):
feature_transformation = PandasTransformation(udf, udf_string)
else:
elif isinstance(inspect.signature(udf).return_annotation, Dict):
feature_transformation = PythonTransformation(udf, udf_string)
else:
pass

else:
raise Exception(
Expand Down Expand Up @@ -170,7 +170,7 @@ def __copy__(self):
name=self.name,
schema=self.features,
sources=list(self.source_feature_view_projections.values())
+ list(self.source_request_sources.values()),
+ list(self.source_request_sources.values()),
feature_transformation=self.feature_transformation,
mode=self.mode,
description=self.description,
Expand All @@ -190,11 +190,11 @@ def __eq__(self, other):
return False

if (
self.source_feature_view_projections
!= other.source_feature_view_projections
or self.source_request_sources != other.source_request_sources
or self.mode != other.mode
or self.feature_transformation != other.feature_transformation
self.source_feature_view_projections
!= other.source_feature_view_projections
or self.source_request_sources != other.source_request_sources
or self.mode != other.mode
or self.feature_transformation != other.feature_transformation
):
return False

Expand All @@ -221,8 +221,8 @@ def to_proto(self) -> OnDemandFeatureViewProto:
feature_view_projection=fv_projection.to_proto()
)
for (
source_name,
request_sources,
source_name,
request_sources,
) in self.source_request_sources.items():
sources[source_name] = OnDemandSource(
request_data_source=request_sources.to_proto()
Expand Down Expand Up @@ -254,9 +254,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:

@classmethod
def from_proto(
cls,
on_demand_feature_view_proto: OnDemandFeatureViewProto,
skip_udf: bool = False,
cls,
on_demand_feature_view_proto: OnDemandFeatureViewProto,
skip_udf: bool = False,
):
"""
Creates an on demand feature view from a protobuf representation.
Expand All @@ -270,8 +270,8 @@ def from_proto(
"""
sources = []
for (
_,
on_demand_source,
_,
on_demand_source,
) in on_demand_feature_view_proto.spec.sources.items():
if on_demand_source.WhichOneof("source") == "feature_view":
sources.append(
Expand All @@ -289,29 +289,29 @@ def from_proto(
)

if (
on_demand_feature_view_proto.spec.feature_transformation.WhichOneof(
"transformation"
)
== "user_defined_function"
and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text
!= ""
on_demand_feature_view_proto.spec.feature_transformation.WhichOneof(
"transformation"
)
== "user_defined_function"
and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text
!= ""
):
transformation = PandasTransformation.from_proto(
on_demand_feature_view_proto.spec.feature_transformation.user_defined_function
)
elif (
on_demand_feature_view_proto.spec.feature_transformation.WhichOneof(
"transformation"
)
== "substrait_transformation"
on_demand_feature_view_proto.spec.feature_transformation.WhichOneof(
"transformation"
)
== "substrait_transformation"
):
transformation = SubstraitTransformation.from_proto(
on_demand_feature_view_proto.spec.feature_transformation.substrait_transformation
)
elif (
hasattr(on_demand_feature_view_proto.spec, "user_defined_function")
and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text
== ""
hasattr(on_demand_feature_view_proto.spec, "user_defined_function")
and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text
== ""
):
backwards_compatible_udf = UserDefinedFunctionProto(
name=on_demand_feature_view_proto.spec.user_defined_function.name,
Expand Down Expand Up @@ -386,9 +386,9 @@ def _get_projected_feature_name(self, feature: str) -> str:
return f"{self.projection.name_to_use()}__{feature}"

def get_transformed_features_df(
self,
df_with_features: pd.DataFrame,
full_feature_names: bool = False,
self,
df_with_features: pd.DataFrame,
full_feature_names: bool = False,
) -> pd.DataFrame:
# Apply on demand transformations
columns_to_cleanup = []
Expand Down Expand Up @@ -416,8 +416,8 @@ def get_transformed_features_df(
short_name = feature.name
long_name = self._get_projected_feature_name(feature.name)
if (
short_name in df_with_transformed_features.columns
and full_feature_names
short_name in df_with_transformed_features.columns
and full_feature_names
):
rename_columns[short_name] = long_name
elif not full_feature_names:
Expand All @@ -428,10 +428,10 @@ def get_transformed_features_df(
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
return df_with_transformed_features.rename(columns=rename_columns)

def _get_transformed_features_dict(
self,
feature_dict: Dict[str, List[Any]],
full_feature_names: bool = False,
def get_transformed_features_dict(
self,
feature_dict: Dict[str, List[Any]],
full_feature_names: bool = False,
) -> Dict[str, Any]:
# generates a mapping between feature names and fv__feature names (and vice versa)
name_map: Dict[str, str] = {}
Expand Down Expand Up @@ -473,6 +473,29 @@ def _get_transformed_features_dict(
)
return output_dict

def get_transformed_features(
self,
features: Union[Dict[str, List[Any]], pd.DataFrame],
full_feature_names: bool = False,
) -> Union[Dict[str, List[Any]], pd.DataFrame]:
# TODO: classic inheritance pattern....maybe fix this
if self.mode == "python":
assert isinstance(features, dict)
return self.get_transformed_features_dict(
feature_dict=cast(features, Dict[str, List[Any]]),
full_feature_names=full_feature_names,
)
elif self.mode == "pandas":
assert isinstance(features, pd.DataFrame)
return self.get_transformed_features_df(
df_with_features=cast(features, pd.DataFrame),
full_feature_names=full_feature_names,
)
else:
raise Exception(
f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".'
)

def infer_features(self) -> None:
if self.mode == "pandas":
self._infer_features_df()
Expand Down Expand Up @@ -611,7 +634,7 @@ def _infer_features_df(self) -> None:

@staticmethod
def get_requested_odfvs(
feature_refs, project, registry
feature_refs, project, registry
) -> List["OnDemandFeatureView"]:
all_on_demand_feature_views = registry.list_on_demand_feature_views(
project, allow_cache=True
Expand All @@ -626,19 +649,19 @@ def get_requested_odfvs(


def on_demand_feature_view(
*,
schema: List[Field],
sources: List[
Union[
FeatureView,
RequestSource,
FeatureViewProjection,
]
],
mode: str = "pandas",
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
*,
schema: List[Field],
sources: List[
Union[
FeatureView,
RequestSource,
FeatureViewProjection,
]
],
mode: str = "pandas",
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
):
"""
Creates an OnDemandFeatureView object with the given user function as udf.
Expand All @@ -665,9 +688,9 @@ def mainify(obj) -> None:
def decorator(user_function):
return_annotation = inspect.signature(user_function).return_annotation
if (
return_annotation
and return_annotation.__module__ == "ibis.expr.types.relations"
and return_annotation.__name__ == "Table"
return_annotation
and return_annotation.__module__ == "ibis.expr.types.relations"
and return_annotation.__name__ == "Table"
):
import ibis
import ibis.expr.datatypes as dt
Expand Down Expand Up @@ -722,74 +745,6 @@ def decorator(user_function):

return decorator

def get_transformed_features_dict(
self,
feature_dict: Dict[str, List[Any]],
full_feature_names: bool = False,
) -> Dict[str, Any]:
# generates a mapping between feature names and fv__feature names (and vice versa)
name_map: Dict[str, str] = {}
for source_fv_projection in self.source_feature_view_projections.values():
for feature in source_fv_projection.features:
full_feature_ref = f"{source_fv_projection.name}__{feature.name}"
if full_feature_ref in feature_dict:
name_map[full_feature_ref] = feature.name
elif feature.name in feature_dict:
name_map[feature.name] = name_map[full_feature_ref]

rows = []
# this doesn't actually require 2 x |key_space| space; k and name_map[k] point to the same object in memory
for values in zip(*feature_dict.values()):
rows.append(
{
**{k: v for k, v in zip(feature_dict.keys(), values)},
**{name_map[k]: v for k, v in zip(feature_dict.keys(), values)},
}
)

# construct output dictionary and mapping from expected feature names to alternative feature names
output_dict: Dict[str, List[Any]] = {}
correct_feature_name_to_alias: Dict[str, str] = {}
for feature in self.features:
long_name = self._get_projected_feature_name(feature.name)
correct_name = long_name if full_feature_names else feature.name
correct_feature_name_to_alias[correct_name] = (
feature.name if full_feature_names else long_name
)
output_dict[correct_name] = [None] * len(rows)

# populate output dictionary per row
for i, row in enumerate(rows):
row_output = self.udf.__call__(row)
for feature in output_dict:
output_dict[feature][i] = row_output.get(
feature, row_output[correct_feature_name_to_alias[feature]]
)
return output_dict

def get_transformed_features(
self,
features: Union[Dict[str, List[Any]], pd.DataFrame],
full_feature_names: bool = False,
) -> Union[Dict[str, List[Any]], pd.DataFrame]:
# TODO: classic inheritance pattern....maybe fix this
if self.mode == "python":
assert isinstance(features, dict)
return self.get_transformed_features_dict(
feature_dict=cast(features, Dict[str, List[Any]]),
full_feature_names=full_feature_names,
)
elif self.mode == "pandas":
assert isinstance(features, pd.DataFrame)
return self._get_transformed_features(
df_with_features=cast(features, pd.DataFrame),
full_feature_names=full_feature_names,
)
else:
raise Exception(
f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".'
)


def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView:
bfv = BatchFeatureView(
Expand Down