From af0e9b66595b16a7d9a898d4c3279ecc48acb519 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 25 Jan 2022 15:31:25 +0700 Subject: [PATCH 1/5] Delete entity from redis when the last attached feature view is deleted Signed-off-by: pyalex --- sdk/python/feast/infra/online_stores/redis.py | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 493c6ab462..752ed7d009 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -72,11 +72,11 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel): class RedisOnlineStore(OnlineStore): _client: Optional[Union[Redis, RedisCluster]] = None - def delete_table_values(self, config: RepoConfig, table: FeatureView): + def delete_entity_values(self, config: RepoConfig, join_keys: List[str]): client = self._get_client(config.online_store) deleted_count = 0 pipeline = client.pipeline() - prefix = _redis_key_prefix(table.entities) + prefix = _redis_key_prefix(join_keys) for _k in client.scan_iter( b"".join([prefix, b"*", config.project.encode("utf8")]) @@ -85,7 +85,7 @@ def delete_table_values(self, config: RepoConfig, table: FeatureView): deleted_count += 1 pipeline.execute() - logger.debug(f"Deleted {deleted_count} keys for {table.name}") + logger.debug(f"Deleted {deleted_count} rows for entity {', '.join(join_keys)}") @log_exceptions_and_usage(online_store="redis") def update( @@ -98,10 +98,16 @@ def update( partial: bool, ): """ - We delete the keys in redis for tables/views being removed. + Look for join_keys (list of entities) that are not in use anymore + (usually this happens when the last feature view that was using specific compound key is deleted) + and remove all features attached to this "join_keys". """ - for table in tables_to_delete: - self.delete_table_values(config, table) + join_keys_to_keep = set(tuple(table.entities) for table in tables_to_keep) + + join_keys_to_delete = set(tuple(table.entities) for table in tables_to_delete) + + for join_keys in join_keys_to_delete - join_keys_to_keep: + self.delete_entity_values(config, list(join_keys)) def teardown( self, @@ -112,8 +118,10 @@ def teardown( """ We delete the keys in redis for tables/views being removed. """ - for table in tables: - self.delete_table_values(config, table) + join_keys_to_delete = set(tuple(table.entities) for table in tables) + + for join_keys in join_keys_to_delete: + self.delete_entity_values(config, list(join_keys)) @staticmethod def _parse_connection_string(connection_string: str): From e444b1e8c4eb0dc9ef67e12a53ea0992fe397ef8 Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 26 Jan 2022 16:28:05 +0700 Subject: [PATCH 2/5] Delete entity key from Redis only when all attached feature views are gone Signed-off-by: pyalex --- .../online_store/test_universal_online.py | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index b23c68033e..f483d54f6b 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -28,6 +28,7 @@ ) from tests.integration.feature_repos.universal.feature_views import ( create_driver_hourly_stats_feature_view, + driver_feature_view, ) from tests.utils.data_source_utils import prep_file_source @@ -503,6 +504,79 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name ) +@pytest.mark.integration +@pytest.mark.universal +def test_online_store_cleanup(environment, universal_data_sources): + """ + Some online store implementations (like Redis) keep features from different features views + but with common entities together. + This might end up with deletion of all features attached to the entity, + when only one feature view was deletion target (see https://github.com/feast-dev/feast/issues/2150). + + Plan: + 1. Register two feature views with common entity "driver" + 2. Materialize data + 3. Check if features are available (via online retrieval) + 4. Delete one feature view + 5. Check that features for other are still available + 6. Delete another feature view (and create again) + 7. Verify that features for both feature view were deleted + """ + fs = environment.feature_store + entities, datasets, data_sources = universal_data_sources + driver_stats_fv = construct_universal_feature_views(data_sources)["driver"] + + df = pd.DataFrame( + { + "ts_1": [environment.end_date] * len(entities["driver"]), + "created_ts": [environment.end_date] * len(entities["driver"]), + "driver_id": entities["driver"], + "value": np.random.random(size=len(entities["driver"])), + } + ) + + ds = environment.data_source_creator.create_data_source( + df, destination_name="simple_driver_dataset" + ) + + simple_driver_fv = driver_feature_view( + data_source=ds, name="test_universal_online_simple_driver" + ) + + fs.apply([driver(), simple_driver_fv, driver_stats_fv]) + + fs.materialize( + environment.start_date - timedelta(days=1), + environment.end_date + timedelta(days=1), + ) + expected_values = df.sort_values(by="driver_id") + + features = [f"{simple_driver_fv.name}:value"] + entity_rows = [{"driver": driver_id} for driver_id in sorted(entities["driver"])] + + online_features = fs.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + assert np.allclose(expected_values["value"], online_features["value"]) + + fs.apply( + objects=[simple_driver_fv], objects_to_delete=[driver_stats_fv], partial=False + ) + + online_features = fs.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + assert np.allclose(expected_values["value"], online_features["value"]) + + fs.apply(objects=[], objects_to_delete=[simple_driver_fv], partial=False) + fs.apply([simple_driver_fv]) + + online_features = fs.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + assert all(v is None for v in online_features["value"]) + + def response_feature_name(feature: str, full_feature_names: bool) -> str: if ( feature in {"current_balance", "avg_passenger_count", "lifetime_trip_count"} From 912460565c1d31563fe04b4991478945911cac23 Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 26 Jan 2022 16:36:26 +0700 Subject: [PATCH 3/5] make lint happy Signed-off-by: pyalex --- sdk/python/feast/feature_store.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c4d03fd01c..c75fad8ab5 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -69,6 +69,7 @@ from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, + FieldStatusValue, GetOnlineFeaturesResponse, ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -1349,7 +1350,7 @@ def _read_from_online_store( provider: Provider, requested_features: List[str], table: FeatureView, - ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: + ) -> List[Tuple[List[Timestamp], List[FieldStatusValue], List[Value]]]: """ Read and process data from the OnlineStore for a given FeatureView. This method guarentees that the order of the data in each element of the @@ -1403,9 +1404,7 @@ def _read_from_online_store( @staticmethod def _populate_response_from_feature_data( feature_data: Iterable[ - Tuple[ - Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] - ] + Tuple[Iterable[Timestamp], Iterable[FieldStatusValue], Iterable[Value]] ], indexes: Iterable[Iterable[int]], online_features_response: GetOnlineFeaturesResponse, From cb3f1622e336070b4064b3620675939e552d827f Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 26 Jan 2022 16:48:11 +0700 Subject: [PATCH 4/5] make lint happy Signed-off-by: pyalex --- sdk/python/feast/feature_store.py | 7 ++++--- .../tests/integration/feature_repos/repo_configuration.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c75fad8ab5..c4d03fd01c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -69,7 +69,6 @@ from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, - FieldStatusValue, GetOnlineFeaturesResponse, ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -1350,7 +1349,7 @@ def _read_from_online_store( provider: Provider, requested_features: List[str], table: FeatureView, - ) -> List[Tuple[List[Timestamp], List[FieldStatusValue], List[Value]]]: + ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: """ Read and process data from the OnlineStore for a given FeatureView. This method guarentees that the order of the data in each element of the @@ -1404,7 +1403,9 @@ def _read_from_online_store( @staticmethod def _populate_response_from_feature_data( feature_data: Iterable[ - Tuple[Iterable[Timestamp], Iterable[FieldStatusValue], Iterable[Value]] + Tuple[ + Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] + ] ], indexes: Iterable[Iterable[int]], online_features_response: GetOnlineFeaturesResponse, diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 45044574e0..7f3c6d67f9 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -289,7 +289,7 @@ def construct_test_environment( feature_server = None registry = RegistryConfig( path=str(Path(repo_dir_name) / "registry.db"), cache_ttl_seconds=1, - ) + ) # type: ignore config = RepoConfig( registry=registry, From 5145f5e95764c2153d43b73f01b5eaed677cd6d9 Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 26 Jan 2022 16:53:07 +0700 Subject: [PATCH 5/5] one more try with mypy Signed-off-by: pyalex --- .../tests/integration/feature_repos/repo_configuration.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 7f3c6d67f9..e1f4f0317c 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -7,7 +7,7 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import pandas as pd import yaml @@ -283,13 +283,15 @@ def construct_test_environment( execution_role_name="arn:aws:iam::402087665549:role/lambda_execution_role", ) - registry = f"s3://feast-integration-tests/registries/{project}/registry.db" + registry = ( + f"s3://feast-integration-tests/registries/{project}/registry.db" + ) # type: Union[str, RegistryConfig] else: # Note: even if it's a local feature server, the repo config does not have this configured feature_server = None registry = RegistryConfig( path=str(Path(repo_dir_name) / "registry.db"), cache_ttl_seconds=1, - ) # type: ignore + ) config = RepoConfig( registry=registry,