Skip to content

Commit f959e1e

Browse files
Vitaly Sergeyevadchia
authored andcommitted
feat: Key ttl setting for redis online store (#2341)
* key ttl setting for redis online store Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com> * Optional typing Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>
1 parent dec8dea commit f959e1e

File tree

2 files changed

+69
-3
lines changed

2 files changed

+69
-3
lines changed

sdk/python/feast/infra/online_stores/redis.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel):
6868
"""Connection string containing the host, port, and configuration parameters for Redis
6969
format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """
7070

71+
key_ttl_seconds: Optional[int] = None
72+
"""(Optional) redis key bin ttl (in seconds) for expiring entities"""
73+
7174

7275
class RedisOnlineStore(OnlineStore):
7376
_client: Optional[Union[Redis, RedisCluster]] = None
@@ -227,9 +230,11 @@ def online_write_batch(
227230
entity_hset[f_key] = val.SerializeToString()
228231

229232
pipe.hset(redis_key_bin, mapping=entity_hset)
230-
# TODO: support expiring the entity / features in Redis
231-
# otherwise entity features remain in redis until cleaned up in separate process
232-
# client.expire redis_key_bin based a ttl setting
233+
234+
if online_store_config.key_ttl_seconds:
235+
pipe.expire(
236+
name=redis_key_bin, time=online_store_config.key_ttl_seconds
237+
)
233238
results = pipe.execute()
234239
if progress:
235240
progress(len(results))

sdk/python/tests/integration/online_store/test_universal_online.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,67 @@
3333
from tests.utils.data_source_utils import prep_file_source
3434

3535

36+
@pytest.mark.integration
37+
def test_entity_ttl_online_store(local_redis_environment, universal_data_sources):
38+
if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True":
39+
return
40+
fs = local_redis_environment.feature_store
41+
# setting ttl setting in online store to 1 second
42+
fs.config.online_store.key_ttl_seconds = 1
43+
entities, datasets, data_sources = universal_data_sources
44+
driver_hourly_stats = create_driver_hourly_stats_feature_view(
45+
data_sources["driver"]
46+
)
47+
driver_entity = driver()
48+
49+
# Register Feature View and Entity
50+
fs.apply([driver_hourly_stats, driver_entity])
51+
52+
# fake data to ingest into Online Store
53+
data = {
54+
"driver_id": [1],
55+
"conv_rate": [0.5],
56+
"acc_rate": [0.6],
57+
"avg_daily_trips": [4],
58+
"event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
59+
"created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
60+
}
61+
df_ingest = pd.DataFrame(data)
62+
63+
# directly ingest data into the Online Store
64+
fs.write_to_online_store("driver_stats", df_ingest)
65+
66+
# assert the right data is in the Online Store
67+
df = fs.get_online_features(
68+
features=[
69+
"driver_stats:avg_daily_trips",
70+
"driver_stats:acc_rate",
71+
"driver_stats:conv_rate",
72+
],
73+
entity_rows=[{"driver": 1}],
74+
).to_df()
75+
assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_equal_to(4)
76+
assertpy.assert_that(df["acc_rate"].iloc[0]).is_close_to(0.6, 1e-6)
77+
assertpy.assert_that(df["conv_rate"].iloc[0]).is_close_to(0.5, 1e-6)
78+
79+
# simulate time passing for testing ttl
80+
time.sleep(1)
81+
82+
# retrieve the same entity again
83+
df = fs.get_online_features(
84+
features=[
85+
"driver_stats:avg_daily_trips",
86+
"driver_stats:acc_rate",
87+
"driver_stats:conv_rate",
88+
],
89+
entity_rows=[{"driver": 1}],
90+
).to_df()
91+
# assert that the entity features expired in the online store
92+
assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_none()
93+
assertpy.assert_that(df["acc_rate"].iloc[0]).is_none()
94+
assertpy.assert_that(df["conv_rate"].iloc[0]).is_none()
95+
96+
3697
# TODO: make this work with all universal (all online store types)
3798
@pytest.mark.integration
3899
def test_write_to_online_store_event_check(local_redis_environment):

0 commit comments

Comments
 (0)