Skip to content

Commit 3fdb716

Browse files
authored
feat: Make arrow primary interchange for online ODFV execution (#4143)
* rewrite online flow to use transform_arrow Signed-off-by: tokoko <togurg14@freeuni.edu.ge> * fix transformation server Signed-off-by: tokoko <togurg14@freeuni.edu.ge> --------- Signed-off-by: tokoko <togurg14@freeuni.edu.ge>
1 parent 6ef7852 commit 3fdb716

File tree

7 files changed

+29
-112
lines changed

7 files changed

+29
-112
lines changed

sdk/python/feast/feature_store.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2138,7 +2138,7 @@ def _augment_response_with_on_demand_transforms(
21382138
)
21392139

21402140
initial_response = OnlineResponse(online_features_response)
2141-
initial_response_df: Optional[pd.DataFrame] = None
2141+
initial_response_arrow: Optional[pa.Table] = None
21422142
initial_response_dict: Optional[Dict[str, List[Any]]] = None
21432143

21442144
# Apply on demand transformations and augment the result rows
@@ -2148,18 +2148,14 @@ def _augment_response_with_on_demand_transforms(
21482148
if odfv.mode == "python":
21492149
if initial_response_dict is None:
21502150
initial_response_dict = initial_response.to_dict()
2151-
transformed_features_dict: Dict[str, List[Any]] = (
2152-
odfv.get_transformed_features(
2153-
initial_response_dict,
2154-
full_feature_names,
2155-
)
2151+
transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict(
2152+
initial_response_dict
21562153
)
21572154
elif odfv.mode in {"pandas", "substrait"}:
2158-
if initial_response_df is None:
2159-
initial_response_df = initial_response.to_df()
2160-
transformed_features_df: pd.DataFrame = odfv.get_transformed_features(
2161-
initial_response_df,
2162-
full_feature_names,
2155+
if initial_response_arrow is None:
2156+
initial_response_arrow = initial_response.to_arrow()
2157+
transformed_features_arrow = odfv.transform_arrow(
2158+
initial_response_arrow, full_feature_names
21632159
)
21642160
else:
21652161
raise Exception(
@@ -2169,11 +2165,11 @@ def _augment_response_with_on_demand_transforms(
21692165
transformed_features = (
21702166
transformed_features_dict
21712167
if odfv.mode == "python"
2172-
else transformed_features_df
2168+
else transformed_features_arrow
21732169
)
21742170
transformed_columns = (
2175-
transformed_features.columns
2176-
if isinstance(transformed_features, pd.DataFrame)
2171+
transformed_features.column_names
2172+
if isinstance(transformed_features, pa.Table)
21772173
else transformed_features
21782174
)
21792175
selected_subset = [f for f in transformed_columns if f in _feature_refs]
@@ -2183,6 +2179,10 @@ def _augment_response_with_on_demand_transforms(
21832179
feature_vector = transformed_features[selected_feature]
21842180
proto_values.append(
21852181
python_values_to_proto_values(feature_vector, ValueType.UNKNOWN)
2182+
if odfv.mode == "python"
2183+
else python_values_to_proto_values(
2184+
feature_vector.to_numpy(), ValueType.UNKNOWN
2185+
)
21862186
)
21872187

21882188
odfv_result_names |= set(selected_subset)

sdk/python/feast/on_demand_feature_view.py

Lines changed: 1 addition & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from typeguard import typechecked
1313

1414
from feast.base_feature_view import BaseFeatureView
15-
from feast.batch_feature_view import BatchFeatureView
1615
from feast.data_source import RequestSource
1716
from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError
1817
from feast.feature_view import FeatureView
@@ -493,53 +492,7 @@ def transform_arrow(
493492
]
494493
)
495494

496-
def get_transformed_features_df(
497-
self,
498-
df_with_features: pd.DataFrame,
499-
full_feature_names: bool = False,
500-
) -> pd.DataFrame:
501-
# Apply on demand transformations
502-
if not isinstance(df_with_features, pd.DataFrame):
503-
raise TypeError("get_transformed_features_df only accepts pd.DataFrame")
504-
columns_to_cleanup = []
505-
for source_fv_projection in self.source_feature_view_projections.values():
506-
for feature in source_fv_projection.features:
507-
full_feature_ref = f"{source_fv_projection.name}__{feature.name}"
508-
if full_feature_ref in df_with_features.keys():
509-
# Make sure the partial feature name is always present
510-
df_with_features[feature.name] = df_with_features[full_feature_ref]
511-
columns_to_cleanup.append(feature.name)
512-
elif feature.name in df_with_features.keys():
513-
# Make sure the full feature name is always present
514-
df_with_features[full_feature_ref] = df_with_features[feature.name]
515-
columns_to_cleanup.append(full_feature_ref)
516-
517-
# Compute transformed values and apply to each result row
518-
df_with_transformed_features: pd.DataFrame = (
519-
self.feature_transformation.transform(df_with_features)
520-
)
521-
522-
# Work out whether the correct columns names are used.
523-
rename_columns: Dict[str, str] = {}
524-
for feature in self.features:
525-
short_name = feature.name
526-
long_name = self._get_projected_feature_name(feature.name)
527-
if (
528-
short_name in df_with_transformed_features.columns
529-
and full_feature_names
530-
):
531-
rename_columns[short_name] = long_name
532-
elif not full_feature_names:
533-
# Long name must be in dataframe.
534-
rename_columns[long_name] = short_name
535-
536-
# Cleanup extra columns used for transformation
537-
df_with_transformed_features = df_with_transformed_features[
538-
[f.name for f in self.features]
539-
]
540-
return df_with_transformed_features.rename(columns=rename_columns)
541-
542-
def get_transformed_features_dict(
495+
def transform_dict(
543496
self,
544497
feature_dict: Dict[str, Any], # type: ignore
545498
) -> Dict[str, Any]:
@@ -566,29 +519,6 @@ def get_transformed_features_dict(
566519
del output_dict[feature_name]
567520
return output_dict
568521

569-
def get_transformed_features(
570-
self,
571-
features: Union[Dict[str, Any], pd.DataFrame],
572-
full_feature_names: bool = False,
573-
) -> Union[Dict[str, Any], pd.DataFrame]:
574-
# TODO: classic inheritance pattern....maybe fix this
575-
if self.mode == "python" and isinstance(features, Dict):
576-
# note full_feature_names is not needed for the dictionary
577-
return self.get_transformed_features_dict(
578-
feature_dict=features,
579-
)
580-
elif self.mode in {"pandas", "substrait"} and isinstance(
581-
features, pd.DataFrame
582-
):
583-
return self.get_transformed_features_df(
584-
df_with_features=features,
585-
full_feature_names=full_feature_names,
586-
)
587-
else:
588-
raise Exception(
589-
f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".'
590-
)
591-
592522
def infer_features(self) -> None:
593523
inferred_features = self.feature_transformation.infer_features(
594524
self._construct_random_input()
@@ -745,23 +675,6 @@ def decorator(user_function):
745675
return decorator
746676

747677

748-
def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView:
749-
bfv = BatchFeatureView(
750-
name=fv.name,
751-
entities=fv.entities,
752-
ttl=fv.ttl,
753-
tags=fv.tags,
754-
online=fv.online,
755-
owner=fv.owner,
756-
schema=fv.schema,
757-
source=fv.batch_source,
758-
)
759-
760-
bfv.features = copy.copy(fv.features)
761-
bfv.entities = copy.copy(fv.entities)
762-
return bfv
763-
764-
765678
def _empty_odfv_udf_fn(x: Any) -> Any:
766679
# just an identity mapping, otherwise we risk tripping some downstream tests
767680
return x

sdk/python/feast/online_response.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from typing import Any, Dict, List
1616

1717
import pandas as pd
18+
import pyarrow as pa
1819

1920
from feast.feature_view import DUMMY_ENTITY_ID
2021
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse
@@ -77,3 +78,13 @@ def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame:
7778
"""
7879

7980
return pd.DataFrame(self.to_dict(include_event_timestamps))
81+
82+
def to_arrow(self, include_event_timestamps: bool = False) -> pa.Table:
83+
"""
84+
Converts GetOnlineFeaturesResponse features into pyarrow Table.
85+
86+
Args:
87+
is_with_event_timestamps: bool Optionally include feature timestamps in the table
88+
"""
89+
90+
return pa.Table.from_pydict(self.to_dict(include_event_timestamps))

sdk/python/feast/transformation/python_transformation.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@ def __eq__(self, other):
6464
"Comparisons should only involve PythonTransformation class objects."
6565
)
6666

67-
if not super().__eq__(other):
68-
return False
69-
7067
if (
7168
self.udf_string != other.udf_string
7269
or self.udf.__code__.co_code != other.udf.__code__.co_code

sdk/python/feast/transformation/substrait_transformation.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,6 @@ def __eq__(self, other):
7777
"Comparisons should only involve SubstraitTransformation class objects."
7878
)
7979

80-
if not super().__eq__(other):
81-
return False
82-
8380
return (
8481
self.substrait_plan == other.substrait_plan
8582
and self.ibis_function.__code__.co_code

sdk/python/feast/transformation_server.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,14 @@ def TransformFeatures(self, request, context):
4545
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
4646
raise
4747

48-
df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas()
48+
df = pa.ipc.open_file(request.transformation_input.arrow_value).read_all()
4949

5050
if odfv.mode != "pandas":
5151
raise Exception(
5252
f'OnDemandFeatureView mode "{odfv.mode}" not supported by TransformationServer.'
5353
)
5454

55-
result_df = odfv.get_transformed_features_df(df, True)
56-
result_arrow = pa.Table.from_pandas(result_df)
55+
result_arrow = odfv.transform_arrow(df, True)
5756
sink = pa.BufferOutputStream()
5857
writer = pa.ipc.new_file(sink, result_arrow.schema)
5958
writer.write_table(result_arrow)

sdk/python/tests/unit/test_on_demand_feature_view.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ def test_python_native_transformation_mode():
204204
}
205205
)
206206

207-
assert on_demand_feature_view_python_native.get_transformed_features(
207+
assert on_demand_feature_view_python_native.transform_dict(
208208
{
209209
"feature1": 0,
210210
"feature2": 1,

0 commit comments

Comments
 (0)