From 7c6221d6d2084d784e249cf9e8f440caa6a95eba Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 23 Mar 2022 15:50:38 -0700 Subject: [PATCH 1/7] Rename `inputs` parameter to `sources` for odfv Signed-off-by: Felix Wang --- protos/feast/core/OnDemandFeatureView.proto | 8 +- sdk/python/feast/feature_store.py | 4 +- sdk/python/feast/on_demand_feature_view.py | 148 ++++++++++++-------- 3 files changed, 98 insertions(+), 62 deletions(-) diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index b265f73966..33c51f5c4d 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -45,8 +45,8 @@ message OnDemandFeatureViewSpec { // List of features specifications for each feature defined with this feature view. repeated FeatureSpecV2 features = 3; - // Map of inputs for this feature view. - map inputs = 4; + // Map of sources for this feature view. + map sources = 4; UserDefinedFunction user_defined_function = 5; @@ -68,8 +68,8 @@ message OnDemandFeatureViewMeta { google.protobuf.Timestamp last_updated_timestamp = 2; } -message OnDemandInput { - oneof input { +message OnDemandSource { + oneof source { FeatureView feature_view = 1; FeatureViewProjection feature_view_projection = 3; DataSource request_data_source = 2; diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 73c2f14a63..fc1c92654d 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1878,7 +1878,7 @@ def _get_feature_views_to_use( odfv = od_fvs[fv_name].with_projection(copy.copy(projection)) od_fvs_to_use.append(odfv) # Let's make sure to include an FVs which the ODFV requires Features from. - for projection in odfv.input_feature_view_projections.values(): + for projection in odfv.source_feature_view_projections.values(): fv = fvs[projection.name].with_projection(copy.copy(projection)) if fv not in fvs_to_use: fvs_to_use.append(fv) @@ -2005,7 +2005,7 @@ def _group_feature_refs( # Let's also add in any FV Feature dependencies here. for input_fv_projection in on_demand_view_index[ view_name - ].input_feature_view_projections.values(): + ].source_feature_view_projections.values(): for input_feat in input_fv_projection.features: views_features[input_fv_projection.name].add(input_feat.name) elif view_name in request_view_index: diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index f0eaf987ef..f9052f8832 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -1,5 +1,6 @@ import copy import functools +import warnings from types import MethodType from typing import Dict, List, Optional, Type, Union @@ -18,7 +19,7 @@ from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( OnDemandFeatureViewMeta, OnDemandFeatureViewSpec, - OnDemandInput, + OnDemandSource, ) from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( UserDefinedFunction as UserDefinedFunctionProto, @@ -30,17 +31,22 @@ from feast.usage import log_exceptions from feast.value_type import ValueType +warnings.simplefilter("once", DeprecationWarning) + class OnDemandFeatureView(BaseFeatureView): """ - [Experimental] An OnDemandFeatureView defines a logical group of features, along with - transformations to be applied on those features and additional request data. + [Experimental] An OnDemandFeatureView defines a logical group of features that are + generated by applying a transformation on a set of input sources, such as feature + views and request data sources. Attributes: name: The unique name of the on demand feature view. - features: The list of features in the output of the on demand feature view, after - the transformation has been applied. - inputs: The feature views and request data sources passed into the transformation. + features: The list of features in the output of the on demand feature view. + source_feature_view_projections: A map from input source names to actual input + sources with type FeatureViewProjection. + source_request_data_sources: A map from input source names to the actual input + sources with type RequestDataSource. udf: The user defined transformation function, which must take pandas dataframes as inputs. description: A human-readable description. @@ -52,8 +58,8 @@ class OnDemandFeatureView(BaseFeatureView): # TODO(adchia): remove inputs from proto and declaration name: str features: List[Feature] - input_feature_view_projections: Dict[str, FeatureViewProjection] - input_request_data_sources: Dict[str, RequestDataSource] + source_feature_view_projections: Dict[str, FeatureViewProjection] + source_request_data_sources: Dict[str, RequestDataSource] udf: MethodType description: str tags: Dict[str, str] @@ -64,8 +70,13 @@ def __init__( self, name: str, features: List[Feature], - inputs: Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]], udf: MethodType, + inputs: Optional[ + Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]] + ] = None, + sources: Optional[ + Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]] + ] = None, description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -77,24 +88,46 @@ def __init__( name: The unique name of the on demand feature view. features: The list of features in the output of the on demand feature view, after the transformation has been applied. - inputs: The feature views and request data sources passed into the transformation. udf: The user defined transformation function, which must take pandas dataframes as inputs. + inputs (optional): A map from input source names to the actual input sources, + which may be feature views, feature view projections, or request data sources. + These sources serve as inputs to the udf, which will refer to them by name. + sources (optional): A map from input source names to the actual input sources, + which may be feature views, feature view projections, or request data sources. + These sources serve as inputs to the udf, which will refer to them by name. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email of the primary maintainer. """ super().__init__(name, features, description, tags, owner) - self.input_feature_view_projections: Dict[str, FeatureViewProjection] = {} - self.input_request_data_sources: Dict[str, RequestDataSource] = {} - for input_ref, odfv_input in inputs.items(): - if isinstance(odfv_input, RequestDataSource): - self.input_request_data_sources[input_ref] = odfv_input - elif isinstance(odfv_input, FeatureViewProjection): - self.input_feature_view_projections[input_ref] = odfv_input + if inputs and sources: + raise ValueError("At most one of `inputs` or `sources` can be specified.") + elif inputs: + warnings.warn( + ( + "The `inputs` parameter is being deprecated. Please use `sources` instead. " + "Feast 0.21 and onwards will not support the `inputs` parameter." + ), + DeprecationWarning, + ) + sources = inputs + elif not inputs and not sources: + raise ValueError("At least one of `inputs` or `sources` must be specified.") + + assert sources is not None + self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} + self.source_request_data_sources: Dict[str, RequestDataSource] = {} + for source_name, odfv_source in sources.items(): + if isinstance(odfv_source, RequestDataSource): + self.source_request_data_sources[source_name] = odfv_source + elif isinstance(odfv_source, FeatureViewProjection): + self.source_feature_view_projections[source_name] = odfv_source else: - self.input_feature_view_projections[input_ref] = odfv_input.projection + self.source_feature_view_projections[ + source_name + ] = odfv_source.projection self.udf = udf @@ -106,10 +139,14 @@ def __copy__(self): fv = OnDemandFeatureView( name=self.name, features=self.features, - inputs=dict( - **self.input_feature_view_projections, **self.input_request_data_sources + sources=dict( + **self.source_feature_view_projections, + **self.source_request_data_sources, ), udf=self.udf, + description=self.description, + tags=self.tags, + owner=self.owner, ) fv.projection = copy.copy(self.projection) return fv @@ -119,17 +156,13 @@ def __eq__(self, other): return False if ( - not self.input_feature_view_projections - == other.input_feature_view_projections + not self.source_feature_view_projections + == other.source_feature_view_projections + or not self.source_request_data_sources == other.source_request_data_sources + or not self.udf.__code__.co_code == other.udf.__code__.co_code ): return False - if not self.input_request_data_sources == other.input_request_data_sources: - return False - - if not self.udf.__code__.co_code == other.udf.__code__.co_code: - return False - return True def __hash__(self): @@ -147,20 +180,23 @@ def to_proto(self) -> OnDemandFeatureViewProto: meta.created_timestamp.FromDatetime(self.created_timestamp) if self.last_updated_timestamp: meta.last_updated_timestamp.FromDatetime(self.last_updated_timestamp) - inputs = {} - for input_ref, fv_projection in self.input_feature_view_projections.items(): - inputs[input_ref] = OnDemandInput( + sources = {} + for source_name, fv_projection in self.source_feature_view_projections.items(): + sources[source_name] = OnDemandSource( feature_view_projection=fv_projection.to_proto() ) - for input_ref, request_data_source in self.input_request_data_sources.items(): - inputs[input_ref] = OnDemandInput( + for ( + source_name, + request_data_source, + ) in self.source_request_data_sources.items(): + sources[source_name] = OnDemandSource( request_data_source=request_data_source.to_proto() ) spec = OnDemandFeatureViewSpec( name=self.name, features=[feature.to_proto() for feature in self.features], - inputs=inputs, + sources=sources, user_defined_function=UserDefinedFunctionProto( name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True), ), @@ -182,22 +218,22 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): Returns: A OnDemandFeatureView object based on the on-demand feature view protobuf. """ - inputs = {} + sources = {} for ( - input_name, - on_demand_input, - ) in on_demand_feature_view_proto.spec.inputs.items(): - if on_demand_input.WhichOneof("input") == "feature_view": - inputs[input_name] = FeatureView.from_proto( - on_demand_input.feature_view + source_name, + on_demand_source, + ) in on_demand_feature_view_proto.spec.sources.items(): + if on_demand_source.WhichOneof("source") == "feature_view": + sources[source_name] = FeatureView.from_proto( + on_demand_source.feature_view ).projection - elif on_demand_input.WhichOneof("input") == "feature_view_projection": - inputs[input_name] = FeatureViewProjection.from_proto( - on_demand_input.feature_view_projection + elif on_demand_source.WhichOneof("source") == "feature_view_projection": + sources[source_name] = FeatureViewProjection.from_proto( + on_demand_source.feature_view_projection ) else: - inputs[input_name] = RequestDataSource.from_proto( - on_demand_input.request_data_source + sources[source_name] = RequestDataSource.from_proto( + on_demand_source.request_data_source ) on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, @@ -209,7 +245,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): ) for feature in on_demand_feature_view_proto.spec.features ], - inputs=inputs, + sources=sources, udf=dill.loads( on_demand_feature_view_proto.spec.user_defined_function.body ), @@ -237,7 +273,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): def get_request_data_schema(self) -> Dict[str, ValueType]: schema: Dict[str, ValueType] = {} - for request_data_source in self.input_request_data_sources.values(): + for request_data_source in self.source_request_data_sources.values(): schema.update(request_data_source.schema) return schema @@ -246,9 +282,9 @@ def get_transformed_features_df( ) -> pd.DataFrame: # Apply on demand transformations columns_to_cleanup = [] - for input_fv_projection in self.input_feature_view_projections.values(): - for feature in input_fv_projection.features: - full_feature_ref = f"{input_fv_projection.name}__{feature.name}" + for source_fv_projection in self.source_feature_view_projections.values(): + for feature in source_fv_projection.features: + full_feature_ref = f"{source_fv_projection.name}__{feature.name}" if full_feature_ref in df_with_features.keys(): # Make sure the partial feature name is always present df_with_features[feature.name] = df_with_features[full_feature_ref] @@ -287,14 +323,14 @@ def infer_features(self): RegistryInferenceFailure: The set of features could not be inferred. """ df = pd.DataFrame() - for feature_view_projection in self.input_feature_view_projections.values(): + for feature_view_projection in self.source_feature_view_projections.values(): for feature in feature_view_projection.features: dtype = feast_value_type_to_pandas_type(feature.dtype) df[f"{feature_view_projection.name}__{feature.name}"] = pd.Series( dtype=dtype ) df[f"{feature.name}"] = pd.Series(dtype=dtype) - for request_data in self.input_request_data_sources.values(): + for request_data in self.source_request_data_sources.values(): for feature_name, feature_type in request_data.schema.items(): dtype = feast_value_type_to_pandas_type(feature_type) df[f"{feature_name}"] = pd.Series(dtype=dtype) @@ -340,20 +376,20 @@ def get_requested_odfvs(feature_refs, project, registry): def on_demand_feature_view( - features: List[Feature], inputs: Dict[str, Union[FeatureView, RequestDataSource]] + features: List[Feature], sources: Dict[str, Union[FeatureView, RequestDataSource]] ): """ Declare an on-demand feature view :param features: Output schema with feature names - :param inputs: The inputs passed into the transform. + :param sources: The sources passed into the transform. :return: An On Demand Feature View. """ def decorator(user_function): on_demand_feature_view_obj = OnDemandFeatureView( name=user_function.__name__, - inputs=inputs, + sources=sources, features=features, udf=user_function, ) From e47449c870cae347e07f8c06e37e1e4c9ae01d9c Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 25 Mar 2022 14:19:14 -0700 Subject: [PATCH 2/7] Address CR comments Signed-off-by: Felix Wang --- sdk/python/feast/on_demand_feature_view.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index f9052f8832..7d3e62a036 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -70,11 +70,11 @@ def __init__( self, name: str, features: List[Feature], - udf: MethodType, - inputs: Optional[ + sources: Optional[ Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]] ] = None, - sources: Optional[ + udf: Optional[MethodType] = None, + inputs: Optional[ Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]] ] = None, description: str = "", @@ -88,12 +88,12 @@ def __init__( name: The unique name of the on demand feature view. features: The list of features in the output of the on demand feature view, after the transformation has been applied. - udf: The user defined transformation function, which must take pandas dataframes - as inputs. - inputs (optional): A map from input source names to the actual input sources, + sources (optional): A map from input source names to the actual input sources, which may be feature views, feature view projections, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. - sources (optional): A map from input source names to the actual input sources, + udf (optional): The user defined transformation function, which must take pandas + dataframes as inputs. + inputs (optional): A map from input source names to the actual input sources, which may be feature views, feature view projections, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. description (optional): A human-readable description. @@ -103,7 +103,7 @@ def __init__( """ super().__init__(name, features, description, tags, owner) if inputs and sources: - raise ValueError("At most one of `inputs` or `sources` can be specified.") + raise ValueError("At most one of `sources` or `inputs` can be specified.") elif inputs: warnings.warn( ( @@ -129,6 +129,9 @@ def __init__( source_name ] = odfv_source.projection + if udf is None: + raise ValueError("The `udf` parameter must be specified.") + assert udf self.udf = udf @property From d9667871f0f0927c386231a0f952b1f1ded9bc6c Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 25 Mar 2022 14:19:20 -0700 Subject: [PATCH 3/7] Fix Go references to ODFV proto Signed-off-by: Felix Wang --- go/internal/feast/ondemandfeatureview.go | 32 ++++++++++++------------ 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/go/internal/feast/ondemandfeatureview.go b/go/internal/feast/ondemandfeatureview.go index 293365d9da..35155355d8 100644 --- a/go/internal/feast/ondemandfeatureview.go +++ b/go/internal/feast/ondemandfeatureview.go @@ -7,27 +7,27 @@ import ( type OnDemandFeatureView struct { base *BaseFeatureView - inputFeatureViewProjections map[string]*FeatureViewProjection - inputRequestDataSources map[string]*core.DataSource_RequestDataOptions + sourceFeatureViewProjections map[string]*FeatureViewProjection + sourceRequestDataSources map[string]*core.DataSource_RequestDataOptions } func NewOnDemandFeatureViewFromProto(proto *core.OnDemandFeatureView) *OnDemandFeatureView { onDemandFeatureView := &OnDemandFeatureView{base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features), - inputFeatureViewProjections: make(map[string]*FeatureViewProjection), - inputRequestDataSources: make(map[string]*core.DataSource_RequestDataOptions), + sourceFeatureViewProjections: make(map[string]*FeatureViewProjection), + sourceRequestDataSources: make(map[string]*core.DataSource_RequestDataOptions), } - for inputName, onDemandInput := range proto.Spec.Inputs { - if onDemandInputFeatureView, ok := onDemandInput.Input.(*core.OnDemandInput_FeatureView); ok { - featureViewProto := onDemandInputFeatureView.FeatureView + for sourceName, onDemandSource := range proto.Spec.Sources { + if onDemandSourceFeatureView, ok := onDemandSource.Source.(*core.OnDemandSource_FeatureView); ok { + featureViewProto := onDemandSourceFeatureView.FeatureView featureView := NewFeatureViewFromProto(featureViewProto) - onDemandFeatureView.inputFeatureViewProjections[inputName] = featureView.base.projection - } else if onDemandInputFeatureViewProjection, ok := onDemandInput.Input.(*core.OnDemandInput_FeatureViewProjection); ok { - featureProjectionProto := onDemandInputFeatureViewProjection.FeatureViewProjection - onDemandFeatureView.inputFeatureViewProjections[inputName] = NewFeatureViewProjectionFromProto(featureProjectionProto) - } else if onDemandInputRequestFeatureView, ok := onDemandInput.Input.(*core.OnDemandInput_RequestDataSource); ok { - - if dataSourceRequestOptions, ok := onDemandInputRequestFeatureView.RequestDataSource.Options.(*core.DataSource_RequestDataOptions_); ok { - onDemandFeatureView.inputRequestDataSources[inputName] = dataSourceRequestOptions.RequestDataOptions + onDemandFeatureView.sourceFeatureViewProjections[sourceName] = featureView.base.projection + } else if onDemandSourceFeatureViewProjection, ok := onDemandSource.Source.(*core.OnDemandSource_FeatureViewProjection); ok { + featureProjectionProto := onDemandSourceFeatureViewProjection.FeatureViewProjection + onDemandFeatureView.sourceFeatureViewProjections[sourceName] = NewFeatureViewProjectionFromProto(featureProjectionProto) + } else if onDemandSourceRequestFeatureView, ok := onDemandSource.Source.(*core.OnDemandSource_RequestDataSource); ok { + + if dataSourceRequestOptions, ok := onDemandSourceRequestFeatureView.RequestDataSource.Options.(*core.DataSource_RequestDataOptions_); ok { + onDemandFeatureView.sourceRequestDataSources[sourceName] = dataSourceRequestOptions.RequestDataOptions } } } @@ -43,7 +43,7 @@ func (fs *OnDemandFeatureView) NewOnDemandFeatureViewFromBase(base *BaseFeatureV func (fs *OnDemandFeatureView) getRequestDataSchema() map[string]types.ValueType_Enum { schema := make(map[string]types.ValueType_Enum) - for _, requestDataSource := range fs.inputRequestDataSources { + for _, requestDataSource := range fs.sourceRequestDataSources { for fieldName, fieldValueType := range requestDataSource.Schema { schema[fieldName] = fieldValueType } From dbafeddf498bb0a18ee019bc8d0804f3937b5512 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 25 Mar 2022 17:17:51 -0700 Subject: [PATCH 4/7] Fix tests Signed-off-by: Felix Wang --- .../feature_repos/universal/feature_views.py | 8 ++++---- .../tests/integration/registration/test_inference.py | 12 ++++++------ .../tests/integration/registration/test_registry.py | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 1a9f1f1865..cef1e89d24 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -52,7 +52,7 @@ def conv_rate_plus_100(features_df: pd.DataFrame) -> pd.DataFrame: def conv_rate_plus_100_feature_view( - inputs: Dict[str, Union[RequestDataSource, FeatureView]], + sources: Dict[str, Union[RequestDataSource, FeatureView]], infer_features: bool = False, features: Optional[List[Feature]] = None, ) -> OnDemandFeatureView: @@ -63,7 +63,7 @@ def conv_rate_plus_100_feature_view( ] return OnDemandFeatureView( name=conv_rate_plus_100.__name__, - inputs=inputs, + sources=sources, features=[] if infer_features else _features, udf=conv_rate_plus_100, ) @@ -87,7 +87,7 @@ def similarity(features_df: pd.DataFrame) -> pd.DataFrame: def similarity_feature_view( - inputs: Dict[str, Union[RequestDataSource, FeatureView]], + sources: Dict[str, Union[RequestDataSource, FeatureView]], infer_features: bool = False, features: Optional[List[Feature]] = None, ) -> OnDemandFeatureView: @@ -97,7 +97,7 @@ def similarity_feature_view( ] return OnDemandFeatureView( name=similarity.__name__, - inputs=inputs, + sources=sources, features=[] if infer_features else _features, udf=similarity, ) diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index 0ea6276669..54af9f6ff2 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -169,7 +169,7 @@ def test_on_demand_features_type_inference(): ) @on_demand_feature_view( - inputs={"date_request": date_request}, + sources={"date_request": date_request}, features=[ Feature("output", ValueType.UNIX_TIMESTAMP), Feature("string_output", ValueType.STRING), @@ -184,7 +184,7 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame: test_view.infer_features() @on_demand_feature_view( - inputs={"date_request": date_request}, + sources={"date_request": date_request}, features=[ Feature("output", ValueType.UNIX_TIMESTAMP), Feature("object_output", ValueType.STRING), @@ -200,7 +200,7 @@ def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: invalid_test_view.infer_features() @on_demand_feature_view( - inputs={"date_request": date_request}, + sources={"date_request": date_request}, features=[ Feature("output", ValueType.UNIX_TIMESTAMP), Feature("missing", ValueType.STRING), @@ -222,7 +222,7 @@ def test_datasource_inference(): ) @on_demand_feature_view( - inputs={"date_request": date_request}, + sources={"date_request": date_request}, features=[ Feature("output", ValueType.UNIX_TIMESTAMP), Feature("string_output", ValueType.STRING), @@ -237,7 +237,7 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame: test_view.infer_features() @on_demand_feature_view( - inputs={"date_request": date_request}, + sources={"date_request": date_request}, features=[ Feature("output", ValueType.UNIX_TIMESTAMP), Feature("object_output", ValueType.STRING), @@ -253,7 +253,7 @@ def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: invalid_test_view.infer_features() @on_demand_feature_view( - inputs={"date_request": date_request}, + sources={"date_request": date_request}, features=[ Feature("output", ValueType.UNIX_TIMESTAMP), Feature("missing", ValueType.STRING), diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 535497634d..fb68770db8 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -264,7 +264,7 @@ def test_modify_feature_views_success(test_registry): Feature(name="odfv1_my_feature_1", dtype=ValueType.STRING), Feature(name="odfv1_my_feature_2", dtype=ValueType.INT32), ], - inputs={"request_source": request_source}, + sources={"request_source": request_source}, ) def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: data = pd.DataFrame() @@ -284,7 +284,7 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: Feature(name="odfv1_my_feature_1", dtype=ValueType.FLOAT), Feature(name="odfv1_my_feature_2", dtype=ValueType.INT32), ], - inputs={"request_source": request_source}, + sources={"request_source": request_source}, ) def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: data = pd.DataFrame() From 05b393e12443b86e38b0f25f0f60e84d60f3ae3f Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 25 Mar 2022 17:29:41 -0700 Subject: [PATCH 5/7] Fix Java Signed-off-by: Felix Wang --- .../service/OnlineServingServiceV2.java | 22 ++++++------ .../service/OnlineTransformationService.java | 34 +++++++++---------- .../service/TransformationService.java | 4 +-- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/java/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java b/java/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java index f4e330fbf7..5665f1d634 100644 --- a/java/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java +++ b/java/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java @@ -88,16 +88,16 @@ public ServingAPIProto.GetOnlineFeaturesResponse getOnlineFeatures( .collect(Collectors.toList()); // ToDo (pyalex): refactor transformation service to delete unused left part of the returned - // Pair from extractRequestDataFeatureNamesAndOnDemandFeatureInputs. + // Pair from extractRequestDataFeatureNamesAndOnDemandFeatureSources. // Currently, we can retrieve context variables directly from GetOnlineFeaturesRequest. - List onDemandFeatureInputs = + List onDemandFeatureSources = this.onlineTransformationService.extractOnDemandFeaturesDependencies( onDemandFeatureReferences); - // Add on demand feature inputs to list of feature references to retrieve. - for (FeatureReferenceV2 onDemandFeatureInput : onDemandFeatureInputs) { - if (!retrievedFeatureReferences.contains(onDemandFeatureInput)) { - retrievedFeatureReferences.add(onDemandFeatureInput); + // Add on demand feature sources to list of feature references to retrieve. + for (FeatureReferenceV2 onDemandFeatureSource : onDemandFeatureSources) { + if (!retrievedFeatureReferences.contains(onDemandFeatureSource)) { + retrievedFeatureReferences.add(onDemandFeatureSource); } } @@ -194,7 +194,7 @@ public ServingAPIProto.GetOnlineFeaturesResponse getOnlineFeatures( // data. this.populateOnDemandFeatures( onDemandFeatureReferences, - onDemandFeatureInputs, + onDemandFeatureSources, retrievedFeatureReferences, request, features, @@ -257,7 +257,7 @@ private List> getEntityRows( private void populateOnDemandFeatures( List onDemandFeatureReferences, - List onDemandFeatureInputs, + List onDemandFeatureSources, List retrievedFeatureReferences, ServingAPIProto.GetOnlineFeaturesRequest request, List> features, @@ -271,7 +271,7 @@ private void populateOnDemandFeatures( for (int featureIdx = 0; featureIdx < retrievedFeatureReferences.size(); featureIdx++) { FeatureReferenceV2 featureReference = retrievedFeatureReferences.get(featureIdx); - if (!onDemandFeatureInputs.contains(featureReference)) { + if (!onDemandFeatureSources.contains(featureReference)) { continue; } @@ -291,7 +291,7 @@ private void populateOnDemandFeatures( valueList)); } // Serialize the augmented values. - ValueType transformationInput = + ValueType transformationSource = this.onlineTransformationService.serializeValuesIntoArrowIPC(onDemandContext); // Send out requests to the FTS and process the responses. @@ -305,7 +305,7 @@ private void populateOnDemandFeatures( TransformFeaturesRequest transformFeaturesRequest = TransformFeaturesRequest.newBuilder() .setOnDemandFeatureViewName(onDemandFeatureViewName) - .setTransformationInput(transformationInput) + .setTransformationSource(transformationSource) .build(); TransformFeaturesResponse transformFeaturesResponse = diff --git a/java/serving/src/main/java/feast/serving/service/OnlineTransformationService.java b/java/serving/src/main/java/feast/serving/service/OnlineTransformationService.java index d1df763f6e..fce007595e 100644 --- a/java/serving/src/main/java/feast/serving/service/OnlineTransformationService.java +++ b/java/serving/src/main/java/feast/serving/service/OnlineTransformationService.java @@ -83,55 +83,55 @@ public TransformFeaturesResponse transformFeatures( @Override public List extractOnDemandFeaturesDependencies( List onDemandFeatureReferences) { - List onDemandFeatureInputs = new ArrayList<>(); + List onDemandFeatureSources = new ArrayList<>(); for (ServingAPIProto.FeatureReferenceV2 featureReference : onDemandFeatureReferences) { OnDemandFeatureViewProto.OnDemandFeatureViewSpec onDemandFeatureViewSpec = this.registryRepository.getOnDemandFeatureViewSpec(featureReference); - Map inputs = - onDemandFeatureViewSpec.getInputsMap(); + Map sources = + onDemandFeatureViewSpec.getSourcesMap(); - for (OnDemandFeatureViewProto.OnDemandInput input : inputs.values()) { - OnDemandFeatureViewProto.OnDemandInput.InputCase inputCase = input.getInputCase(); - switch (inputCase) { + for (OnDemandFeatureViewProto.OnDemandSource source : sources.values()) { + OnDemandFeatureViewProto.OnDemandSource.SourceCase sourceCase = source.getSourceCase(); + switch (sourceCase) { case REQUEST_DATA_SOURCE: // Do nothing. The value should be provided as dedicated request parameter break; case FEATURE_VIEW_PROJECTION: FeatureReferenceProto.FeatureViewProjection projection = - input.getFeatureViewProjection(); + source.getFeatureViewProjection(); for (FeatureProto.FeatureSpecV2 featureSpec : projection.getFeatureColumnsList()) { String featureName = featureSpec.getName(); - ServingAPIProto.FeatureReferenceV2 onDemandFeatureInput = + ServingAPIProto.FeatureReferenceV2 onDemandFeatureSource = ServingAPIProto.FeatureReferenceV2.newBuilder() .setFeatureViewName(projection.getFeatureViewName()) .setFeatureName(featureName) .build(); - onDemandFeatureInputs.add(onDemandFeatureInput); + onDemandFeatureSources.add(onDemandFeatureSource); } break; case FEATURE_VIEW: - FeatureViewProto.FeatureView featureView = input.getFeatureView(); + FeatureViewProto.FeatureView featureView = source.getFeatureView(); FeatureViewProto.FeatureViewSpec featureViewSpec = featureView.getSpec(); String featureViewName = featureViewSpec.getName(); for (FeatureProto.FeatureSpecV2 featureSpec : featureViewSpec.getFeaturesList()) { String featureName = featureSpec.getName(); - ServingAPIProto.FeatureReferenceV2 onDemandFeatureInput = + ServingAPIProto.FeatureReferenceV2 onDemandFeatureSource = ServingAPIProto.FeatureReferenceV2.newBuilder() .setFeatureViewName(featureViewName) .setFeatureName(featureName) .build(); - onDemandFeatureInputs.add(onDemandFeatureInput); + onDemandFeatureSources.add(onDemandFeatureSource); } break; default: throw Status.INTERNAL .withDescription( - "OnDemandInput proto input field has an unexpected type: " + inputCase) + "OnDemandSource proto source field has an unexpected type: " + sourceCase) .asRuntimeException(); } } } - return onDemandFeatureInputs; + return onDemandFeatureSources; } /** {@inheritDoc} */ @@ -321,8 +321,8 @@ public ValueType serializeValuesIntoArrowIPC(List extractOnDemandFeaturesDependencies( List onDemandFeatureReferences); From 1b9d808f7ef7e48b569d2a143a84879a7c9192d5 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 25 Mar 2022 17:35:00 -0700 Subject: [PATCH 6/7] Fix Java again Signed-off-by: Felix Wang --- .../java/feast/serving/service/OnlineServingServiceV2.java | 4 ++-- .../feast/serving/service/OnlineTransformationService.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/java/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java b/java/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java index 5665f1d634..12e8a5b158 100644 --- a/java/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java +++ b/java/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java @@ -291,7 +291,7 @@ private void populateOnDemandFeatures( valueList)); } // Serialize the augmented values. - ValueType transformationSource = + ValueType transformationInput = this.onlineTransformationService.serializeValuesIntoArrowIPC(onDemandContext); // Send out requests to the FTS and process the responses. @@ -305,7 +305,7 @@ private void populateOnDemandFeatures( TransformFeaturesRequest transformFeaturesRequest = TransformFeaturesRequest.newBuilder() .setOnDemandFeatureViewName(onDemandFeatureViewName) - .setTransformationSource(transformationSource) + .setTransformationInput(transformationInput) .build(); TransformFeaturesResponse transformFeaturesResponse = diff --git a/java/serving/src/main/java/feast/serving/service/OnlineTransformationService.java b/java/serving/src/main/java/feast/serving/service/OnlineTransformationService.java index fce007595e..365432b84e 100644 --- a/java/serving/src/main/java/feast/serving/service/OnlineTransformationService.java +++ b/java/serving/src/main/java/feast/serving/service/OnlineTransformationService.java @@ -322,7 +322,7 @@ public ValueType serializeValuesIntoArrowIPC(List Date: Fri, 25 Mar 2022 17:56:00 -0700 Subject: [PATCH 7/7] Fix Python integration tests Signed-off-by: Felix Wang --- sdk/python/feast/cli.py | 2 +- sdk/python/feast/feature_store.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index d2a71bc561..c00fb8a425 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -334,7 +334,7 @@ def feature_view_list(ctx: click.Context): if isinstance(feature_view, FeatureView): entities.update(feature_view.entities) elif isinstance(feature_view, OnDemandFeatureView): - for backing_fv in feature_view.input_feature_view_projections.values(): + for backing_fv in feature_view.source_feature_view_projections.values(): entities.update(store.get_feature_view(backing_fv.name).entities) table.append( [ diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index fc1c92654d..c34a64a583 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -672,7 +672,7 @@ def apply( data_sources_set_to_update.add(rfv.request_data_source) for odfv in odfvs_to_update: - for v in odfv.input_request_data_sources.values(): + for v in odfv.source_request_data_sources.values(): data_sources_set_to_update.add(v) data_sources_to_update = list(data_sources_set_to_update)