Skip to content

Commit 8117ec4

Browse files
Vperiodtntkathole
authored andcommitted
added utility function
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
1 parent 38c72fe commit 8117ec4

File tree

8 files changed

+80
-160
lines changed

8 files changed

+80
-160
lines changed

sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig
3232
from feast.infra.utils.clickhouse.connection_utils import get_client
3333
from feast.saved_dataset import SavedDatasetStorage
34-
from feast.utils import _utc_now, make_tzaware
34+
from feast.utils import compute_non_entity_date_range
3535

3636

3737
class ClickhouseOfflineStoreConfig(ClickhouseConfig):
@@ -56,12 +56,11 @@ def get_historical_features(
5656

5757
# Handle non-entity retrieval mode
5858
if entity_df is None:
59-
end_date = kwargs.get("end_date", None)
60-
if end_date is None:
61-
end_date = _utc_now()
62-
else:
63-
end_date = make_tzaware(end_date)
64-
59+
start_date, end_date = compute_non_entity_date_range(
60+
feature_views,
61+
start_date=kwargs.get("start_date"),
62+
end_date=kwargs.get("end_date"),
63+
)
6564
entity_df = pd.DataFrame({"event_timestamp": [end_date]})
6665

6766
entity_schema = _get_entity_schema(entity_df, config)

sdk/python/feast/infra/offline_stores/contrib/oracle_offline_store/oracle.py

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import datetime, timedelta, timezone
1+
from datetime import datetime
22
from pathlib import Path
33
from typing import Any, Callable, List, Literal, Optional, Union
44

@@ -24,6 +24,7 @@
2424
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
2525
from feast.infra.registry.base_registry import BaseRegistry
2626
from feast.repo_config import FeastConfigBaseModel, RepoConfig
27+
from feast.utils import compute_non_entity_date_range
2728

2829

2930
def get_ibis_connection(config: RepoConfig):
@@ -153,35 +154,6 @@ def _validate_connection_params(self):
153154
return self
154155

155156

156-
def _resolve_date_range(
157-
start_date: Optional[datetime],
158-
end_date: Optional[datetime],
159-
feature_views: List[FeatureView],
160-
) -> tuple:
161-
"""Resolve start/end dates to a UTC-aware range, using TTL as a fallback window."""
162-
if end_date is None:
163-
end_date = datetime.now(tz=timezone.utc)
164-
elif end_date.tzinfo is None:
165-
end_date = end_date.replace(tzinfo=timezone.utc)
166-
167-
if start_date is None:
168-
max_ttl_seconds = max(
169-
(
170-
int(fv.ttl.total_seconds())
171-
for fv in feature_views
172-
if fv.ttl and isinstance(fv.ttl, timedelta)
173-
),
174-
default=0,
175-
)
176-
start_date = end_date - timedelta(
177-
seconds=max_ttl_seconds if max_ttl_seconds > 0 else 30 * 86400
178-
)
179-
elif start_date.tzinfo is None:
180-
start_date = start_date.replace(tzinfo=timezone.utc)
181-
182-
return start_date, end_date
183-
184-
185157
def _build_entity_df_from_feature_sources(
186158
con,
187159
feature_views: List[FeatureView],
@@ -254,10 +226,10 @@ def get_historical_features(
254226

255227
# Handle non-entity retrieval mode (start_date/end_date only)
256228
if entity_df is None:
257-
start_date, end_date = _resolve_date_range(
229+
start_date, end_date = compute_non_entity_date_range(
230+
feature_views,
258231
start_date=kwargs.get("start_date"),
259232
end_date=kwargs.get("end_date"),
260-
feature_views=feature_views,
261233
)
262234
entity_df = _build_entity_df_from_feature_sources(
263235
con, feature_views, start_date, end_date

sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import contextlib
22
from dataclasses import asdict
3-
from datetime import datetime, timedelta, timezone
3+
from datetime import datetime, timezone
44
from enum import Enum
55
from typing import (
66
Any,
@@ -46,7 +46,7 @@
4646
from feast.repo_config import RepoConfig
4747
from feast.saved_dataset import SavedDatasetStorage
4848
from feast.type_map import pg_type_code_to_arrow
49-
from feast.utils import _utc_now, make_tzaware
49+
from feast.utils import compute_non_entity_date_range
5050

5151
from .postgres_source import PostgreSQLSource
5252

@@ -129,36 +129,16 @@ def get_historical_features(
129129
assert isinstance(config.offline_store, PostgreSQLOfflineStoreConfig)
130130
for fv in feature_views:
131131
assert isinstance(fv.batch_source, PostgreSQLSource)
132-
start_date: Optional[datetime] = kwargs.get("start_date", None)
133-
end_date: Optional[datetime] = kwargs.get("end_date", None)
132+
start_date: Optional[datetime] = kwargs.get("start_date")
133+
end_date: Optional[datetime] = kwargs.get("end_date")
134134

135135
# Handle non-entity retrieval mode
136136
if entity_df is None:
137-
# Default to current time if end_date not provided
138-
if end_date is None:
139-
end_date = _utc_now()
140-
else:
141-
end_date = make_tzaware(end_date)
142-
143-
# Calculate start_date from TTL if not provided
144-
145-
if start_date is None:
146-
# Find the maximum TTL across all feature views to ensure we capture enough data
147-
max_ttl_seconds = 0
148-
for fv in feature_views:
149-
if fv.ttl and isinstance(fv.ttl, timedelta):
150-
ttl_seconds = int(fv.ttl.total_seconds())
151-
max_ttl_seconds = max(max_ttl_seconds, ttl_seconds)
152-
153-
if max_ttl_seconds > 0:
154-
# Start from (end_date - max_ttl) to ensure we capture all relevant features
155-
start_date = end_date - timedelta(seconds=max_ttl_seconds)
156-
else:
157-
# If no TTL is set, default to 30 days before end_date
158-
start_date = end_date - timedelta(days=30)
159-
else:
160-
start_date = make_tzaware(start_date)
161-
137+
start_date, end_date = compute_non_entity_date_range(
138+
feature_views,
139+
start_date=start_date,
140+
end_date=end_date,
141+
)
162142
entity_df = pd.DataFrame({"event_timestamp": [end_date]})
163143

164144
entity_schema = _get_entity_schema(entity_df, config)

sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
import os
33
import uuid
4-
from datetime import datetime, timedelta
4+
from datetime import datetime
55
from pathlib import Path
66
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union
77

@@ -59,7 +59,12 @@
5959
feast_value_type_to_pandas_type,
6060
pa_to_feast_value_type,
6161
)
62-
from feast.utils import _get_column_names, make_df_tzaware, make_tzaware
62+
from feast.utils import (
63+
_get_column_names,
64+
compute_non_entity_date_range,
65+
make_df_tzaware,
66+
make_tzaware,
67+
)
6368

6469
logger = logging.getLogger(__name__)
6570
# Remote storage URI schemes supported by the Ray offline store
@@ -1203,37 +1208,6 @@ def schema(self) -> pa.Schema:
12031208
return pa.Table.from_pandas(df).schema
12041209

12051210

1206-
def _compute_non_entity_dates_ray(
1207-
feature_views: List[FeatureView],
1208-
start_date_opt: Optional[datetime],
1209-
end_date_opt: Optional[datetime],
1210-
) -> Tuple[datetime, datetime]:
1211-
# Why: derive bounded time window when no entity_df is provided using explicit dates or max TTL fallback
1212-
end_date = (
1213-
make_tzaware(end_date_opt) if end_date_opt else make_tzaware(datetime.utcnow())
1214-
)
1215-
if start_date_opt is None:
1216-
max_ttl_seconds = 0
1217-
for fv in feature_views:
1218-
if getattr(fv, "ttl", None):
1219-
try:
1220-
ttl_val = fv.ttl
1221-
if isinstance(ttl_val, timedelta):
1222-
max_ttl_seconds = max(
1223-
max_ttl_seconds, int(ttl_val.total_seconds())
1224-
)
1225-
except Exception:
1226-
pass
1227-
start_date = (
1228-
end_date - timedelta(seconds=max_ttl_seconds)
1229-
if max_ttl_seconds > 0
1230-
else end_date - timedelta(days=30)
1231-
)
1232-
else:
1233-
start_date = make_tzaware(start_date_opt)
1234-
return start_date, end_date
1235-
1236-
12371211
def _make_filter_range(timestamp_field: str, start_date: datetime, end_date: datetime):
12381212
# Why: factory function for time-range filtering in Ray map_batches
12391213
def _filter_range(batch: pd.DataFrame) -> pd.Series:
@@ -2067,8 +2041,10 @@ def get_historical_features(
20672041
# Non-entity mode: derive entity set from feature sources within a bounded time window
20682042
# Preserves distinct (entity_keys, event_timestamp) combinations for proper PIT joins
20692043
# This handles cases where multiple transactions per entity ID exist
2070-
start_date, end_date = _compute_non_entity_dates_ray(
2071-
feature_views, kwargs.get("start_date"), kwargs.get("end_date")
2044+
start_date, end_date = compute_non_entity_date_range(
2045+
feature_views,
2046+
start_date=kwargs.get("start_date"),
2047+
end_date=kwargs.get("end_date"),
20722048
)
20732049
per_view_entity_ds: List[Dataset] = []
20742050
all_join_keys: List[str] = []

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import uuid
44
import warnings
55
from dataclasses import asdict, dataclass
6-
from datetime import datetime, timedelta, timezone
6+
from datetime import datetime, timezone
77
from typing import (
88
TYPE_CHECKING,
99
Any,
@@ -52,7 +52,7 @@
5252
from feast.repo_config import FeastConfigBaseModel, RepoConfig
5353
from feast.saved_dataset import SavedDatasetStorage
5454
from feast.type_map import spark_schema_to_np_dtypes
55-
from feast.utils import _get_fields_with_aliases
55+
from feast.utils import _get_fields_with_aliases, compute_non_entity_date_range
5656

5757
# Make sure spark warning doesn't raise more than once.
5858
warnings.simplefilter("once", RuntimeWarning)
@@ -183,8 +183,11 @@ def get_historical_features(
183183
# This makes date-range retrievals possible without enumerating entities upfront; sources remain bounded by time.
184184
non_entity_mode = entity_df is None
185185
if non_entity_mode:
186-
# Why: derive bounded time window without requiring entities; uses max TTL fallback to constrain scans.
187-
start_date, end_date = _compute_non_entity_dates(feature_views, kwargs)
186+
start_date, end_date = compute_non_entity_date_range(
187+
feature_views,
188+
start_date=kwargs.get("start_date"),
189+
end_date=kwargs.get("end_date"),
190+
)
188191
entity_df_event_timestamp_range = (start_date, end_date)
189192

190193
# Build query contexts so we can reuse entity names and per-view table info consistently.
@@ -619,29 +622,6 @@ def get_spark_session_or_start_new_with_repoconfig(
619622
return spark_session
620623

621624

622-
def _compute_non_entity_dates(
623-
feature_views: List[FeatureView], kwargs: Dict[str, Any]
624-
) -> Tuple[datetime, datetime]:
625-
# Why: bounds the scan window when no entity_df is provided using explicit dates or max TTL fallback.
626-
start_date_opt = cast(Optional[datetime], kwargs.get("start_date"))
627-
end_date_opt = cast(Optional[datetime], kwargs.get("end_date"))
628-
end_date: datetime = end_date_opt or datetime.now(timezone.utc)
629-
630-
if start_date_opt is None:
631-
max_ttl_seconds = 0
632-
for fv in feature_views:
633-
if fv.ttl and isinstance(fv.ttl, timedelta):
634-
max_ttl_seconds = max(max_ttl_seconds, int(fv.ttl.total_seconds()))
635-
start_date: datetime = (
636-
end_date - timedelta(seconds=max_ttl_seconds)
637-
if max_ttl_seconds > 0
638-
else end_date - timedelta(days=30)
639-
)
640-
else:
641-
start_date = start_date_opt
642-
return (start_date, end_date)
643-
644-
645625
def _gather_all_entities(
646626
fv_query_contexts: List[offline_utils.FeatureViewQueryContext],
647627
) -> List[str]:

sdk/python/feast/infra/offline_stores/dask.py

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import os
22
import uuid
3-
from datetime import datetime, timedelta, timezone
3+
from datetime import datetime, timezone
44
from pathlib import Path
55
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union
66

@@ -37,7 +37,10 @@
3737
from feast.on_demand_feature_view import OnDemandFeatureView
3838
from feast.repo_config import FeastConfigBaseModel, RepoConfig
3939
from feast.saved_dataset import SavedDatasetStorage
40-
from feast.utils import _get_requested_feature_views_to_features_dict, make_tzaware
40+
from feast.utils import (
41+
_get_requested_feature_views_to_features_dict,
42+
compute_non_entity_date_range,
43+
)
4144

4245
# DaskRetrievalJob will cast string objects to string[pyarrow] from dask version 2023.7.1
4346
# This is not the desired behavior for our use case, so we set the convert-string option to False
@@ -143,36 +146,14 @@ def get_historical_features(
143146
for fv in feature_views:
144147
assert isinstance(fv.batch_source, FileSource)
145148

146-
# Allow non-entity mode using start/end timestamps to enable bounded retrievals without an input entity_df.
147-
# This synthesizes a minimal entity_df solely to drive the existing join and metadata plumbing without
148-
# incurring source scans here; actual pushdowns can be layered in follow-ups if needed.
149-
start_date: Optional[datetime] = kwargs.get("start_date", None)
150-
end_date: Optional[datetime] = kwargs.get("end_date", None)
151149
non_entity_mode = entity_df is None
152150

153151
if non_entity_mode:
154-
# Default end_date to current time (UTC) to keep behavior predictable without extra parameters.
155-
end_date = (
156-
make_tzaware(end_date) if end_date else datetime.now(timezone.utc)
152+
start_date, end_date = compute_non_entity_date_range(
153+
feature_views,
154+
start_date=kwargs.get("start_date"),
155+
end_date=kwargs.get("end_date"),
157156
)
158-
159-
# When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back.
160-
if start_date is None:
161-
max_ttl_seconds = 0
162-
for fv in feature_views:
163-
if fv.ttl and isinstance(fv.ttl, timedelta):
164-
max_ttl_seconds = max(
165-
max_ttl_seconds, int(fv.ttl.total_seconds())
166-
)
167-
if max_ttl_seconds > 0:
168-
start_date = end_date - timedelta(seconds=max_ttl_seconds)
169-
else:
170-
# Keep default window bounded to avoid unbounded scans by default.
171-
start_date = end_date - timedelta(days=30)
172-
start_date = make_tzaware(start_date)
173-
174-
# Minimal synthetic entity_df: one timestamp row; join keys are not materialized here on purpose to avoid
175-
# accidental dependence on specific feature view schemas at this layer.
176157
entity_df = pd.DataFrame(
177158
{DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]}
178159
)

sdk/python/feast/utils.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import typing
55
import warnings
66
from collections import Counter, defaultdict
7-
from datetime import datetime, timezone
7+
from datetime import datetime, timedelta, timezone
88
from pathlib import Path
99
from typing import (
1010
Any,
@@ -72,6 +72,38 @@ def make_tzaware(t: datetime) -> datetime:
7272
return t
7373

7474

75+
def compute_non_entity_date_range(
76+
feature_views: List["FeatureView"],
77+
start_date: Optional[datetime] = None,
78+
end_date: Optional[datetime] = None,
79+
default_window_days: int = 30,
80+
) -> Tuple[datetime, datetime]:
81+
82+
if end_date is None:
83+
end_date = datetime.now(tz=timezone.utc)
84+
else:
85+
end_date = make_tzaware(end_date)
86+
87+
if start_date is None:
88+
max_ttl_seconds = max(
89+
(
90+
int(fv.ttl.total_seconds())
91+
for fv in feature_views
92+
if fv.ttl and isinstance(fv.ttl, timedelta)
93+
),
94+
default=0,
95+
)
96+
start_date = end_date - timedelta(
97+
seconds=max_ttl_seconds
98+
if max_ttl_seconds > 0
99+
else default_window_days * 86400
100+
)
101+
else:
102+
start_date = make_tzaware(start_date)
103+
104+
return start_date, end_date
105+
106+
75107
def make_df_tzaware(t: pd.DataFrame) -> pd.DataFrame:
76108
"""Make all datetime type columns tzaware; leave everything else intact."""
77109
df = t.copy() # don't modify incoming dataframe inplace

0 commit comments

Comments
 (0)