From a101f0fab950c658b556dcba9191dd56eb2df131 Mon Sep 17 00:00:00 2001 From: hao-xu5 Date: Tue, 14 Oct 2025 15:28:05 -0700 Subject: [PATCH 1/9] add aggregation to odfv Signed-off-by: hao-xu5 --- .../compute_engines/backends/__init__.py | 0 .../infra/compute_engines/backends/base.py | 79 ++++++++++++++++ .../infra/compute_engines/backends/factory.py | 53 +++++++++++ .../backends/pandas_backend.py | 46 ++++++++++ .../backends/polars_backend.py | 47 ++++++++++ .../infra/compute_engines/local/compute.py | 4 +- .../compute_engines/local/feature_builder.py | 2 +- .../infra/compute_engines/local/nodes.py | 2 +- sdk/python/feast/utils.py | 89 +++++++++++++++++++ 9 files changed, 318 insertions(+), 4 deletions(-) create mode 100644 sdk/python/feast/infra/compute_engines/backends/__init__.py create mode 100644 sdk/python/feast/infra/compute_engines/backends/base.py create mode 100644 sdk/python/feast/infra/compute_engines/backends/factory.py create mode 100644 sdk/python/feast/infra/compute_engines/backends/pandas_backend.py create mode 100644 sdk/python/feast/infra/compute_engines/backends/polars_backend.py diff --git a/sdk/python/feast/infra/compute_engines/backends/__init__.py b/sdk/python/feast/infra/compute_engines/backends/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/infra/compute_engines/backends/base.py b/sdk/python/feast/infra/compute_engines/backends/base.py new file mode 100644 index 00000000000..3c8d25abe00 --- /dev/null +++ b/sdk/python/feast/infra/compute_engines/backends/base.py @@ -0,0 +1,79 @@ +from abc import ABC, abstractmethod +from datetime import timedelta + + +class DataFrameBackend(ABC): + """ + Abstract interface for DataFrame operations used by the LocalComputeEngine. + + This interface defines the contract for implementing pluggable DataFrame backends + such as Pandas, Polars, or DuckDB. Each backend must support core table operations + such as joins, filtering, aggregation, conversion to/from Arrow, and deduplication. + + The purpose of this abstraction is to allow seamless swapping of execution backends + without changing DAGNode or ComputeEngine logic. All nodes operate on pyarrow.Table + as the standard input/output format, while the backend defines how the computation + is actually performed. + + Expected implementations include: + - PandasBackend + - PolarsBackend + - DuckDBBackend (future) + + Methods + ------- + from_arrow(table: pa.Table) -> Any + Convert a pyarrow.Table to the backend-native DataFrame format. + + to_arrow(df: Any) -> pa.Table + Convert a backend-native DataFrame to pyarrow.Table. + + join(left: Any, right: Any, on: List[str], how: str) -> Any + Join two dataframes on specified keys with given join type. + + groupby_agg(df: Any, group_keys: List[str], agg_ops: Dict[str, Tuple[str, str]]) -> Any + Group and aggregate the dataframe. `agg_ops` maps output column names + to (aggregation function, source column name) pairs. + + filter(df: Any, expr: str) -> Any + Apply a filter expression (string-based) to the DataFrame. + + to_timedelta_value(delta: timedelta) -> Any + Convert a Python timedelta object to a backend-compatible value + that can be subtracted from a timestamp column. + + drop_duplicates(df: Any, keys: List[str], sort_by: List[str], ascending: bool = False) -> Any + Deduplicate the DataFrame by key columns, keeping the first row + by descending or ascending sort order. + + rename_columns(df: Any, columns: Dict[str, str]) -> Any + Rename columns in the DataFrame according to the provided mapping. + """ + + @abstractmethod + def columns(self, df): ... + + @abstractmethod + def from_arrow(self, table): ... + + @abstractmethod + def join(self, left, right, on, how): ... + + @abstractmethod + def groupby_agg(self, df, group_keys, agg_ops): ... + + @abstractmethod + def filter(self, df, expr): ... + + @abstractmethod + def to_arrow(self, df): ... + + @abstractmethod + def to_timedelta_value(self, delta: timedelta): ... + + @abstractmethod + def drop_duplicates(self, df, keys, sort_by, ascending: bool = False): + pass + + @abstractmethod + def rename_columns(self, df, columns: dict[str, str]): ... diff --git a/sdk/python/feast/infra/compute_engines/backends/factory.py b/sdk/python/feast/infra/compute_engines/backends/factory.py new file mode 100644 index 00000000000..ffe969f3003 --- /dev/null +++ b/sdk/python/feast/infra/compute_engines/backends/factory.py @@ -0,0 +1,53 @@ +from typing import Optional + +import pandas as pd +import pyarrow + +from feast.infra.compute_engines.backends.base import DataFrameBackend +from feast.infra.compute_engines.backends.pandas_backend import PandasBackend + + +class BackendFactory: + """ + Factory class for constructing DataFrameBackend implementations based on backend name + or runtime entity_df type. + """ + + @staticmethod + def from_name(name: str) -> DataFrameBackend: + if name == "pandas": + return PandasBackend() + if name == "polars": + return BackendFactory._get_polars_backend() + raise ValueError(f"Unsupported backend name: {name}") + + @staticmethod + def infer_from_entity_df(entity_df) -> Optional[DataFrameBackend]: + if ( + not entity_df + or isinstance(entity_df, pyarrow.Table) + or isinstance(entity_df, pd.DataFrame) + ): + return PandasBackend() + + if BackendFactory._is_polars(entity_df): + return BackendFactory._get_polars_backend() + return None + + @staticmethod + def _is_polars(entity_df) -> bool: + try: + import polars as pl + except ImportError: + raise ImportError( + "Polars is not installed. Please install it to use Polars backend." + ) + return isinstance(entity_df, pl.DataFrame) + + @staticmethod + def _get_polars_backend(): + from feast.infra.compute_engines.backends.polars_backend import ( + PolarsBackend, + ) + + return PolarsBackend() diff --git a/sdk/python/feast/infra/compute_engines/backends/pandas_backend.py b/sdk/python/feast/infra/compute_engines/backends/pandas_backend.py new file mode 100644 index 00000000000..8ea5a4a9213 --- /dev/null +++ b/sdk/python/feast/infra/compute_engines/backends/pandas_backend.py @@ -0,0 +1,46 @@ +from datetime import timedelta + +import pandas as pd +import pyarrow as pa + +from feast.infra.compute_engines.backends.base import DataFrameBackend + + +class PandasBackend(DataFrameBackend): + def columns(self, df): + return df.columns.tolist() + + def from_arrow(self, table): + return table.to_pandas() + + def join(self, left, right, on, how): + return left.merge(right, on=on, how=how) + + def groupby_agg(self, df, group_keys, agg_ops): + return ( + df.groupby(group_keys) + .agg( + **{ + alias: pd.NamedAgg(column=col, aggfunc=func) + for alias, (func, col) in agg_ops.items() + } + ) + .reset_index() + ) + + def filter(self, df, expr): + return df.query(expr) + + def to_arrow(self, df): + return pa.Table.from_pandas(df) + + def to_timedelta_value(self, delta: timedelta): + return pd.to_timedelta(delta) + + def drop_duplicates(self, df, keys, sort_by, ascending: bool = False): + return df.sort_values(by=sort_by, ascending=ascending).drop_duplicates( + subset=keys + ) + + def rename_columns(self, df, columns: dict[str, str]): + return df.rename(columns=columns) diff --git a/sdk/python/feast/infra/compute_engines/backends/polars_backend.py b/sdk/python/feast/infra/compute_engines/backends/polars_backend.py new file mode 100644 index 00000000000..92e348c3652 --- /dev/null +++ b/sdk/python/feast/infra/compute_engines/backends/polars_backend.py @@ -0,0 +1,47 @@ +from datetime import timedelta + +import polars as pl +import pyarrow as pa + +from feast.infra.compute_engines.backends.base import DataFrameBackend + + +class PolarsBackend(DataFrameBackend): + def columns(self, df: pl.DataFrame) -> list[str]: + return df.columns + + def from_arrow(self, table: pa.Table) -> pl.DataFrame: + return pl.from_arrow(table) + + def to_arrow(self, df: pl.DataFrame) -> pa.Table: + return df.to_arrow() + + def join(self, left: pl.DataFrame, right: pl.DataFrame, on, how) -> pl.DataFrame: + return left.join(right, on=on, how=how) + + def groupby_agg(self, df: pl.DataFrame, group_keys, agg_ops) -> pl.DataFrame: + agg_exprs = [ + getattr(pl.col(col), func)().alias(alias) + for alias, (func, col) in agg_ops.items() + ] + return df.groupby(group_keys).agg(agg_exprs) + + def filter(self, df: pl.DataFrame, expr: str) -> pl.DataFrame: + return df.filter(pl.sql_expr(expr)) + + def to_timedelta_value(self, delta: timedelta): + return pl.duration(milliseconds=delta.total_seconds() * 1000) + + def drop_duplicates( + self, + df: pl.DataFrame, + keys: list[str], + sort_by: list[str], + ascending: bool = False, + ) -> pl.DataFrame: + return df.sort(by=sort_by, descending=not ascending).unique( + subset=keys, keep="first" + ) + + def rename_columns(self, df: pl.DataFrame, columns: dict[str, str]) -> pl.DataFrame: + return df.rename(columns) diff --git a/sdk/python/feast/infra/compute_engines/local/compute.py b/sdk/python/feast/infra/compute_engines/local/compute.py index 556468f5e1d..cea7ba99547 100644 --- a/sdk/python/feast/infra/compute_engines/local/compute.py +++ b/sdk/python/feast/infra/compute_engines/local/compute.py @@ -14,8 +14,8 @@ from feast.infra.common.retrieval_task import HistoricalRetrievalTask from feast.infra.compute_engines.base import ComputeEngine from feast.infra.compute_engines.dag.context import ExecutionContext -from feast.infra.compute_engines.local.backends.base import DataFrameBackend -from feast.infra.compute_engines.local.backends.factory import BackendFactory +from feast.infra.compute_engines.backends.base import DataFrameBackend +from feast.infra.compute_engines.backends.factory import BackendFactory from feast.infra.compute_engines.local.feature_builder import LocalFeatureBuilder from feast.infra.compute_engines.local.job import ( LocalMaterializationJob, diff --git a/sdk/python/feast/infra/compute_engines/local/feature_builder.py b/sdk/python/feast/infra/compute_engines/local/feature_builder.py index 9b2306c0f01..4eed3e577c4 100644 --- a/sdk/python/feast/infra/compute_engines/local/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/local/feature_builder.py @@ -3,7 +3,7 @@ from feast.infra.common.materialization_job import MaterializationTask from feast.infra.common.retrieval_task import HistoricalRetrievalTask from feast.infra.compute_engines.feature_builder import FeatureBuilder -from feast.infra.compute_engines.local.backends.base import DataFrameBackend +from feast.infra.compute_engines.backends.base import DataFrameBackend from feast.infra.compute_engines.local.nodes import ( LocalAggregationNode, LocalDedupNode, diff --git a/sdk/python/feast/infra/compute_engines/local/nodes.py b/sdk/python/feast/infra/compute_engines/local/nodes.py index 870a098261d..d070e50885c 100644 --- a/sdk/python/feast/infra/compute_engines/local/nodes.py +++ b/sdk/python/feast/infra/compute_engines/local/nodes.py @@ -9,7 +9,7 @@ from feast.infra.compute_engines.dag.model import DAGFormat from feast.infra.compute_engines.dag.node import DAGNode from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue -from feast.infra.compute_engines.local.backends.base import DataFrameBackend +from feast.infra.compute_engines.backends.base import DataFrameBackend from feast.infra.compute_engines.local.local_node import LocalNode from feast.infra.compute_engines.utils import ( create_offline_store_retrieval_job, diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 939adbe933f..05c73914bf8 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -33,6 +33,7 @@ RequestDataNotFoundInEntityRowsException, ) from feast.field import Field +from feast.infra.compute_engines.backends.pandas_backend import PandasBackend from feast.infra.key_encoding_utils import deserialize_entity_key from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, @@ -561,6 +562,72 @@ def construct_response_feature_vector( ) +def _get_aggregate_operations(agg_specs) -> dict: + """ + Convert Aggregation specs to agg_ops format for PandasBackend. + + Reused from LocalFeatureBuilder logic. + TODO: This logic is duplicated from LocalFeatureBuilder._get_aggregate_operations(). + Consider refactoring to a shared utility module in the future. + """ + agg_ops = {} + for agg in agg_specs: + if agg.time_window is not None: + raise ValueError( + "Time window aggregation is not supported in online serving." + ) + alias = f"{agg.function}_{agg.column}" + agg_ops[alias] = (agg.function, agg.column) + return agg_ops + + +def _apply_aggregations_to_response( + response_data: Union[pyarrow.Table, Dict[str, List[Any]]], + aggregations, + group_keys: List[str], + mode: str, +) -> Union[pyarrow.Table, Dict[str, List[Any]]]: + """ + Apply aggregations using PandasBackend. + + Args: + response_data: Either a pyarrow.Table or dict of lists containing the data + aggregations: List of Aggregation objects to apply + group_keys: List of column names to group by + mode: Transformation mode ("python", "pandas", or "substrait") + + Returns: + Aggregated data in the same format as input + + TODO: Consider refactoring to support backends other than pandas in the future. + """ + if not aggregations: + return response_data + + backend = PandasBackend() + + # Convert to pandas DataFrame + if isinstance(response_data, dict): + df = pd.DataFrame(response_data) + else: # pyarrow.Table + df = backend.from_arrow(response_data) + + if df.empty: + return response_data + + # Convert aggregations to agg_ops format + agg_ops = _get_aggregate_operations(aggregations) + + # Apply aggregations using PandasBackend + result_df = backend.groupby_agg(df, group_keys, agg_ops) + + # Convert back to original format + if mode == "python": + return {col: result_df[col].tolist() for col in result_df.columns} + else: # pandas or substrait + return backend.to_arrow(result_df) + + def _augment_response_with_on_demand_transforms( online_features_response: GetOnlineFeaturesResponse, feature_refs: List[str], @@ -605,6 +672,28 @@ def _augment_response_with_on_demand_transforms( for odfv_name, _feature_refs in odfv_feature_refs.items(): odfv = requested_odfv_map[odfv_name] if not odfv.write_to_online_store: + # Apply aggregations BEFORE transformation if defined + if odfv.aggregations: + if odfv.mode == "python": + if initial_response_dict is None: + initial_response_dict = initial_response.to_dict() + initial_response_dict = _apply_aggregations_to_response( + initial_response_dict, + odfv.aggregations, + odfv.entities, + odfv.mode, + ) + elif odfv.mode in {"pandas", "substrait"}: + if initial_response_arrow is None: + initial_response_arrow = initial_response.to_arrow() + initial_response_arrow = _apply_aggregations_to_response( + initial_response_arrow, + odfv.aggregations, + odfv.entities, + odfv.mode, + ) + + # Apply transformation if odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() From 398dbfaf3496b3d497a8671600db7d458eaef813 Mon Sep 17 00:00:00 2001 From: hao-xu5 Date: Tue, 14 Oct 2025 15:30:40 -0700 Subject: [PATCH 2/9] add aggregation to odfv Signed-off-by: hao-xu5 --- .../local/backends/__init__.py | 0 .../compute_engines/local/backends/base.py | 79 ------------------- .../compute_engines/local/backends/factory.py | 53 ------------- .../local/backends/pandas_backend.py | 46 ----------- .../local/backends/polars_backend.py | 47 ----------- 5 files changed, 225 deletions(-) delete mode 100644 sdk/python/feast/infra/compute_engines/local/backends/__init__.py delete mode 100644 sdk/python/feast/infra/compute_engines/local/backends/base.py delete mode 100644 sdk/python/feast/infra/compute_engines/local/backends/factory.py delete mode 100644 sdk/python/feast/infra/compute_engines/local/backends/pandas_backend.py delete mode 100644 sdk/python/feast/infra/compute_engines/local/backends/polars_backend.py diff --git a/sdk/python/feast/infra/compute_engines/local/backends/__init__.py b/sdk/python/feast/infra/compute_engines/local/backends/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/sdk/python/feast/infra/compute_engines/local/backends/base.py b/sdk/python/feast/infra/compute_engines/local/backends/base.py deleted file mode 100644 index 3c8d25abe00..00000000000 --- a/sdk/python/feast/infra/compute_engines/local/backends/base.py +++ /dev/null @@ -1,79 +0,0 @@ -from abc import ABC, abstractmethod -from datetime import timedelta - - -class DataFrameBackend(ABC): - """ - Abstract interface for DataFrame operations used by the LocalComputeEngine. - - This interface defines the contract for implementing pluggable DataFrame backends - such as Pandas, Polars, or DuckDB. Each backend must support core table operations - such as joins, filtering, aggregation, conversion to/from Arrow, and deduplication. - - The purpose of this abstraction is to allow seamless swapping of execution backends - without changing DAGNode or ComputeEngine logic. All nodes operate on pyarrow.Table - as the standard input/output format, while the backend defines how the computation - is actually performed. - - Expected implementations include: - - PandasBackend - - PolarsBackend - - DuckDBBackend (future) - - Methods - ------- - from_arrow(table: pa.Table) -> Any - Convert a pyarrow.Table to the backend-native DataFrame format. - - to_arrow(df: Any) -> pa.Table - Convert a backend-native DataFrame to pyarrow.Table. - - join(left: Any, right: Any, on: List[str], how: str) -> Any - Join two dataframes on specified keys with given join type. - - groupby_agg(df: Any, group_keys: List[str], agg_ops: Dict[str, Tuple[str, str]]) -> Any - Group and aggregate the dataframe. `agg_ops` maps output column names - to (aggregation function, source column name) pairs. - - filter(df: Any, expr: str) -> Any - Apply a filter expression (string-based) to the DataFrame. - - to_timedelta_value(delta: timedelta) -> Any - Convert a Python timedelta object to a backend-compatible value - that can be subtracted from a timestamp column. - - drop_duplicates(df: Any, keys: List[str], sort_by: List[str], ascending: bool = False) -> Any - Deduplicate the DataFrame by key columns, keeping the first row - by descending or ascending sort order. - - rename_columns(df: Any, columns: Dict[str, str]) -> Any - Rename columns in the DataFrame according to the provided mapping. - """ - - @abstractmethod - def columns(self, df): ... - - @abstractmethod - def from_arrow(self, table): ... - - @abstractmethod - def join(self, left, right, on, how): ... - - @abstractmethod - def groupby_agg(self, df, group_keys, agg_ops): ... - - @abstractmethod - def filter(self, df, expr): ... - - @abstractmethod - def to_arrow(self, df): ... - - @abstractmethod - def to_timedelta_value(self, delta: timedelta): ... - - @abstractmethod - def drop_duplicates(self, df, keys, sort_by, ascending: bool = False): - pass - - @abstractmethod - def rename_columns(self, df, columns: dict[str, str]): ... diff --git a/sdk/python/feast/infra/compute_engines/local/backends/factory.py b/sdk/python/feast/infra/compute_engines/local/backends/factory.py deleted file mode 100644 index 6d3774f6393..00000000000 --- a/sdk/python/feast/infra/compute_engines/local/backends/factory.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import Optional - -import pandas as pd -import pyarrow - -from feast.infra.compute_engines.local.backends.base import DataFrameBackend -from feast.infra.compute_engines.local.backends.pandas_backend import PandasBackend - - -class BackendFactory: - """ - Factory class for constructing DataFrameBackend implementations based on backend name - or runtime entity_df type. - """ - - @staticmethod - def from_name(name: str) -> DataFrameBackend: - if name == "pandas": - return PandasBackend() - if name == "polars": - return BackendFactory._get_polars_backend() - raise ValueError(f"Unsupported backend name: {name}") - - @staticmethod - def infer_from_entity_df(entity_df) -> Optional[DataFrameBackend]: - if ( - not entity_df - or isinstance(entity_df, pyarrow.Table) - or isinstance(entity_df, pd.DataFrame) - ): - return PandasBackend() - - if BackendFactory._is_polars(entity_df): - return BackendFactory._get_polars_backend() - return None - - @staticmethod - def _is_polars(entity_df) -> bool: - try: - import polars as pl - except ImportError: - raise ImportError( - "Polars is not installed. Please install it to use Polars backend." - ) - return isinstance(entity_df, pl.DataFrame) - - @staticmethod - def _get_polars_backend(): - from feast.infra.compute_engines.local.backends.polars_backend import ( - PolarsBackend, - ) - - return PolarsBackend() diff --git a/sdk/python/feast/infra/compute_engines/local/backends/pandas_backend.py b/sdk/python/feast/infra/compute_engines/local/backends/pandas_backend.py deleted file mode 100644 index 76ddd688424..00000000000 --- a/sdk/python/feast/infra/compute_engines/local/backends/pandas_backend.py +++ /dev/null @@ -1,46 +0,0 @@ -from datetime import timedelta - -import pandas as pd -import pyarrow as pa - -from feast.infra.compute_engines.local.backends.base import DataFrameBackend - - -class PandasBackend(DataFrameBackend): - def columns(self, df): - return df.columns.tolist() - - def from_arrow(self, table): - return table.to_pandas() - - def join(self, left, right, on, how): - return left.merge(right, on=on, how=how) - - def groupby_agg(self, df, group_keys, agg_ops): - return ( - df.groupby(group_keys) - .agg( - **{ - alias: pd.NamedAgg(column=col, aggfunc=func) - for alias, (func, col) in agg_ops.items() - } - ) - .reset_index() - ) - - def filter(self, df, expr): - return df.query(expr) - - def to_arrow(self, df): - return pa.Table.from_pandas(df) - - def to_timedelta_value(self, delta: timedelta): - return pd.to_timedelta(delta) - - def drop_duplicates(self, df, keys, sort_by, ascending: bool = False): - return df.sort_values(by=sort_by, ascending=ascending).drop_duplicates( - subset=keys - ) - - def rename_columns(self, df, columns: dict[str, str]): - return df.rename(columns=columns) diff --git a/sdk/python/feast/infra/compute_engines/local/backends/polars_backend.py b/sdk/python/feast/infra/compute_engines/local/backends/polars_backend.py deleted file mode 100644 index 352ffecdab8..00000000000 --- a/sdk/python/feast/infra/compute_engines/local/backends/polars_backend.py +++ /dev/null @@ -1,47 +0,0 @@ -from datetime import timedelta - -import polars as pl -import pyarrow as pa - -from feast.infra.compute_engines.local.backends.base import DataFrameBackend - - -class PolarsBackend(DataFrameBackend): - def columns(self, df: pl.DataFrame) -> list[str]: - return df.columns - - def from_arrow(self, table: pa.Table) -> pl.DataFrame: - return pl.from_arrow(table) - - def to_arrow(self, df: pl.DataFrame) -> pa.Table: - return df.to_arrow() - - def join(self, left: pl.DataFrame, right: pl.DataFrame, on, how) -> pl.DataFrame: - return left.join(right, on=on, how=how) - - def groupby_agg(self, df: pl.DataFrame, group_keys, agg_ops) -> pl.DataFrame: - agg_exprs = [ - getattr(pl.col(col), func)().alias(alias) - for alias, (func, col) in agg_ops.items() - ] - return df.groupby(group_keys).agg(agg_exprs) - - def filter(self, df: pl.DataFrame, expr: str) -> pl.DataFrame: - return df.filter(pl.sql_expr(expr)) - - def to_timedelta_value(self, delta: timedelta): - return pl.duration(milliseconds=delta.total_seconds() * 1000) - - def drop_duplicates( - self, - df: pl.DataFrame, - keys: list[str], - sort_by: list[str], - ascending: bool = False, - ) -> pl.DataFrame: - return df.sort(by=sort_by, descending=not ascending).unique( - subset=keys, keep="first" - ) - - def rename_columns(self, df: pl.DataFrame, columns: dict[str, str]) -> pl.DataFrame: - return df.rename(columns) From 5ccd4eacd255d2db01fa72f0ca0bfcd6a56e9465 Mon Sep 17 00:00:00 2001 From: hao-xu5 Date: Tue, 14 Oct 2025 17:09:08 -0700 Subject: [PATCH 3/9] add aggregation to odfv Signed-off-by: hao-xu5 --- sdk/python/feast/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 05c73914bf8..c1234503e30 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -672,7 +672,7 @@ def _augment_response_with_on_demand_transforms( for odfv_name, _feature_refs in odfv_feature_refs.items(): odfv = requested_odfv_map[odfv_name] if not odfv.write_to_online_store: - # Apply aggregations BEFORE transformation if defined + # Apply aggregations if configured. if odfv.aggregations: if odfv.mode == "python": if initial_response_dict is None: @@ -694,7 +694,7 @@ def _augment_response_with_on_demand_transforms( ) # Apply transformation - if odfv.mode == "python": + elif odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict( From 6a398997264b502c3994e1cc56d37743e5966433 Mon Sep 17 00:00:00 2001 From: hao-xu5 Date: Mon, 20 Oct 2025 09:39:35 -0700 Subject: [PATCH 4/9] fix linting Signed-off-by: hao-xu5 --- .../infra/compute_engines/local/compute.py | 4 +- .../compute_engines/local/feature_builder.py | 2 +- .../infra/compute_engines/local/nodes.py | 2 +- sdk/python/feast/utils.py | 12 ++- ...test_on_demand_feature_view_aggregation.py | 89 +++++++++++++++++++ 5 files changed, 101 insertions(+), 8 deletions(-) create mode 100644 sdk/python/tests/unit/test_on_demand_feature_view_aggregation.py diff --git a/sdk/python/feast/infra/compute_engines/local/compute.py b/sdk/python/feast/infra/compute_engines/local/compute.py index cea7ba99547..26f537da7cf 100644 --- a/sdk/python/feast/infra/compute_engines/local/compute.py +++ b/sdk/python/feast/infra/compute_engines/local/compute.py @@ -12,10 +12,10 @@ MaterializationTask, ) from feast.infra.common.retrieval_task import HistoricalRetrievalTask -from feast.infra.compute_engines.base import ComputeEngine -from feast.infra.compute_engines.dag.context import ExecutionContext from feast.infra.compute_engines.backends.base import DataFrameBackend from feast.infra.compute_engines.backends.factory import BackendFactory +from feast.infra.compute_engines.base import ComputeEngine +from feast.infra.compute_engines.dag.context import ExecutionContext from feast.infra.compute_engines.local.feature_builder import LocalFeatureBuilder from feast.infra.compute_engines.local.job import ( LocalMaterializationJob, diff --git a/sdk/python/feast/infra/compute_engines/local/feature_builder.py b/sdk/python/feast/infra/compute_engines/local/feature_builder.py index 4eed3e577c4..a98573621fb 100644 --- a/sdk/python/feast/infra/compute_engines/local/feature_builder.py +++ b/sdk/python/feast/infra/compute_engines/local/feature_builder.py @@ -2,8 +2,8 @@ from feast.infra.common.materialization_job import MaterializationTask from feast.infra.common.retrieval_task import HistoricalRetrievalTask -from feast.infra.compute_engines.feature_builder import FeatureBuilder from feast.infra.compute_engines.backends.base import DataFrameBackend +from feast.infra.compute_engines.feature_builder import FeatureBuilder from feast.infra.compute_engines.local.nodes import ( LocalAggregationNode, LocalDedupNode, diff --git a/sdk/python/feast/infra/compute_engines/local/nodes.py b/sdk/python/feast/infra/compute_engines/local/nodes.py index d070e50885c..985a089daae 100644 --- a/sdk/python/feast/infra/compute_engines/local/nodes.py +++ b/sdk/python/feast/infra/compute_engines/local/nodes.py @@ -5,11 +5,11 @@ from feast import BatchFeatureView, StreamFeatureView from feast.data_source import DataSource +from feast.infra.compute_engines.backends.base import DataFrameBackend from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext from feast.infra.compute_engines.dag.model import DAGFormat from feast.infra.compute_engines.dag.node import DAGNode from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue -from feast.infra.compute_engines.backends.base import DataFrameBackend from feast.infra.compute_engines.local.local_node import LocalNode from feast.infra.compute_engines.utils import ( create_offline_store_retrieval_job, diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index c1234503e30..3e9a27c051f 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -584,7 +584,7 @@ def _get_aggregate_operations(agg_specs) -> dict: def _apply_aggregations_to_response( response_data: Union[pyarrow.Table, Dict[str, List[Any]]], aggregations, - group_keys: List[str], + group_keys: Optional[List[str]], mode: str, ) -> Union[pyarrow.Table, Dict[str, List[Any]]]: """ @@ -593,7 +593,7 @@ def _apply_aggregations_to_response( Args: response_data: Either a pyarrow.Table or dict of lists containing the data aggregations: List of Aggregation objects to apply - group_keys: List of column names to group by + group_keys: List of column names to group by (optional) mode: Transformation mode ("python", "pandas", or "substrait") Returns: @@ -619,7 +619,11 @@ def _apply_aggregations_to_response( agg_ops = _get_aggregate_operations(aggregations) # Apply aggregations using PandasBackend - result_df = backend.groupby_agg(df, group_keys, agg_ops) + if group_keys: + result_df = backend.groupby_agg(df, group_keys, agg_ops) + else: + # No grouping - aggregate over entire dataset + result_df = backend.groupby_agg(df, [], agg_ops) # Convert back to original format if mode == "python": @@ -693,7 +697,7 @@ def _augment_response_with_on_demand_transforms( odfv.mode, ) - # Apply transformation + # Apply transformation. Note, aggregations and transformation configs are mutually exclusive elif odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() diff --git a/sdk/python/tests/unit/test_on_demand_feature_view_aggregation.py b/sdk/python/tests/unit/test_on_demand_feature_view_aggregation.py new file mode 100644 index 00000000000..3d6199be3a0 --- /dev/null +++ b/sdk/python/tests/unit/test_on_demand_feature_view_aggregation.py @@ -0,0 +1,89 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for OnDemandFeatureView aggregations in online serving.""" + +import pyarrow as pa + +from feast.aggregation import Aggregation +from feast.utils import _apply_aggregations_to_response + + +def test_aggregation_python_mode(): + """Test aggregations in Python mode (dict format).""" + data = { + "driver_id": [1, 1, 2, 2], + "trips": [10, 20, 15, 25], + } + aggs = [Aggregation(column="trips", function="sum")] + + result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python") + + assert result == {"driver_id": [1, 2], "sum_trips": [30, 40]} + + +def test_aggregation_pandas_mode(): + """Test aggregations in Pandas mode (Arrow table format).""" + table = pa.table( + { + "driver_id": [1, 1, 2, 2], + "trips": [10, 20, 15, 25], + } + ) + aggs = [Aggregation(column="trips", function="sum")] + + result = _apply_aggregations_to_response(table, aggs, ["driver_id"], "pandas") + + assert isinstance(result, pa.Table) + result_df = result.to_pandas() + assert list(result_df["driver_id"]) == [1, 2] + assert list(result_df["sum_trips"]) == [30, 40] + + +def test_multiple_aggregations(): + """Test multiple aggregation functions.""" + data = { + "driver_id": [1, 1, 2, 2], + "trips": [10, 20, 15, 25], + "revenue": [100.0, 200.0, 150.0, 250.0], + } + aggs = [ + Aggregation(column="trips", function="sum"), + Aggregation(column="revenue", function="mean"), + ] + + result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python") + + assert result["driver_id"] == [1, 2] + assert result["sum_trips"] == [30, 40] + assert result["mean_revenue"] == [150.0, 200.0] + + +def test_no_aggregations_returns_original(): + """Test that no aggregations returns original data.""" + data = {"driver_id": [1, 2], "trips": [10, 20]} + + result = _apply_aggregations_to_response(data, [], ["driver_id"], "python") + + assert result == data + + +def test_empty_data_returns_empty(): + """Test that empty data returns empty result.""" + data = {"driver_id": [], "trips": []} + aggs = [Aggregation(column="trips", function="sum")] + + result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python") + + assert result == data From 81466fa2552c0b0941ec8bd05b19e8e27966bd90 Mon Sep 17 00:00:00 2001 From: Hao Xu Date: Mon, 20 Oct 2025 09:56:50 -0700 Subject: [PATCH 5/9] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- sdk/python/feast/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 3e9a27c051f..d503a77ee99 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -567,7 +567,7 @@ def _get_aggregate_operations(agg_specs) -> dict: Convert Aggregation specs to agg_ops format for PandasBackend. Reused from LocalFeatureBuilder logic. - TODO: This logic is duplicated from LocalFeatureBuilder._get_aggregate_operations(). + TODO: This logic is duplicated from feast.infra.compute_engines.local.feature_builder.LocalFeatureBuilder._get_aggregate_operations(). Consider refactoring to a shared utility module in the future. """ agg_ops = {} @@ -697,7 +697,7 @@ def _augment_response_with_on_demand_transforms( odfv.mode, ) - # Apply transformation. Note, aggregations and transformation configs are mutually exclusive + # Apply transformation. Note: aggregations and transformation configs are mutually exclusive elif odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() From b9541a1e21f1311e69a9aaacce1533cb9bfe2857 Mon Sep 17 00:00:00 2001 From: hao-xu5 Date: Mon, 20 Oct 2025 11:40:30 -0700 Subject: [PATCH 6/9] fix linting Signed-off-by: hao-xu5 --- .../feast.infra.compute_engines.local.backends.rst | 10 +++++----- .../docs/source/feast.infra.compute_engines.local.rst | 2 +- .../unit/infra/compute_engines/local/test_nodes.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/python/docs/source/feast.infra.compute_engines.local.backends.rst b/sdk/python/docs/source/feast.infra.compute_engines.local.backends.rst index 39205f3c4df..69eeed70bb5 100644 --- a/sdk/python/docs/source/feast.infra.compute_engines.local.backends.rst +++ b/sdk/python/docs/source/feast.infra.compute_engines.local.backends.rst @@ -7,7 +7,7 @@ Submodules feast.infra.compute\_engines.local.backends.base module ------------------------------------------------------- -.. automodule:: feast.infra.compute_engines.local.backends.base +.. automodule:: feast.infra.compute_engines.backends.base :members: :undoc-members: :show-inheritance: @@ -15,7 +15,7 @@ feast.infra.compute\_engines.local.backends.base module feast.infra.compute\_engines.local.backends.factory module ---------------------------------------------------------- -.. automodule:: feast.infra.compute_engines.local.backends.factory +.. automodule:: feast.infra.compute_engines.backends.factory :members: :undoc-members: :show-inheritance: @@ -23,7 +23,7 @@ feast.infra.compute\_engines.local.backends.factory module feast.infra.compute\_engines.local.backends.pandas\_backend module ------------------------------------------------------------------ -.. automodule:: feast.infra.compute_engines.local.backends.pandas_backend +.. automodule:: feast.infra.compute_engines.backends.pandas_backend :members: :undoc-members: :show-inheritance: @@ -31,7 +31,7 @@ feast.infra.compute\_engines.local.backends.pandas\_backend module feast.infra.compute\_engines.local.backends.polars\_backend module ------------------------------------------------------------------ -.. automodule:: feast.infra.compute_engines.local.backends.polars_backend +.. automodule:: feast.infra.compute_engines.backends.polars_backend :members: :undoc-members: :show-inheritance: @@ -39,7 +39,7 @@ feast.infra.compute\_engines.local.backends.polars\_backend module Module contents --------------- -.. automodule:: feast.infra.compute_engines.local.backends +.. automodule:: feast.infra.compute_engines.backends :members: :undoc-members: :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.compute_engines.local.rst b/sdk/python/docs/source/feast.infra.compute_engines.local.rst index 6525199e6dc..d6ce47cb140 100644 --- a/sdk/python/docs/source/feast.infra.compute_engines.local.rst +++ b/sdk/python/docs/source/feast.infra.compute_engines.local.rst @@ -7,7 +7,7 @@ Subpackages .. toctree:: :maxdepth: 4 - feast.infra.compute_engines.local.backends + feast.infra.compute_engines.backends Submodules ---------- diff --git a/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py b/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py index 20e23c35e03..df2fa961694 100644 --- a/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py +++ b/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py @@ -6,7 +6,7 @@ from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue -from feast.infra.compute_engines.local.backends.pandas_backend import PandasBackend +from feast.infra.compute_engines.backends.pandas_backend import PandasBackend from feast.infra.compute_engines.local.nodes import ( LocalAggregationNode, LocalDedupNode, From 2e85113a4755723b0e1f9525d59657d529addbcc Mon Sep 17 00:00:00 2001 From: hao-xu5 Date: Mon, 20 Oct 2025 11:52:04 -0700 Subject: [PATCH 7/9] fix linting Signed-off-by: hao-xu5 --- sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py b/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py index df2fa961694..905ea65ae42 100644 --- a/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py +++ b/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py @@ -4,9 +4,9 @@ import pandas as pd import pyarrow as pa +from feast.infra.compute_engines.backends.pandas_backend import PandasBackend from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue -from feast.infra.compute_engines.backends.pandas_backend import PandasBackend from feast.infra.compute_engines.local.nodes import ( LocalAggregationNode, LocalDedupNode, From 41d20bc276f4c0fcf321c0ca268952979737e756 Mon Sep 17 00:00:00 2001 From: hao-xu5 Date: Tue, 21 Oct 2025 22:49:40 -0700 Subject: [PATCH 8/9] update doc Signed-off-by: hao-xu5 --- docs/reference/beta-on-demand-feature-view.md | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/docs/reference/beta-on-demand-feature-view.md b/docs/reference/beta-on-demand-feature-view.md index f45620a1cee..2482bbc1c8f 100644 --- a/docs/reference/beta-on-demand-feature-view.md +++ b/docs/reference/beta-on-demand-feature-view.md @@ -35,10 +35,40 @@ When defining an ODFV, you can specify the transformation mode using the `mode` ### Singleton Transformations in Native Python Mode -Native Python mode supports transformations on singleton dictionaries by setting `singleton=True`. This allows you to -write transformation functions that operate on a single row at a time, making the code more intuitive and aligning with +Native Python mode supports transformations on singleton dictionaries by setting `singleton=True`. This allows you to +write transformation functions that operate on a single row at a time, making the code more intuitive and aligning with how data scientists typically think about data transformations. +## Aggregations + +On Demand Feature Views support aggregations that compute aggregate statistics over groups of rows. When using aggregations, data is grouped by entity columns (e.g., `driver_id`) and aggregated before being passed to the transformation function. + +**Important**: Aggregations and transformations are mutually exclusive. When aggregations are specified, they replace the transformation function. + +### Usage + +```python +from feast import Aggregation +from datetime import timedelta + +@on_demand_feature_view( + sources=[driver_hourly_stats_view], + schema=[ + Field(name="total_trips", dtype=Int64), + Field(name="avg_rating", dtype=Float64), + ], + aggregations=[ + Aggregation(column="trips", function="sum"), + Aggregation(column="rating", function="mean"), + ], +) +def driver_aggregated_stats(inputs): + # No transformation function needed when using aggregations + pass +``` + +Aggregated columns are automatically named using the pattern `{function}_{column}` (e.g., `sum_trips`, `mean_rating`). + ## Example See [https://github.com/feast-dev/on-demand-feature-views-demo](https://github.com/feast-dev/on-demand-feature-views-demo) for an example on how to use on demand feature views. From db15d1ff856b89bf9349fd6ade94a9d22577dfea Mon Sep 17 00:00:00 2001 From: hao-xu5 Date: Wed, 22 Oct 2025 10:52:44 -0700 Subject: [PATCH 9/9] update doc Signed-off-by: hao-xu5 --- sdk/python/feast/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index d503a77ee99..01ffa774ccd 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -698,6 +698,8 @@ def _augment_response_with_on_demand_transforms( ) # Apply transformation. Note: aggregations and transformation configs are mutually exclusive + # TODO: Fix to make it work for having both aggregation and transformation + # ticket: https://github.com/feast-dev/feast/issues/5689 elif odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict()