Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add redis provider
Signed-off-by: qooba <dev@qooba.net>
  • Loading branch information
qooba authored and woop committed Jun 9, 2021
commit e165f003a81ad386359626c2e5a5d55602befd60
56 changes: 43 additions & 13 deletions sdk/python/feast/infra/redis_provider.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redis.py for the file name is fine, for consistency.

import os
from datetime import datetime
from pathlib import Path
Expand Down Expand Up @@ -32,18 +33,6 @@ class RedisProvider(Provider):
def __init__(self, config: RepoConfig):
assert isinstance(config.online_store, RedisOnlineStoreConfig)

def _get_client(self):
if os.environ["REDIS_TYPE"] == "REDIS_CLUSTER":
return RedisCluster(
host=os.environ["REDIS_HOST"],
port=os.environ["REDIS_PORT"],
decode_responses=True,
)
else:
return Redis(
host=os.environ["REDIS_HOST"], port=os.environ["REDIS_PORT"], db=0
)

def update_infra(
self,
project: str,
Expand All @@ -54,7 +43,6 @@ def update_infra(
partial: bool,
):
client = self._get_client()
# TODO

def teardown_infra(
self,
Expand Down Expand Up @@ -178,6 +166,48 @@ def materialize_single_feature_view(
feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)

def _get_cs(self):
"""
Reads Redis connections string using format
for RedisCluster:
redis1:6379,redis2:6379,decode_responses=true,skip_full_coverage_check=true,ssl=true,password=...
for Redis:
redis_master:6379,db=0,ssl=true,password=...
"""
connection_string = os.environ["REDIS_CONNECTION_STRING"]
startup_nodes = [
dict(zip(["host", "port"], c.split(":")))
for c in connection_string.split(",")
if not "=" in c
]
params = {}
for c in connection_string.split(","):
if "=" in c:
kv = c.split("=")
try:
kv[1] = json.loads(kv[1])
except json.JSONDecodeError:
...

it = iter(kv)
params.update(dict(zip(it, it)))

return startup_nodes, params

def _get_client(self):
"""
Creates the Redis client RedisCluster or Redis depending on configuration
"""
startup_nodes, kwargs = self._get_cs()

if os.environ["REDIS_TYPE"] == "REDIS_CLUSTER":
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We certainly should be moving to using purely feature_store.yaml for configuration, and not environmental variables. It will be too hard to keep track of these configuration backdoors.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does feature_store.yaml support provider-specific configuration? If it does, I agree that using it may be better than environment variables, except the connection string, of course. I assume the project YAML is supposed to be stored with the project in git, etc., so you wouldn't add connection details there?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is the goal. Also, I think we can add environmental variable loading when parsing the feature_store.yaml file, or optionally make it possible to set values inside the file using env vars. That way we are keeping track of all options in a single place, and loading of env vars happens during initialization, not in a place deeply nested in the code.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's a good idea! I agree that it's best to have a single place that all of the configuration options are visible.

kwargs["startup_nodes"] = startup_nodes
return RedisCluster(**kwargs)
else:
kwargs["host"] = startup_nodes[0]["host"]
kwargs["port"] = startup_nodes[0]["port"]
return Redis(**kwargs)

@staticmethod
def get_historical_features(
config: RepoConfig,
Expand Down