Skip to content

Commit 43bdcc1

Browse files
HaoXuAIjfw-ppi
authored andcommitted
feat: Kickoff Transformation implementationtransformation code base (feast-dev#5181)
* transformation code base Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * add back master change to resovle unit test error Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * add back master change to resovle unit test error Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * fix linthing Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * fix linthing Signed-off-by: HaoXuAI <sduxuhao@gmail.com> * add back master change to resovle unit test error Signed-off-by: HaoXuAI <sduxuhao@gmail.com> --------- Signed-off-by: HaoXuAI <sduxuhao@gmail.com> Signed-off-by: Jacob Weinhold <29459386+j-wine@users.noreply.github.com>
1 parent 43da0d4 commit 43bdcc1

18 files changed

Lines changed: 591 additions & 175 deletions

sdk/python/feast/batch_feature_view.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1+
import functools
12
import warnings
23
from datetime import datetime, timedelta
3-
from typing import Dict, List, Optional, Tuple
4+
from types import FunctionType
5+
from typing import Dict, List, Optional, Tuple, Union
6+
7+
import dill
48

59
from feast import flags_helper
610
from feast.data_source import DataSource
711
from feast.entity import Entity
812
from feast.feature_view import FeatureView
913
from feast.field import Field
1014
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
15+
from feast.transformation.base import Transformation
16+
from feast.transformation.mode import TransformationMode
1117

1218
warnings.simplefilter("once", RuntimeWarning)
1319

@@ -42,6 +48,7 @@ class BatchFeatureView(FeatureView):
4248
"""
4349

4450
name: str
51+
mode: Union[TransformationMode, str]
4552
entities: List[str]
4653
ttl: Optional[timedelta]
4754
source: DataSource
@@ -54,11 +61,15 @@ class BatchFeatureView(FeatureView):
5461
owner: str
5562
timestamp_field: str
5663
materialization_intervals: List[Tuple[datetime, datetime]]
64+
udf: Optional[FunctionType]
65+
udf_string: Optional[str]
66+
feature_transformation: Transformation
5767

5868
def __init__(
5969
self,
6070
*,
6171
name: str,
72+
mode: Union[TransformationMode, str] = TransformationMode.PYTHON,
6273
source: DataSource,
6374
entities: Optional[List[Entity]] = None,
6475
ttl: Optional[timedelta] = None,
@@ -67,6 +78,9 @@ def __init__(
6778
description: str = "",
6879
owner: str = "",
6980
schema: Optional[List[Field]] = None,
81+
udf: Optional[FunctionType] = None,
82+
udf_string: Optional[str] = "",
83+
feature_transformation: Optional[Transformation] = None,
7084
):
7185
if not flags_helper.is_test():
7286
warnings.warn(
@@ -84,6 +98,13 @@ def __init__(
8498
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
8599
)
86100

101+
self.mode = mode
102+
self.udf = udf
103+
self.udf_string = udf_string
104+
self.feature_transformation = (
105+
feature_transformation or self.get_feature_transformation()
106+
)
107+
87108
super().__init__(
88109
name=name,
89110
entities=entities,
@@ -95,3 +116,79 @@ def __init__(
95116
schema=schema,
96117
source=source,
97118
)
119+
120+
def get_feature_transformation(self) -> Transformation:
121+
if not self.udf:
122+
raise ValueError(
123+
"Either a UDF or a feature transformation must be provided for BatchFeatureView"
124+
)
125+
if self.mode in (
126+
TransformationMode.PANDAS,
127+
TransformationMode.PYTHON,
128+
TransformationMode.SQL,
129+
) or self.mode in ("pandas", "python", "sql"):
130+
return Transformation(
131+
mode=self.mode, udf=self.udf, udf_string=self.udf_string or ""
132+
)
133+
else:
134+
raise ValueError(
135+
f"Unsupported transformation mode: {self.mode} for StreamFeatureView"
136+
)
137+
138+
139+
def batch_feature_view(
140+
*,
141+
name: Optional[str] = None,
142+
mode: Union[TransformationMode, str] = TransformationMode.PYTHON,
143+
entities: Optional[List[str]] = None,
144+
ttl: Optional[timedelta] = None,
145+
source: Optional[DataSource] = None,
146+
tags: Optional[Dict[str, str]] = None,
147+
online: bool = True,
148+
description: str = "",
149+
owner: str = "",
150+
schema: Optional[List[Field]] = None,
151+
):
152+
"""
153+
Args:
154+
name:
155+
entities:
156+
ttl:
157+
source:
158+
tags:
159+
online:
160+
description:
161+
owner:
162+
schema:
163+
164+
Returns:
165+
166+
"""
167+
168+
def mainify(obj):
169+
# Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same
170+
# name as the original file defining the sfv.
171+
if obj.__module__ != "__main__":
172+
obj.__module__ = "__main__"
173+
174+
def decorator(user_function):
175+
udf_string = dill.source.getsource(user_function)
176+
mainify(user_function)
177+
batch_feature_view_obj = BatchFeatureView(
178+
name=name or user_function.__name__,
179+
mode=mode,
180+
entities=entities,
181+
ttl=ttl,
182+
source=source,
183+
tags=tags,
184+
online=online,
185+
description=description,
186+
owner=owner,
187+
schema=schema,
188+
udf=user_function,
189+
udf_string=udf_string,
190+
)
191+
functools.update_wrapper(wrapper=batch_feature_view_obj, wrapped=user_function)
192+
return batch_feature_view_obj
193+
194+
return decorator

sdk/python/feast/on_demand_feature_view.py

Lines changed: 50 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
import copy
22
import functools
3-
import inspect
43
import warnings
54
from types import FunctionType
6-
from typing import Any, List, Optional, Union, get_type_hints
5+
from typing import Any, List, Optional, Union, cast
76

87
import dill
9-
import pandas as pd
108
import pyarrow
119
from typeguard import typechecked
1210

@@ -31,6 +29,8 @@
3129
from feast.protos.feast.core.Transformation_pb2 import (
3230
UserDefinedFunctionV2 as UserDefinedFunctionProto,
3331
)
32+
from feast.transformation.base import Transformation
33+
from feast.transformation.mode import TransformationMode
3434
from feast.transformation.pandas_transformation import PandasTransformation
3535
from feast.transformation.python_transformation import PythonTransformation
3636
from feast.transformation.substrait_transformation import SubstraitTransformation
@@ -66,15 +66,15 @@ class OnDemandFeatureView(BaseFeatureView):
6666
features: List[Field]
6767
source_feature_view_projections: dict[str, FeatureViewProjection]
6868
source_request_sources: dict[str, RequestSource]
69-
feature_transformation: Union[
70-
PandasTransformation, PythonTransformation, SubstraitTransformation
71-
]
69+
feature_transformation: Transformation
7270
mode: str
7371
description: str
7472
tags: dict[str, str]
7573
owner: str
7674
write_to_online_store: bool
7775
singleton: bool
76+
udf: Optional[FunctionType]
77+
udf_string: Optional[str]
7878

7979
def __init__( # noqa: C901
8080
self,
@@ -90,10 +90,8 @@ def __init__( # noqa: C901
9090
]
9191
],
9292
udf: Optional[FunctionType] = None,
93-
udf_string: str = "",
94-
feature_transformation: Union[
95-
PandasTransformation, PythonTransformation, SubstraitTransformation
96-
],
93+
udf_string: Optional[str] = "",
94+
feature_transformation: Optional[Transformation] = None,
9795
mode: str = "pandas",
9896
description: str = "",
9997
tags: Optional[dict[str, str]] = None,
@@ -112,9 +110,9 @@ def __init__( # noqa: C901
112110
sources: A map from input source names to the actual input sources, which may be
113111
feature views, or request data sources. These sources serve as inputs to the udf,
114112
which will refer to them by name.
115-
udf (deprecated): The user defined transformation function, which must take pandas
113+
udf: The user defined transformation function, which must take pandas
116114
dataframes as inputs.
117-
udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI)
115+
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
118116
feature_transformation: The user defined transformation.
119117
mode: Mode of execution (e.g., Pandas or Python native)
120118
description (optional): A human-readable description.
@@ -136,29 +134,10 @@ def __init__( # noqa: C901
136134

137135
schema = schema or []
138136
self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME]
137+
self.sources = sources
139138
self.mode = mode.lower()
140-
141-
if self.mode not in {"python", "pandas", "substrait"}:
142-
raise ValueError(
143-
f"Unknown mode {self.mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait."
144-
)
145-
146-
if not feature_transformation:
147-
if udf:
148-
warnings.warn(
149-
"udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.",
150-
DeprecationWarning,
151-
)
152-
# Note inspecting the return signature won't work with isinstance so this is the best alternative
153-
if self.mode == "pandas":
154-
feature_transformation = PandasTransformation(udf, udf_string)
155-
elif self.mode == "python":
156-
feature_transformation = PythonTransformation(udf, udf_string)
157-
else:
158-
raise ValueError(
159-
"OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments"
160-
)
161-
139+
self.udf = udf
140+
self.udf_string = udf_string
162141
self.source_feature_view_projections: dict[str, FeatureViewProjection] = {}
163142
self.source_request_sources: dict[str, RequestSource] = {}
164143
for odfv_source in sources:
@@ -206,12 +185,33 @@ def __init__( # noqa: C901
206185
features.append(field)
207186

208187
self.features = features
209-
self.feature_transformation = feature_transformation
188+
self.feature_transformation = (
189+
feature_transformation or self.get_feature_transformation()
190+
)
210191
self.write_to_online_store = write_to_online_store
211192
self.singleton = singleton
212193
if self.singleton and self.mode != "python":
213194
raise ValueError("Singleton is only supported for Python mode.")
214195

196+
def get_feature_transformation(self) -> Transformation:
197+
if not self.udf:
198+
raise ValueError(
199+
"Either udf or feature_transformation must be provided to create an OnDemandFeatureView"
200+
)
201+
if self.mode in (
202+
TransformationMode.PANDAS,
203+
TransformationMode.PYTHON,
204+
) or self.mode in ("pandas", "python"):
205+
return Transformation(
206+
mode=self.mode, udf=self.udf, udf_string=self.udf_string or ""
207+
)
208+
elif self.mode == TransformationMode.SUBSTRAIT or self.mode == "substrait":
209+
return SubstraitTransformation.from_ibis(self.udf, self.sources)
210+
else:
211+
raise ValueError(
212+
f"Unsupported transformation mode: {self.mode} for OnDemandFeatureView"
213+
)
214+
215215
@property
216216
def proto_class(self) -> type[OnDemandFeatureViewProto]:
217217
return OnDemandFeatureViewProto
@@ -312,16 +312,25 @@ def to_proto(self) -> OnDemandFeatureViewProto:
312312
request_data_source=request_sources.to_proto()
313313
)
314314

315-
feature_transformation = FeatureTransformationProto(
316-
user_defined_function=self.feature_transformation.to_proto()
315+
user_defined_function_proto = cast(
316+
UserDefinedFunctionProto,
317+
self.feature_transformation.to_proto()
317318
if isinstance(
318319
self.feature_transformation,
319320
(PandasTransformation, PythonTransformation),
320321
)
321322
else None,
322-
substrait_transformation=self.feature_transformation.to_proto()
323+
)
324+
325+
substrait_transformation_proto = (
326+
self.feature_transformation.to_proto()
323327
if isinstance(self.feature_transformation, SubstraitTransformation)
324-
else None,
328+
else None
329+
)
330+
331+
feature_transformation = FeatureTransformationProto(
332+
user_defined_function=user_defined_function_proto,
333+
substrait_transformation=substrait_transformation_proto,
325334
)
326335
spec = OnDemandFeatureViewSpec(
327336
name=self.name,
@@ -786,38 +795,22 @@ def mainify(obj) -> None:
786795
obj.__module__ = "__main__"
787796

788797
def decorator(user_function):
789-
return_annotation = get_type_hints(user_function).get("return", inspect._empty)
790798
udf_string = dill.source.getsource(user_function)
791799
mainify(user_function)
792-
if mode == "pandas":
793-
if return_annotation not in (inspect._empty, pd.DataFrame):
794-
raise TypeError(
795-
f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame"
796-
)
797-
transformation = PandasTransformation(user_function, udf_string)
798-
elif mode == "python":
799-
transformation = PythonTransformation(user_function, udf_string)
800-
elif mode == "substrait":
801-
from ibis.expr.types.relations import Table
802-
803-
if return_annotation not in (inspect._empty, Table):
804-
raise TypeError(
805-
f"return signature for {user_function} is {return_annotation} but should be ibis.expr.types.relations.Table"
806-
)
807-
transformation = SubstraitTransformation.from_ibis(user_function, sources)
808800

809801
on_demand_feature_view_obj = OnDemandFeatureView(
810802
name=name if name is not None else user_function.__name__,
811803
sources=sources,
812804
schema=schema,
813-
feature_transformation=transformation,
814805
mode=mode,
815806
description=description,
816807
tags=tags,
817808
owner=owner,
818809
write_to_online_store=write_to_online_store,
819810
entities=entities,
820811
singleton=singleton,
812+
udf=user_function,
813+
udf_string=udf_string,
821814
)
822815
functools.update_wrapper(
823816
wrapper=on_demand_feature_view_obj, wrapped=user_function

0 commit comments

Comments
 (0)