Skip to content

Commit eec5375

Browse files
tokokotqtensor
authored andcommitted
feat: Decouple transformation types from ODFVs (feast-dev#3949)
* decouple transformation from odfvs Signed-off-by: tokoko <togurg14@freeuni.edu.ge> * OnDemandFeatureView: keep udf and udf_string parameters for backwards compatibility Signed-off-by: tokoko <togurg14@freeuni.edu.ge> * fix linting issues Signed-off-by: tokoko <togurg14@freeuni.edu.ge> * remove unused import in registry protos Signed-off-by: tokoko <togurg14@freeuni.edu.ge> --------- Signed-off-by: tokoko <togurg14@freeuni.edu.ge>
1 parent 9621cfd commit eec5375

7 files changed

Lines changed: 145 additions & 46 deletions

File tree

protos/feast/core/OnDemandFeatureView.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ message OnDemandFeatureViewSpec {
4848
// Map of sources for this feature view.
4949
map<string, OnDemandSource> sources = 4;
5050

51-
UserDefinedFunction user_defined_function = 5;
51+
oneof transformation {
52+
UserDefinedFunction user_defined_function = 5;
53+
}
5254

5355
// Description of the on demand feature view.
5456
string description = 6;

protos/feast/registry/RegistryServer.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ syntax = "proto3";
22

33
package feast.registry;
44

5-
import "google/protobuf/timestamp.proto";
65
import "google/protobuf/empty.proto";
76
import "feast/core/Registry.proto";
87
import "feast/core/Entity.proto";

sdk/python/feast/infra/registry/base_registry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:
665665

666666
odfv_dict["spec"]["userDefinedFunction"][
667667
"body"
668-
] = on_demand_feature_view.udf_string
668+
] = on_demand_feature_view.transformation.udf_string
669669
registry_dict["onDemandFeatureViews"].append(odfv_dict)
670670
for request_feature_view in sorted(
671671
self.list_request_feature_views(project=project),

sdk/python/feast/on_demand_feature_view.py

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from feast.feature_view import FeatureView
1717
from feast.feature_view_projection import FeatureViewProjection
1818
from feast.field import Field, from_value_type
19+
from feast.on_demand_pandas_transformation import OnDemandPandasTransformation
1920
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
2021
OnDemandFeatureView as OnDemandFeatureViewProto,
2122
)
@@ -24,9 +25,6 @@
2425
OnDemandFeatureViewSpec,
2526
OnDemandSource,
2627
)
27-
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
28-
UserDefinedFunction as UserDefinedFunctionProto,
29-
)
3028
from feast.type_map import (
3129
feast_value_type_to_pandas_type,
3230
python_type_to_feast_value_type,
@@ -51,8 +49,7 @@ class OnDemandFeatureView(BaseFeatureView):
5149
sources with type FeatureViewProjection.
5250
source_request_sources: A map from input source names to the actual input
5351
sources with type RequestSource.
54-
udf: The user defined transformation function, which must take pandas dataframes
55-
as inputs.
52+
transformation: The user defined transformation.
5653
description: A human-readable description.
5754
tags: A dictionary of key-value pairs to store arbitrary metadata.
5855
owner: The owner of the on demand feature view, typically the email of the primary
@@ -63,8 +60,7 @@ class OnDemandFeatureView(BaseFeatureView):
6360
features: List[Field]
6461
source_feature_view_projections: Dict[str, FeatureViewProjection]
6562
source_request_sources: Dict[str, RequestSource]
66-
udf: FunctionType
67-
udf_string: str
63+
transformation: Union[OnDemandPandasTransformation]
6864
description: str
6965
tags: Dict[str, str]
7066
owner: str
@@ -82,8 +78,9 @@ def __init__( # noqa: C901
8278
FeatureViewProjection,
8379
]
8480
],
85-
udf: FunctionType,
81+
udf: Optional[FunctionType] = None,
8682
udf_string: str = "",
83+
transformation: Optional[Union[OnDemandPandasTransformation]] = None,
8784
description: str = "",
8885
tags: Optional[Dict[str, str]] = None,
8986
owner: str = "",
@@ -98,9 +95,10 @@ def __init__( # noqa: C901
9895
sources: A map from input source names to the actual input sources, which may be
9996
feature views, or request data sources. These sources serve as inputs to the udf,
10097
which will refer to them by name.
101-
udf: The user defined transformation function, which must take pandas
98+
udf (deprecated): The user defined transformation function, which must take pandas
10299
dataframes as inputs.
103-
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
100+
udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI)
101+
transformation: The user defined transformation.
104102
description (optional): A human-readable description.
105103
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
106104
owner (optional): The owner of the on demand feature view, typically the email
@@ -114,6 +112,18 @@ def __init__( # noqa: C901
114112
owner=owner,
115113
)
116114

115+
if not transformation:
116+
if udf:
117+
warnings.warn(
118+
"udf and udf_string parameters are deprecated. Please use transformation=OnDemandPandasTransformation(udf, udf_string) instead.",
119+
DeprecationWarning,
120+
)
121+
transformation = OnDemandPandasTransformation(udf, udf_string)
122+
else:
123+
raise Exception(
124+
"OnDemandFeatureView needs to be initialized with either transformation or udf arguments"
125+
)
126+
117127
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
118128
self.source_request_sources: Dict[str, RequestSource] = {}
119129
for odfv_source in sources:
@@ -126,8 +136,7 @@ def __init__( # noqa: C901
126136
odfv_source.name
127137
] = odfv_source.projection
128138

129-
self.udf = udf # type: ignore
130-
self.udf_string = udf_string
139+
self.transformation = transformation
131140

132141
@property
133142
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
@@ -139,8 +148,7 @@ def __copy__(self):
139148
schema=self.features,
140149
sources=list(self.source_feature_view_projections.values())
141150
+ list(self.source_request_sources.values()),
142-
udf=self.udf,
143-
udf_string=self.udf_string,
151+
transformation=self.transformation,
144152
description=self.description,
145153
tags=self.tags,
146154
owner=self.owner,
@@ -161,8 +169,7 @@ def __eq__(self, other):
161169
self.source_feature_view_projections
162170
!= other.source_feature_view_projections
163171
or self.source_request_sources != other.source_request_sources
164-
or self.udf_string != other.udf_string
165-
or self.udf.__code__.co_code != other.udf.__code__.co_code
172+
or self.transformation != other.transformation
166173
):
167174
return False
168175

@@ -200,11 +207,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:
200207
name=self.name,
201208
features=[feature.to_proto() for feature in self.features],
202209
sources=sources,
203-
user_defined_function=UserDefinedFunctionProto(
204-
name=self.udf.__name__,
205-
body=dill.dumps(self.udf, recurse=True),
206-
body_text=self.udf_string,
207-
),
210+
user_defined_function=self.transformation.to_proto()
211+
if type(self.transformation) == OnDemandPandasTransformation
212+
else None,
208213
description=self.description,
209214
tags=self.tags,
210215
owner=self.owner,
@@ -243,6 +248,16 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
243248
RequestSource.from_proto(on_demand_source.request_data_source)
244249
)
245250

251+
if (
252+
on_demand_feature_view_proto.spec.WhichOneof("transformation")
253+
== "user_defined_function"
254+
):
255+
transformation = OnDemandPandasTransformation.from_proto(
256+
on_demand_feature_view_proto.spec.user_defined_function
257+
)
258+
else:
259+
raise Exception("At least one transformation type needs to be provided")
260+
246261
on_demand_feature_view_obj = cls(
247262
name=on_demand_feature_view_proto.spec.name,
248263
schema=[
@@ -253,10 +268,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
253268
for feature in on_demand_feature_view_proto.spec.features
254269
],
255270
sources=sources,
256-
udf=dill.loads(
257-
on_demand_feature_view_proto.spec.user_defined_function.body
258-
),
259-
udf_string=on_demand_feature_view_proto.spec.user_defined_function.body_text,
271+
transformation=transformation,
260272
description=on_demand_feature_view_proto.spec.description,
261273
tags=dict(on_demand_feature_view_proto.spec.tags),
262274
owner=on_demand_feature_view_proto.spec.owner,
@@ -315,7 +327,8 @@ def get_transformed_features_df(
315327
columns_to_cleanup.append(full_feature_ref)
316328

317329
# Compute transformed values and apply to each result row
318-
df_with_transformed_features = self.udf.__call__(df_with_features)
330+
331+
df_with_transformed_features = self.transformation.transform(df_with_features)
319332

320333
# Work out whether the correct columns names are used.
321334
rename_columns: Dict[str, str] = {}
@@ -335,7 +348,7 @@ def get_transformed_features_df(
335348
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
336349
return df_with_transformed_features.rename(columns=rename_columns)
337350

338-
def infer_features(self):
351+
def infer_features(self) -> None:
339352
"""
340353
Infers the set of features associated to this feature view from the input source.
341354
@@ -365,7 +378,7 @@ def infer_features(self):
365378
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
366379
sample_val = rand_df_value[dtype] if dtype in rand_df_value else None
367380
df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype)
368-
output_df: pd.DataFrame = self.udf.__call__(df)
381+
output_df: pd.DataFrame = self.transformation.transform(df)
369382
inferred_features = []
370383
for f, dt in zip(output_df.columns, output_df.dtypes):
371384
inferred_features.append(
@@ -396,7 +409,9 @@ def infer_features(self):
396409
)
397410

398411
@staticmethod
399-
def get_requested_odfvs(feature_refs, project, registry):
412+
def get_requested_odfvs(
413+
feature_refs, project, registry
414+
) -> List["OnDemandFeatureView"]:
400415
all_on_demand_feature_views = registry.list_on_demand_feature_views(
401416
project, allow_cache=True
402417
)
@@ -438,7 +453,7 @@ def on_demand_feature_view(
438453
of the primary maintainer.
439454
"""
440455

441-
def mainify(obj):
456+
def mainify(obj) -> None:
442457
# Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same
443458
# name as the original file defining the ODFV.
444459
if obj.__module__ != "__main__":
@@ -447,15 +462,17 @@ def mainify(obj):
447462
def decorator(user_function):
448463
udf_string = dill.source.getsource(user_function)
449464
mainify(user_function)
465+
466+
transformation = OnDemandPandasTransformation(user_function, udf_string)
467+
450468
on_demand_feature_view_obj = OnDemandFeatureView(
451469
name=user_function.__name__,
452470
sources=sources,
453471
schema=schema,
454-
udf=user_function,
472+
transformation=transformation,
455473
description=description,
456474
tags=tags,
457475
owner=owner,
458-
udf_string=udf_string,
459476
)
460477
functools.update_wrapper(
461478
wrapper=on_demand_feature_view_obj, wrapped=user_function
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from types import FunctionType
2+
3+
import dill
4+
import pandas as pd
5+
6+
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
7+
UserDefinedFunction as UserDefinedFunctionProto,
8+
)
9+
10+
11+
class OnDemandPandasTransformation:
12+
def __init__(self, udf: FunctionType, udf_string: str = ""):
13+
"""
14+
Creates an OnDemandPandasTransformation object.
15+
16+
Args:
17+
udf: The user defined transformation function, which must take pandas
18+
dataframes as inputs.
19+
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
20+
"""
21+
self.udf = udf
22+
self.udf_string = udf_string
23+
24+
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
25+
return self.udf.__call__(df)
26+
27+
def __eq__(self, other):
28+
if not isinstance(other, OnDemandPandasTransformation):
29+
raise TypeError(
30+
"Comparisons should only involve OnDemandPandasTransformation class objects."
31+
)
32+
33+
if not super().__eq__(other):
34+
return False
35+
36+
if (
37+
self.udf_string != other.udf_string
38+
or self.udf.__code__.co_code != other.udf.__code__.co_code
39+
):
40+
return False
41+
42+
return True
43+
44+
def to_proto(self) -> UserDefinedFunctionProto:
45+
return UserDefinedFunctionProto(
46+
name=self.udf.__name__,
47+
body=dill.dumps(self.udf, recurse=True),
48+
body_text=self.udf_string,
49+
)
50+
51+
@classmethod
52+
def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto):
53+
return OnDemandPandasTransformation(
54+
udf=dill.loads(user_defined_function_proto.body),
55+
udf_string=user_defined_function_proto.body_text,
56+
)

sdk/python/tests/integration/feature_repos/universal/feature_views.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from feast.data_source import DataSource, RequestSource
1717
from feast.feature_view_projection import FeatureViewProjection
18+
from feast.on_demand_feature_view import OnDemandPandasTransformation
1819
from feast.types import Array, FeastType, Float32, Float64, Int32, Int64
1920
from tests.integration.feature_repos.universal.entities import (
2021
customer,
@@ -70,8 +71,9 @@ def conv_rate_plus_100_feature_view(
7071
name=conv_rate_plus_100.__name__,
7172
schema=[] if infer_features else _features,
7273
sources=sources,
73-
udf=conv_rate_plus_100,
74-
udf_string="raw udf source",
74+
transformation=OnDemandPandasTransformation(
75+
udf=conv_rate_plus_100, udf_string="raw udf source"
76+
),
7577
)
7678

7779

@@ -108,8 +110,9 @@ def similarity_feature_view(
108110
name=similarity.__name__,
109111
sources=sources,
110112
schema=[] if infer_features else _fields,
111-
udf=similarity,
112-
udf_string="similarity raw udf",
113+
transformation=OnDemandPandasTransformation(
114+
udf=similarity, udf_string="similarity raw udf"
115+
),
113116
)
114117

115118

0 commit comments

Comments
 (0)