Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Rename inputs parameter to sources for odfv
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 committed Mar 28, 2022
commit 7c6221d6d2084d784e249cf9e8f440caa6a95eba
8 changes: 4 additions & 4 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ message OnDemandFeatureViewSpec {
// List of features specifications for each feature defined with this feature view.
repeated FeatureSpecV2 features = 3;

// Map of inputs for this feature view.
map<string, OnDemandInput> inputs = 4;
// Map of sources for this feature view.
map<string, OnDemandSource> sources = 4;

UserDefinedFunction user_defined_function = 5;

Expand All @@ -68,8 +68,8 @@ message OnDemandFeatureViewMeta {
google.protobuf.Timestamp last_updated_timestamp = 2;
}

message OnDemandInput {
oneof input {
message OnDemandSource {
oneof source {
FeatureView feature_view = 1;
FeatureViewProjection feature_view_projection = 3;
DataSource request_data_source = 2;
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1878,7 +1878,7 @@ def _get_feature_views_to_use(
odfv = od_fvs[fv_name].with_projection(copy.copy(projection))
od_fvs_to_use.append(odfv)
# Let's make sure to include an FVs which the ODFV requires Features from.
for projection in odfv.input_feature_view_projections.values():
for projection in odfv.source_feature_view_projections.values():
fv = fvs[projection.name].with_projection(copy.copy(projection))
if fv not in fvs_to_use:
fvs_to_use.append(fv)
Expand Down Expand Up @@ -2005,7 +2005,7 @@ def _group_feature_refs(
# Let's also add in any FV Feature dependencies here.
for input_fv_projection in on_demand_view_index[
view_name
].input_feature_view_projections.values():
].source_feature_view_projections.values():
for input_feat in input_fv_projection.features:
views_features[input_fv_projection.name].add(input_feat.name)
elif view_name in request_view_index:
Expand Down
148 changes: 92 additions & 56 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import functools
import warnings
from types import MethodType
from typing import Dict, List, Optional, Type, Union

Expand All @@ -18,7 +19,7 @@
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureViewMeta,
OnDemandFeatureViewSpec,
OnDemandInput,
OnDemandSource,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
Expand All @@ -30,17 +31,22 @@
from feast.usage import log_exceptions
from feast.value_type import ValueType

warnings.simplefilter("once", DeprecationWarning)


class OnDemandFeatureView(BaseFeatureView):
"""
[Experimental] An OnDemandFeatureView defines a logical group of features, along with
transformations to be applied on those features and additional request data.
[Experimental] An OnDemandFeatureView defines a logical group of features that are
generated by applying a transformation on a set of input sources, such as feature
views and request data sources.

Attributes:
name: The unique name of the on demand feature view.
features: The list of features in the output of the on demand feature view, after
the transformation has been applied.
inputs: The feature views and request data sources passed into the transformation.
features: The list of features in the output of the on demand feature view.
source_feature_view_projections: A map from input source names to actual input
sources with type FeatureViewProjection.
source_request_data_sources: A map from input source names to the actual input
sources with type RequestDataSource.
udf: The user defined transformation function, which must take pandas dataframes
as inputs.
description: A human-readable description.
Expand All @@ -52,8 +58,8 @@ class OnDemandFeatureView(BaseFeatureView):
# TODO(adchia): remove inputs from proto and declaration
name: str
features: List[Feature]
input_feature_view_projections: Dict[str, FeatureViewProjection]
input_request_data_sources: Dict[str, RequestDataSource]
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_data_sources: Dict[str, RequestDataSource]
udf: MethodType
description: str
tags: Dict[str, str]
Expand All @@ -64,8 +70,13 @@ def __init__(
self,
name: str,
features: List[Feature],
inputs: Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]],
udf: MethodType,
inputs: Optional[
Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]]
] = None,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think making this a kwarg from an arg is a backwards incompatibile change. e.g.

ODFV(
  "my name",
  [...],
  {"input1": some_feature_view}
)

will no longer be valid. Are you thinking that this is okay because it's experimental?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reordering parameters so that this doesn't break

sources: Optional[
Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]]
] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
Expand All @@ -77,24 +88,46 @@ def __init__(
name: The unique name of the on demand feature view.
features: The list of features in the output of the on demand feature view, after
the transformation has been applied.
inputs: The feature views and request data sources passed into the transformation.
udf: The user defined transformation function, which must take pandas dataframes
as inputs.
inputs (optional): A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
These sources serve as inputs to the udf, which will refer to them by name.
sources (optional): A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
These sources serve as inputs to the udf, which will refer to them by name.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the on demand feature view, typically the email
of the primary maintainer.
"""
super().__init__(name, features, description, tags, owner)
self.input_feature_view_projections: Dict[str, FeatureViewProjection] = {}
self.input_request_data_sources: Dict[str, RequestDataSource] = {}
for input_ref, odfv_input in inputs.items():
if isinstance(odfv_input, RequestDataSource):
self.input_request_data_sources[input_ref] = odfv_input
elif isinstance(odfv_input, FeatureViewProjection):
self.input_feature_view_projections[input_ref] = odfv_input
if inputs and sources:
raise ValueError("At most one of `inputs` or `sources` can be specified.")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, I would say one of 'sources' or 'inputs' and put sources first.

elif inputs:
warnings.warn(
(
"The `inputs` parameter is being deprecated. Please use `sources` instead. "
"Feast 0.21 and onwards will not support the `inputs` parameter."
),
DeprecationWarning,
)
sources = inputs
elif not inputs and not sources:
raise ValueError("At least one of `inputs` or `sources` must be specified.")

assert sources is not None
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
self.source_request_data_sources: Dict[str, RequestDataSource] = {}
for source_name, odfv_source in sources.items():
if isinstance(odfv_source, RequestDataSource):
self.source_request_data_sources[source_name] = odfv_source
elif isinstance(odfv_source, FeatureViewProjection):
self.source_feature_view_projections[source_name] = odfv_source
else:
self.input_feature_view_projections[input_ref] = odfv_input.projection
self.source_feature_view_projections[
source_name
] = odfv_source.projection

self.udf = udf

Expand All @@ -106,10 +139,14 @@ def __copy__(self):
fv = OnDemandFeatureView(
name=self.name,
features=self.features,
inputs=dict(
**self.input_feature_view_projections, **self.input_request_data_sources
sources=dict(
**self.source_feature_view_projections,
**self.source_request_data_sources,
),
udf=self.udf,
description=self.description,
tags=self.tags,
owner=self.owner,
)
fv.projection = copy.copy(self.projection)
return fv
Expand All @@ -119,17 +156,13 @@ def __eq__(self, other):
return False

if (
not self.input_feature_view_projections
== other.input_feature_view_projections
not self.source_feature_view_projections
== other.source_feature_view_projections
or not self.source_request_data_sources == other.source_request_data_sources
or not self.udf.__code__.co_code == other.udf.__code__.co_code
):
return False

if not self.input_request_data_sources == other.input_request_data_sources:
return False

if not self.udf.__code__.co_code == other.udf.__code__.co_code:
return False

return True

def __hash__(self):
Expand All @@ -147,20 +180,23 @@ def to_proto(self) -> OnDemandFeatureViewProto:
meta.created_timestamp.FromDatetime(self.created_timestamp)
if self.last_updated_timestamp:
meta.last_updated_timestamp.FromDatetime(self.last_updated_timestamp)
inputs = {}
for input_ref, fv_projection in self.input_feature_view_projections.items():
inputs[input_ref] = OnDemandInput(
sources = {}
for source_name, fv_projection in self.source_feature_view_projections.items():
sources[source_name] = OnDemandSource(
feature_view_projection=fv_projection.to_proto()
)
for input_ref, request_data_source in self.input_request_data_sources.items():
inputs[input_ref] = OnDemandInput(
for (
source_name,
request_data_source,
) in self.source_request_data_sources.items():
sources[source_name] = OnDemandSource(
request_data_source=request_data_source.to_proto()
)

spec = OnDemandFeatureViewSpec(
name=self.name,
features=[feature.to_proto() for feature in self.features],
inputs=inputs,
sources=sources,
user_defined_function=UserDefinedFunctionProto(
name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True),
),
Expand All @@ -182,22 +218,22 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
Returns:
A OnDemandFeatureView object based on the on-demand feature view protobuf.
"""
inputs = {}
sources = {}
for (
input_name,
on_demand_input,
) in on_demand_feature_view_proto.spec.inputs.items():
if on_demand_input.WhichOneof("input") == "feature_view":
inputs[input_name] = FeatureView.from_proto(
on_demand_input.feature_view
source_name,
on_demand_source,
) in on_demand_feature_view_proto.spec.sources.items():
if on_demand_source.WhichOneof("source") == "feature_view":
sources[source_name] = FeatureView.from_proto(
on_demand_source.feature_view
).projection
elif on_demand_input.WhichOneof("input") == "feature_view_projection":
inputs[input_name] = FeatureViewProjection.from_proto(
on_demand_input.feature_view_projection
elif on_demand_source.WhichOneof("source") == "feature_view_projection":
sources[source_name] = FeatureViewProjection.from_proto(
on_demand_source.feature_view_projection
)
else:
inputs[input_name] = RequestDataSource.from_proto(
on_demand_input.request_data_source
sources[source_name] = RequestDataSource.from_proto(
on_demand_source.request_data_source
)
on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
Expand All @@ -209,7 +245,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
)
for feature in on_demand_feature_view_proto.spec.features
],
inputs=inputs,
sources=sources,
udf=dill.loads(
on_demand_feature_view_proto.spec.user_defined_function.body
),
Expand Down Expand Up @@ -237,7 +273,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):

def get_request_data_schema(self) -> Dict[str, ValueType]:
schema: Dict[str, ValueType] = {}
for request_data_source in self.input_request_data_sources.values():
for request_data_source in self.source_request_data_sources.values():
schema.update(request_data_source.schema)
return schema

Expand All @@ -246,9 +282,9 @@ def get_transformed_features_df(
) -> pd.DataFrame:
# Apply on demand transformations
columns_to_cleanup = []
for input_fv_projection in self.input_feature_view_projections.values():
for feature in input_fv_projection.features:
full_feature_ref = f"{input_fv_projection.name}__{feature.name}"
for source_fv_projection in self.source_feature_view_projections.values():
for feature in source_fv_projection.features:
full_feature_ref = f"{source_fv_projection.name}__{feature.name}"
if full_feature_ref in df_with_features.keys():
# Make sure the partial feature name is always present
df_with_features[feature.name] = df_with_features[full_feature_ref]
Expand Down Expand Up @@ -287,14 +323,14 @@ def infer_features(self):
RegistryInferenceFailure: The set of features could not be inferred.
"""
df = pd.DataFrame()
for feature_view_projection in self.input_feature_view_projections.values():
for feature_view_projection in self.source_feature_view_projections.values():
for feature in feature_view_projection.features:
dtype = feast_value_type_to_pandas_type(feature.dtype)
df[f"{feature_view_projection.name}__{feature.name}"] = pd.Series(
dtype=dtype
)
df[f"{feature.name}"] = pd.Series(dtype=dtype)
for request_data in self.input_request_data_sources.values():
for request_data in self.source_request_data_sources.values():
for feature_name, feature_type in request_data.schema.items():
dtype = feast_value_type_to_pandas_type(feature_type)
df[f"{feature_name}"] = pd.Series(dtype=dtype)
Expand Down Expand Up @@ -340,20 +376,20 @@ def get_requested_odfvs(feature_refs, project, registry):


def on_demand_feature_view(
features: List[Feature], inputs: Dict[str, Union[FeatureView, RequestDataSource]]
features: List[Feature], sources: Dict[str, Union[FeatureView, RequestDataSource]]
):
"""
Declare an on-demand feature view

:param features: Output schema with feature names
:param inputs: The inputs passed into the transform.
:param sources: The sources passed into the transform.
:return: An On Demand Feature View.
"""

def decorator(user_function):
on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
inputs=inputs,
sources=sources,
features=features,
udf=user_function,
)
Expand Down