|
7 | 7 | import pandas |
8 | 8 | import pyarrow |
9 | 9 | from jinja2 import BaseLoader, Environment |
| 10 | +from pandas import Timestamp |
10 | 11 | from pydantic import StrictStr |
11 | 12 | from pydantic.typing import Literal |
12 | 13 | from tenacity import retry, stop_after_delay, wait_fixed |
@@ -129,12 +130,16 @@ def get_historical_features( |
129 | 130 | full_feature_names=full_feature_names, |
130 | 131 | ) |
131 | 132 |
|
132 | | - # TODO: Infer min_timestamp and max_timestamp from entity_df |
| 133 | + # Infer min and max timestamps from entity_df to limit data read in BigQuery SQL query |
| 134 | + min_timestamp, max_timestamp = _get_entity_df_timestamp_bounds( |
| 135 | + client, str(table.reference), entity_df_event_timestamp_col |
| 136 | + ) |
| 137 | + |
133 | 138 | # Generate the BigQuery SQL query from the query context |
134 | 139 | query = build_point_in_time_query( |
135 | 140 | query_context, |
136 | | - min_timestamp=datetime.now() - timedelta(days=365), |
137 | | - max_timestamp=datetime.now() + timedelta(days=1), |
| 141 | + min_timestamp=min_timestamp, |
| 142 | + max_timestamp=max_timestamp, |
138 | 143 | left_table_query_string=str(table.reference), |
139 | 144 | entity_df_event_timestamp_col=entity_df_event_timestamp_col, |
140 | 145 | full_feature_names=full_feature_names, |
@@ -374,6 +379,28 @@ def _upload_entity_df_into_bigquery( |
374 | 379 | return table |
375 | 380 |
|
376 | 381 |
|
| 382 | +def _get_entity_df_timestamp_bounds( |
| 383 | + client: Client, entity_df_bq_table: str, event_timestamp_col: str, |
| 384 | +): |
| 385 | + |
| 386 | + boundary_df = ( |
| 387 | + client.query( |
| 388 | + f""" |
| 389 | + SELECT |
| 390 | + MIN({event_timestamp_col}) AS min_timestamp, |
| 391 | + MAX({event_timestamp_col}) AS max_timestamp |
| 392 | + FROM {entity_df_bq_table} |
| 393 | + """ |
| 394 | + ) |
| 395 | + .result() |
| 396 | + .to_dataframe() |
| 397 | + ) |
| 398 | + |
| 399 | + min_timestamp = boundary_df.loc[0, "min_timestamp"] |
| 400 | + max_timestamp = boundary_df.loc[0, "max_timestamp"] |
| 401 | + return min_timestamp, max_timestamp |
| 402 | + |
| 403 | + |
377 | 404 | def get_feature_view_query_context( |
378 | 405 | feature_refs: List[str], |
379 | 406 | feature_views: List[FeatureView], |
@@ -435,8 +462,8 @@ def get_feature_view_query_context( |
435 | 462 |
|
436 | 463 | def build_point_in_time_query( |
437 | 464 | feature_view_query_contexts: List[FeatureViewQueryContext], |
438 | | - min_timestamp: datetime, |
439 | | - max_timestamp: datetime, |
| 465 | + min_timestamp: Timestamp, |
| 466 | + max_timestamp: Timestamp, |
440 | 467 | left_table_query_string: str, |
441 | 468 | entity_df_event_timestamp_col: str, |
442 | 469 | full_feature_names: bool = False, |
@@ -533,6 +560,10 @@ def _get_bigquery_client(project: Optional[str] = None): |
533 | 560 | {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %} |
534 | 561 | {% endfor %} |
535 | 562 | FROM {{ featureview.table_subquery }} |
| 563 | + WHERE {{ featureview.event_timestamp_column }} <= '{{max_timestamp}}' |
| 564 | + {% if featureview.ttl == 0 %}{% else %} |
| 565 | + AND {{ featureview.event_timestamp_column }} >= Timestamp_sub('{{min_timestamp}}', interval {{ featureview.ttl }} second) |
| 566 | + {% endif %} |
536 | 567 | ), |
537 | 568 |
|
538 | 569 | {{ featureview.name }}__base AS ( |
|
0 commit comments