Skip to content

Commit 9dc9e60

Browse files
authored
Initial scaffolding for on demand feature view (#1803)
* Initial scaffolding for on demand feature view, with initial support for transforms on online fetches Signed-off-by: Danny Chiao <danny@tecton.ai> * Fixing comments Signed-off-by: Danny Chiao <danny@tecton.ai> * Comments Signed-off-by: Danny Chiao <danny@tecton.ai> * Added basic test Signed-off-by: Danny Chiao <danny@tecton.ai> * Simplifying function serialization Signed-off-by: Danny Chiao <danny@tecton.ai> * Refactor logic into odfv Signed-off-by: Danny Chiao <danny@tecton.ai>
1 parent 021daf0 commit 9dc9e60

File tree

12 files changed

+434
-11
lines changed

12 files changed

+434
-11
lines changed

protos/feast/core/FeatureView.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ message FeatureView {
3535
FeatureViewMeta meta = 2;
3636
}
3737

38+
// TODO(adchia): refactor common fields from this and ODFV into separate metadata proto
3839
message FeatureViewSpec {
3940
// Name of the feature view. Must be unique. Not updated.
4041
string name = 1;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
//
2+
// Copyright 2020 The Feast Authors
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
18+
syntax = "proto3";
19+
package feast.core;
20+
21+
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
22+
option java_outer_classname = "OnDemandFeatureViewProto";
23+
option java_package = "feast.proto.core";
24+
25+
import "feast/core/FeatureView.proto";
26+
import "feast/core/Feature.proto";
27+
28+
message OnDemandFeatureView {
29+
// User-specified specifications of this feature view.
30+
OnDemandFeatureViewSpec spec = 1;
31+
}
32+
33+
message OnDemandFeatureViewSpec {
34+
// Name of the feature view. Must be unique. Not updated.
35+
string name = 1;
36+
37+
// Name of Feast project that this feature view belongs to.
38+
string project = 2;
39+
40+
// List of features specifications for each feature defined with this feature view.
41+
repeated FeatureSpecV2 features = 3;
42+
43+
// List of features specifications for each feature defined with this feature view.
44+
// TODO(adchia): add support for request data
45+
map<string, FeatureView> inputs = 4;
46+
47+
UserDefinedFunction user_defined_function = 5;
48+
}
49+
50+
// Serialized representation of python function.
51+
message UserDefinedFunction {
52+
// The function name
53+
string name = 1;
54+
55+
// The python-syntax function body (serialized by dill)
56+
bytes body = 2;
57+
}

protos/feast/core/Registry.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ import "feast/core/Entity.proto";
2525
import "feast/core/FeatureService.proto";
2626
import "feast/core/FeatureTable.proto";
2727
import "feast/core/FeatureView.proto";
28+
import "feast/core/OnDemandFeatureView.proto";
2829
import "google/protobuf/timestamp.proto";
2930

3031
message Registry {
3132
repeated Entity entities = 1;
3233
repeated FeatureTable feature_tables = 2;
3334
repeated FeatureView feature_views = 6;
35+
repeated OnDemandFeatureView on_demand_feature_views = 8;
3436
repeated FeatureService feature_services = 7;
3537

3638
string registry_schema_version = 3; // to support migrations; incremented when schema is changed

sdk/python/feast/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .feature_store import FeatureStore
1414
from .feature_table import FeatureTable
1515
from .feature_view import FeatureView
16+
from .on_demand_feature_view import OnDemandFeatureView
1617
from .repo_config import RepoConfig
1718
from .value_type import ValueType
1819

@@ -37,6 +38,7 @@
3738
"FeatureStore",
3839
"FeatureTable",
3940
"FeatureView",
41+
"OnDemandFeatureView",
4042
"RepoConfig",
4143
"SourceType",
4244
"ValueType",

sdk/python/feast/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,10 @@ def __init__(self, entity_type: type):
200200
f"The entity dataframe you have provided must be a Pandas DataFrame or a SQL query, "
201201
f"but we found: {entity_type} "
202202
)
203+
204+
205+
class ConflictingFeatureViewNames(Exception):
206+
def __init__(self, feature_view_name: str):
207+
super().__init__(
208+
f"The feature view name: {feature_view_name} refers to both an on-demand feature view and a feature view"
209+
)

sdk/python/feast/feature_store.py

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
update_entities_with_inferred_types_from_feature_views,
3838
)
3939
from feast.infra.provider import Provider, RetrievalJob, get_provider
40+
from feast.on_demand_feature_view import OnDemandFeatureView
4041
from feast.online_response import OnlineResponse, _infer_online_entity_rows
4142
from feast.protos.feast.serving.ServingService_pb2 import (
4243
GetOnlineFeaturesRequestV2,
@@ -45,6 +46,7 @@
4546
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
4647
from feast.registry import Registry
4748
from feast.repo_config import RepoConfig, load_repo_config
49+
from feast.type_map import python_value_to_proto_value
4850
from feast.usage import log_exceptions, log_exceptions_and_usage
4951
from feast.version import get_version
5052

@@ -267,8 +269,9 @@ def apply(
267269
objects: Union[
268270
Entity,
269271
FeatureView,
272+
OnDemandFeatureView,
270273
FeatureService,
271-
List[Union[FeatureView, Entity, FeatureService]],
274+
List[Union[FeatureView, OnDemandFeatureView, Entity, FeatureService]],
272275
],
273276
commit: bool = True,
274277
):
@@ -314,6 +317,7 @@ def apply(
314317
assert isinstance(objects, list)
315318

316319
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
320+
odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)]
317321
_validate_feature_views(views_to_update)
318322
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
319323
services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)]
@@ -332,11 +336,15 @@ def apply(
332336

333337
if len(views_to_update) + len(entities_to_update) + len(
334338
services_to_update
335-
) != len(objects):
339+
) + len(odfvs_to_update) != len(objects):
336340
raise ValueError("Unknown object type provided as part of apply() call")
337341

338342
for view in views_to_update:
339343
self._registry.apply_feature_view(view, project=self.project, commit=False)
344+
for odfv in odfvs_to_update:
345+
self._registry.apply_on_demand_feature_view(
346+
odfv, project=self.project, commit=False
347+
)
340348
for ent in entities_to_update:
341349
self._registry.apply_entity(ent, project=self.project, commit=False)
342350
for feature_service in services_to_update:
@@ -717,7 +725,6 @@ def get_online_features(
717725
all_feature_views = self._registry.list_feature_views(
718726
project=self.project, allow_cache=True
719727
)
720-
721728
_validate_feature_refs(_feature_refs, full_feature_names)
722729
grouped_refs = _group_feature_refs(_feature_refs, all_feature_views)
723730
for table, requested_features in grouped_refs:
@@ -759,6 +766,47 @@ def get_online_features(
759766
feature_ref
760767
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
761768

769+
initial_response = OnlineResponse(
770+
GetOnlineFeaturesResponse(field_values=result_rows)
771+
)
772+
return self._augment_response_with_on_demand_transforms(
773+
_feature_refs, full_feature_names, initial_response, result_rows
774+
)
775+
776+
def _augment_response_with_on_demand_transforms(
777+
self,
778+
feature_refs: List[str],
779+
full_feature_names: bool,
780+
initial_response: OnlineResponse,
781+
result_rows: List[GetOnlineFeaturesResponse.FieldValues],
782+
) -> OnlineResponse:
783+
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
784+
project=self.project, allow_cache=True
785+
)
786+
if len(all_on_demand_feature_views) == 0:
787+
return initial_response
788+
initial_response_df = initial_response.to_df()
789+
# Apply on demand transformations
790+
for odfv in all_on_demand_feature_views:
791+
feature_ref = odfv.name
792+
if feature_ref in feature_refs:
793+
transformed_features_df = odfv.get_transformed_features_df(
794+
full_feature_names, initial_response_df
795+
)
796+
for row_idx in range(len(result_rows)):
797+
result_row = result_rows[row_idx]
798+
# TODO(adchia): support multiple output features in an ODFV, which requires different naming
799+
# conventions
800+
result_row.fields[odfv.name].CopyFrom(
801+
python_value_to_proto_value(
802+
transformed_features_df[odfv.features[0].name].values[
803+
row_idx
804+
]
805+
)
806+
)
807+
result_row.statuses[
808+
feature_ref
809+
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
762810
return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))
763811

764812
@log_exceptions_and_usage
@@ -791,7 +839,9 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F
791839
ref for ref, occurrences in Counter(feature_refs).items() if occurrences > 1
792840
]
793841
else:
794-
feature_names = [ref.split(":")[1] for ref in feature_refs]
842+
feature_names = [
843+
ref.split(":")[1] if ":" in ref else ref for ref in feature_refs
844+
]
795845
collided_feature_names = [
796846
ref
797847
for ref, occurrences in Counter(feature_names).items()
@@ -820,6 +870,9 @@ def _group_feature_refs(
820870

821871
if isinstance(features, list) and isinstance(features[0], str):
822872
for ref in features:
873+
if ":" not in ref:
874+
# This is an on demand feature view ref
875+
continue
823876
view_name, feat_name = ref.split(":")
824877
if view_name not in view_index:
825878
raise FeatureViewNotFoundException(view_name)
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import functools
2+
from types import MethodType
3+
from typing import Dict, List
4+
5+
import dill
6+
import pandas as pd
7+
8+
from feast.feature import Feature
9+
from feast.feature_view import FeatureView
10+
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
11+
OnDemandFeatureView as OnDemandFeatureViewProto,
12+
)
13+
from feast.protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec
14+
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
15+
UserDefinedFunction as UserDefinedFunctionProto,
16+
)
17+
from feast.usage import log_exceptions
18+
from feast.value_type import ValueType
19+
20+
21+
class OnDemandFeatureView:
22+
"""
23+
An OnDemandFeatureView defines on demand transformations on existing feature view values and request data.
24+
25+
Args:
26+
name: Name of the group of features.
27+
features: Output schema of transformation with feature names
28+
inputs: The input feature views passed into the transform.
29+
udf: User defined transformation function that takes as input pandas dataframes
30+
"""
31+
32+
name: str
33+
features: List[Feature]
34+
inputs: Dict[str, FeatureView]
35+
udf: MethodType
36+
37+
@log_exceptions
38+
def __init__(
39+
self,
40+
name: str,
41+
features: List[Feature],
42+
inputs: Dict[str, FeatureView],
43+
udf: MethodType,
44+
):
45+
"""
46+
Creates an OnDemandFeatureView object.
47+
"""
48+
49+
self.name = name
50+
self.features = features
51+
self.inputs = inputs
52+
self.udf = udf
53+
54+
def to_proto(self) -> OnDemandFeatureViewProto:
55+
"""
56+
Converts an on demand feature view object to its protobuf representation.
57+
58+
Returns:
59+
A OnDemandFeatureViewProto protobuf.
60+
"""
61+
spec = OnDemandFeatureViewSpec(
62+
name=self.name,
63+
features=[feature.to_proto() for feature in self.features],
64+
inputs={k: fv.to_proto() for k, fv in self.inputs.items()},
65+
user_defined_function=UserDefinedFunctionProto(
66+
name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True),
67+
),
68+
)
69+
70+
return OnDemandFeatureViewProto(spec=spec)
71+
72+
@classmethod
73+
def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
74+
"""
75+
Creates an on demand feature view from a protobuf representation.
76+
77+
Args:
78+
on_demand_feature_view_proto: A protobuf representation of an on-demand feature view.
79+
80+
Returns:
81+
A OnDemandFeatureView object based on the on-demand feature view protobuf.
82+
"""
83+
on_demand_feature_view_obj = cls(
84+
name=on_demand_feature_view_proto.spec.name,
85+
features=[
86+
Feature(
87+
name=feature.name,
88+
dtype=ValueType(feature.value_type),
89+
labels=dict(feature.labels),
90+
)
91+
for feature in on_demand_feature_view_proto.spec.features
92+
],
93+
inputs={
94+
feature_view_name: FeatureView.from_proto(feature_view_proto)
95+
for feature_view_name, feature_view_proto in on_demand_feature_view_proto.spec.inputs.items()
96+
},
97+
udf=dill.loads(
98+
on_demand_feature_view_proto.spec.user_defined_function.body
99+
),
100+
)
101+
102+
return on_demand_feature_view_obj
103+
104+
def get_transformed_features_df(
105+
self, full_feature_names: bool, df_with_features: pd.DataFrame
106+
) -> pd.DataFrame:
107+
# Apply on demand transformations
108+
# TODO(adchia): Include only the feature values from the specified input FVs in the ODFV.
109+
# Copy over un-prefixed features even if not requested since transform may need it
110+
columns_to_cleanup = []
111+
if full_feature_names:
112+
for input_fv in self.inputs.values():
113+
for feature in input_fv.features:
114+
full_feature_ref = f"{input_fv.name}__{feature.name}"
115+
if full_feature_ref in df_with_features.keys():
116+
df_with_features[feature.name] = df_with_features[
117+
full_feature_ref
118+
]
119+
columns_to_cleanup.append(feature.name)
120+
121+
# Compute transformed values and apply to each result row
122+
df_with_transformed_features = self.udf.__call__(df_with_features)
123+
124+
# Cleanup extra columns used for transformation
125+
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
126+
return df_with_transformed_features
127+
128+
129+
def on_demand_feature_view(features: List[Feature], inputs: Dict[str, FeatureView]):
130+
"""
131+
Declare an on-demand feature view
132+
133+
:param features: Output schema with feature names
134+
:param inputs: The inputs passed into the transform.
135+
:return: An On Demand Feature View.
136+
"""
137+
138+
def decorator(user_function):
139+
on_demand_feature_view_obj = OnDemandFeatureView(
140+
name=user_function.__name__,
141+
inputs=inputs,
142+
features=features,
143+
udf=user_function,
144+
)
145+
functools.update_wrapper(
146+
wrapper=on_demand_feature_view_obj, wrapped=user_function
147+
)
148+
return on_demand_feature_view_obj
149+
150+
return decorator

0 commit comments

Comments
 (0)