Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add redis online provider
Signed-off-by: qooba <dev@qooba.net>
  • Loading branch information
qooba authored and woop committed Jun 9, 2021
commit 6bea80d1a36ba9e2c91d5cade1030a7021a08b0f
24 changes: 14 additions & 10 deletions sdk/python/feast/infra/redis_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def teardown_infra(
) -> None:
# according to the repos_operations.py we can delete the whole project
client = self._get_client()
#keys = client.keys("{project}:*")
#client.unlink(*keys)
keys = client.keys("*{project}:*")
client.unlink(*keys)

def online_write_batch(
self,
Expand Down Expand Up @@ -108,15 +108,19 @@ def online_read(
ts_key = f"_ts:{feature_view}"
hset_keys.append(ts_key)
values = client.hmget(redis_key_bin, hset_keys)

requested_features.append(ts_key)
res_val = dict(zip(requested_features, values))
res_ts = res_val.pop(ts_key)

res_ts = Timestamp()
ts_val = res_val.pop(ts_key)
if ts_val:
res_ts.ParseFromString(ts_val)

res = {}
for feature_name, val_bin in res_val.items():
val = ValueProto()
val.FromString(val_bin)
if val_bin:
val.ParseFromString(val_bin)
res[feature_name] = val

if not res:
Expand Down Expand Up @@ -235,10 +239,10 @@ def get_historical_features(

def _redis_key(project: str, entity_key: EntityKeyProto) -> str:
redis_key = RedisKeyProto(
project=project,
entity_names=entity_key.join_keys,
entity_values=entity_key.entity_values,
)
project=project,
entity_names=entity_key.join_keys,
entity_values=entity_key.entity_values,
)
#key = _mmh3(serialize_entity_key(entity_key))
return redis_key.SerializeToString()

Expand All @@ -249,7 +253,7 @@ def _mmh3(key: str) -> str:
https://stackoverflow.com/questions/13141787/convert-decimal-int-to-little-endian-string-x-x
"""
key_hash = mmh3.hash(key,signed=False)
bytes.fromhex(struct.pack('<Q', key_hash).hex().rstrip('0'))
return bytes.fromhex(struct.pack('<Q', key_hash).hex().rstrip('0'))