-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Offline Store historical features retrieval based on datetime range in dask #5717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
41b66fb
fa6059c
3ddcc3b
1ed85ce
039b009
497bce6
b28ea19
5a1b6c5
7cafbdc
973d7cd
e7700d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,6 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import uuid | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from datetime import datetime, timezone | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from datetime import datetime, timedelta, timezone | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pathlib import Path | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -37,7 +37,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from feast.on_demand_feature_view import OnDemandFeatureView | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from feast.repo_config import FeastConfigBaseModel, RepoConfig | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from feast.saved_dataset import SavedDatasetStorage | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from feast.utils import _get_requested_feature_views_to_features_dict | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from feast.utils import _get_requested_feature_views_to_features_dict, make_tzaware | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # DaskRetrievalJob will cast string objects to string[pyarrow] from dask version 2023.7.1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # This is not the desired behavior for our use case, so we set the convert-string option to False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -133,21 +133,56 @@ def get_historical_features( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| config: RepoConfig, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| feature_views: List[FeatureView], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| feature_refs: List[str], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| entity_df: Union[pd.DataFrame, str], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| entity_df: Optional[Union[pd.DataFrame, dd.DataFrame, str]], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| registry: BaseRegistry, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| project: str, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| full_feature_names: bool = False, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| **kwargs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> RetrievalJob: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert isinstance(config.offline_store, DaskOfflineStoreConfig) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for fv in feature_views: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert isinstance(fv.batch_source, FileSource) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not isinstance(entity_df, pd.DataFrame) and not isinstance( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| entity_df, dd.DataFrame | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Allow non-entity mode using start/end timestamps to enable bounded retrievals without an input entity_df. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # This synthesizes a minimal entity_df solely to drive the existing join and metadata plumbing without | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # incurring source scans here; actual pushdowns can be layered in follow-ups if needed. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start_date: Optional[datetime] = kwargs.get("start_date", None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end_date: Optional[datetime] = kwargs.get("end_date", None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| non_entity_mode = entity_df is None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if non_entity_mode: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Default end_date to current time (UTC) to keep behavior predictable without extra parameters. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end_date = ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| make_tzaware(end_date) if end_date else datetime.now(timezone.utc) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if start_date is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| max_ttl_seconds = 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for fv in feature_views: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if fv.ttl and isinstance(fv.ttl, timedelta): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| max_ttl_seconds = max( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| max_ttl_seconds, int(fv.ttl.total_seconds()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if max_ttl_seconds > 0: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start_date = end_date - timedelta(seconds=max_ttl_seconds) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Keep default window bounded to avoid unbounded scans by default. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start_date = end_date - timedelta(days=30) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start_date = make_tzaware(start_date) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+159
to
+173
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back. | |
| if start_date is None: | |
| max_ttl_seconds = 0 | |
| for fv in feature_views: | |
| if fv.ttl and isinstance(fv.ttl, timedelta): | |
| max_ttl_seconds = max( | |
| max_ttl_seconds, int(fv.ttl.total_seconds()) | |
| ) | |
| if max_ttl_seconds > 0: | |
| start_date = end_date - timedelta(seconds=max_ttl_seconds) | |
| else: | |
| # Keep default window bounded to avoid unbounded scans by default. | |
| start_date = end_date - timedelta(days=30) | |
| # Compute TTL-based lower bound for start_date. | |
| max_ttl_seconds = 0 | |
| for fv in feature_views: | |
| if fv.ttl and isinstance(fv.ttl, timedelta): | |
| max_ttl_seconds = max( | |
| max_ttl_seconds, int(fv.ttl.total_seconds()) | |
| ) | |
| if max_ttl_seconds > 0: | |
| ttl_lower_bound = end_date - timedelta(seconds=max_ttl_seconds) | |
| else: | |
| # Keep default window bounded to avoid unbounded scans by default. | |
| ttl_lower_bound = end_date - timedelta(days=30) | |
| # If user provided start_date, use the max of user start_date and ttl_lower_bound. | |
| if start_date is not None: | |
| if start_date < ttl_lower_bound: | |
| import warnings | |
| warnings.warn( | |
| f"Provided start_date ({start_date}) is earlier than TTL-based lower bound ({ttl_lower_bound}). Overriding start_date to {ttl_lower_bound}." | |
| ) | |
| start_date = max(start_date, ttl_lower_bound) | |
| else: | |
| start_date = ttl_lower_bound |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you havent given start date and tz. Both matters very much. It should be:
start=start_date, end=end_date, freq="1s", tz=timezone.utc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If start_date is given you have to make it tzaware ?