Skip to content

Commit 236a108

Browse files
author
Vitaly Sergeyev
authored
feat: Key ttl setting for redis online store (feast-dev#2341)
* key ttl setting for redis online store Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com> * Optional typing Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>
1 parent 4e47686 commit 236a108

2 files changed

Lines changed: 69 additions & 3 deletions

File tree

sdk/python/feast/infra/online_stores/redis.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel):
6868
"""Connection string containing the host, port, and configuration parameters for Redis
6969
format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """
7070

71+
key_ttl_seconds: Optional[int] = None
72+
"""(Optional) redis key bin ttl (in seconds) for expiring entities"""
73+
7174

7275
class RedisOnlineStore(OnlineStore):
7376
_client: Optional[Union[Redis, RedisCluster]] = None
@@ -227,9 +230,11 @@ def online_write_batch(
227230
entity_hset[f_key] = val.SerializeToString()
228231

229232
pipe.hset(redis_key_bin, mapping=entity_hset)
230-
# TODO: support expiring the entity / features in Redis
231-
# otherwise entity features remain in redis until cleaned up in separate process
232-
# client.expire redis_key_bin based a ttl setting
233+
234+
if online_store_config.key_ttl_seconds:
235+
pipe.expire(
236+
name=redis_key_bin, time=online_store_config.key_ttl_seconds
237+
)
233238
results = pipe.execute()
234239
if progress:
235240
progress(len(results))

sdk/python/tests/integration/online_store/test_universal_online.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,67 @@
3535
from tests.utils.data_source_utils import prep_file_source
3636

3737

38+
@pytest.mark.integration
39+
def test_entity_ttl_online_store(local_redis_environment, universal_data_sources):
40+
if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True":
41+
return
42+
fs = local_redis_environment.feature_store
43+
# setting ttl setting in online store to 1 second
44+
fs.config.online_store.key_ttl_seconds = 1
45+
entities, datasets, data_sources = universal_data_sources
46+
driver_hourly_stats = create_driver_hourly_stats_feature_view(
47+
data_sources["driver"]
48+
)
49+
driver_entity = driver()
50+
51+
# Register Feature View and Entity
52+
fs.apply([driver_hourly_stats, driver_entity])
53+
54+
# fake data to ingest into Online Store
55+
data = {
56+
"driver_id": [1],
57+
"conv_rate": [0.5],
58+
"acc_rate": [0.6],
59+
"avg_daily_trips": [4],
60+
"event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
61+
"created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
62+
}
63+
df_ingest = pd.DataFrame(data)
64+
65+
# directly ingest data into the Online Store
66+
fs.write_to_online_store("driver_stats", df_ingest)
67+
68+
# assert the right data is in the Online Store
69+
df = fs.get_online_features(
70+
features=[
71+
"driver_stats:avg_daily_trips",
72+
"driver_stats:acc_rate",
73+
"driver_stats:conv_rate",
74+
],
75+
entity_rows=[{"driver": 1}],
76+
).to_df()
77+
assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_equal_to(4)
78+
assertpy.assert_that(df["acc_rate"].iloc[0]).is_close_to(0.6, 1e-6)
79+
assertpy.assert_that(df["conv_rate"].iloc[0]).is_close_to(0.5, 1e-6)
80+
81+
# simulate time passing for testing ttl
82+
time.sleep(1)
83+
84+
# retrieve the same entity again
85+
df = fs.get_online_features(
86+
features=[
87+
"driver_stats:avg_daily_trips",
88+
"driver_stats:acc_rate",
89+
"driver_stats:conv_rate",
90+
],
91+
entity_rows=[{"driver": 1}],
92+
).to_df()
93+
# assert that the entity features expired in the online store
94+
assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_none()
95+
assertpy.assert_that(df["acc_rate"].iloc[0]).is_none()
96+
assertpy.assert_that(df["conv_rate"].iloc[0]).is_none()
97+
98+
3899
# TODO: make this work with all universal (all online store types)
39100
@pytest.mark.integration
40101
def test_write_to_online_store_event_check(local_redis_environment):

0 commit comments

Comments
 (0)