Skip to content

Commit 6b33786

Browse files
committed
Add redis online provider - integration tests
Signed-off-by: qooba <dev@qooba.net>
1 parent f625b15 commit 6b33786

File tree

1 file changed

+38
-29
lines changed

1 file changed

+38
-29
lines changed

sdk/python/feast/infra/redis_provider.py

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,15 @@ def teardown_infra(
5656
) -> None:
5757
# according to the repos_operations.py we can delete the whole project
5858
client = self._get_client()
59-
keys = client.keys("*{project}:*")
60-
client.unlink(*keys)
59+
60+
tables_join_keys = [[e for e in t.entities] for t in tables]
61+
for table_join_keys in tables_join_keys:
62+
redis_key_bin = _redis_key(
63+
project, EntityKeyProto(join_keys=table_join_keys)
64+
)
65+
keys = client.keys(f"{redis_key_bin}*")
66+
if keys:
67+
client.unlink(*keys)
6168

6269
def online_write_batch(
6370
self,
@@ -103,33 +110,35 @@ def online_read(
103110

104111
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
105112

106-
if requested_features:
107-
for entity_key in entity_keys:
108-
redis_key_bin = _redis_key(project, entity_key)
109-
hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features]
110-
ts_key = f"_ts:{feature_view}"
111-
hset_keys.append(ts_key)
112-
values = client.hmget(redis_key_bin, hset_keys)
113-
requested_features.append(ts_key)
114-
res_val = dict(zip(requested_features, values))
115-
116-
res_ts = Timestamp()
117-
ts_val = res_val.pop(ts_key)
118-
if ts_val:
119-
res_ts.ParseFromString(ts_val)
120-
121-
res = {}
122-
for feature_name, val_bin in res_val.items():
123-
val = ValueProto()
124-
if val_bin:
125-
val.ParseFromString(val_bin)
126-
res[feature_name] = val
127-
128-
if not res:
129-
result.append((None, None))
130-
else:
131-
timestamp = datetime.fromtimestamp(res_ts.seconds)
132-
result.append((timestamp, res))
113+
if not requested_features:
114+
requested_features = [f.name for f in table.features]
115+
116+
for entity_key in entity_keys:
117+
redis_key_bin = _redis_key(project, entity_key)
118+
hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features]
119+
ts_key = f"_ts:{feature_view}"
120+
hset_keys.append(ts_key)
121+
values = client.hmget(redis_key_bin, hset_keys)
122+
requested_features.append(ts_key)
123+
res_val = dict(zip(requested_features, values))
124+
125+
res_ts = Timestamp()
126+
ts_val = res_val.pop(ts_key)
127+
if ts_val:
128+
res_ts.ParseFromString(ts_val)
129+
130+
res = {}
131+
for feature_name, val_bin in res_val.items():
132+
val = ValueProto()
133+
if val_bin:
134+
val.ParseFromString(val_bin)
135+
res[feature_name] = val
136+
137+
if not res:
138+
result.append((None, None))
139+
else:
140+
timestamp = datetime.fromtimestamp(res_ts.seconds)
141+
result.append((timestamp, res))
133142
return result
134143

135144
def materialize_single_feature_view(

0 commit comments

Comments
 (0)