1616from feast .feature_view import FeatureView
1717from feast .feature_view_projection import FeatureViewProjection
1818from feast .field import Field , from_value_type
19+ from feast .on_demand_pandas_transformation import OnDemandPandasTransformation
1920from feast .protos .feast .core .OnDemandFeatureView_pb2 import (
2021 OnDemandFeatureView as OnDemandFeatureViewProto ,
2122)
2425 OnDemandFeatureViewSpec ,
2526 OnDemandSource ,
2627)
27- from feast .protos .feast .core .OnDemandFeatureView_pb2 import (
28- UserDefinedFunction as UserDefinedFunctionProto ,
29- )
3028from feast .type_map import (
3129 feast_value_type_to_pandas_type ,
3230 python_type_to_feast_value_type ,
@@ -51,8 +49,7 @@ class OnDemandFeatureView(BaseFeatureView):
5149 sources with type FeatureViewProjection.
5250 source_request_sources: A map from input source names to the actual input
5351 sources with type RequestSource.
54- udf: The user defined transformation function, which must take pandas dataframes
55- as inputs.
52+ transformation: The user defined transformation.
5653 description: A human-readable description.
5754 tags: A dictionary of key-value pairs to store arbitrary metadata.
5855 owner: The owner of the on demand feature view, typically the email of the primary
@@ -63,8 +60,7 @@ class OnDemandFeatureView(BaseFeatureView):
6360 features : List [Field ]
6461 source_feature_view_projections : Dict [str , FeatureViewProjection ]
6562 source_request_sources : Dict [str , RequestSource ]
66- udf : FunctionType
67- udf_string : str
63+ transformation : Union [OnDemandPandasTransformation ]
6864 description : str
6965 tags : Dict [str , str ]
7066 owner : str
@@ -82,8 +78,9 @@ def __init__( # noqa: C901
8278 FeatureViewProjection ,
8379 ]
8480 ],
85- udf : FunctionType ,
81+ udf : Optional [ FunctionType ] = None ,
8682 udf_string : str = "" ,
83+ transformation : Optional [Union [OnDemandPandasTransformation ]] = None ,
8784 description : str = "" ,
8885 tags : Optional [Dict [str , str ]] = None ,
8986 owner : str = "" ,
@@ -98,9 +95,10 @@ def __init__( # noqa: C901
9895 sources: A map from input source names to the actual input sources, which may be
9996 feature views, or request data sources. These sources serve as inputs to the udf,
10097 which will refer to them by name.
101- udf: The user defined transformation function, which must take pandas
98+ udf (deprecated) : The user defined transformation function, which must take pandas
10299 dataframes as inputs.
103- udf_string: The source code version of the udf (for diffing and displaying in Web UI)
100+ udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI)
101+ transformation: The user defined transformation.
104102 description (optional): A human-readable description.
105103 tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
106104 owner (optional): The owner of the on demand feature view, typically the email
@@ -114,6 +112,18 @@ def __init__( # noqa: C901
114112 owner = owner ,
115113 )
116114
115+ if not transformation :
116+ if udf :
117+ warnings .warn (
118+ "udf and udf_string parameters are deprecated. Please use transformation=OnDemandPandasTransformation(udf, udf_string) instead." ,
119+ DeprecationWarning ,
120+ )
121+ transformation = OnDemandPandasTransformation (udf , udf_string )
122+ else :
123+ raise Exception (
124+ "OnDemandFeatureView needs to be initialized with either transformation or udf arguments"
125+ )
126+
117127 self .source_feature_view_projections : Dict [str , FeatureViewProjection ] = {}
118128 self .source_request_sources : Dict [str , RequestSource ] = {}
119129 for odfv_source in sources :
@@ -126,8 +136,7 @@ def __init__( # noqa: C901
126136 odfv_source .name
127137 ] = odfv_source .projection
128138
129- self .udf = udf # type: ignore
130- self .udf_string = udf_string
139+ self .transformation = transformation
131140
132141 @property
133142 def proto_class (self ) -> Type [OnDemandFeatureViewProto ]:
@@ -139,8 +148,7 @@ def __copy__(self):
139148 schema = self .features ,
140149 sources = list (self .source_feature_view_projections .values ())
141150 + list (self .source_request_sources .values ()),
142- udf = self .udf ,
143- udf_string = self .udf_string ,
151+ transformation = self .transformation ,
144152 description = self .description ,
145153 tags = self .tags ,
146154 owner = self .owner ,
@@ -161,8 +169,7 @@ def __eq__(self, other):
161169 self .source_feature_view_projections
162170 != other .source_feature_view_projections
163171 or self .source_request_sources != other .source_request_sources
164- or self .udf_string != other .udf_string
165- or self .udf .__code__ .co_code != other .udf .__code__ .co_code
172+ or self .transformation != other .transformation
166173 ):
167174 return False
168175
@@ -200,11 +207,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:
200207 name = self .name ,
201208 features = [feature .to_proto () for feature in self .features ],
202209 sources = sources ,
203- user_defined_function = UserDefinedFunctionProto (
204- name = self .udf .__name__ ,
205- body = dill .dumps (self .udf , recurse = True ),
206- body_text = self .udf_string ,
207- ),
210+ user_defined_function = self .transformation .to_proto ()
211+ if type (self .transformation ) == OnDemandPandasTransformation
212+ else None ,
208213 description = self .description ,
209214 tags = self .tags ,
210215 owner = self .owner ,
@@ -243,6 +248,16 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
243248 RequestSource .from_proto (on_demand_source .request_data_source )
244249 )
245250
251+ if (
252+ on_demand_feature_view_proto .spec .WhichOneof ("transformation" )
253+ == "user_defined_function"
254+ ):
255+ transformation = OnDemandPandasTransformation .from_proto (
256+ on_demand_feature_view_proto .spec .user_defined_function
257+ )
258+ else :
259+ raise Exception ("At least one transformation type needs to be provided" )
260+
246261 on_demand_feature_view_obj = cls (
247262 name = on_demand_feature_view_proto .spec .name ,
248263 schema = [
@@ -253,10 +268,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
253268 for feature in on_demand_feature_view_proto .spec .features
254269 ],
255270 sources = sources ,
256- udf = dill .loads (
257- on_demand_feature_view_proto .spec .user_defined_function .body
258- ),
259- udf_string = on_demand_feature_view_proto .spec .user_defined_function .body_text ,
271+ transformation = transformation ,
260272 description = on_demand_feature_view_proto .spec .description ,
261273 tags = dict (on_demand_feature_view_proto .spec .tags ),
262274 owner = on_demand_feature_view_proto .spec .owner ,
@@ -315,7 +327,8 @@ def get_transformed_features_df(
315327 columns_to_cleanup .append (full_feature_ref )
316328
317329 # Compute transformed values and apply to each result row
318- df_with_transformed_features = self .udf .__call__ (df_with_features )
330+
331+ df_with_transformed_features = self .transformation .transform (df_with_features )
319332
320333 # Work out whether the correct columns names are used.
321334 rename_columns : Dict [str , str ] = {}
@@ -335,7 +348,7 @@ def get_transformed_features_df(
335348 df_with_features .drop (columns = columns_to_cleanup , inplace = True )
336349 return df_with_transformed_features .rename (columns = rename_columns )
337350
338- def infer_features (self ):
351+ def infer_features (self ) -> None :
339352 """
340353 Infers the set of features associated to this feature view from the input source.
341354
@@ -365,7 +378,7 @@ def infer_features(self):
365378 dtype = feast_value_type_to_pandas_type (field .dtype .to_value_type ())
366379 sample_val = rand_df_value [dtype ] if dtype in rand_df_value else None
367380 df [f"{ field .name } " ] = pd .Series (sample_val , dtype = dtype )
368- output_df : pd .DataFrame = self .udf . __call__ (df )
381+ output_df : pd .DataFrame = self .transformation . transform (df )
369382 inferred_features = []
370383 for f , dt in zip (output_df .columns , output_df .dtypes ):
371384 inferred_features .append (
@@ -396,7 +409,9 @@ def infer_features(self):
396409 )
397410
398411 @staticmethod
399- def get_requested_odfvs (feature_refs , project , registry ):
412+ def get_requested_odfvs (
413+ feature_refs , project , registry
414+ ) -> List ["OnDemandFeatureView" ]:
400415 all_on_demand_feature_views = registry .list_on_demand_feature_views (
401416 project , allow_cache = True
402417 )
@@ -438,7 +453,7 @@ def on_demand_feature_view(
438453 of the primary maintainer.
439454 """
440455
441- def mainify (obj ):
456+ def mainify (obj ) -> None :
442457 # Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same
443458 # name as the original file defining the ODFV.
444459 if obj .__module__ != "__main__" :
@@ -447,15 +462,17 @@ def mainify(obj):
447462 def decorator (user_function ):
448463 udf_string = dill .source .getsource (user_function )
449464 mainify (user_function )
465+
466+ transformation = OnDemandPandasTransformation (user_function , udf_string )
467+
450468 on_demand_feature_view_obj = OnDemandFeatureView (
451469 name = user_function .__name__ ,
452470 sources = sources ,
453471 schema = schema ,
454- udf = user_function ,
472+ transformation = transformation ,
455473 description = description ,
456474 tags = tags ,
457475 owner = owner ,
458- udf_string = udf_string ,
459476 )
460477 functools .update_wrapper (
461478 wrapper = on_demand_feature_view_obj , wrapped = user_function
0 commit comments