-
Notifications
You must be signed in to change notification settings - Fork 1.3k
chore: Rename inputs parameter to sources for on demand feature views #2442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
7c6221d
Rename `inputs` parameter to `sources` for odfv
felixwang9817 e47449c
Address CR comments
felixwang9817 d966787
Fix Go references to ODFV proto
felixwang9817 dbafedd
Fix tests
felixwang9817 05b393e
Fix Java
felixwang9817 1b9d808
Fix Java again
felixwang9817 18ca377
Fix Python integration tests
felixwang9817 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next
Next commit
Rename
inputs parameter to sources for odfv
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
- Loading branch information
commit 7c6221d6d2084d784e249cf9e8f440caa6a95eba
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| import copy | ||
| import functools | ||
| import warnings | ||
| from types import MethodType | ||
| from typing import Dict, List, Optional, Type, Union | ||
|
|
||
|
|
@@ -18,7 +19,7 @@ | |
| from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( | ||
| OnDemandFeatureViewMeta, | ||
| OnDemandFeatureViewSpec, | ||
| OnDemandInput, | ||
| OnDemandSource, | ||
| ) | ||
| from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( | ||
| UserDefinedFunction as UserDefinedFunctionProto, | ||
|
|
@@ -30,17 +31,22 @@ | |
| from feast.usage import log_exceptions | ||
| from feast.value_type import ValueType | ||
|
|
||
| warnings.simplefilter("once", DeprecationWarning) | ||
|
|
||
|
|
||
| class OnDemandFeatureView(BaseFeatureView): | ||
| """ | ||
| [Experimental] An OnDemandFeatureView defines a logical group of features, along with | ||
| transformations to be applied on those features and additional request data. | ||
| [Experimental] An OnDemandFeatureView defines a logical group of features that are | ||
| generated by applying a transformation on a set of input sources, such as feature | ||
| views and request data sources. | ||
|
|
||
| Attributes: | ||
| name: The unique name of the on demand feature view. | ||
| features: The list of features in the output of the on demand feature view, after | ||
| the transformation has been applied. | ||
| inputs: The feature views and request data sources passed into the transformation. | ||
| features: The list of features in the output of the on demand feature view. | ||
| source_feature_view_projections: A map from input source names to actual input | ||
| sources with type FeatureViewProjection. | ||
| source_request_data_sources: A map from input source names to the actual input | ||
| sources with type RequestDataSource. | ||
| udf: The user defined transformation function, which must take pandas dataframes | ||
| as inputs. | ||
| description: A human-readable description. | ||
|
|
@@ -52,8 +58,8 @@ class OnDemandFeatureView(BaseFeatureView): | |
| # TODO(adchia): remove inputs from proto and declaration | ||
| name: str | ||
| features: List[Feature] | ||
| input_feature_view_projections: Dict[str, FeatureViewProjection] | ||
| input_request_data_sources: Dict[str, RequestDataSource] | ||
| source_feature_view_projections: Dict[str, FeatureViewProjection] | ||
| source_request_data_sources: Dict[str, RequestDataSource] | ||
| udf: MethodType | ||
| description: str | ||
| tags: Dict[str, str] | ||
|
|
@@ -64,8 +70,13 @@ def __init__( | |
| self, | ||
| name: str, | ||
| features: List[Feature], | ||
| inputs: Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]], | ||
| udf: MethodType, | ||
| inputs: Optional[ | ||
| Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]] | ||
| ] = None, | ||
| sources: Optional[ | ||
| Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]] | ||
| ] = None, | ||
| description: str = "", | ||
| tags: Optional[Dict[str, str]] = None, | ||
| owner: str = "", | ||
|
|
@@ -77,24 +88,46 @@ def __init__( | |
| name: The unique name of the on demand feature view. | ||
| features: The list of features in the output of the on demand feature view, after | ||
| the transformation has been applied. | ||
| inputs: The feature views and request data sources passed into the transformation. | ||
| udf: The user defined transformation function, which must take pandas dataframes | ||
| as inputs. | ||
| inputs (optional): A map from input source names to the actual input sources, | ||
| which may be feature views, feature view projections, or request data sources. | ||
| These sources serve as inputs to the udf, which will refer to them by name. | ||
| sources (optional): A map from input source names to the actual input sources, | ||
| which may be feature views, feature view projections, or request data sources. | ||
| These sources serve as inputs to the udf, which will refer to them by name. | ||
| description (optional): A human-readable description. | ||
| tags (optional): A dictionary of key-value pairs to store arbitrary metadata. | ||
| owner (optional): The owner of the on demand feature view, typically the email | ||
| of the primary maintainer. | ||
| """ | ||
| super().__init__(name, features, description, tags, owner) | ||
| self.input_feature_view_projections: Dict[str, FeatureViewProjection] = {} | ||
| self.input_request_data_sources: Dict[str, RequestDataSource] = {} | ||
| for input_ref, odfv_input in inputs.items(): | ||
| if isinstance(odfv_input, RequestDataSource): | ||
| self.input_request_data_sources[input_ref] = odfv_input | ||
| elif isinstance(odfv_input, FeatureViewProjection): | ||
| self.input_feature_view_projections[input_ref] = odfv_input | ||
| if inputs and sources: | ||
| raise ValueError("At most one of `inputs` or `sources` can be specified.") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, I would say |
||
| elif inputs: | ||
| warnings.warn( | ||
| ( | ||
| "The `inputs` parameter is being deprecated. Please use `sources` instead. " | ||
| "Feast 0.21 and onwards will not support the `inputs` parameter." | ||
| ), | ||
| DeprecationWarning, | ||
| ) | ||
| sources = inputs | ||
| elif not inputs and not sources: | ||
| raise ValueError("At least one of `inputs` or `sources` must be specified.") | ||
|
|
||
| assert sources is not None | ||
| self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} | ||
| self.source_request_data_sources: Dict[str, RequestDataSource] = {} | ||
| for source_name, odfv_source in sources.items(): | ||
| if isinstance(odfv_source, RequestDataSource): | ||
| self.source_request_data_sources[source_name] = odfv_source | ||
| elif isinstance(odfv_source, FeatureViewProjection): | ||
| self.source_feature_view_projections[source_name] = odfv_source | ||
| else: | ||
| self.input_feature_view_projections[input_ref] = odfv_input.projection | ||
| self.source_feature_view_projections[ | ||
| source_name | ||
| ] = odfv_source.projection | ||
|
|
||
| self.udf = udf | ||
|
|
||
|
|
@@ -106,10 +139,14 @@ def __copy__(self): | |
| fv = OnDemandFeatureView( | ||
| name=self.name, | ||
| features=self.features, | ||
| inputs=dict( | ||
| **self.input_feature_view_projections, **self.input_request_data_sources | ||
| sources=dict( | ||
| **self.source_feature_view_projections, | ||
| **self.source_request_data_sources, | ||
| ), | ||
| udf=self.udf, | ||
| description=self.description, | ||
| tags=self.tags, | ||
| owner=self.owner, | ||
| ) | ||
| fv.projection = copy.copy(self.projection) | ||
| return fv | ||
|
|
@@ -119,17 +156,13 @@ def __eq__(self, other): | |
| return False | ||
|
|
||
| if ( | ||
| not self.input_feature_view_projections | ||
| == other.input_feature_view_projections | ||
| not self.source_feature_view_projections | ||
| == other.source_feature_view_projections | ||
| or not self.source_request_data_sources == other.source_request_data_sources | ||
| or not self.udf.__code__.co_code == other.udf.__code__.co_code | ||
| ): | ||
| return False | ||
|
|
||
| if not self.input_request_data_sources == other.input_request_data_sources: | ||
| return False | ||
|
|
||
| if not self.udf.__code__.co_code == other.udf.__code__.co_code: | ||
| return False | ||
|
|
||
| return True | ||
|
|
||
| def __hash__(self): | ||
|
|
@@ -147,20 +180,23 @@ def to_proto(self) -> OnDemandFeatureViewProto: | |
| meta.created_timestamp.FromDatetime(self.created_timestamp) | ||
| if self.last_updated_timestamp: | ||
| meta.last_updated_timestamp.FromDatetime(self.last_updated_timestamp) | ||
| inputs = {} | ||
| for input_ref, fv_projection in self.input_feature_view_projections.items(): | ||
| inputs[input_ref] = OnDemandInput( | ||
| sources = {} | ||
| for source_name, fv_projection in self.source_feature_view_projections.items(): | ||
| sources[source_name] = OnDemandSource( | ||
| feature_view_projection=fv_projection.to_proto() | ||
| ) | ||
| for input_ref, request_data_source in self.input_request_data_sources.items(): | ||
| inputs[input_ref] = OnDemandInput( | ||
| for ( | ||
| source_name, | ||
| request_data_source, | ||
| ) in self.source_request_data_sources.items(): | ||
| sources[source_name] = OnDemandSource( | ||
| request_data_source=request_data_source.to_proto() | ||
| ) | ||
|
|
||
| spec = OnDemandFeatureViewSpec( | ||
| name=self.name, | ||
| features=[feature.to_proto() for feature in self.features], | ||
| inputs=inputs, | ||
| sources=sources, | ||
| user_defined_function=UserDefinedFunctionProto( | ||
| name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True), | ||
| ), | ||
|
|
@@ -182,22 +218,22 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): | |
| Returns: | ||
| A OnDemandFeatureView object based on the on-demand feature view protobuf. | ||
| """ | ||
| inputs = {} | ||
| sources = {} | ||
| for ( | ||
| input_name, | ||
| on_demand_input, | ||
| ) in on_demand_feature_view_proto.spec.inputs.items(): | ||
| if on_demand_input.WhichOneof("input") == "feature_view": | ||
| inputs[input_name] = FeatureView.from_proto( | ||
| on_demand_input.feature_view | ||
| source_name, | ||
| on_demand_source, | ||
| ) in on_demand_feature_view_proto.spec.sources.items(): | ||
| if on_demand_source.WhichOneof("source") == "feature_view": | ||
| sources[source_name] = FeatureView.from_proto( | ||
| on_demand_source.feature_view | ||
| ).projection | ||
| elif on_demand_input.WhichOneof("input") == "feature_view_projection": | ||
| inputs[input_name] = FeatureViewProjection.from_proto( | ||
| on_demand_input.feature_view_projection | ||
| elif on_demand_source.WhichOneof("source") == "feature_view_projection": | ||
| sources[source_name] = FeatureViewProjection.from_proto( | ||
| on_demand_source.feature_view_projection | ||
| ) | ||
| else: | ||
| inputs[input_name] = RequestDataSource.from_proto( | ||
| on_demand_input.request_data_source | ||
| sources[source_name] = RequestDataSource.from_proto( | ||
| on_demand_source.request_data_source | ||
| ) | ||
| on_demand_feature_view_obj = cls( | ||
| name=on_demand_feature_view_proto.spec.name, | ||
|
|
@@ -209,7 +245,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): | |
| ) | ||
| for feature in on_demand_feature_view_proto.spec.features | ||
| ], | ||
| inputs=inputs, | ||
| sources=sources, | ||
| udf=dill.loads( | ||
| on_demand_feature_view_proto.spec.user_defined_function.body | ||
| ), | ||
|
|
@@ -237,7 +273,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): | |
|
|
||
| def get_request_data_schema(self) -> Dict[str, ValueType]: | ||
| schema: Dict[str, ValueType] = {} | ||
| for request_data_source in self.input_request_data_sources.values(): | ||
| for request_data_source in self.source_request_data_sources.values(): | ||
| schema.update(request_data_source.schema) | ||
| return schema | ||
|
|
||
|
|
@@ -246,9 +282,9 @@ def get_transformed_features_df( | |
| ) -> pd.DataFrame: | ||
| # Apply on demand transformations | ||
| columns_to_cleanup = [] | ||
| for input_fv_projection in self.input_feature_view_projections.values(): | ||
| for feature in input_fv_projection.features: | ||
| full_feature_ref = f"{input_fv_projection.name}__{feature.name}" | ||
| for source_fv_projection in self.source_feature_view_projections.values(): | ||
| for feature in source_fv_projection.features: | ||
| full_feature_ref = f"{source_fv_projection.name}__{feature.name}" | ||
| if full_feature_ref in df_with_features.keys(): | ||
| # Make sure the partial feature name is always present | ||
| df_with_features[feature.name] = df_with_features[full_feature_ref] | ||
|
|
@@ -287,14 +323,14 @@ def infer_features(self): | |
| RegistryInferenceFailure: The set of features could not be inferred. | ||
| """ | ||
| df = pd.DataFrame() | ||
| for feature_view_projection in self.input_feature_view_projections.values(): | ||
| for feature_view_projection in self.source_feature_view_projections.values(): | ||
| for feature in feature_view_projection.features: | ||
| dtype = feast_value_type_to_pandas_type(feature.dtype) | ||
| df[f"{feature_view_projection.name}__{feature.name}"] = pd.Series( | ||
| dtype=dtype | ||
| ) | ||
| df[f"{feature.name}"] = pd.Series(dtype=dtype) | ||
| for request_data in self.input_request_data_sources.values(): | ||
| for request_data in self.source_request_data_sources.values(): | ||
| for feature_name, feature_type in request_data.schema.items(): | ||
| dtype = feast_value_type_to_pandas_type(feature_type) | ||
| df[f"{feature_name}"] = pd.Series(dtype=dtype) | ||
|
|
@@ -340,20 +376,20 @@ def get_requested_odfvs(feature_refs, project, registry): | |
|
|
||
|
|
||
| def on_demand_feature_view( | ||
| features: List[Feature], inputs: Dict[str, Union[FeatureView, RequestDataSource]] | ||
| features: List[Feature], sources: Dict[str, Union[FeatureView, RequestDataSource]] | ||
| ): | ||
| """ | ||
| Declare an on-demand feature view | ||
|
|
||
| :param features: Output schema with feature names | ||
| :param inputs: The inputs passed into the transform. | ||
| :param sources: The sources passed into the transform. | ||
| :return: An On Demand Feature View. | ||
| """ | ||
|
|
||
| def decorator(user_function): | ||
| on_demand_feature_view_obj = OnDemandFeatureView( | ||
| name=user_function.__name__, | ||
| inputs=inputs, | ||
| sources=sources, | ||
| features=features, | ||
| udf=user_function, | ||
| ) | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think making this a
kwargfrom anargis a backwards incompatibile change. e.g.will no longer be valid. Are you thinking that this is okay because it's experimental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reordering parameters so that this doesn't break