Skip to content
Merged
Next Next commit
Refactor OnlineStoreConfig classes into owning modules
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jun 15, 2021
commit fe441c1d5938e5cc384c6185dc2ab237c91e4ce8
26 changes: 20 additions & 6 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ def __init__(self, provider_name):
super().__init__(f"Provider '{provider_name}' is not implemented")


class FeastProviderModuleImportError(Exception):
def __init__(self, module_name):
super().__init__(f"Could not import provider module '{module_name}'")
class FeastModuleImportError(Exception):
def __init__(self, module_name, module_type="provider"):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove the default module_type here, instead explicitly pass the module.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in #1657

super().__init__(f"Could not import {module_type} module '{module_name}'")


class FeastProviderClassImportError(Exception):
def __init__(self, module_name, class_name):
class FeastClassImportError(Exception):
def __init__(self, module_name, class_name, class_type="provider"):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in #1657

super().__init__(
f"Could not import provider '{class_name}' from module '{module_name}'"
f"Could not import {class_type} '{class_name}' from module '{module_name}'"
)


Expand All @@ -71,6 +71,20 @@ def __init__(self, offline_store_name: str, data_source_name: str):
)


class FeastOnlineStoreInvalidName(Exception):
def __init__(self, online_store_class_name: str):
super().__init__(
f"Online Store Class '{online_store_class_name}' should end with the string `OnlineStore`.'"
)


class FeastOnlineStoreConfigInvalidName(Exception):
def __init__(self, online_store_config_class_name: str):
super().__init__(
f"Online Store Config Class '{online_store_config_class_name}' should end with the string `OnlineStoreConfig`.'"
)


class FeastOnlineStoreUnsupportedDataSource(Exception):
def __init__(self, online_store_name: str, data_source_name: str):
super().__init__(
Expand Down
23 changes: 22 additions & 1 deletion sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig
from feast.repo_config import RepoConfig, FeastConfigBaseModel
from pydantic import StrictStr, PositiveInt
from pydantic.typing import Literal

try:
from google.auth.exceptions import DefaultCredentialsError
Expand All @@ -40,6 +42,25 @@
]


class DatastoreOnlineStoreConfig(FeastConfigBaseModel):
""" Online store config for GCP Datastore """

type: Literal["datastore"] = "datastore"
""" Online store type selector"""

project_id: Optional[StrictStr] = None
""" (optional) GCP Project Id """

namespace: Optional[StrictStr] = None
""" (optional) Datastore namespace """

write_concurrency: Optional[PositiveInt] = 40
""" (optional) Amount of threads to use when writing batches of feature rows into Datastore"""

write_batch_size: Optional[PositiveInt] = 50
""" (optional) Amount of feature rows per batch being written into Datastore"""


class DatastoreOnlineStore(OnlineStore):
"""
OnlineStore is an object used for all interaction between Feast and the service used for offline storage of
Expand Down
37 changes: 3 additions & 34 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
import struct
from typing import Any, Dict, Set

import mmh3

from feast.data_source import BigQuerySource, DataSource, FileSource
from feast.errors import FeastOnlineStoreUnsupportedDataSource
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.repo_config import (
DatastoreOnlineStoreConfig,
OnlineStoreConfig,
RedisOnlineStoreConfig,
SqliteOnlineStoreConfig,
)


Expand All @@ -21,45 +16,19 @@ def get_online_store_from_config(
) -> OnlineStore:
"""Get the offline store from offline store config"""

if isinstance(online_store_config, SqliteOnlineStoreConfig):
if online_store_config.__repr_name__() == "SqliteOnlineStoreConfig":
from feast.infra.online_stores.sqlite import SqliteOnlineStore

return SqliteOnlineStore()
elif isinstance(online_store_config, DatastoreOnlineStoreConfig):
elif online_store_config.__repr_name__() == "DatastoreOnlineStoreConfig":
from feast.infra.online_stores.datastore import DatastoreOnlineStore

return DatastoreOnlineStore()
elif isinstance(online_store_config, RedisOnlineStoreConfig):
from feast.infra.online_stores.redis import RedisOnlineStore

return RedisOnlineStore()
raise ValueError(f"Unsupported offline store config '{online_store_config}'")


SUPPORTED_SOURCES: Dict[Any, Set[Any]] = {
SqliteOnlineStoreConfig: {FileSource},
DatastoreOnlineStoreConfig: {BigQuerySource},
RedisOnlineStoreConfig: {FileSource, BigQuerySource},
}


def assert_online_store_supports_data_source(
online_store_config: OnlineStoreConfig, data_source: DataSource
):
supported_sources: Set[Any] = SUPPORTED_SOURCES.get(
online_store_config.__class__, set()
)
# This is needed because checking for `in` with Union types breaks mypy.
# https://github.com/python/mypy/issues/4954
# We can replace this with `data_source.__class__ in SUPPORTED_SOURCES[online_store_config.__class__]`
# Once ^ is resolved.
if supported_sources:
for source in supported_sources:
if source == data_source.__class__:
return
raise FeastOnlineStoreUnsupportedDataSource(
online_store_config.type, data_source.__class__.__name__
)
raise ValueError(f"Unsupported online store config '{online_store_config}'")


def _redis_key(project: str, entity_key: EntityKeyProto):
Expand Down
14 changes: 13 additions & 1 deletion sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,19 @@
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from feast.repo_config import RepoConfig, FeastConfigBaseModel
from pydantic import StrictStr
from pydantic.schema import Literal


class SqliteOnlineStoreConfig(FeastConfigBaseModel):
""" Online store config for local (SQLite-based) store """

type: Literal["sqlite"] = "sqlite"
""" Online store type selector"""

path: StrictStr = "data/online.db"
""" (optional) Path to sqlite db """


class SqliteOnlineStore(OnlineStore):
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,15 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
# The original exception can be anything - either module not found,
# or any other kind of error happening during the module import time.
# So we should include the original error as well in the stack trace.
raise errors.FeastProviderModuleImportError(module_name) from e
raise errors.FeastModuleImportError(module_name) from e

# Try getting the provider class definition
try:
ProviderCls = getattr(module, class_name)
except AttributeError:
# This can only be one type of error, when class_name attribute does not exist in the module
# So we don't have to include the original exception here
raise errors.FeastProviderClassImportError(
raise errors.FeastClassImportError(
module_name, class_name
) from None

Expand Down
101 changes: 61 additions & 40 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import importlib
from enum import Enum
from pathlib import Path
from typing import TypeVar, Generic, Any

import yaml
from pydantic import (
BaseModel,
PositiveInt,
StrictInt,
StrictStr,
ValidationError,
Expand All @@ -14,43 +15,35 @@
from pydantic.typing import Dict, Literal, Optional, Union

from feast.telemetry import log_exceptions
from feast import errors


# This dict exists so that:
# - existing values for the online store type in featurestore.yaml files continue to work in a backwards compatible way
# - first party and third party implementations can use the same class loading code path.
ONLINE_CONFIG_CLASS_FOR_TYPE = {
'sqlite': 'feast.infra.online_stores.sqlite.SqliteOnlineStore',
'datastore': 'feast.infra.online_stores.datastore.DatastoreOnlineStore'
}


class FeastBaseModel(BaseModel):
""" Feast Pydantic Configuration Class """

class Config:
arbitrary_types_allowed = True
extra = "forbid"


class SqliteOnlineStoreConfig(FeastBaseModel):
""" Online store config for local (SQLite-based) store """

type: Literal["sqlite"] = "sqlite"
""" Online store type selector"""
extra = "allow"
Comment thread
achals marked this conversation as resolved.
Outdated

path: StrictStr = "data/online.db"
""" (optional) Path to sqlite db """

class FeastConfigBaseModel(BaseModel):
""" Feast Pydantic Configuration Class """

class DatastoreOnlineStoreConfig(FeastBaseModel):
""" Online store config for GCP Datastore """

type: Literal["datastore"] = "datastore"
""" Online store type selector"""

project_id: Optional[StrictStr] = None
""" (optional) GCP Project Id """

namespace: Optional[StrictStr] = None
""" (optional) Datastore namespace """
class Config:
arbitrary_types_allowed = True
extra = "forbid"

write_concurrency: Optional[PositiveInt] = 40
""" (optional) Amount of threads to use when writing batches of feature rows into Datastore"""

write_batch_size: Optional[PositiveInt] = 50
""" (optional) Amount of feature rows per batch being written into Datastore"""
OnlineT = TypeVar('OnlineT', bound=FeastConfigBaseModel)


class RedisType(str, Enum):
Expand All @@ -72,9 +65,7 @@ class RedisOnlineStoreConfig(FeastBaseModel):
format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """


OnlineStoreConfig = Union[
DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig, RedisOnlineStoreConfig
]
OnlineStoreConfig = Union[RedisOnlineStoreConfig]


class FileOfflineStoreConfig(FeastBaseModel):
Expand Down Expand Up @@ -125,7 +116,7 @@ class RepoConfig(FeastBaseModel):
provider: StrictStr
""" str: local or gcp or redis """

online_store: OnlineStoreConfig = SqliteOnlineStoreConfig()
online_store: Any
Comment thread
woop marked this conversation as resolved.
""" OnlineStoreConfig: Online store configuration (optional depending on provider) """

offline_store: OfflineStoreConfig = FileOfflineStoreConfig()
Expand Down Expand Up @@ -168,22 +159,17 @@ def _validate_online_store_config(cls, values):

online_store_type = values["online_store"]["type"]

# Make sure the user hasn't provided the wrong type
assert online_store_type in ["datastore", "sqlite", "redis"]

# Validate the dict to ensure one of the union types match
try:
if online_store_type == "sqlite":
SqliteOnlineStoreConfig(**values["online_store"])
elif online_store_type == "datastore":
DatastoreOnlineStoreConfig(**values["online_store"])
elif online_store_type == "redis":
if online_store_type == "redis":
RedisOnlineStoreConfig(**values["online_store"])
else:
raise ValueError(f"Invalid online store type {online_store_type}")
online_config_class = get_online_config_from_type(online_store_type)
online_config_class(**values["online_store"])
except ValidationError as e:

raise ValidationError(
[ErrorWrapper(e, loc="online_store")], model=SqliteOnlineStoreConfig,
[ErrorWrapper(e, loc="online_store")], model=RepoConfig,
)

return values
Expand Down Expand Up @@ -246,6 +232,36 @@ def __repr__(self) -> str:
)


def get_online_config_from_type(online_store_type: str):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should also be in sdk/python/feast/infra/online_stores/helpers.py

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like leaving it here since it's more config-related. But happy to change if you feel strongly.

if online_store_type in ONLINE_CONFIG_CLASS_FOR_TYPE:
online_store_type = ONLINE_CONFIG_CLASS_FOR_TYPE[online_store_type]
module_name, class_name = online_store_type.rsplit(".", 1)

if not class_name.endswith('OnlineStore'):
raise errors.FeastOnlineStoreConfigInvalidName(class_name)
config_class_name = f"{class_name}Config"

# Try importing the module that contains the custom provider
try:
module = importlib.import_module(module_name)
except Exception as e:
# The original exception can be anything - either module not found,
# or any other kind of error happening during the module import time.
# So we should include the original error as well in the stack trace.
raise errors.FeastModuleImportError(module_name, module_type="OnlineStore") from e

# Try getting the provider class definition
try:
online_store_config_class = getattr(module, config_class_name)
except AttributeError:
# This can only be one type of error, when class_name attribute does not exist in the module
# So we don't have to include the original exception here
raise errors.FeastClassImportError(
module_name, config_class_name, class_type="OnlineStoreConfig"
) from None
return online_store_config_class


def load_repo_config(repo_path: Path) -> RepoConfig:
config_path = repo_path / "feature_store.yaml"

Expand All @@ -254,6 +270,11 @@ def load_repo_config(repo_path: Path) -> RepoConfig:
try:
c = RepoConfig(**raw_config)
c.repo_path = repo_path
online_config_class = get_online_config_from_type(c.dict()['online_store']['type'])
c.online_store = online_config_class(**c.dict()['online_store'])
return c
except ValidationError as e:
raise FeastConfigError(e, config_path)



5 changes: 3 additions & 2 deletions sdk/python/tests/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
from tempfile import mkstemp

import pytest
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from pytest_lazyfixture import lazy_fixture
from utils.data_source_utils import (
from tests.utils.data_source_utils import (
prep_file_source,
simple_bq_source_using_query_arg,
simple_bq_source_using_table_ref_arg,
Expand All @@ -30,7 +31,7 @@
from feast.feature_store import FeatureStore
from feast.feature_view import FeatureView
from feast.protos.feast.types import Value_pb2 as ValueProto
from feast.repo_config import RepoConfig, SqliteOnlineStoreConfig
from feast.repo_config import RepoConfig
from feast.value_type import ValueType


Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numpy as np
import pandas as pd
import pytest
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from google.cloud import bigquery
from pandas.testing import assert_frame_equal
from pytz import utc
Expand All @@ -24,7 +25,6 @@
from feast.repo_config import (
BigQueryOfflineStoreConfig,
RepoConfig,
SqliteOnlineStoreConfig,
)
from feast.value_type import ValueType

Expand Down
Loading