11import copy
22import functools
3- import inspect
43import warnings
54from types import FunctionType
6- from typing import Any , List , Optional , Union , get_type_hints
5+ from typing import Any , List , Optional , Union , cast
76
87import dill
9- import pandas as pd
108import pyarrow
119from typeguard import typechecked
1210
3129from feast .protos .feast .core .Transformation_pb2 import (
3230 UserDefinedFunctionV2 as UserDefinedFunctionProto ,
3331)
32+ from feast .transformation .base import Transformation
33+ from feast .transformation .mode import TransformationMode
3434from feast .transformation .pandas_transformation import PandasTransformation
3535from feast .transformation .python_transformation import PythonTransformation
3636from feast .transformation .substrait_transformation import SubstraitTransformation
@@ -66,15 +66,15 @@ class OnDemandFeatureView(BaseFeatureView):
6666 features : List [Field ]
6767 source_feature_view_projections : dict [str , FeatureViewProjection ]
6868 source_request_sources : dict [str , RequestSource ]
69- feature_transformation : Union [
70- PandasTransformation , PythonTransformation , SubstraitTransformation
71- ]
69+ feature_transformation : Transformation
7270 mode : str
7371 description : str
7472 tags : dict [str , str ]
7573 owner : str
7674 write_to_online_store : bool
7775 singleton : bool
76+ udf : Optional [FunctionType ]
77+ udf_string : Optional [str ]
7878
7979 def __init__ ( # noqa: C901
8080 self ,
@@ -90,10 +90,8 @@ def __init__( # noqa: C901
9090 ]
9191 ],
9292 udf : Optional [FunctionType ] = None ,
93- udf_string : str = "" ,
94- feature_transformation : Union [
95- PandasTransformation , PythonTransformation , SubstraitTransformation
96- ],
93+ udf_string : Optional [str ] = "" ,
94+ feature_transformation : Optional [Transformation ] = None ,
9795 mode : str = "pandas" ,
9896 description : str = "" ,
9997 tags : Optional [dict [str , str ]] = None ,
@@ -112,9 +110,9 @@ def __init__( # noqa: C901
112110 sources: A map from input source names to the actual input sources, which may be
113111 feature views, or request data sources. These sources serve as inputs to the udf,
114112 which will refer to them by name.
115- udf (deprecated) : The user defined transformation function, which must take pandas
113+ udf: The user defined transformation function, which must take pandas
116114 dataframes as inputs.
117- udf_string (deprecated) : The source code version of the udf (for diffing and displaying in Web UI)
115+ udf_string: The source code version of the udf (for diffing and displaying in Web UI)
118116 feature_transformation: The user defined transformation.
119117 mode: Mode of execution (e.g., Pandas or Python native)
120118 description (optional): A human-readable description.
@@ -136,29 +134,10 @@ def __init__( # noqa: C901
136134
137135 schema = schema or []
138136 self .entities = [e .name for e in entities ] if entities else [DUMMY_ENTITY_NAME ]
137+ self .sources = sources
139138 self .mode = mode .lower ()
140-
141- if self .mode not in {"python" , "pandas" , "substrait" }:
142- raise ValueError (
143- f"Unknown mode { self .mode } . OnDemandFeatureView only supports python or pandas UDFs and substrait."
144- )
145-
146- if not feature_transformation :
147- if udf :
148- warnings .warn (
149- "udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead." ,
150- DeprecationWarning ,
151- )
152- # Note inspecting the return signature won't work with isinstance so this is the best alternative
153- if self .mode == "pandas" :
154- feature_transformation = PandasTransformation (udf , udf_string )
155- elif self .mode == "python" :
156- feature_transformation = PythonTransformation (udf , udf_string )
157- else :
158- raise ValueError (
159- "OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments"
160- )
161-
139+ self .udf = udf
140+ self .udf_string = udf_string
162141 self .source_feature_view_projections : dict [str , FeatureViewProjection ] = {}
163142 self .source_request_sources : dict [str , RequestSource ] = {}
164143 for odfv_source in sources :
@@ -206,12 +185,33 @@ def __init__( # noqa: C901
206185 features .append (field )
207186
208187 self .features = features
209- self .feature_transformation = feature_transformation
188+ self .feature_transformation = (
189+ feature_transformation or self .get_feature_transformation ()
190+ )
210191 self .write_to_online_store = write_to_online_store
211192 self .singleton = singleton
212193 if self .singleton and self .mode != "python" :
213194 raise ValueError ("Singleton is only supported for Python mode." )
214195
196+ def get_feature_transformation (self ) -> Transformation :
197+ if not self .udf :
198+ raise ValueError (
199+ "Either udf or feature_transformation must be provided to create an OnDemandFeatureView"
200+ )
201+ if self .mode in (
202+ TransformationMode .PANDAS ,
203+ TransformationMode .PYTHON ,
204+ ) or self .mode in ("pandas" , "python" ):
205+ return Transformation (
206+ mode = self .mode , udf = self .udf , udf_string = self .udf_string or ""
207+ )
208+ elif self .mode == TransformationMode .SUBSTRAIT or self .mode == "substrait" :
209+ return SubstraitTransformation .from_ibis (self .udf , self .sources )
210+ else :
211+ raise ValueError (
212+ f"Unsupported transformation mode: { self .mode } for OnDemandFeatureView"
213+ )
214+
215215 @property
216216 def proto_class (self ) -> type [OnDemandFeatureViewProto ]:
217217 return OnDemandFeatureViewProto
@@ -312,16 +312,25 @@ def to_proto(self) -> OnDemandFeatureViewProto:
312312 request_data_source = request_sources .to_proto ()
313313 )
314314
315- feature_transformation = FeatureTransformationProto (
316- user_defined_function = self .feature_transformation .to_proto ()
315+ user_defined_function_proto = cast (
316+ UserDefinedFunctionProto ,
317+ self .feature_transformation .to_proto ()
317318 if isinstance (
318319 self .feature_transformation ,
319320 (PandasTransformation , PythonTransformation ),
320321 )
321322 else None ,
322- substrait_transformation = self .feature_transformation .to_proto ()
323+ )
324+
325+ substrait_transformation_proto = (
326+ self .feature_transformation .to_proto ()
323327 if isinstance (self .feature_transformation , SubstraitTransformation )
324- else None ,
328+ else None
329+ )
330+
331+ feature_transformation = FeatureTransformationProto (
332+ user_defined_function = user_defined_function_proto ,
333+ substrait_transformation = substrait_transformation_proto ,
325334 )
326335 spec = OnDemandFeatureViewSpec (
327336 name = self .name ,
@@ -786,38 +795,22 @@ def mainify(obj) -> None:
786795 obj .__module__ = "__main__"
787796
788797 def decorator (user_function ):
789- return_annotation = get_type_hints (user_function ).get ("return" , inspect ._empty )
790798 udf_string = dill .source .getsource (user_function )
791799 mainify (user_function )
792- if mode == "pandas" :
793- if return_annotation not in (inspect ._empty , pd .DataFrame ):
794- raise TypeError (
795- f"return signature for { user_function } is { return_annotation } but should be pd.DataFrame"
796- )
797- transformation = PandasTransformation (user_function , udf_string )
798- elif mode == "python" :
799- transformation = PythonTransformation (user_function , udf_string )
800- elif mode == "substrait" :
801- from ibis .expr .types .relations import Table
802-
803- if return_annotation not in (inspect ._empty , Table ):
804- raise TypeError (
805- f"return signature for { user_function } is { return_annotation } but should be ibis.expr.types.relations.Table"
806- )
807- transformation = SubstraitTransformation .from_ibis (user_function , sources )
808800
809801 on_demand_feature_view_obj = OnDemandFeatureView (
810802 name = name if name is not None else user_function .__name__ ,
811803 sources = sources ,
812804 schema = schema ,
813- feature_transformation = transformation ,
814805 mode = mode ,
815806 description = description ,
816807 tags = tags ,
817808 owner = owner ,
818809 write_to_online_store = write_to_online_store ,
819810 entities = entities ,
820811 singleton = singleton ,
812+ udf = user_function ,
813+ udf_string = udf_string ,
821814 )
822815 functools .update_wrapper (
823816 wrapper = on_demand_feature_view_obj , wrapped = user_function
0 commit comments