Skip to content

Commit 7189d22

Browse files
committed
chore: Created Aggregation interface
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
1 parent ecee4f6 commit 7189d22

File tree

8 files changed

+116
-99
lines changed

8 files changed

+116
-99
lines changed

sdk/python/feast/aggregation.py

Lines changed: 7 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,10 @@
1-
from datetime import timedelta
2-
from typing import Optional
1+
"""
2+
Aggregation module for Feast.
33
4-
from google.protobuf.duration_pb2 import Duration
5-
from typeguard import typechecked
4+
DEPRECATED: This module has been moved to feast.aggregation.
5+
This file provides backward compatibility.
6+
"""
67

7-
from feast.protos.feast.core.Aggregation_pb2 import Aggregation as AggregationProto
8+
from feast.aggregation import Aggregation
89

9-
10-
@typechecked
11-
class Aggregation:
12-
"""
13-
NOTE: Feast-handled aggregations are not yet supported. This class provides a way to register user-defined aggregations.
14-
15-
Attributes:
16-
column: str # Column name of the feature we are aggregating.
17-
function: str # Provided built in aggregations sum, max, min, count mean
18-
time_window: timedelta # The time window for this aggregation.
19-
slide_interval: timedelta # The sliding window for these aggregations
20-
"""
21-
22-
column: str
23-
function: str
24-
time_window: Optional[timedelta]
25-
slide_interval: Optional[timedelta]
26-
27-
def __init__(
28-
self,
29-
column: Optional[str] = "",
30-
function: Optional[str] = "",
31-
time_window: Optional[timedelta] = None,
32-
slide_interval: Optional[timedelta] = None,
33-
):
34-
self.column = column or ""
35-
self.function = function or ""
36-
self.time_window = time_window
37-
if not slide_interval:
38-
self.slide_interval = self.time_window
39-
else:
40-
self.slide_interval = slide_interval
41-
42-
def to_proto(self) -> AggregationProto:
43-
window_duration = None
44-
if self.time_window is not None:
45-
window_duration = Duration()
46-
window_duration.FromTimedelta(self.time_window)
47-
48-
slide_interval_duration = None
49-
if self.slide_interval is not None:
50-
slide_interval_duration = Duration()
51-
slide_interval_duration.FromTimedelta(self.slide_interval)
52-
53-
return AggregationProto(
54-
column=self.column,
55-
function=self.function,
56-
time_window=window_duration,
57-
slide_interval=slide_interval_duration,
58-
)
59-
60-
@classmethod
61-
def from_proto(cls, agg_proto: AggregationProto):
62-
time_window = (
63-
timedelta(days=0)
64-
if agg_proto.time_window.ToNanoseconds() == 0
65-
else agg_proto.time_window.ToTimedelta()
66-
)
67-
68-
slide_interval = (
69-
timedelta(days=0)
70-
if agg_proto.slide_interval.ToNanoseconds() == 0
71-
else agg_proto.slide_interval.ToTimedelta()
72-
)
73-
aggregation = cls(
74-
column=agg_proto.column,
75-
function=agg_proto.function,
76-
time_window=time_window,
77-
slide_interval=slide_interval,
78-
)
79-
return aggregation
80-
81-
def __eq__(self, other):
82-
if not isinstance(other, Aggregation):
83-
raise TypeError("Comparisons should only involve Aggregations.")
84-
85-
if (
86-
self.column != other.column
87-
or self.function != other.function
88-
or self.time_window != other.time_window
89-
or self.slide_interval != other.slide_interval
90-
):
91-
return False
92-
93-
return True
10+
__all__ = ["Aggregation"]
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""
2+
Aggregation module for Feast.
3+
"""
4+
5+
from datetime import timedelta
6+
from typing import Optional
7+
8+
from google.protobuf.duration_pb2 import Duration
9+
from typeguard import typechecked
10+
11+
from feast.protos.feast.core.Aggregation_pb2 import Aggregation as AggregationProto
12+
13+
14+
@typechecked
15+
class Aggregation:
16+
"""
17+
NOTE: Feast-handled aggregations are not yet supported. This class provides a way to register user-defined aggregations.
18+
19+
Attributes:
20+
column: str # Column name of the feature we are aggregating.
21+
function: str # Provided built in aggregations sum, max, min, count mean
22+
time_window: timedelta # The time window for this aggregation.
23+
slide_interval: timedelta # The sliding window for these aggregations
24+
"""
25+
26+
column: str
27+
function: str
28+
time_window: Optional[timedelta]
29+
slide_interval: Optional[timedelta]
30+
31+
def __init__(
32+
self,
33+
column: Optional[str] = "",
34+
function: Optional[str] = "",
35+
time_window: Optional[timedelta] = None,
36+
slide_interval: Optional[timedelta] = None,
37+
):
38+
self.column = column or ""
39+
self.function = function or ""
40+
self.time_window = time_window
41+
if not slide_interval:
42+
self.slide_interval = self.time_window
43+
else:
44+
self.slide_interval = slide_interval
45+
46+
def to_proto(self) -> AggregationProto:
47+
window_duration = None
48+
if self.time_window is not None:
49+
window_duration = Duration()
50+
window_duration.FromTimedelta(self.time_window)
51+
52+
slide_interval_duration = None
53+
if self.slide_interval is not None:
54+
slide_interval_duration = Duration()
55+
slide_interval_duration.FromTimedelta(self.slide_interval)
56+
57+
return AggregationProto(
58+
column=self.column,
59+
function=self.function,
60+
time_window=window_duration,
61+
slide_interval=slide_interval_duration,
62+
)
63+
64+
@classmethod
65+
def from_proto(cls, agg_proto: AggregationProto):
66+
time_window = (
67+
timedelta(days=0)
68+
if agg_proto.time_window.ToNanoseconds() == 0
69+
else agg_proto.time_window.ToTimedelta()
70+
)
71+
72+
slide_interval = (
73+
timedelta(days=0)
74+
if agg_proto.slide_interval.ToNanoseconds() == 0
75+
else agg_proto.slide_interval.ToTimedelta()
76+
)
77+
aggregation = cls(
78+
column=agg_proto.column,
79+
function=agg_proto.function,
80+
time_window=time_window,
81+
slide_interval=slide_interval,
82+
)
83+
return aggregation
84+
85+
def __eq__(self, other):
86+
if not isinstance(other, Aggregation):
87+
raise TypeError("Comparisons should only involve Aggregations.")
88+
89+
if (
90+
self.column != other.column
91+
or self.function != other.function
92+
or self.time_window != other.time_window
93+
or self.slide_interval != other.slide_interval
94+
):
95+
return False
96+
97+
return True
98+
99+
100+
__all__ = ["Aggregation"]

sdk/python/feast/infra/tiling/__init__.py renamed to sdk/python/feast/aggregation/tiling/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
4. Engine nodes: Convert back to engine format (e.g., from_pandas(), createDataFrame())
1212
"""
1313

14-
from feast.infra.tiling.base import IRMetadata, get_ir_metadata_for_aggregation
15-
from feast.infra.tiling.orchestrator import apply_sawtooth_window_tiling
16-
from feast.infra.tiling.tile_subtraction import (
14+
from feast.aggregation.tiling.base import IRMetadata, get_ir_metadata_for_aggregation
15+
from feast.aggregation.tiling.orchestrator import apply_sawtooth_window_tiling
16+
from feast.aggregation.tiling.tile_subtraction import (
1717
convert_cumulative_to_windowed,
1818
deduplicate_keep_latest,
1919
)
File renamed without changes.

sdk/python/feast/infra/tiling/orchestrator.py renamed to sdk/python/feast/aggregation/tiling/orchestrator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import pandas as pd
1313

1414
from feast.aggregation import Aggregation
15-
from feast.infra.tiling.base import get_ir_metadata_for_aggregation
15+
from feast.aggregation.tiling.base import get_ir_metadata_for_aggregation
1616

1717

1818
def apply_sawtooth_window_tiling(

sdk/python/feast/infra/tiling/tile_subtraction.py renamed to sdk/python/feast/aggregation/tiling/tile_subtraction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import pandas as pd
1212

1313
from feast.aggregation import Aggregation
14-
from feast.infra.tiling.base import get_ir_metadata_for_aggregation
14+
from feast.aggregation.tiling.base import get_ir_metadata_for_aggregation
1515

1616

1717
def convert_cumulative_to_windowed(

sdk/python/feast/infra/compute_engines/ray/nodes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,8 @@ def _execute_tiled_aggregation(self, dataset: Dataset) -> DAGValue:
360360
3. Convert to windowed aggregations
361361
4. Convert pandas → Ray Dataset
362362
"""
363-
from feast.infra.tiling.orchestrator import apply_sawtooth_window_tiling
364-
from feast.infra.tiling.tile_subtraction import (
363+
from feast.aggregation.tiling.orchestrator import apply_sawtooth_window_tiling
364+
from feast.aggregation.tiling.tile_subtraction import (
365365
convert_cumulative_to_windowed,
366366
deduplicate_keep_latest,
367367
)

sdk/python/feast/infra/compute_engines/spark/nodes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ def _execute_tiled_aggregation(self, input_df: DataFrame) -> DAGValue:
146146
)
147147
aggs_by_window[agg.time_window].append(agg)
148148

149-
from feast.infra.tiling.orchestrator import apply_sawtooth_window_tiling
150-
from feast.infra.tiling.tile_subtraction import (
149+
from feast.aggregation.tiling.orchestrator import apply_sawtooth_window_tiling
150+
from feast.aggregation.tiling.tile_subtraction import (
151151
convert_cumulative_to_windowed,
152152
deduplicate_keep_latest,
153153
)

0 commit comments

Comments
 (0)