Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix linting
Signed-off-by: hao-xu5 <hxu44@apple.com>
  • Loading branch information
hao-xu5 committed Oct 20, 2025
commit 6a398997264b502c3994e1cc56d37743e5966433
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/compute_engines/local/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
MaterializationTask,
)
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.compute_engines.base import ComputeEngine
from feast.infra.compute_engines.dag.context import ExecutionContext
from feast.infra.compute_engines.backends.base import DataFrameBackend
from feast.infra.compute_engines.backends.factory import BackendFactory
from feast.infra.compute_engines.base import ComputeEngine
from feast.infra.compute_engines.dag.context import ExecutionContext
from feast.infra.compute_engines.local.feature_builder import LocalFeatureBuilder
from feast.infra.compute_engines.local.job import (
LocalMaterializationJob,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from feast.infra.common.materialization_job import MaterializationTask
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.compute_engines.feature_builder import FeatureBuilder
from feast.infra.compute_engines.backends.base import DataFrameBackend
from feast.infra.compute_engines.feature_builder import FeatureBuilder
from feast.infra.compute_engines.local.nodes import (
LocalAggregationNode,
LocalDedupNode,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/compute_engines/local/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

from feast import BatchFeatureView, StreamFeatureView
from feast.data_source import DataSource
from feast.infra.compute_engines.backends.base import DataFrameBackend
from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext
from feast.infra.compute_engines.dag.model import DAGFormat
from feast.infra.compute_engines.dag.node import DAGNode
from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue
from feast.infra.compute_engines.backends.base import DataFrameBackend
from feast.infra.compute_engines.local.local_node import LocalNode
from feast.infra.compute_engines.utils import (
create_offline_store_retrieval_job,
Expand Down
12 changes: 8 additions & 4 deletions sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ def _get_aggregate_operations(agg_specs) -> dict:
def _apply_aggregations_to_response(
response_data: Union[pyarrow.Table, Dict[str, List[Any]]],
aggregations,
group_keys: List[str],
group_keys: Optional[List[str]],
mode: str,
) -> Union[pyarrow.Table, Dict[str, List[Any]]]:
"""
Expand All @@ -593,7 +593,7 @@ def _apply_aggregations_to_response(
Args:
response_data: Either a pyarrow.Table or dict of lists containing the data
aggregations: List of Aggregation objects to apply
group_keys: List of column names to group by
group_keys: List of column names to group by (optional)
mode: Transformation mode ("python", "pandas", or "substrait")

Returns:
Expand All @@ -619,7 +619,11 @@ def _apply_aggregations_to_response(
agg_ops = _get_aggregate_operations(aggregations)

# Apply aggregations using PandasBackend
result_df = backend.groupby_agg(df, group_keys, agg_ops)
if group_keys:
result_df = backend.groupby_agg(df, group_keys, agg_ops)
else:
# No grouping - aggregate over entire dataset
result_df = backend.groupby_agg(df, [], agg_ops)
Comment thread
HaoXuAI marked this conversation as resolved.

# Convert back to original format
if mode == "python":
Expand Down Expand Up @@ -693,7 +697,7 @@ def _augment_response_with_on_demand_transforms(
odfv.mode,
)

# Apply transformation
# Apply transformation. Note, aggregations and transformation configs are mutually exclusive
Comment thread
HaoXuAI marked this conversation as resolved.
Outdated
elif odfv.mode == "python":
if initial_response_dict is None:
initial_response_dict = initial_response.to_dict()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2025 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for OnDemandFeatureView aggregations in online serving."""

import pyarrow as pa

from feast.aggregation import Aggregation
from feast.utils import _apply_aggregations_to_response


def test_aggregation_python_mode():
"""Test aggregations in Python mode (dict format)."""
data = {
"driver_id": [1, 1, 2, 2],
"trips": [10, 20, 15, 25],
}
aggs = [Aggregation(column="trips", function="sum")]

result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python")

assert result == {"driver_id": [1, 2], "sum_trips": [30, 40]}


def test_aggregation_pandas_mode():
"""Test aggregations in Pandas mode (Arrow table format)."""
table = pa.table(
{
"driver_id": [1, 1, 2, 2],
"trips": [10, 20, 15, 25],
}
)
aggs = [Aggregation(column="trips", function="sum")]

result = _apply_aggregations_to_response(table, aggs, ["driver_id"], "pandas")

assert isinstance(result, pa.Table)
result_df = result.to_pandas()
assert list(result_df["driver_id"]) == [1, 2]
assert list(result_df["sum_trips"]) == [30, 40]


def test_multiple_aggregations():
"""Test multiple aggregation functions."""
data = {
"driver_id": [1, 1, 2, 2],
"trips": [10, 20, 15, 25],
"revenue": [100.0, 200.0, 150.0, 250.0],
}
aggs = [
Aggregation(column="trips", function="sum"),
Aggregation(column="revenue", function="mean"),
]

result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python")

assert result["driver_id"] == [1, 2]
assert result["sum_trips"] == [30, 40]
assert result["mean_revenue"] == [150.0, 200.0]


def test_no_aggregations_returns_original():
"""Test that no aggregations returns original data."""
data = {"driver_id": [1, 2], "trips": [10, 20]}

result = _apply_aggregations_to_response(data, [], ["driver_id"], "python")

assert result == data


def test_empty_data_returns_empty():
"""Test that empty data returns empty result."""
data = {"driver_id": [], "trips": []}
aggs = [Aggregation(column="trips", function="sum")]

result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python")

assert result == data
Loading