Skip to content

Commit 8a91fc1

Browse files
committed
Merge branch 'master' into achal/feature-service-api
2 parents 52a6507 + 0d2179d commit 8a91fc1

3 files changed

Lines changed: 66 additions & 7 deletions

File tree

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import pandas
88
import pyarrow
99
from jinja2 import BaseLoader, Environment
10+
from pandas import Timestamp
1011
from pydantic import StrictStr
1112
from pydantic.typing import Literal
1213
from tenacity import retry, stop_after_delay, wait_fixed
@@ -129,12 +130,16 @@ def get_historical_features(
129130
full_feature_names=full_feature_names,
130131
)
131132

132-
# TODO: Infer min_timestamp and max_timestamp from entity_df
133+
# Infer min and max timestamps from entity_df to limit data read in BigQuery SQL query
134+
min_timestamp, max_timestamp = _get_entity_df_timestamp_bounds(
135+
client, str(table.reference), entity_df_event_timestamp_col
136+
)
137+
133138
# Generate the BigQuery SQL query from the query context
134139
query = build_point_in_time_query(
135140
query_context,
136-
min_timestamp=datetime.now() - timedelta(days=365),
137-
max_timestamp=datetime.now() + timedelta(days=1),
141+
min_timestamp=min_timestamp,
142+
max_timestamp=max_timestamp,
138143
left_table_query_string=str(table.reference),
139144
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
140145
full_feature_names=full_feature_names,
@@ -374,6 +379,28 @@ def _upload_entity_df_into_bigquery(
374379
return table
375380

376381

382+
def _get_entity_df_timestamp_bounds(
383+
client: Client, entity_df_bq_table: str, event_timestamp_col: str,
384+
):
385+
386+
boundary_df = (
387+
client.query(
388+
f"""
389+
SELECT
390+
MIN({event_timestamp_col}) AS min_timestamp,
391+
MAX({event_timestamp_col}) AS max_timestamp
392+
FROM {entity_df_bq_table}
393+
"""
394+
)
395+
.result()
396+
.to_dataframe()
397+
)
398+
399+
min_timestamp = boundary_df.loc[0, "min_timestamp"]
400+
max_timestamp = boundary_df.loc[0, "max_timestamp"]
401+
return min_timestamp, max_timestamp
402+
403+
377404
def get_feature_view_query_context(
378405
feature_refs: List[str],
379406
feature_views: List[FeatureView],
@@ -435,8 +462,8 @@ def get_feature_view_query_context(
435462

436463
def build_point_in_time_query(
437464
feature_view_query_contexts: List[FeatureViewQueryContext],
438-
min_timestamp: datetime,
439-
max_timestamp: datetime,
465+
min_timestamp: Timestamp,
466+
max_timestamp: Timestamp,
440467
left_table_query_string: str,
441468
entity_df_event_timestamp_col: str,
442469
full_feature_names: bool = False,
@@ -533,6 +560,10 @@ def _get_bigquery_client(project: Optional[str] = None):
533560
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
534561
{% endfor %}
535562
FROM {{ featureview.table_subquery }}
563+
WHERE {{ featureview.event_timestamp_column }} <= '{{max_timestamp}}'
564+
{% if featureview.ttl == 0 %}{% else %}
565+
AND {{ featureview.event_timestamp_column }} >= Timestamp_sub('{{min_timestamp}}', interval {{ featureview.ttl }} second)
566+
{% endif %}
536567
),
537568
538569
{{ featureview.name }}__base AS (

sdk/python/feast/type_map.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def python_type_to_feast_value_type(
128128
for item in list_items:
129129
if isinstance(item, ProtoValue):
130130
current_item_value_type = _proto_str_to_value_type(
131-
item.WhichOneof("val")
131+
str(item.WhichOneof("val"))
132132
)
133133
else:
134134
# Get the type from the current item, only one level deep

sdk/python/tests/test_historical_retrieval.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
from feast.feature import Feature
2222
from feast.feature_store import FeatureStore, _validate_feature_refs
2323
from feast.feature_view import FeatureView
24-
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
24+
from feast.infra.offline_stores.bigquery import (
25+
BigQueryOfflineStoreConfig,
26+
_get_entity_df_timestamp_bounds,
27+
)
2528
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
2629
from feast.infra.provider import DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
2730
from feast.value_type import ValueType
@@ -595,6 +598,31 @@ def test_historical_features_from_bigquery_sources(
595598
)
596599

597600

601+
@pytest.mark.integration
602+
def test_timestamp_bound_inference_from_entity_df_using_bigquery():
603+
start_date = datetime.now().replace(microsecond=0, second=0, minute=0)
604+
(_, _, _, entity_df, start_date) = generate_entities(
605+
start_date, infer_event_timestamp_col=True
606+
)
607+
608+
table_id = "foo.table_id"
609+
stage_orders_bigquery(entity_df, table_id)
610+
611+
client = bigquery.Client()
612+
table = client.get_table(table=table_id)
613+
614+
# Ensure that the table expires after some time
615+
table.expires = datetime.utcnow() + timedelta(minutes=30)
616+
client.update_table(table, ["expires"])
617+
618+
min_timestamp, max_timestamp = _get_entity_df_timestamp_bounds(
619+
client, str(table.reference), "e_ts"
620+
)
621+
622+
assert min_timestamp.astimezone("UTC") == min(entity_df["e_ts"]).astimezone("UTC")
623+
assert max_timestamp.astimezone("UTC") == max(entity_df["e_ts"]).astimezone("UTC")
624+
625+
598626
def test_feature_name_collision_on_historical_retrieval():
599627

600628
# _validate_feature_refs is the function that checks for colliding feature names

0 commit comments

Comments
 (0)