Skip to content

Commit 6a39899

Browse files
author
hao-xu5
committed
fix linting
Signed-off-by: hao-xu5 <hxu44@apple.com>
1 parent 5ccd4ea commit 6a39899

File tree

5 files changed

+101
-8
lines changed

5 files changed

+101
-8
lines changed

sdk/python/feast/infra/compute_engines/local/compute.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
MaterializationTask,
1313
)
1414
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
15-
from feast.infra.compute_engines.base import ComputeEngine
16-
from feast.infra.compute_engines.dag.context import ExecutionContext
1715
from feast.infra.compute_engines.backends.base import DataFrameBackend
1816
from feast.infra.compute_engines.backends.factory import BackendFactory
17+
from feast.infra.compute_engines.base import ComputeEngine
18+
from feast.infra.compute_engines.dag.context import ExecutionContext
1919
from feast.infra.compute_engines.local.feature_builder import LocalFeatureBuilder
2020
from feast.infra.compute_engines.local.job import (
2121
LocalMaterializationJob,

sdk/python/feast/infra/compute_engines/local/feature_builder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
from feast.infra.common.materialization_job import MaterializationTask
44
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
5-
from feast.infra.compute_engines.feature_builder import FeatureBuilder
65
from feast.infra.compute_engines.backends.base import DataFrameBackend
6+
from feast.infra.compute_engines.feature_builder import FeatureBuilder
77
from feast.infra.compute_engines.local.nodes import (
88
LocalAggregationNode,
99
LocalDedupNode,

sdk/python/feast/infra/compute_engines/local/nodes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55

66
from feast import BatchFeatureView, StreamFeatureView
77
from feast.data_source import DataSource
8+
from feast.infra.compute_engines.backends.base import DataFrameBackend
89
from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext
910
from feast.infra.compute_engines.dag.model import DAGFormat
1011
from feast.infra.compute_engines.dag.node import DAGNode
1112
from feast.infra.compute_engines.local.arrow_table_value import ArrowTableValue
12-
from feast.infra.compute_engines.backends.base import DataFrameBackend
1313
from feast.infra.compute_engines.local.local_node import LocalNode
1414
from feast.infra.compute_engines.utils import (
1515
create_offline_store_retrieval_job,

sdk/python/feast/utils.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ def _get_aggregate_operations(agg_specs) -> dict:
584584
def _apply_aggregations_to_response(
585585
response_data: Union[pyarrow.Table, Dict[str, List[Any]]],
586586
aggregations,
587-
group_keys: List[str],
587+
group_keys: Optional[List[str]],
588588
mode: str,
589589
) -> Union[pyarrow.Table, Dict[str, List[Any]]]:
590590
"""
@@ -593,7 +593,7 @@ def _apply_aggregations_to_response(
593593
Args:
594594
response_data: Either a pyarrow.Table or dict of lists containing the data
595595
aggregations: List of Aggregation objects to apply
596-
group_keys: List of column names to group by
596+
group_keys: List of column names to group by (optional)
597597
mode: Transformation mode ("python", "pandas", or "substrait")
598598
599599
Returns:
@@ -619,7 +619,11 @@ def _apply_aggregations_to_response(
619619
agg_ops = _get_aggregate_operations(aggregations)
620620

621621
# Apply aggregations using PandasBackend
622-
result_df = backend.groupby_agg(df, group_keys, agg_ops)
622+
if group_keys:
623+
result_df = backend.groupby_agg(df, group_keys, agg_ops)
624+
else:
625+
# No grouping - aggregate over entire dataset
626+
result_df = backend.groupby_agg(df, [], agg_ops)
623627

624628
# Convert back to original format
625629
if mode == "python":
@@ -693,7 +697,7 @@ def _augment_response_with_on_demand_transforms(
693697
odfv.mode,
694698
)
695699

696-
# Apply transformation
700+
# Apply transformation. Note, aggregations and transformation configs are mutually exclusive
697701
elif odfv.mode == "python":
698702
if initial_response_dict is None:
699703
initial_response_dict = initial_response.to_dict()
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Copyright 2025 The Feast Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Tests for OnDemandFeatureView aggregations in online serving."""
16+
17+
import pyarrow as pa
18+
19+
from feast.aggregation import Aggregation
20+
from feast.utils import _apply_aggregations_to_response
21+
22+
23+
def test_aggregation_python_mode():
24+
"""Test aggregations in Python mode (dict format)."""
25+
data = {
26+
"driver_id": [1, 1, 2, 2],
27+
"trips": [10, 20, 15, 25],
28+
}
29+
aggs = [Aggregation(column="trips", function="sum")]
30+
31+
result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python")
32+
33+
assert result == {"driver_id": [1, 2], "sum_trips": [30, 40]}
34+
35+
36+
def test_aggregation_pandas_mode():
37+
"""Test aggregations in Pandas mode (Arrow table format)."""
38+
table = pa.table(
39+
{
40+
"driver_id": [1, 1, 2, 2],
41+
"trips": [10, 20, 15, 25],
42+
}
43+
)
44+
aggs = [Aggregation(column="trips", function="sum")]
45+
46+
result = _apply_aggregations_to_response(table, aggs, ["driver_id"], "pandas")
47+
48+
assert isinstance(result, pa.Table)
49+
result_df = result.to_pandas()
50+
assert list(result_df["driver_id"]) == [1, 2]
51+
assert list(result_df["sum_trips"]) == [30, 40]
52+
53+
54+
def test_multiple_aggregations():
55+
"""Test multiple aggregation functions."""
56+
data = {
57+
"driver_id": [1, 1, 2, 2],
58+
"trips": [10, 20, 15, 25],
59+
"revenue": [100.0, 200.0, 150.0, 250.0],
60+
}
61+
aggs = [
62+
Aggregation(column="trips", function="sum"),
63+
Aggregation(column="revenue", function="mean"),
64+
]
65+
66+
result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python")
67+
68+
assert result["driver_id"] == [1, 2]
69+
assert result["sum_trips"] == [30, 40]
70+
assert result["mean_revenue"] == [150.0, 200.0]
71+
72+
73+
def test_no_aggregations_returns_original():
74+
"""Test that no aggregations returns original data."""
75+
data = {"driver_id": [1, 2], "trips": [10, 20]}
76+
77+
result = _apply_aggregations_to_response(data, [], ["driver_id"], "python")
78+
79+
assert result == data
80+
81+
82+
def test_empty_data_returns_empty():
83+
"""Test that empty data returns empty result."""
84+
data = {"driver_id": [], "trips": []}
85+
aggs = [Aggregation(column="trips", function="sum")]
86+
87+
result = _apply_aggregations_to_response(data, aggs, ["driver_id"], "python")
88+
89+
assert result == data

0 commit comments

Comments
 (0)