Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add support for Redis as online store
Signed-off-by: qooba <dev@qooba.net>
  • Loading branch information
qooba authored and woop committed Jun 9, 2021
commit d97aaea26e661f4c43ba5e39cb4598d978c922c6
5 changes: 4 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,10 @@ def get_online_features(
table, union_of_entity_keys, entity_name_to_join_key_map
)
read_rows = provider.online_read(
project=self.project, table=table, entity_keys=entity_keys,
project=self.project,
table=table,
entity_keys=entity_keys,
requested_features=requested_features,
)
for row_idx, read_row in enumerate(read_rows):
row_ts, feature_data = read_row
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def online_read(
project: str,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
client = self._initialize_client()

Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def online_read(
project: str,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:

conn = self._get_conn()
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def online_read(
project: str,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Read feature values given an Entity Key. This is a low level interface, not
Expand All @@ -144,6 +145,10 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
from feast.infra.gcp import GcpProvider

return GcpProvider(config)
elif config.provider == "redis":
from feast.infra.redis_provider import RedisProvider

return RedisProvider(config)
elif config.provider == "local":
from feast.infra.local import LocalProvider

Expand Down
209 changes: 209 additions & 0 deletions sdk/python/feast/infra/redis_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import mmh3
import pandas as pd
from redis import Redis
from rediscluster import RedisCluster

from feast import FeatureTable, utils
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
from feast.infra.provider import (
Provider,
RetrievalJob,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RedisOnlineStoreConfig, RepoConfig


class RedisProvider(Provider):
_db_path: Path

def __init__(self, config: RepoConfig):
assert isinstance(config.online_store, RedisOnlineStoreConfig)

def _get_client(self):
if os.environ["REDIS_TYPE"] == "REDIS_CLUSTER":
return RedisCluster(
host=os.environ["REDIS_HOST"],
port=os.environ["REDIS_PORT"],
decode_responses=True,
)
else:
return Redis(
host=os.environ["REDIS_HOST"], port=os.environ["REDIS_PORT"], db=0
)

def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
client = self._get_client()
# TODO

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
# according to the repos_operations.py we can delete the whole project
client = self._get_client()
keys = client.keys("{project}:*")
client.unlink(*keys)

def online_write_batch(
self,
project: str,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
client = self._get_client()

entity_hset = {}
feature_view = table.name

for entity_key, values, timestamp, created_ts in data:
redis_key_bin = _redis_key(project, entity_key)
timestamp = utils.make_tzaware(timestamp).strftime("%Y-%m-%d %H:%M:%S")
entity_hset[f"_ts:{feature_view}"] = timestamp

if created_ts is not None:
created_ts = utils.make_tzaware(created_ts).strftime(
"%Y-%m-%d %H:%M:%S"
)
entity_hset[f"_created_ts:{feature_view}"] = created_ts

for feature_name, val in values.items():
f_key = _mmh3(f"{feature_view}:{feature_name}")
entity_hset[f_key] = val.SerializeToString()

client.hset(redis_key_bin, mapping=entity_hset)

def online_read(
self,
project: str,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:

client = self._get_client()
feature_view = table.name

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

for entity_key in entity_keys:
redis_key_bin = _redis_key(project, entity_key)
hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features]
ts_key = f"_ts:{feature_view}"
hset_keys.append(ts_key)
values = client.hmget(redis_key_bin, hset_keys)

requested_features.append(ts_key)
res_val = dict(zip(requested_features, values))
res_ts = res_val.pop(ts_key)

res = {}
for feature_name, val_bin in res_val.items():
val = ValueProto()
val.ParseFromString(val_bin)
res[feature_name] = val

if not res:
result.append((None, None))
else:
result.append((res_ts, res))
return result

def materialize_single_feature_view(
self,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
) -> None:
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
event_timestamp_column,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

offline_store = get_offline_store_from_sources([feature_view.input])
table = offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(project, feature_view, rows_to_write, None)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)

@staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
offline_store = get_offline_store_from_sources(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've moved to using explicitly defined offline stores https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/gcp.py#L47

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

[feature_view.input for feature_view in feature_views]
)
return offline_store.get_historical_features(
config=config,
feature_views=feature_views,
feature_refs=feature_refs,
entity_df=entity_df,
registry=registry,
project=project,
)


def _redis_key(project: str, entity_key: EntityKeyProto) -> str:
key = _mmh3(serialize_entity_key(entity_key))
return f"{project}:{key}"


def _mmh3(key: str) -> str:
return mmh3.hash_bytes(key).hex()
18 changes: 16 additions & 2 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,16 @@ class DatastoreOnlineStoreConfig(FeastBaseModel):
""" (optional) Amount of feature rows per batch being written into Datastore"""


OnlineStoreConfig = Union[DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig]
class RedisOnlineStoreConfig(FeastBaseModel):
"""Online store config for Redis store"""

type: Literal["redis"] = "redis"
"""Online store type selector"""


OnlineStoreConfig = Union[
DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig, RedisOnlineStoreConfig
]


class FileOfflineStoreConfig(FeastBaseModel):
Expand Down Expand Up @@ -101,7 +110,7 @@ class RepoConfig(FeastBaseModel):
"""

provider: StrictStr
""" str: local or gcp """
""" str: local or gcp or redis """

online_store: OnlineStoreConfig = SqliteOnlineStoreConfig()
""" OnlineStoreConfig: Online store configuration (optional depending on provider) """
Expand Down Expand Up @@ -141,6 +150,9 @@ def _validate_online_store_config(cls, values):
values["online_store"]["type"] = "sqlite"
elif values["provider"] == "gcp":
values["online_store"]["type"] = "datastore"
elif values["provider"] == "redis":
values["online_store"]["type"] = "redis"


online_store_type = values["online_store"]["type"]

Expand All @@ -153,6 +165,8 @@ def _validate_online_store_config(cls, values):
SqliteOnlineStoreConfig(**values["online_store"])
elif online_store_type == "datastore":
DatastoreOnlineStoreConfig(**values["online_store"])
elif online_store_type == "redis":
RedisOnlineStoreConfig(**values["online_store"])
else:
raise ValueError(f"Invalid online store type {online_store_type}")
except ValidationError as e:
Expand Down
1 change: 1 addition & 0 deletions sdk/python/tests/foo_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def online_read(
project: str,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
pass

Expand Down