@@ -56,8 +56,15 @@ def teardown_infra(
5656 ) -> None :
5757 # according to the repos_operations.py we can delete the whole project
5858 client = self ._get_client ()
59- keys = client .keys ("*{project}:*" )
60- client .unlink (* keys )
59+
60+ tables_join_keys = [[e for e in t .entities ] for t in tables ]
61+ for table_join_keys in tables_join_keys :
62+ redis_key_bin = _redis_key (
63+ project , EntityKeyProto (join_keys = table_join_keys )
64+ )
65+ keys = client .keys (f"{ redis_key_bin } *" )
66+ if keys :
67+ client .unlink (* keys )
6168
6269 def online_write_batch (
6370 self ,
@@ -103,33 +110,35 @@ def online_read(
103110
104111 result : List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]] = []
105112
106- if requested_features :
107- for entity_key in entity_keys :
108- redis_key_bin = _redis_key (project , entity_key )
109- hset_keys = [_mmh3 (f"{ feature_view } :{ k } " ) for k in requested_features ]
110- ts_key = f"_ts:{ feature_view } "
111- hset_keys .append (ts_key )
112- values = client .hmget (redis_key_bin , hset_keys )
113- requested_features .append (ts_key )
114- res_val = dict (zip (requested_features , values ))
115-
116- res_ts = Timestamp ()
117- ts_val = res_val .pop (ts_key )
118- if ts_val :
119- res_ts .ParseFromString (ts_val )
120-
121- res = {}
122- for feature_name , val_bin in res_val .items ():
123- val = ValueProto ()
124- if val_bin :
125- val .ParseFromString (val_bin )
126- res [feature_name ] = val
127-
128- if not res :
129- result .append ((None , None ))
130- else :
131- timestamp = datetime .fromtimestamp (res_ts .seconds )
132- result .append ((timestamp , res ))
113+ if not requested_features :
114+ requested_features = [f .name for f in table .features ]
115+
116+ for entity_key in entity_keys :
117+ redis_key_bin = _redis_key (project , entity_key )
118+ hset_keys = [_mmh3 (f"{ feature_view } :{ k } " ) for k in requested_features ]
119+ ts_key = f"_ts:{ feature_view } "
120+ hset_keys .append (ts_key )
121+ values = client .hmget (redis_key_bin , hset_keys )
122+ requested_features .append (ts_key )
123+ res_val = dict (zip (requested_features , values ))
124+
125+ res_ts = Timestamp ()
126+ ts_val = res_val .pop (ts_key )
127+ if ts_val :
128+ res_ts .ParseFromString (ts_val )
129+
130+ res = {}
131+ for feature_name , val_bin in res_val .items ():
132+ val = ValueProto ()
133+ if val_bin :
134+ val .ParseFromString (val_bin )
135+ res [feature_name ] = val
136+
137+ if not res :
138+ result .append ((None , None ))
139+ else :
140+ timestamp = datetime .fromtimestamp (res_ts .seconds )
141+ result .append ((timestamp , res ))
133142 return result
134143
135144 def materialize_single_feature_view (
0 commit comments