Skip to content

Commit 7538f7e

Browse files
authored
Delete keys from Redis when tearing down online store (feast-dev#1965)
* Delete keys from redis when tearing down online store Signed-off-by: Achal Shah <achals@gmail.com> * add project Signed-off-by: Achal Shah <achals@gmail.com> * Update comments and update method as well Signed-off-by: Achal Shah <achals@gmail.com> * Update comments and update method as well Signed-off-by: Achal Shah <achals@gmail.com> * CR updates Signed-off-by: Achal Shah <achals@gmail.com> * Update comments and update method as well Signed-off-by: Achal Shah <achals@gmail.com> * docstring Signed-off-by: Achal Shah <achals@gmail.com>
1 parent a3e18f6 commit 7538f7e

File tree

4 files changed

+51
-8
lines changed

4 files changed

+51
-8
lines changed

sdk/python/feast/feature_table.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def name(self, name: str):
110110
self._name = name
111111

112112
@property
113-
def entities(self):
113+
def entities(self) -> List[str]:
114114
"""
115115
Returns the entities of this feature table
116116
"""

sdk/python/feast/infra/key_encoding_utils.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,22 @@ def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]:
1919
raise ValueError(f"Value type not supported for Firestore: {v}")
2020

2121

22+
def serialize_entity_key_prefix(entity_keys: List[str]) -> bytes:
23+
"""
24+
Serialize keys to a bytestring so it can be used to prefix-scan through items stored in the online store
25+
using serialize_entity_key.
26+
27+
This encoding is a partial implementation of serialize_entity_key, only operating on the keys of entities,
28+
and not the values.
29+
"""
30+
sorted_keys = sorted(entity_keys)
31+
output: List[bytes] = []
32+
for k in sorted_keys:
33+
output.append(struct.pack("<I", ValueType.STRING))
34+
output.append(k.encode("utf8"))
35+
return b"".join(output)
36+
37+
2238
def serialize_entity_key(entity_key: EntityKeyProto) -> bytes:
2339
"""
2440
Serialize entity key to a bytestring so it can be used as a lookup key in a hash table.

sdk/python/feast/infra/online_stores/helpers.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import mmh3
66

77
from feast import errors
8-
from feast.infra.key_encoding_utils import serialize_entity_key
8+
from feast.infra.key_encoding_utils import (
9+
serialize_entity_key,
10+
serialize_entity_key_prefix,
11+
)
912
from feast.infra.online_stores.online_store import OnlineStore
1013
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
1114

@@ -41,6 +44,10 @@ def _redis_key(project: str, entity_key: EntityKeyProto) -> bytes:
4144
return b"".join(key)
4245

4346

47+
def _redis_key_prefix(entity_keys: List[str]) -> bytes:
48+
return serialize_entity_key_prefix(entity_keys)
49+
50+
4451
def _mmh3(key: str):
4552
"""
4653
Calculate murmur3_32 hash which is equal to scala version which is using little endian:

sdk/python/feast/infra/online_stores/redis.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import json
15+
import logging
1516
from datetime import datetime
1617
from enum import Enum
1718
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
@@ -21,7 +22,7 @@
2122
from pydantic.typing import Literal
2223

2324
from feast import Entity, FeatureTable, FeatureView, RepoConfig, utils
24-
from feast.infra.online_stores.helpers import _mmh3, _redis_key
25+
from feast.infra.online_stores.helpers import _mmh3, _redis_key, _redis_key_prefix
2526
from feast.infra.online_stores.online_store import OnlineStore
2627
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2728
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
@@ -36,6 +37,7 @@
3637
raise FeastExtrasDependencyImportError("redis", str(e))
3738

3839
EX_SECONDS = 253402300799
40+
logger = logging.getLogger(__name__)
3941

4042

4143
class RedisType(str, Enum):
@@ -60,6 +62,23 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel):
6062
class RedisOnlineStore(OnlineStore):
6163
_client: Optional[Union[Redis, RedisCluster]] = None
6264

65+
def delete_table_values(
66+
self, config: RepoConfig, table: Union[FeatureTable, FeatureView]
67+
):
68+
client = self._get_client(config.online_store)
69+
deleted_count = 0
70+
pipeline = client.pipeline()
71+
prefix = _redis_key_prefix(table.entities)
72+
73+
for _k in client.scan_iter(
74+
b"".join([prefix, b"*", config.project.encode("utf8")])
75+
):
76+
pipeline.delete(_k)
77+
deleted_count += 1
78+
pipeline.execute()
79+
80+
logger.debug(f"Deleted {deleted_count} keys for {table.name}")
81+
6382
def update(
6483
self,
6584
config: RepoConfig,
@@ -70,9 +89,10 @@ def update(
7089
partial: bool,
7190
):
7291
"""
73-
There's currently no setup done for Redis.
92+
We delete the keys in redis for tables/views being removed.
7493
"""
75-
pass
94+
for table in tables_to_delete:
95+
self.delete_table_values(config, table)
7696

7797
def teardown(
7898
self,
@@ -81,9 +101,10 @@ def teardown(
81101
entities: Sequence[Entity],
82102
):
83103
"""
84-
There's currently no teardown done for Redis.
104+
We delete the keys in redis for tables/views being removed.
85105
"""
86-
pass
106+
for table in tables:
107+
self.delete_table_values(config, table)
87108

88109
@staticmethod
89110
def _parse_connection_string(connection_string: str):
@@ -151,7 +172,6 @@ def online_write_batch(
151172
ex = Timestamp()
152173
ex.seconds = EX_SECONDS
153174
ex_str = ex.SerializeToString()
154-
155175
for entity_key, values, timestamp, created_ts in data:
156176
redis_key_bin = _redis_key(project, entity_key)
157177
ts = Timestamp()

0 commit comments

Comments
 (0)