Skip to content

Commit d91d7e0

Browse files
authored
fix: Improve the code related to on-demand-featureview. (#4203)
* fix: Improve the code related to on-demand-featureview. Signed-off-by: Shuchu Han <shuchu.han@gmail.com> * fix: add the pyarrow.substrait import. Signed-off-by: Shuchu Han <shuchu.han@gmail.com> --------- Signed-off-by: Shuchu Han <shuchu.han@gmail.com>
1 parent 0e42150 commit d91d7e0

File tree

8 files changed

+73
-109
lines changed

8 files changed

+73
-109
lines changed

sdk/python/feast/on_demand_feature_view.py

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import warnings
55
from datetime import datetime
66
from types import FunctionType
7-
from typing import Any, Dict, List, Optional, Type, Union
7+
from typing import Any, Optional, Union
88

99
import dill
1010
import pandas as pd
@@ -62,24 +62,24 @@ class OnDemandFeatureView(BaseFeatureView):
6262
"""
6363

6464
name: str
65-
features: List[Field]
66-
source_feature_view_projections: Dict[str, FeatureViewProjection]
67-
source_request_sources: Dict[str, RequestSource]
65+
features: list[Field]
66+
source_feature_view_projections: dict[str, FeatureViewProjection]
67+
source_request_sources: dict[str, RequestSource]
6868
feature_transformation: Union[
6969
PandasTransformation, PythonTransformation, SubstraitTransformation
7070
]
7171
mode: str
7272
description: str
73-
tags: Dict[str, str]
73+
tags: dict[str, str]
7474
owner: str
7575

7676
@log_exceptions # noqa: C901
7777
def __init__( # noqa: C901
7878
self,
7979
*,
8080
name: str,
81-
schema: List[Field],
82-
sources: List[
81+
schema: list[Field],
82+
sources: list[
8383
Union[
8484
FeatureView,
8585
RequestSource,
@@ -93,7 +93,7 @@ def __init__( # noqa: C901
9393
],
9494
mode: str = "pandas",
9595
description: str = "",
96-
tags: Optional[Dict[str, str]] = None,
96+
tags: Optional[dict[str, str]] = None,
9797
owner: str = "",
9898
):
9999
"""
@@ -124,32 +124,31 @@ def __init__( # noqa: C901
124124
owner=owner,
125125
)
126126

127-
if mode not in {"python", "pandas", "substrait"}:
128-
raise Exception(
129-
f"Unknown mode {mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait."
127+
self.mode = mode.lower()
128+
129+
if self.mode not in {"python", "pandas", "substrait"}:
130+
raise ValueError(
131+
f"Unknown mode {self.mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait."
130132
)
131-
else:
132-
self.mode = mode
133+
133134
if not feature_transformation:
134135
if udf:
135136
warnings.warn(
136137
"udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.",
137138
DeprecationWarning,
138139
)
139140
# Note inspecting the return signature won't work with isinstance so this is the best alternative
140-
if mode == "pandas":
141+
if self.mode == "pandas":
141142
feature_transformation = PandasTransformation(udf, udf_string)
142-
elif mode == "python":
143+
elif self.mode == "python":
143144
feature_transformation = PythonTransformation(udf, udf_string)
144-
else:
145-
pass
146145
else:
147-
raise Exception(
146+
raise ValueError(
148147
"OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments"
149148
)
150149

151-
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
152-
self.source_request_sources: Dict[str, RequestSource] = {}
150+
self.source_feature_view_projections: dict[str, FeatureViewProjection] = {}
151+
self.source_request_sources: dict[str, RequestSource] = {}
153152
for odfv_source in sources:
154153
if isinstance(odfv_source, RequestSource):
155154
self.source_request_sources[odfv_source.name] = odfv_source
@@ -163,7 +162,7 @@ def __init__( # noqa: C901
163162
self.feature_transformation = feature_transformation
164163

165164
@property
166-
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
165+
def proto_class(self) -> type[OnDemandFeatureViewProto]:
167166
return OnDemandFeatureViewProto
168167

169168
def __copy__(self):
@@ -336,7 +335,7 @@ def from_proto(
336335
user_defined_function_proto=backwards_compatible_udf,
337336
)
338337
else:
339-
raise Exception("At least one transformation type needs to be provided")
338+
raise ValueError("At least one transformation type needs to be provided")
340339

341340
on_demand_feature_view_obj = cls(
342341
name=on_demand_feature_view_proto.spec.name,
@@ -372,18 +371,18 @@ def from_proto(
372371

373372
return on_demand_feature_view_obj
374373

375-
def get_request_data_schema(self) -> Dict[str, ValueType]:
376-
schema: Dict[str, ValueType] = {}
374+
def get_request_data_schema(self) -> dict[str, ValueType]:
375+
schema: dict[str, ValueType] = {}
377376
for request_source in self.source_request_sources.values():
378-
if isinstance(request_source.schema, List):
377+
if isinstance(request_source.schema, list):
379378
new_schema = {}
380379
for field in request_source.schema:
381380
new_schema[field.name] = field.dtype.to_value_type()
382381
schema.update(new_schema)
383-
elif isinstance(request_source.schema, Dict):
382+
elif isinstance(request_source.schema, dict):
384383
schema.update(request_source.schema)
385384
else:
386-
raise Exception(
385+
raise TypeError(
387386
f"Request source schema is not correct type: ${str(type(request_source.schema))}"
388387
)
389388
return schema
@@ -401,7 +400,10 @@ def transform_ibis(
401400
if not isinstance(ibis_table, Table):
402401
raise TypeError("transform_ibis only accepts ibis.expr.types.Table")
403402

404-
assert type(self.feature_transformation) == SubstraitTransformation
403+
if not isinstance(self.feature_transformation, SubstraitTransformation):
404+
raise TypeError(
405+
"The feature_transformation is not SubstraitTransformation type while calling transform_ibis()."
406+
)
405407

406408
columns_to_cleanup = []
407409
for source_fv_projection in self.source_feature_view_projections.values():
@@ -423,7 +425,7 @@ def transform_ibis(
423425

424426
transformed_table = transformed_table.drop(*columns_to_cleanup)
425427

426-
rename_columns: Dict[str, str] = {}
428+
rename_columns: dict[str, str] = {}
427429
for feature in self.features:
428430
short_name = feature.name
429431
long_name = self._get_projected_feature_name(feature.name)
@@ -454,11 +456,9 @@ def transform_arrow(
454456
pa_table = pa_table.append_column(
455457
feature.name, pa_table[full_feature_ref]
456458
)
457-
# pa_table[feature.name] = pa_table[full_feature_ref]
458459
columns_to_cleanup.append(feature.name)
459460
elif feature.name in pa_table.column_names:
460461
# Make sure the full feature name is always present
461-
# pa_table[full_feature_ref] = pa_table[feature.name]
462462
pa_table = pa_table.append_column(
463463
full_feature_ref, pa_table[feature.name]
464464
)
@@ -469,7 +469,7 @@ def transform_arrow(
469469
)
470470

471471
# Work out whether the correct columns names are used.
472-
rename_columns: Dict[str, str] = {}
472+
rename_columns: dict[str, str] = {}
473473
for feature in self.features:
474474
short_name = feature.name
475475
long_name = self._get_projected_feature_name(feature.name)
@@ -494,12 +494,12 @@ def transform_arrow(
494494

495495
def transform_dict(
496496
self,
497-
feature_dict: Dict[str, Any], # type: ignore
498-
) -> Dict[str, Any]:
497+
feature_dict: dict[str, Any], # type: ignore
498+
) -> dict[str, Any]:
499499
# we need a mapping from full feature name to short and back to do a renaming
500500
# The simplest thing to do is to make the full reference, copy the columns with the short reference
501501
# and rerun
502-
columns_to_cleanup: List[str] = []
502+
columns_to_cleanup: list[str] = []
503503
for source_fv_projection in self.source_feature_view_projections.values():
504504
for feature in source_fv_projection.features:
505505
full_feature_ref = f"{source_fv_projection.name}__{feature.name}"
@@ -512,7 +512,7 @@ def transform_dict(
512512
feature_dict[full_feature_ref] = feature_dict[feature.name]
513513
columns_to_cleanup.append(str(full_feature_ref))
514514

515-
output_dict: Dict[str, Any] = self.feature_transformation.transform(
515+
output_dict: dict[str, Any] = self.feature_transformation.transform(
516516
feature_dict
517517
)
518518
for feature_name in columns_to_cleanup:
@@ -542,8 +542,8 @@ def infer_features(self) -> None:
542542
f"Could not infer Features for the feature view '{self.name}'.",
543543
)
544544

545-
def _construct_random_input(self) -> Dict[str, List[Any]]:
546-
rand_dict_value: Dict[ValueType, List[Any]] = {
545+
def _construct_random_input(self) -> dict[str, list[Any]]:
546+
rand_dict_value: dict[ValueType, list[Any]] = {
547547
ValueType.BYTES: [str.encode("hello world")],
548548
ValueType.STRING: ["hello world"],
549549
ValueType.INT32: [1],
@@ -582,11 +582,11 @@ def _construct_random_input(self) -> Dict[str, List[Any]]:
582582
@staticmethod
583583
def get_requested_odfvs(
584584
feature_refs, project, registry
585-
) -> List["OnDemandFeatureView"]:
585+
) -> list["OnDemandFeatureView"]:
586586
all_on_demand_feature_views = registry.list_on_demand_feature_views(
587587
project, allow_cache=True
588588
)
589-
requested_on_demand_feature_views: List[OnDemandFeatureView] = []
589+
requested_on_demand_feature_views: list[OnDemandFeatureView] = []
590590
for odfv in all_on_demand_feature_views:
591591
for feature in odfv.features:
592592
if f"{odfv.name}:{feature.name}" in feature_refs:
@@ -597,8 +597,8 @@ def get_requested_odfvs(
597597

598598
def on_demand_feature_view(
599599
*,
600-
schema: List[Field],
601-
sources: List[
600+
schema: list[Field],
601+
sources: list[
602602
Union[
603603
FeatureView,
604604
RequestSource,
@@ -607,7 +607,7 @@ def on_demand_feature_view(
607607
],
608608
mode: str = "pandas",
609609
description: str = "",
610-
tags: Optional[Dict[str, str]] = None,
610+
tags: Optional[dict[str, str]] = None,
611611
owner: str = "",
612612
):
613613
"""
@@ -643,9 +643,9 @@ def decorator(user_function):
643643
)
644644
transformation = PandasTransformation(user_function, udf_string)
645645
elif mode == "python":
646-
if return_annotation not in (inspect._empty, Dict[str, Any]):
646+
if return_annotation not in (inspect._empty, dict[str, Any]):
647647
raise TypeError(
648-
f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]"
648+
f"return signature for {user_function} is {return_annotation} but should be dict[str, Any]"
649649
)
650650
transformation = PythonTransformation(user_function, udf_string)
651651
elif mode == "substrait":

sdk/python/feast/transformation/pandas_transformation.py

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from types import FunctionType
2-
from typing import Any, Dict, List
2+
from typing import Any
33

44
import dill
55
import pandas as pd
@@ -28,35 +28,16 @@ def __init__(self, udf: FunctionType, udf_string: str = ""):
2828
self.udf_string = udf_string
2929

3030
def transform_arrow(
31-
self, pa_table: pyarrow.Table, features: List[Field]
31+
self, pa_table: pyarrow.Table, features: list[Field]
3232
) -> pyarrow.Table:
33-
if not isinstance(pa_table, pyarrow.Table):
34-
raise TypeError(
35-
f"pa_table should be type pyarrow.Table but got {type(pa_table).__name__}"
36-
)
37-
output_df = self.udf.__call__(pa_table.to_pandas())
38-
output_df = pyarrow.Table.from_pandas(output_df)
39-
if not isinstance(output_df, pyarrow.Table):
40-
raise TypeError(
41-
f"output_df should be type pyarrow.Table but got {type(output_df).__name__}"
42-
)
43-
return output_df
33+
output_df_pandas = self.udf.__call__(pa_table.to_pandas())
34+
return pyarrow.Table.from_pandas(output_df_pandas)
4435

4536
def transform(self, input_df: pd.DataFrame) -> pd.DataFrame:
46-
if not isinstance(input_df, pd.DataFrame):
47-
raise TypeError(
48-
f"input_df should be type pd.DataFrame but got {type(input_df).__name__}"
49-
)
50-
output_df = self.udf.__call__(input_df)
51-
if not isinstance(output_df, pd.DataFrame):
52-
raise TypeError(
53-
f"output_df should be type pd.DataFrame but got {type(output_df).__name__}"
54-
)
55-
return output_df
37+
return self.udf.__call__(input_df)
5638

57-
def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
39+
def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
5840
df = pd.DataFrame.from_dict(random_input)
59-
6041
output_df: pd.DataFrame = self.transform(df)
6142

6243
return [

sdk/python/feast/transformation/python_transformation.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from types import FunctionType
2-
from typing import Any, Dict, List
2+
from typing import Any
33

44
import dill
55
import pyarrow
@@ -26,27 +26,19 @@ def __init__(self, udf: FunctionType, udf_string: str = ""):
2626
self.udf_string = udf_string
2727

2828
def transform_arrow(
29-
self, pa_table: pyarrow.Table, features: List[Field]
29+
self, pa_table: pyarrow.Table, features: list[Field]
3030
) -> pyarrow.Table:
3131
raise Exception(
32-
'OnDemandFeatureView mode "python" not supported for offline processing.'
32+
'OnDemandFeatureView with mode "python" does not support offline processing.'
3333
)
3434

35-
def transform(self, input_dict: Dict) -> Dict:
36-
if not isinstance(input_dict, Dict):
37-
raise TypeError(
38-
f"input_dict should be type Dict[str, Any] but got {type(input_dict).__name__}"
39-
)
35+
def transform(self, input_dict: dict) -> dict:
4036
# Ensuring that the inputs are included as well
4137
output_dict = self.udf.__call__(input_dict)
42-
if not isinstance(output_dict, Dict):
43-
raise TypeError(
44-
f"output_dict should be type Dict[str, Any] but got {type(output_dict).__name__}"
45-
)
4638
return {**input_dict, **output_dict}
4739

48-
def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
49-
output_dict: Dict[str, List[Any]] = self.transform(random_input)
40+
def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
41+
output_dict: dict[str, list[Any]] = self.transform(random_input)
5042

5143
return [
5244
Field(

sdk/python/feast/transformation/substrait_transformation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from types import FunctionType
2-
from typing import Any, Dict, List
2+
from typing import Any
33

44
import dill
55
import pandas as pd
@@ -42,7 +42,7 @@ def transform_ibis(self, table):
4242
return self.ibis_function(table)
4343

4444
def transform_arrow(
45-
self, pa_table: pyarrow.Table, features: List[Field] = []
45+
self, pa_table: pyarrow.Table, features: list[Field] = []
4646
) -> pyarrow.Table:
4747
def table_provider(names, schema: pyarrow.Schema):
4848
return pa_table.select(schema.names)
@@ -56,7 +56,7 @@ def table_provider(names, schema: pyarrow.Schema):
5656

5757
return table
5858

59-
def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
59+
def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
6060
df = pd.DataFrame.from_dict(random_input)
6161
output_df: pd.DataFrame = self.transform(df)
6262

sdk/python/tests/unit/infra/test_inference_unit_tests.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame:
8383
],
8484
mode="python",
8585
)
86-
def python_native_test_view(input_dict: Dict[str, Any]) -> Dict[str, Any]:
87-
output_dict: Dict[str, Any] = {
86+
def python_native_test_view(input_dict: dict[str, Any]) -> dict[str, Any]:
87+
output_dict: dict[str, Any] = {
8888
"output": input_dict["some_date"],
8989
"object_output": str(input_dict["some_date"]),
9090
}

0 commit comments

Comments
 (0)