diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index fac3439b911..948cfcf1ff0 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -161,6 +161,14 @@ def pull_latest_from_table_or_query( project=project_id, location=config.offline_store.location, ) + timestamp_filter = get_timestamp_filter_sql( + start_date, + end_date, + timestamp_field, + date_partition_column=data_source.date_partition_column, + quote_fields=False, + cast_style="timestamp_func", + ) query = f""" SELECT {field_string} @@ -169,7 +177,7 @@ def pull_latest_from_table_or_query( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row FROM {from_expression} - WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') + WHERE {timestamp_filter} ) WHERE _feast_row = 1 """ @@ -216,6 +224,7 @@ def pull_all_from_table_or_query( start_date, end_date, timestamp_field, + date_partition_column=data_source.date_partition_column, quote_fields=False, cast_style="timestamp_func", ) @@ -929,6 +938,12 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField] {% if featureview.ttl == 0 %}{% else %} AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' {% endif %} + {% if featureview.date_partition_column %} + AND {{ featureview.date_partition_column | backticks }} <= '{{ featureview.max_event_timestamp[:10] }}' + {% if featureview.min_event_timestamp %} + AND {{ featureview.date_partition_column | backticks }} >= '{{ featureview.min_event_timestamp[:10] }}' + {% endif %} + {% endif %} ), {{ featureview.name }}__base AS ( diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 1dda7af928f..69e42e3fd09 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -34,6 +34,7 @@ def __init__( table: Optional[str] = None, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, + date_partition_column: Optional[str] = None, query: Optional[str] = None, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, @@ -52,6 +53,7 @@ def __init__( created_timestamp_column (optional): Timestamp column when row was created, used for deduplicating rows. field_mapping (optional): A dictionary mapping of column names in this data source to feature names in a feature table or view. Only used for feature columns, not entities or timestamp columns. + date_partition_column (optional): Timestamp column used for partitioning. query (optional): The query to be executed to obtain the features. Exactly one of 'table' and 'query' must be specified. description (optional): A human-readable description. @@ -78,6 +80,7 @@ def __init__( timestamp_field=timestamp_field, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping, + date_partition_column=date_partition_column, description=description, tags=tags, owner=owner, @@ -117,6 +120,7 @@ def from_proto(data_source: DataSourceProto): table=data_source.bigquery_options.table, timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, + date_partition_column=data_source.date_partition_column, query=data_source.bigquery_options.query, description=data_source.description, tags=dict(data_source.tags), @@ -134,6 +138,7 @@ def _to_proto_impl(self) -> DataSourceProto: owner=self.owner, timestamp_field=self.timestamp_field, created_timestamp_column=self.created_timestamp_column, + date_partition_column=self.date_partition_column, ) return data_source_proto diff --git a/sdk/python/tests/benchmarks/test_bigquery_partition_pruning_benchmark.py b/sdk/python/tests/benchmarks/test_bigquery_partition_pruning_benchmark.py new file mode 100644 index 00000000000..00123e79664 --- /dev/null +++ b/sdk/python/tests/benchmarks/test_bigquery_partition_pruning_benchmark.py @@ -0,0 +1,135 @@ +import os +from datetime import datetime, timedelta, timezone +from unittest.mock import Mock, patch + +import pytest + +from feast.infra.offline_stores.bigquery import ( + BigQueryOfflineStore, + BigQueryOfflineStoreConfig, +) +from feast.infra.offline_stores.bigquery_source import BigQuerySource +from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig +from feast.repo_config import RepoConfig + +__doc__ = """ +Environment variables: + FEAST_BQ_BENCH_PROJECT: BigQuery project to run dry-run queries in + FEAST_BQ_BENCH_TABLE: BigQuery table in Feast format: project:dataset.table + FEAST_BQ_BENCH_TIMESTAMP_FIELD: event timestamp column name used by Feast + FEAST_BQ_BENCH_PARTITION_COLUMN: partition column to prune (e.g. _PARTITIONDATE) + FEAST_BQ_BENCH_LOCATION: optional BigQuery location + FEAST_BQ_BENCH_START: optional ISO datetime (e.g. 2026-01-01T00:00:00+00:00) + FEAST_BQ_BENCH_END: optional ISO datetime + FEAST_BQ_BENCH_REQUIRE_REDUCTION: if truthy, requires strict byte reduction +""" + + +def _required_env(name: str) -> str: + val = os.environ.get(name) + if not val: + pytest.skip(f"Missing env var {name}") + return val + + +def _optional_iso_datetime(name: str) -> datetime | None: + val = os.environ.get(name) + if not val: + return None + return datetime.fromisoformat(val.replace("Z", "+00:00")) + + +def _estimate_bytes_processed(project: str, location: str | None, sql: str) -> int: + try: + from google.cloud import bigquery + except Exception as e: + pytest.skip(str(e)) + client = bigquery.Client(project=project, location=location) + job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) + job = client.query(sql, job_config=job_config) + return int(job.total_bytes_processed or 0) + + +@pytest.mark.benchmark(group="bigquery_partition_pruning") +@patch("feast.infra.offline_stores.bigquery._get_bigquery_client") +def test_bigquery_partition_pruning_bytes_processed( + mock_get_bigquery_client, benchmark +): + mock_get_bigquery_client.return_value = Mock() + + project = _required_env("FEAST_BQ_BENCH_PROJECT") + table = _required_env("FEAST_BQ_BENCH_TABLE") + timestamp_field = _required_env("FEAST_BQ_BENCH_TIMESTAMP_FIELD") + partition_column = _required_env("FEAST_BQ_BENCH_PARTITION_COLUMN") + location = os.environ.get("FEAST_BQ_BENCH_LOCATION") + + end = _optional_iso_datetime("FEAST_BQ_BENCH_END") + if end is None: + end = datetime.now(tz=timezone.utc).replace(microsecond=0) + start = _optional_iso_datetime("FEAST_BQ_BENCH_START") + if start is None: + start = end - timedelta(days=7) + + repo_config = RepoConfig( + registry="gs://ml-test/repo/registry.db", + project="bench", + provider="gcp", + online_store=SqliteOnlineStoreConfig(type="sqlite"), + offline_store=BigQueryOfflineStoreConfig(type="bigquery", dataset="feast"), + ) + + source_without_partition = BigQuerySource( + table=table, + timestamp_field=timestamp_field, + ) + source_with_partition = BigQuerySource( + table=table, + timestamp_field=timestamp_field, + date_partition_column=partition_column, + ) + + job_without = BigQueryOfflineStore.pull_all_from_table_or_query( + config=repo_config, + data_source=source_without_partition, + join_key_columns=[], + feature_name_columns=[], + timestamp_field=timestamp_field, + start_date=start, + end_date=end, + ) + job_with = BigQueryOfflineStore.pull_all_from_table_or_query( + config=repo_config, + data_source=source_with_partition, + join_key_columns=[], + feature_name_columns=[], + timestamp_field=timestamp_field, + start_date=start, + end_date=end, + ) + + sql_without = job_without.to_sql() + sql_with = job_with.to_sql() + + def measure(): + bytes_without = _estimate_bytes_processed(project, location, sql_without) + bytes_with = _estimate_bytes_processed(project, location, sql_with) + return bytes_without, bytes_with + + bytes_without, bytes_with = benchmark(measure) + benchmark.extra_info["total_bytes_processed_without_partition_filter"] = ( + bytes_without + ) + benchmark.extra_info["total_bytes_processed_with_partition_filter"] = bytes_with + if bytes_without > 0: + benchmark.extra_info["bytes_ratio_with_over_without"] = ( + bytes_with / bytes_without + ) + + if os.environ.get("FEAST_BQ_BENCH_REQUIRE_REDUCTION", "").lower() in ( + "1", + "true", + "yes", + ): + assert bytes_with < bytes_without + else: + assert bytes_with <= bytes_without diff --git a/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py b/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py index 662be20b316..7dbf06e94a8 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py @@ -1,3 +1,4 @@ +from datetime import datetime, timezone from unittest.mock import Mock, patch import pandas as pd @@ -5,9 +6,11 @@ import pytest from feast.infra.offline_stores.bigquery import ( + BigQueryOfflineStore, BigQueryOfflineStoreConfig, BigQueryRetrievalJob, ) +from feast.infra.offline_stores.bigquery_source import BigQuerySource from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast.repo_config import RepoConfig @@ -82,3 +85,70 @@ def test_to_arrow_timeout(self, big_query_result): self.retrieval_job._execute_query.assert_called_once_with( query=self.query, timeout=30 ) + + +@patch("feast.infra.offline_stores.bigquery._get_bigquery_client") +def test_pull_latest_from_table_or_query_partition_pruning(mock_get_bigquery_client): + mock_get_bigquery_client.return_value = Mock() + test_repo_config = RepoConfig( + registry="gs://ml-test/repo/registry.db", + project="test", + provider="gcp", + online_store=SqliteOnlineStoreConfig(type="sqlite"), + offline_store=BigQueryOfflineStoreConfig(type="bigquery", dataset="feast"), + ) + test_data_source = BigQuerySource( + table="project:dataset.table", + timestamp_field="event_timestamp", + date_partition_column="partition_date", + ) + retrieval_job = BigQueryOfflineStore.pull_latest_from_table_or_query( + config=test_repo_config, + data_source=test_data_source, + join_key_columns=["driver_id"], + feature_name_columns=["feature1"], + timestamp_field="event_timestamp", + created_timestamp_column=None, + start_date=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_date=datetime(2021, 1, 2, tzinfo=timezone.utc), + ) + actual_query = retrieval_job.to_sql() + assert ( + "event_timestamp BETWEEN TIMESTAMP('2021-01-01T00:00:00+00:00') AND TIMESTAMP('2021-01-02T00:00:00+00:00')" + in actual_query + ) + assert "partition_date >= '2021-01-01'" in actual_query + assert "partition_date <= '2021-01-02'" in actual_query + + +@patch("feast.infra.offline_stores.bigquery._get_bigquery_client") +def test_pull_all_from_table_or_query_partition_pruning(mock_get_bigquery_client): + mock_get_bigquery_client.return_value = Mock() + test_repo_config = RepoConfig( + registry="gs://ml-test/repo/registry.db", + project="test", + provider="gcp", + online_store=SqliteOnlineStoreConfig(type="sqlite"), + offline_store=BigQueryOfflineStoreConfig(type="bigquery", dataset="feast"), + ) + test_data_source = BigQuerySource( + table="project:dataset.table", + timestamp_field="event_timestamp", + date_partition_column="partition_date", + ) + retrieval_job = BigQueryOfflineStore.pull_all_from_table_or_query( + config=test_repo_config, + data_source=test_data_source, + join_key_columns=["driver_id"], + feature_name_columns=["feature1"], + timestamp_field="event_timestamp", + start_date=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_date=datetime(2021, 1, 2, tzinfo=timezone.utc), + ) + actual_query = retrieval_job.to_sql() + assert ( + "event_timestamp BETWEEN TIMESTAMP('2021-01-01T00:00:00+00:00') AND TIMESTAMP('2021-01-02T00:00:00+00:00')" + in actual_query + ) + assert "partition_date >= '2021-01-01'" in actual_query + assert "partition_date <= '2021-01-02'" in actual_query