Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/getting-started/concepts/batch-feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ driver_fv = BatchFeatureView(
Field(name="conv_rate", dtype=Float32),
],
aggregations=[
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)),
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1), name="total_conv_rate_1d"),
],
source=source,
)
Expand Down Expand Up @@ -144,6 +144,7 @@ See:
- `sink_source` is **required** when chaining views (i.e., `source` is another FeatureView or list of them).
- Schema fields must be consistent with `sink_source`, `batch_source.field_mapping` if field mappings exist.
- Aggregation logic must reference columns present in the raw source or transformed inputs.
- The output feature name for an aggregation defaults to `{function}_{column}` (e.g., `sum_conv_rate`). Use the `name` parameter to override it (e.g., `name="total_conv_rate_1d"`).

---

Expand Down
13 changes: 9 additions & 4 deletions docs/getting-started/concepts/tiling.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ customer_features = StreamFeatureView(
batch_source=file_source, # For historical data
),
aggregations=[
Aggregation(column="amount", function="sum", time_window=timedelta(hours=1)),
Aggregation(column="amount", function="avg", time_window=timedelta(hours=1)),
Aggregation(column="amount", function="std", time_window=timedelta(hours=1)),
Aggregation(column="amount", function="sum", time_window=timedelta(hours=1), name="sum_amount_1h"),
Aggregation(column="amount", function="avg", time_window=timedelta(hours=1), name="avg_amount_1h"),
Aggregation(column="amount", function="std", time_window=timedelta(hours=1), name="std_amount_1h"),
],
timestamp_field="event_timestamp",
online=True,
Expand All @@ -229,7 +229,12 @@ customer_features = StreamFeatureView(

### Key Parameters

- `aggregations`: List of time-windowed aggregations to compute
- `aggregations`: List of time-windowed aggregations to compute. Each `Aggregation` accepts:
- `column`: source column to aggregate
- `function`: aggregation function (`sum`, `avg`, `mean`, `min`, `max`, `count`, `std`)
- `time_window`: duration of the aggregation window
- `slide_interval`: hop/slide size (defaults to `time_window`)
- `name` *(optional)*: output feature name. Defaults to `{function}_{column}` (e.g., `sum_amount`). Set this to use a custom name (e.g., `name="sum_amount_1h"`).
- `timestamp_field`: Column name for timestamps (required when aggregations are specified)
- `enable_tiling`: Enable tiling optimization (default: `False`)
- Set to `True` for **streaming scenarios**
Expand Down
1 change: 1 addition & 0 deletions protos/feast/core/Aggregation.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ message Aggregation {
string function = 2;
google.protobuf.Duration time_window = 3;
google.protobuf.Duration slide_interval = 4;
string name = 5;
}
23 changes: 22 additions & 1 deletion sdk/python/feast/aggregation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ class Aggregation:
function: str # Provided built in aggregations sum, max, min, count mean
time_window: timedelta # The time window for this aggregation.
slide_interval: timedelta # The sliding window for these aggregations
name: str # Optional override for the output feature name (defaults to {function}_{column})
"""

column: str
function: str
time_window: Optional[timedelta]
slide_interval: Optional[timedelta]
name: str

def __init__(
self,
column: Optional[str] = "",
function: Optional[str] = "",
time_window: Optional[timedelta] = None,
slide_interval: Optional[timedelta] = None,
name: Optional[str] = None,
):
self.column = column or ""
self.function = function or ""
Expand All @@ -42,6 +45,7 @@ def __init__(
self.slide_interval = self.time_window
else:
self.slide_interval = slide_interval
self.name = name or ""
Comment thread
nquinn408 marked this conversation as resolved.

def to_proto(self) -> AggregationProto:
window_duration = None
Expand All @@ -59,6 +63,7 @@ def to_proto(self) -> AggregationProto:
function=self.function,
time_window=window_duration,
slide_interval=slide_interval_duration,
name=self.name,
)

@classmethod
Expand All @@ -79,6 +84,7 @@ def from_proto(cls, agg_proto: AggregationProto):
function=agg_proto.function,
time_window=time_window,
slide_interval=slide_interval,
name=agg_proto.name or None,
)
return aggregation

Expand All @@ -91,11 +97,26 @@ def __eq__(self, other):
or self.function != other.function
or self.time_window != other.time_window
or self.slide_interval != other.slide_interval
or self.name != other.name
):
return False

return True

def resolved_name(self, time_window: Optional[timedelta] = None) -> str:
"""Return the output feature name for this aggregation.

If ``name`` is set it is returned as-is. Otherwise the name is
derived as ``{function}_{column}``, with ``_{seconds}s`` appended
when *time_window* is provided.
"""
if self.name:
return self.name
base = f"{self.function}_{self.column}"
if time_window is not None:
return f"{base}_{int(time_window.total_seconds())}s"
return base
Comment thread
nquinn408 marked this conversation as resolved.


def aggregation_specs_to_agg_ops(
agg_specs: Iterable[Any],
Expand All @@ -106,7 +127,7 @@ def aggregation_specs_to_agg_ops(
for agg in agg_specs:
if getattr(agg, "time_window", None) is not None:
raise ValueError(time_window_unsupported_error_message)
alias = f"{agg.function}_{agg.column}"
alias = getattr(agg, "name", None) or f"{agg.function}_{agg.column}"
agg_ops[alias] = (agg.function, agg.column)
return agg_ops

Expand Down
8 changes: 2 additions & 6 deletions sdk/python/feast/aggregation/tiling/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ def apply_sawtooth_window_tiling(
ir_metadata_dict = {}

for agg in aggregations:
feature_name = (
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
)
feature_name = agg.resolved_name(window_size)
_, metadata = get_ir_metadata_for_aggregation(agg, feature_name)
ir_metadata_dict[feature_name] = metadata

Expand Down Expand Up @@ -161,9 +159,7 @@ def apply_sawtooth_window_tiling(

# Step 5: Compute final feature values from IRs (for algebraic aggs, just rename)
for agg in aggregations:
feature_name = (
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
)
feature_name = agg.resolved_name(window_size)
metadata = ir_metadata_dict[feature_name]

if metadata.type == "algebraic":
Expand Down
4 changes: 1 addition & 3 deletions sdk/python/feast/aggregation/tiling/tile_subtraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ def convert_cumulative_to_windowed(

# Subtract previous tile values from current tile for each aggregation
for agg in aggregations:
feature_name = (
f"{agg.function}_{agg.column}_{int(window_size.total_seconds())}s"
)
feature_name = agg.resolved_name(window_size)
_, metadata = get_ir_metadata_for_aggregation(agg, feature_name)

if metadata.type == "algebraic":
Expand Down
4 changes: 1 addition & 3 deletions sdk/python/feast/infra/compute_engines/ray/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,7 @@ def _execute_standard_aggregation(self, dataset: Dataset) -> DAGValue:
# Convert aggregations to Ray's groupby format
agg_dict = {}
for agg in self.aggregations:
feature_name = f"{agg.function}_{agg.column}"
if agg.time_window:
feature_name += f"_{int(agg.time_window.total_seconds())}s"
feature_name = agg.resolved_name(agg.time_window)

if agg.function == "count":
agg_dict[feature_name] = (agg.column, "count")
Expand Down
8 changes: 2 additions & 6 deletions sdk/python/feast/infra/compute_engines/spark/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def _execute_tiled_aggregation(self, input_df: DataFrame) -> DAGValue:
expected_columns = entity_keys + [self.timestamp_col]
for time_window, window_aggs in aggs_by_window.items():
for agg in window_aggs:
feature_name = f"{agg.function}_{agg.column}_{int(time_window.total_seconds())}s"
feature_name = agg.resolved_name(time_window)
if feature_name not in expected_columns:
expected_columns.append(feature_name)

Expand Down Expand Up @@ -372,11 +372,7 @@ def _execute_standard_aggregation(self, input_df: DataFrame) -> DAGValue:
agg_exprs = []
for agg in self.aggregations:
func = getattr(F, agg.function)
expr = func(agg.column).alias(
f"{agg.function}_{agg.column}_{int(agg.time_window.total_seconds())}s"
if agg.time_window
else f"{agg.function}_{agg.column}"
)
expr = func(agg.column).alias(agg.resolved_name(agg.time_window))
agg_exprs.append(expr)

if any(agg.time_window for agg in self.aggregations):
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/protos/feast/core/Aggregation_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion sdk/python/feast/protos/feast/core/Aggregation_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ class Aggregation(google.protobuf.message.Message):
FUNCTION_FIELD_NUMBER: builtins.int
TIME_WINDOW_FIELD_NUMBER: builtins.int
SLIDE_INTERVAL_FIELD_NUMBER: builtins.int
NAME_FIELD_NUMBER: builtins.int
column: builtins.str
function: builtins.str
name: builtins.str
@property
def time_window(self) -> google.protobuf.duration_pb2.Duration: ...
@property
Expand All @@ -35,8 +37,9 @@ class Aggregation(google.protobuf.message.Message):
function: builtins.str = ...,
time_window: google.protobuf.duration_pb2.Duration | None = ...,
slide_interval: google.protobuf.duration_pb2.Duration | None = ...,
name: builtins.str = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["slide_interval", b"slide_interval", "time_window", b"time_window"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["column", b"column", "function", b"function", "slide_interval", b"slide_interval", "time_window", b"time_window"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["column", b"column", "function", b"function", "name", b"name", "slide_interval", b"slide_interval", "time_window", b"time_window"]) -> None: ...

global___Aggregation = Aggregation
58 changes: 56 additions & 2 deletions sdk/python/tests/unit/test_aggregation_ops.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from datetime import timedelta

import pytest

from feast.aggregation import aggregation_specs_to_agg_ops
from feast.aggregation import Aggregation, aggregation_specs_to_agg_ops


class DummyAggregation:
def __init__(self, *, function: str, column: str, time_window=None):
def __init__(self, *, function: str, column: str, time_window=None, name: str = ""):
self.function = function
self.column = column
self.time_window = time_window
self.name = name


def test_aggregation_specs_to_agg_ops_success():
Expand Down Expand Up @@ -42,3 +45,54 @@ def test_aggregation_specs_to_agg_ops_time_window_unsupported(error_message: str
agg_specs,
time_window_unsupported_error_message=error_message,
)


def test_aggregation_specs_to_agg_ops_custom_name():
agg_specs = [
DummyAggregation(
function="sum",
column="seconds_watched",
name="sum_seconds_watched_per_ad_1d",
),
]

agg_ops = aggregation_specs_to_agg_ops(
agg_specs,
time_window_unsupported_error_message="Time window aggregation is not supported.",
)

assert agg_ops == {
"sum_seconds_watched_per_ad_1d": ("sum", "seconds_watched"),
}


def test_aggregation_specs_to_agg_ops_mixed_names():
agg_specs = [
DummyAggregation(function="sum", column="trips", name="total_trips"),
DummyAggregation(function="mean", column="fare"),
]

agg_ops = aggregation_specs_to_agg_ops(
agg_specs,
time_window_unsupported_error_message="Time window aggregation is not supported.",
)

assert agg_ops == {
"total_trips": ("sum", "trips"),
"mean_fare": ("mean", "fare"),
}


def test_aggregation_round_trip_with_name():
agg = Aggregation(
column="seconds_watched",
function="sum",
time_window=timedelta(days=1),
name="sum_seconds_watched_per_ad_1d",
)
proto = agg.to_proto()
assert proto.name == "sum_seconds_watched_per_ad_1d"

restored = Aggregation.from_proto(proto)
assert restored.name == "sum_seconds_watched_per_ad_1d"
assert restored == agg
Loading