Skip to content

Commit 558e28e

Browse files
Anarion-zuoShizoqua
authored andcommitted
feat: Utilize date partition column in BigQuery (feast-dev#6076)
Signed-off-by: Shizoqua <hr.lanreshittu@gmail.com>
1 parent d205a80 commit 558e28e

4 files changed

Lines changed: 226 additions & 1 deletion

File tree

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,14 @@ def pull_latest_from_table_or_query(
161161
project=project_id,
162162
location=config.offline_store.location,
163163
)
164+
timestamp_filter = get_timestamp_filter_sql(
165+
start_date,
166+
end_date,
167+
timestamp_field,
168+
date_partition_column=data_source.date_partition_column,
169+
quote_fields=False,
170+
cast_style="timestamp_func",
171+
)
164172
query = f"""
165173
SELECT
166174
{field_string}
@@ -169,7 +177,7 @@ def pull_latest_from_table_or_query(
169177
SELECT {field_string},
170178
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
171179
FROM {from_expression}
172-
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
180+
WHERE {timestamp_filter}
173181
)
174182
WHERE _feast_row = 1
175183
"""
@@ -216,6 +224,7 @@ def pull_all_from_table_or_query(
216224
start_date,
217225
end_date,
218226
timestamp_field,
227+
date_partition_column=data_source.date_partition_column,
219228
quote_fields=False,
220229
cast_style="timestamp_func",
221230
)
@@ -929,6 +938,12 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
929938
{% if featureview.ttl == 0 %}{% else %}
930939
AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
931940
{% endif %}
941+
{% if featureview.date_partition_column %}
942+
AND {{ featureview.date_partition_column | backticks }} <= '{{ featureview.max_event_timestamp[:10] }}'
943+
{% if featureview.min_event_timestamp %}
944+
AND {{ featureview.date_partition_column | backticks }} >= '{{ featureview.min_event_timestamp[:10] }}'
945+
{% endif %}
946+
{% endif %}
932947
),
933948
934949
{{ featureview.name }}__base AS (

sdk/python/feast/infra/offline_stores/bigquery_source.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def __init__(
3434
table: Optional[str] = None,
3535
created_timestamp_column: Optional[str] = "",
3636
field_mapping: Optional[Dict[str, str]] = None,
37+
date_partition_column: Optional[str] = None,
3738
query: Optional[str] = None,
3839
description: Optional[str] = "",
3940
tags: Optional[Dict[str, str]] = None,
@@ -52,6 +53,7 @@ def __init__(
5253
created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows.
5354
field_mapping (optional): A dictionary mapping of column names in this data source to feature names in a feature table
5455
or view. Only used for feature columns, not entities or timestamp columns.
56+
date_partition_column (optional): Timestamp column used for partitioning.
5557
query (optional): The query to be executed to obtain the features. Exactly one of 'table'
5658
and 'query' must be specified.
5759
description (optional): A human-readable description.
@@ -78,6 +80,7 @@ def __init__(
7880
timestamp_field=timestamp_field,
7981
created_timestamp_column=created_timestamp_column,
8082
field_mapping=field_mapping,
83+
date_partition_column=date_partition_column,
8184
description=description,
8285
tags=tags,
8386
owner=owner,
@@ -117,6 +120,7 @@ def from_proto(data_source: DataSourceProto):
117120
table=data_source.bigquery_options.table,
118121
timestamp_field=data_source.timestamp_field,
119122
created_timestamp_column=data_source.created_timestamp_column,
123+
date_partition_column=data_source.date_partition_column,
120124
query=data_source.bigquery_options.query,
121125
description=data_source.description,
122126
tags=dict(data_source.tags),
@@ -134,6 +138,7 @@ def _to_proto_impl(self) -> DataSourceProto:
134138
owner=self.owner,
135139
timestamp_field=self.timestamp_field,
136140
created_timestamp_column=self.created_timestamp_column,
141+
date_partition_column=self.date_partition_column,
137142
)
138143

139144
return data_source_proto
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import os
2+
from datetime import datetime, timedelta, timezone
3+
from unittest.mock import Mock, patch
4+
5+
import pytest
6+
7+
from feast.infra.offline_stores.bigquery import (
8+
BigQueryOfflineStore,
9+
BigQueryOfflineStoreConfig,
10+
)
11+
from feast.infra.offline_stores.bigquery_source import BigQuerySource
12+
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
13+
from feast.repo_config import RepoConfig
14+
15+
__doc__ = """
16+
Environment variables:
17+
FEAST_BQ_BENCH_PROJECT: BigQuery project to run dry-run queries in
18+
FEAST_BQ_BENCH_TABLE: BigQuery table in Feast format: project:dataset.table
19+
FEAST_BQ_BENCH_TIMESTAMP_FIELD: event timestamp column name used by Feast
20+
FEAST_BQ_BENCH_PARTITION_COLUMN: partition column to prune (e.g. _PARTITIONDATE)
21+
FEAST_BQ_BENCH_LOCATION: optional BigQuery location
22+
FEAST_BQ_BENCH_START: optional ISO datetime (e.g. 2026-01-01T00:00:00+00:00)
23+
FEAST_BQ_BENCH_END: optional ISO datetime
24+
FEAST_BQ_BENCH_REQUIRE_REDUCTION: if truthy, requires strict byte reduction
25+
"""
26+
27+
28+
def _required_env(name: str) -> str:
29+
val = os.environ.get(name)
30+
if not val:
31+
pytest.skip(f"Missing env var {name}")
32+
return val
33+
34+
35+
def _optional_iso_datetime(name: str) -> datetime | None:
36+
val = os.environ.get(name)
37+
if not val:
38+
return None
39+
return datetime.fromisoformat(val.replace("Z", "+00:00"))
40+
41+
42+
def _estimate_bytes_processed(project: str, location: str | None, sql: str) -> int:
43+
try:
44+
from google.cloud import bigquery
45+
except Exception as e:
46+
pytest.skip(str(e))
47+
client = bigquery.Client(project=project, location=location)
48+
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
49+
job = client.query(sql, job_config=job_config)
50+
return int(job.total_bytes_processed or 0)
51+
52+
53+
@pytest.mark.benchmark(group="bigquery_partition_pruning")
54+
@patch("feast.infra.offline_stores.bigquery._get_bigquery_client")
55+
def test_bigquery_partition_pruning_bytes_processed(
56+
mock_get_bigquery_client, benchmark
57+
):
58+
mock_get_bigquery_client.return_value = Mock()
59+
60+
project = _required_env("FEAST_BQ_BENCH_PROJECT")
61+
table = _required_env("FEAST_BQ_BENCH_TABLE")
62+
timestamp_field = _required_env("FEAST_BQ_BENCH_TIMESTAMP_FIELD")
63+
partition_column = _required_env("FEAST_BQ_BENCH_PARTITION_COLUMN")
64+
location = os.environ.get("FEAST_BQ_BENCH_LOCATION")
65+
66+
end = _optional_iso_datetime("FEAST_BQ_BENCH_END")
67+
if end is None:
68+
end = datetime.now(tz=timezone.utc).replace(microsecond=0)
69+
start = _optional_iso_datetime("FEAST_BQ_BENCH_START")
70+
if start is None:
71+
start = end - timedelta(days=7)
72+
73+
repo_config = RepoConfig(
74+
registry="gs://ml-test/repo/registry.db",
75+
project="bench",
76+
provider="gcp",
77+
online_store=SqliteOnlineStoreConfig(type="sqlite"),
78+
offline_store=BigQueryOfflineStoreConfig(type="bigquery", dataset="feast"),
79+
)
80+
81+
source_without_partition = BigQuerySource(
82+
table=table,
83+
timestamp_field=timestamp_field,
84+
)
85+
source_with_partition = BigQuerySource(
86+
table=table,
87+
timestamp_field=timestamp_field,
88+
date_partition_column=partition_column,
89+
)
90+
91+
job_without = BigQueryOfflineStore.pull_all_from_table_or_query(
92+
config=repo_config,
93+
data_source=source_without_partition,
94+
join_key_columns=[],
95+
feature_name_columns=[],
96+
timestamp_field=timestamp_field,
97+
start_date=start,
98+
end_date=end,
99+
)
100+
job_with = BigQueryOfflineStore.pull_all_from_table_or_query(
101+
config=repo_config,
102+
data_source=source_with_partition,
103+
join_key_columns=[],
104+
feature_name_columns=[],
105+
timestamp_field=timestamp_field,
106+
start_date=start,
107+
end_date=end,
108+
)
109+
110+
sql_without = job_without.to_sql()
111+
sql_with = job_with.to_sql()
112+
113+
def measure():
114+
bytes_without = _estimate_bytes_processed(project, location, sql_without)
115+
bytes_with = _estimate_bytes_processed(project, location, sql_with)
116+
return bytes_without, bytes_with
117+
118+
bytes_without, bytes_with = benchmark(measure)
119+
benchmark.extra_info["total_bytes_processed_without_partition_filter"] = (
120+
bytes_without
121+
)
122+
benchmark.extra_info["total_bytes_processed_with_partition_filter"] = bytes_with
123+
if bytes_without > 0:
124+
benchmark.extra_info["bytes_ratio_with_over_without"] = (
125+
bytes_with / bytes_without
126+
)
127+
128+
if os.environ.get("FEAST_BQ_BENCH_REQUIRE_REDUCTION", "").lower() in (
129+
"1",
130+
"true",
131+
"yes",
132+
):
133+
assert bytes_with < bytes_without
134+
else:
135+
assert bytes_with <= bytes_without

sdk/python/tests/unit/infra/offline_stores/test_bigquery.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1+
from datetime import datetime, timezone
12
from unittest.mock import Mock, patch
23

34
import pandas as pd
45
import pyarrow
56
import pytest
67

78
from feast.infra.offline_stores.bigquery import (
9+
BigQueryOfflineStore,
810
BigQueryOfflineStoreConfig,
911
BigQueryRetrievalJob,
1012
)
13+
from feast.infra.offline_stores.bigquery_source import BigQuerySource
1114
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
1215
from feast.repo_config import RepoConfig
1316

@@ -82,3 +85,70 @@ def test_to_arrow_timeout(self, big_query_result):
8285
self.retrieval_job._execute_query.assert_called_once_with(
8386
query=self.query, timeout=30
8487
)
88+
89+
90+
@patch("feast.infra.offline_stores.bigquery._get_bigquery_client")
91+
def test_pull_latest_from_table_or_query_partition_pruning(mock_get_bigquery_client):
92+
mock_get_bigquery_client.return_value = Mock()
93+
test_repo_config = RepoConfig(
94+
registry="gs://ml-test/repo/registry.db",
95+
project="test",
96+
provider="gcp",
97+
online_store=SqliteOnlineStoreConfig(type="sqlite"),
98+
offline_store=BigQueryOfflineStoreConfig(type="bigquery", dataset="feast"),
99+
)
100+
test_data_source = BigQuerySource(
101+
table="project:dataset.table",
102+
timestamp_field="event_timestamp",
103+
date_partition_column="partition_date",
104+
)
105+
retrieval_job = BigQueryOfflineStore.pull_latest_from_table_or_query(
106+
config=test_repo_config,
107+
data_source=test_data_source,
108+
join_key_columns=["driver_id"],
109+
feature_name_columns=["feature1"],
110+
timestamp_field="event_timestamp",
111+
created_timestamp_column=None,
112+
start_date=datetime(2021, 1, 1, tzinfo=timezone.utc),
113+
end_date=datetime(2021, 1, 2, tzinfo=timezone.utc),
114+
)
115+
actual_query = retrieval_job.to_sql()
116+
assert (
117+
"event_timestamp BETWEEN TIMESTAMP('2021-01-01T00:00:00+00:00') AND TIMESTAMP('2021-01-02T00:00:00+00:00')"
118+
in actual_query
119+
)
120+
assert "partition_date >= '2021-01-01'" in actual_query
121+
assert "partition_date <= '2021-01-02'" in actual_query
122+
123+
124+
@patch("feast.infra.offline_stores.bigquery._get_bigquery_client")
125+
def test_pull_all_from_table_or_query_partition_pruning(mock_get_bigquery_client):
126+
mock_get_bigquery_client.return_value = Mock()
127+
test_repo_config = RepoConfig(
128+
registry="gs://ml-test/repo/registry.db",
129+
project="test",
130+
provider="gcp",
131+
online_store=SqliteOnlineStoreConfig(type="sqlite"),
132+
offline_store=BigQueryOfflineStoreConfig(type="bigquery", dataset="feast"),
133+
)
134+
test_data_source = BigQuerySource(
135+
table="project:dataset.table",
136+
timestamp_field="event_timestamp",
137+
date_partition_column="partition_date",
138+
)
139+
retrieval_job = BigQueryOfflineStore.pull_all_from_table_or_query(
140+
config=test_repo_config,
141+
data_source=test_data_source,
142+
join_key_columns=["driver_id"],
143+
feature_name_columns=["feature1"],
144+
timestamp_field="event_timestamp",
145+
start_date=datetime(2021, 1, 1, tzinfo=timezone.utc),
146+
end_date=datetime(2021, 1, 2, tzinfo=timezone.utc),
147+
)
148+
actual_query = retrieval_job.to_sql()
149+
assert (
150+
"event_timestamp BETWEEN TIMESTAMP('2021-01-01T00:00:00+00:00') AND TIMESTAMP('2021-01-02T00:00:00+00:00')"
151+
in actual_query
152+
)
153+
assert "partition_date >= '2021-01-01'" in actual_query
154+
assert "partition_date <= '2021-01-02'" in actual_query

0 commit comments

Comments
 (0)