|
| 1 | +import contextlib |
| 2 | +import tempfile |
| 3 | +import time |
| 4 | +import uuid |
| 5 | +from datetime import datetime, timedelta |
| 6 | +from pathlib import Path |
| 7 | +from typing import Iterator, Tuple, Union |
| 8 | + |
| 9 | +import pandas as pd |
| 10 | +import pytest |
| 11 | +from google.cloud import bigquery |
| 12 | +from pytz import timezone, utc |
| 13 | + |
| 14 | +from feast.data_format import ParquetFormat |
| 15 | +from feast.data_source import BigQuerySource, FileSource |
| 16 | +from feast.feature import Feature |
| 17 | +from feast.feature_store import FeatureStore |
| 18 | +from feast.feature_view import FeatureView |
| 19 | +from feast.repo_config import LocalOnlineStoreConfig, OnlineStoreConfig, RepoConfig |
| 20 | +from feast.value_type import ValueType |
| 21 | + |
| 22 | + |
| 23 | +def create_dataset() -> pd.DataFrame: |
| 24 | + now = datetime.utcnow() |
| 25 | + ts = pd.Timestamp(now).round("ms") |
| 26 | + data = { |
| 27 | + "id": [1, 2, 1, 3, 3], |
| 28 | + "value": [0.1, 0.2, 0.3, 4, 5], |
| 29 | + "ts_1": [ |
| 30 | + ts - timedelta(hours=4), |
| 31 | + ts, |
| 32 | + ts - timedelta(hours=3), |
| 33 | + # Use different time zones to test tz-naive -> tz-aware conversion |
| 34 | + (ts - timedelta(hours=4)) |
| 35 | + .replace(tzinfo=utc) |
| 36 | + .astimezone(tz=timezone("Europe/Berlin")), |
| 37 | + (ts - timedelta(hours=1)) |
| 38 | + .replace(tzinfo=utc) |
| 39 | + .astimezone(tz=timezone("US/Pacific")), |
| 40 | + ], |
| 41 | + "created_ts": [ts, ts, ts, ts, ts], |
| 42 | + } |
| 43 | + return pd.DataFrame.from_dict(data) |
| 44 | + |
| 45 | + |
| 46 | +def get_feature_view(data_source: Union[FileSource, BigQuerySource]) -> FeatureView: |
| 47 | + return FeatureView( |
| 48 | + name="test_bq_correctness", |
| 49 | + entities=["driver_id"], |
| 50 | + features=[Feature("value", ValueType.FLOAT)], |
| 51 | + ttl=timedelta(days=5), |
| 52 | + input=data_source, |
| 53 | + ) |
| 54 | + |
| 55 | + |
| 56 | +# bq_source_type must be one of "query" and "table" |
| 57 | +@contextlib.contextmanager |
| 58 | +def prep_bq_fs_and_fv( |
| 59 | + bq_source_type: str, |
| 60 | +) -> Iterator[Tuple[FeatureStore, FeatureView]]: |
| 61 | + client = bigquery.Client() |
| 62 | + gcp_project = client.project |
| 63 | + bigquery_dataset = "test_ingestion" |
| 64 | + dataset = bigquery.Dataset(f"{gcp_project}.{bigquery_dataset}") |
| 65 | + client.create_dataset(dataset, exists_ok=True) |
| 66 | + dataset.default_table_expiration_ms = ( |
| 67 | + 1000 * 60 * 60 * 24 * 14 |
| 68 | + ) # 2 weeks in milliseconds |
| 69 | + client.update_dataset(dataset, ["default_table_expiration_ms"]) |
| 70 | + |
| 71 | + df = create_dataset() |
| 72 | + |
| 73 | + job_config = bigquery.LoadJobConfig() |
| 74 | + table_ref = f"{gcp_project}.{bigquery_dataset}.{bq_source_type}_correctness_{int(time.time())}" |
| 75 | + query = f"SELECT * FROM `{table_ref}`" |
| 76 | + job = client.load_table_from_dataframe(df, table_ref, job_config=job_config) |
| 77 | + job.result() |
| 78 | + |
| 79 | + bigquery_source = BigQuerySource( |
| 80 | + table_ref=table_ref if bq_source_type == "table" else None, |
| 81 | + query=query if bq_source_type == "query" else None, |
| 82 | + event_timestamp_column="ts", |
| 83 | + created_timestamp_column="created_ts", |
| 84 | + date_partition_column="", |
| 85 | + field_mapping={"ts_1": "ts", "id": "driver_id"}, |
| 86 | + ) |
| 87 | + |
| 88 | + fv = get_feature_view(bigquery_source) |
| 89 | + with tempfile.TemporaryDirectory() as repo_dir_name: |
| 90 | + config = RepoConfig( |
| 91 | + registry=str(Path(repo_dir_name) / "registry.db"), |
| 92 | + project=f"test_bq_correctness_{uuid.uuid4()}", |
| 93 | + provider="gcp", |
| 94 | + ) |
| 95 | + fs = FeatureStore(config=config) |
| 96 | + fs.apply([fv]) |
| 97 | + |
| 98 | + yield fs, fv |
| 99 | + |
| 100 | + |
| 101 | +@contextlib.contextmanager |
| 102 | +def prep_local_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: |
| 103 | + with tempfile.NamedTemporaryFile(suffix=".parquet") as f: |
| 104 | + df = create_dataset() |
| 105 | + f.close() |
| 106 | + df.to_parquet(f.name) |
| 107 | + file_source = FileSource( |
| 108 | + file_format=ParquetFormat(), |
| 109 | + file_url=f"file://{f.name}", |
| 110 | + event_timestamp_column="ts", |
| 111 | + created_timestamp_column="created_ts", |
| 112 | + date_partition_column="", |
| 113 | + field_mapping={"ts_1": "ts", "id": "driver_id"}, |
| 114 | + ) |
| 115 | + fv = get_feature_view(file_source) |
| 116 | + with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: |
| 117 | + config = RepoConfig( |
| 118 | + registry=str(Path(repo_dir_name) / "registry.db"), |
| 119 | + project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}", |
| 120 | + provider="local", |
| 121 | + online_store=OnlineStoreConfig( |
| 122 | + local=LocalOnlineStoreConfig( |
| 123 | + path=str(Path(data_dir_name) / "online_store.db") |
| 124 | + ) |
| 125 | + ), |
| 126 | + ) |
| 127 | + fs = FeatureStore(config=config) |
| 128 | + fs.apply([fv]) |
| 129 | + |
| 130 | + yield fs, fv |
| 131 | + |
| 132 | + |
| 133 | +def run_materialization_test(fs: FeatureStore, fv: FeatureView) -> None: |
| 134 | + now = datetime.utcnow() |
| 135 | + # Run materialize() |
| 136 | + # use both tz-naive & tz-aware timestamps to test that they're both correctly handled |
| 137 | + start_date = (now - timedelta(hours=5)).replace(tzinfo=utc) |
| 138 | + end_date = now - timedelta(hours=2) |
| 139 | + fs.materialize([fv.name], start_date, end_date) |
| 140 | + |
| 141 | + # check result of materialize() |
| 142 | + response_dict = fs.get_online_features( |
| 143 | + [f"{fv.name}:value"], [{"driver_id": 1}] |
| 144 | + ).to_dict() |
| 145 | + assert abs(response_dict[f"{fv.name}__value"][0] - 0.3) < 1e-6 |
| 146 | + |
| 147 | + # check prior value for materialize_incremental() |
| 148 | + response_dict = fs.get_online_features( |
| 149 | + [f"{fv.name}:value"], [{"driver_id": 3}] |
| 150 | + ).to_dict() |
| 151 | + assert abs(response_dict[f"{fv.name}__value"][0] - 4) < 1e-6 |
| 152 | + |
| 153 | + # run materialize_incremental() |
| 154 | + fs.materialize_incremental( |
| 155 | + [fv.name], now - timedelta(seconds=0), |
| 156 | + ) |
| 157 | + |
| 158 | + # check result of materialize_incremental() |
| 159 | + response_dict = fs.get_online_features( |
| 160 | + [f"{fv.name}:value"], [{"driver_id": 3}] |
| 161 | + ).to_dict() |
| 162 | + assert abs(response_dict[f"{fv.name}__value"][0] - 5) < 1e-6 |
| 163 | + |
| 164 | + |
| 165 | +@pytest.mark.integration |
| 166 | +@pytest.mark.parametrize( |
| 167 | + "bq_source_type", ["query", "table"], |
| 168 | +) |
| 169 | +def test_bq_materialization(bq_source_type: str): |
| 170 | + with prep_bq_fs_and_fv(bq_source_type) as (fs, fv): |
| 171 | + run_materialization_test(fs, fv) |
| 172 | + |
| 173 | + |
| 174 | +def test_local_materialization(): |
| 175 | + with prep_local_fs_and_fv() as (fs, fv): |
| 176 | + run_materialization_test(fs, fv) |
0 commit comments