|
15 | 15 | from datetime import timedelta |
16 | 16 | from tempfile import mkstemp |
17 | 17 |
|
| 18 | +import pandas as pd |
18 | 19 | import pytest |
19 | 20 | from pytest_lazyfixture import lazy_fixture |
20 | 21 |
|
|
23 | 24 | from feast.entity import Entity |
24 | 25 | from feast.feature import Feature |
25 | 26 | from feast.feature_view import FeatureView |
| 27 | +from feast.on_demand_feature_view import RequestDataSource, on_demand_feature_view |
26 | 28 | from feast.protos.feast.types import Value_pb2 as ValueProto |
27 | 29 | from feast.registry import Registry |
28 | 30 | from feast.repo_config import RegistryConfig |
@@ -231,6 +233,126 @@ def test_apply_feature_view_success(test_registry): |
231 | 233 | test_registry._get_registry_proto() |
232 | 234 |
|
233 | 235 |
|
| 236 | +@pytest.mark.parametrize( |
| 237 | + "test_registry", [lazy_fixture("local_registry")], |
| 238 | +) |
| 239 | +def test_modify_feature_views_success(test_registry): |
| 240 | + # Create Feature Views |
| 241 | + batch_source = FileSource( |
| 242 | + file_format=ParquetFormat(), |
| 243 | + path="file://feast/*", |
| 244 | + event_timestamp_column="ts_col", |
| 245 | + created_timestamp_column="timestamp", |
| 246 | + date_partition_column="date_partition_col", |
| 247 | + ) |
| 248 | + |
| 249 | + request_source = RequestDataSource( |
| 250 | + name="request_source", schema={"my_input_1": ValueType.INT32} |
| 251 | + ) |
| 252 | + |
| 253 | + fv1 = FeatureView( |
| 254 | + name="my_feature_view_1", |
| 255 | + features=[Feature(name="fs1_my_feature_1", dtype=ValueType.INT64)], |
| 256 | + entities=["fs1_my_entity_1"], |
| 257 | + tags={"team": "matchmaking"}, |
| 258 | + batch_source=batch_source, |
| 259 | + ttl=timedelta(minutes=5), |
| 260 | + ) |
| 261 | + |
| 262 | + @on_demand_feature_view( |
| 263 | + features=[ |
| 264 | + Feature(name="odfv1_my_feature_1", dtype=ValueType.STRING), |
| 265 | + Feature(name="odfv1_my_feature_2", dtype=ValueType.INT32), |
| 266 | + ], |
| 267 | + inputs={"request_source": request_source}, |
| 268 | + ) |
| 269 | + def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: |
| 270 | + data = pd.DataFrame() |
| 271 | + data["odfv1_my_feature_1"] = feature_df["my_input_1"].astype("category") |
| 272 | + data["odfv1_my_feature_2"] = feature_df["my_input_1"].astype("int32") |
| 273 | + return data |
| 274 | + |
| 275 | + project = "project" |
| 276 | + |
| 277 | + # Register Feature Views |
| 278 | + test_registry.apply_feature_view(odfv1, project) |
| 279 | + test_registry.apply_feature_view(fv1, project) |
| 280 | + |
| 281 | + # Modify odfv by changing a single feature dtype |
| 282 | + @on_demand_feature_view( |
| 283 | + features=[ |
| 284 | + Feature(name="odfv1_my_feature_1", dtype=ValueType.FLOAT), |
| 285 | + Feature(name="odfv1_my_feature_2", dtype=ValueType.INT32), |
| 286 | + ], |
| 287 | + inputs={"request_source": request_source}, |
| 288 | + ) |
| 289 | + def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: |
| 290 | + data = pd.DataFrame() |
| 291 | + data["odfv1_my_feature_1"] = feature_df["my_input_1"].astype("float") |
| 292 | + data["odfv1_my_feature_2"] = feature_df["my_input_1"].astype("int32") |
| 293 | + return data |
| 294 | + |
| 295 | + # Apply the modified odfv |
| 296 | + test_registry.apply_feature_view(odfv1, project) |
| 297 | + |
| 298 | + # Check odfv |
| 299 | + on_demand_feature_views = test_registry.list_on_demand_feature_views(project) |
| 300 | + |
| 301 | + assert ( |
| 302 | + len(on_demand_feature_views) == 1 |
| 303 | + and on_demand_feature_views[0].name == "odfv1" |
| 304 | + and on_demand_feature_views[0].features[0].name == "odfv1_my_feature_1" |
| 305 | + and on_demand_feature_views[0].features[0].dtype == ValueType.FLOAT |
| 306 | + and on_demand_feature_views[0].features[1].name == "odfv1_my_feature_2" |
| 307 | + and on_demand_feature_views[0].features[1].dtype == ValueType.INT32 |
| 308 | + ) |
| 309 | + request_schema = on_demand_feature_views[0].get_request_data_schema() |
| 310 | + assert ( |
| 311 | + list(request_schema.keys())[0] == "my_input_1" |
| 312 | + and list(request_schema.values())[0] == ValueType.INT32 |
| 313 | + ) |
| 314 | + |
| 315 | + feature_view = test_registry.get_on_demand_feature_view("odfv1", project) |
| 316 | + assert ( |
| 317 | + feature_view.name == "odfv1" |
| 318 | + and feature_view.features[0].name == "odfv1_my_feature_1" |
| 319 | + and feature_view.features[0].dtype == ValueType.FLOAT |
| 320 | + and feature_view.features[1].name == "odfv1_my_feature_2" |
| 321 | + and feature_view.features[1].dtype == ValueType.INT32 |
| 322 | + ) |
| 323 | + request_schema = feature_view.get_request_data_schema() |
| 324 | + assert ( |
| 325 | + list(request_schema.keys())[0] == "my_input_1" |
| 326 | + and list(request_schema.values())[0] == ValueType.INT32 |
| 327 | + ) |
| 328 | + |
| 329 | + # Make sure fv1 is untouched |
| 330 | + feature_views = test_registry.list_feature_views(project) |
| 331 | + |
| 332 | + # List Feature Views |
| 333 | + assert ( |
| 334 | + len(feature_views) == 1 |
| 335 | + and feature_views[0].name == "my_feature_view_1" |
| 336 | + and feature_views[0].features[0].name == "fs1_my_feature_1" |
| 337 | + and feature_views[0].features[0].dtype == ValueType.INT64 |
| 338 | + and feature_views[0].entities[0] == "fs1_my_entity_1" |
| 339 | + ) |
| 340 | + |
| 341 | + feature_view = test_registry.get_feature_view("my_feature_view_1", project) |
| 342 | + assert ( |
| 343 | + feature_view.name == "my_feature_view_1" |
| 344 | + and feature_view.features[0].name == "fs1_my_feature_1" |
| 345 | + and feature_view.features[0].dtype == ValueType.INT64 |
| 346 | + and feature_view.entities[0] == "fs1_my_entity_1" |
| 347 | + ) |
| 348 | + |
| 349 | + test_registry.teardown() |
| 350 | + |
| 351 | + # Will try to reload registry, which will fail because the file has been deleted |
| 352 | + with pytest.raises(FileNotFoundError): |
| 353 | + test_registry._get_registry_proto() |
| 354 | + |
| 355 | + |
234 | 356 | @pytest.mark.integration |
235 | 357 | @pytest.mark.parametrize( |
236 | 358 | "test_registry", [lazy_fixture("gcs_registry"), lazy_fixture("s3_registry")], |
|
0 commit comments