Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add redis online provider
Signed-off-by: qooba <dev@qooba.net>
  • Loading branch information
qooba authored and woop committed Jun 9, 2021
commit a3c4f3bf464c416371b7f0617b52562357daccda
64 changes: 32 additions & 32 deletions sdk/python/feast/infra/redis_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from feast import FeatureTable, utils
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
from feast.infra.provider import (
Provider,
Expand Down Expand Up @@ -47,7 +46,7 @@ def update_infra(
entities_to_keep: Sequence[Entity],
partial: bool,
):
client = self._get_client()
pass

def teardown_infra(
self,
Expand Down Expand Up @@ -80,9 +79,8 @@ def online_write_batch(

for entity_key, values, timestamp, created_ts in data:
redis_key_bin = _redis_key(project, entity_key)
timestamp = int(utils.make_tzaware(timestamp).timestamp())
ts = Timestamp()
ts.seconds = timestamp
ts.seconds = int(utils.make_tzaware(timestamp).timestamp())
entity_hset[f"_ts:{feature_view}"] = ts.SerializeToString()
entity_hset[f"_ex:{feature_view}"] = ex_str

Expand All @@ -105,31 +103,33 @@ def online_read(

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

for entity_key in entity_keys:
redis_key_bin = _redis_key(project, entity_key)
hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features]
ts_key = f"_ts:{feature_view}"
hset_keys.append(ts_key)
values = client.hmget(redis_key_bin, hset_keys)
requested_features.append(ts_key)
res_val = dict(zip(requested_features, values))

res_ts = Timestamp()
ts_val = res_val.pop(ts_key)
if ts_val:
res_ts.ParseFromString(ts_val)

res = {}
for feature_name, val_bin in res_val.items():
val = ValueProto()
if val_bin:
val.ParseFromString(val_bin)
res[feature_name] = val

if not res:
result.append((None, None))
else:
result.append((res_ts, res))
if requested_features:
for entity_key in entity_keys:
redis_key_bin = _redis_key(project, entity_key)
hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features]
ts_key = f"_ts:{feature_view}"
hset_keys.append(ts_key)
values = client.hmget(redis_key_bin, hset_keys)
requested_features.append(ts_key)
res_val = dict(zip(requested_features, values))

res_ts = Timestamp()
ts_val = res_val.pop(ts_key)
if ts_val:
res_ts.ParseFromString(ts_val)

res = {}
for feature_name, val_bin in res_val.items():
val = ValueProto()
if val_bin:
val.ParseFromString(val_bin)
res[feature_name] = val

if not res:
result.append((None, None))
else:
timestamp = datetime.fromtimestamp(res_ts.seconds)
result.append((timestamp, res))
return result

def materialize_single_feature_view(
Expand Down Expand Up @@ -188,7 +188,7 @@ def _get_cs(self):
startup_nodes = [
dict(zip(["host", "port"], c.split(":")))
for c in connection_string.split(",")
if not "=" in c
if "=" not in c
]
params = {}
for c in connection_string.split(","):
Expand Down Expand Up @@ -240,7 +240,7 @@ def get_historical_features(
)


def _redis_key(project: str, entity_key: EntityKeyProto) -> str:
def _redis_key(project: str, entity_key: EntityKeyProto):
redis_key = RedisKeyProto(
project=project,
entity_names=entity_key.join_keys,
Expand All @@ -250,7 +250,7 @@ def _redis_key(project: str, entity_key: EntityKeyProto) -> str:
return redis_key.SerializeToString()


def _mmh3(key: str) -> str:
def _mmh3(key: str):
"""
Calculate murmur3_32 hash which is equal to scala version which is using little endian:
https://stackoverflow.com/questions/29932956/murmur3-hash-different-result-between-python-and-java-implementation
Expand Down