Skip to content

Commit 469234a

Browse files
authored
Refactor OnlineStoreConfig classes into owning modules (feast-dev#1649)
* Refactor OnlineStoreConfig classes into owning modules Signed-off-by: Achal Shah <achals@gmail.com> * make format Signed-off-by: Achal Shah <achals@gmail.com> * Move redis too Signed-off-by: Achal Shah <achals@gmail.com> * update test_telemetery Signed-off-by: Achal Shah <achals@gmail.com> * add a create_repo_config method that should be called instead of RepoConfig ctor directly Signed-off-by: Achal Shah <achals@gmail.com> * fix the table reference in repo_operations Signed-off-by: Achal Shah <achals@gmail.com> * reuse create_repo_config Signed-off-by: Achal Shah <achals@gmail.com> Remove redis provider reference * CR comments Signed-off-by: Achal Shah <achals@gmail.com> * Remove create_repo_config in favor of __init__ Signed-off-by: Achal Shah <achals@gmail.com> * make format Signed-off-by: Achal Shah <achals@gmail.com> * Remove print statement Signed-off-by: Achal Shah <achals@gmail.com>
1 parent 8e4bbe5 commit 469234a

13 files changed

Lines changed: 192 additions & 163 deletions

sdk/python/feast/errors.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ def __init__(self, provider_name):
4949
super().__init__(f"Provider '{provider_name}' is not implemented")
5050

5151

52-
class FeastProviderModuleImportError(Exception):
53-
def __init__(self, module_name):
54-
super().__init__(f"Could not import provider module '{module_name}'")
52+
class FeastModuleImportError(Exception):
53+
def __init__(self, module_name, module_type="provider"):
54+
super().__init__(f"Could not import {module_type} module '{module_name}'")
5555

5656

57-
class FeastProviderClassImportError(Exception):
58-
def __init__(self, module_name, class_name):
57+
class FeastClassImportError(Exception):
58+
def __init__(self, module_name, class_name, class_type="provider"):
5959
super().__init__(
60-
f"Could not import provider '{class_name}' from module '{module_name}'"
60+
f"Could not import {class_type} '{class_name}' from module '{module_name}'"
6161
)
6262

6363

@@ -78,6 +78,20 @@ def __init__(self, offline_store_name: str, data_source_name: str):
7878
)
7979

8080

81+
class FeastOnlineStoreInvalidName(Exception):
82+
def __init__(self, online_store_class_name: str):
83+
super().__init__(
84+
f"Online Store Class '{online_store_class_name}' should end with the string `OnlineStore`.'"
85+
)
86+
87+
88+
class FeastOnlineStoreConfigInvalidName(Exception):
89+
def __init__(self, online_store_config_class_name: str):
90+
super().__init__(
91+
f"Online Store Config Class '{online_store_config_class_name}' should end with the string `OnlineStoreConfig`.'"
92+
)
93+
94+
8195
class FeastOnlineStoreUnsupportedDataSource(Exception):
8296
def __init__(self, online_store_name: str, data_source_name: str):
8397
super().__init__(

sdk/python/feast/infra/online_stores/datastore.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
1818

1919
import mmh3
20+
from pydantic import PositiveInt, StrictStr
21+
from pydantic.typing import Literal
2022

2123
from feast import Entity, FeatureTable, utils
2224
from feast.feature_view import FeatureView
2325
from feast.infra.key_encoding_utils import serialize_entity_key
2426
from feast.infra.online_stores.online_store import OnlineStore
2527
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2628
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
27-
from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig
29+
from feast.repo_config import FeastConfigBaseModel, RepoConfig
2830

2931
try:
3032
from google.auth.exceptions import DefaultCredentialsError
@@ -40,6 +42,25 @@
4042
]
4143

4244

45+
class DatastoreOnlineStoreConfig(FeastConfigBaseModel):
46+
""" Online store config for GCP Datastore """
47+
48+
type: Literal["datastore"] = "datastore"
49+
""" Online store type selector"""
50+
51+
project_id: Optional[StrictStr] = None
52+
""" (optional) GCP Project Id """
53+
54+
namespace: Optional[StrictStr] = None
55+
""" (optional) Datastore namespace """
56+
57+
write_concurrency: Optional[PositiveInt] = 40
58+
""" (optional) Amount of threads to use when writing batches of feature rows into Datastore"""
59+
60+
write_batch_size: Optional[PositiveInt] = 50
61+
""" (optional) Amount of feature rows per batch being written into Datastore"""
62+
63+
4364
class DatastoreOnlineStore(OnlineStore):
4465
"""
4566
OnlineStore is an object used for all interaction between Feast and the service used for offline storage of

sdk/python/feast/infra/online_stores/helpers.py

Lines changed: 27 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,41 @@
1+
import importlib
12
import struct
2-
from typing import Any, Dict, Set
3+
from typing import Any
34

45
import mmh3
56

6-
from feast.data_source import BigQuerySource, DataSource, FileSource
7-
from feast.errors import FeastOnlineStoreUnsupportedDataSource
7+
from feast import errors
88
from feast.infra.online_stores.online_store import OnlineStore
99
from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto
1010
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
11-
from feast.repo_config import (
12-
DatastoreOnlineStoreConfig,
13-
OnlineStoreConfig,
14-
RedisOnlineStoreConfig,
15-
SqliteOnlineStoreConfig,
16-
)
1711

1812

19-
def get_online_store_from_config(
20-
online_store_config: OnlineStoreConfig,
21-
) -> OnlineStore:
13+
def get_online_store_from_config(online_store_config: Any,) -> OnlineStore:
2214
"""Get the offline store from offline store config"""
2315

24-
if isinstance(online_store_config, SqliteOnlineStoreConfig):
25-
from feast.infra.online_stores.sqlite import SqliteOnlineStore
26-
27-
return SqliteOnlineStore()
28-
elif isinstance(online_store_config, DatastoreOnlineStoreConfig):
29-
from feast.infra.online_stores.datastore import DatastoreOnlineStore
30-
31-
return DatastoreOnlineStore()
32-
elif isinstance(online_store_config, RedisOnlineStoreConfig):
33-
from feast.infra.online_stores.redis import RedisOnlineStore
34-
35-
return RedisOnlineStore()
36-
raise ValueError(f"Unsupported offline store config '{online_store_config}'")
37-
38-
39-
SUPPORTED_SOURCES: Dict[Any, Set[Any]] = {
40-
SqliteOnlineStoreConfig: {FileSource},
41-
DatastoreOnlineStoreConfig: {BigQuerySource},
42-
RedisOnlineStoreConfig: {FileSource, BigQuerySource},
43-
}
44-
45-
46-
def assert_online_store_supports_data_source(
47-
online_store_config: OnlineStoreConfig, data_source: DataSource
48-
):
49-
supported_sources: Set[Any] = SUPPORTED_SOURCES.get(
50-
online_store_config.__class__, set()
51-
)
52-
# This is needed because checking for `in` with Union types breaks mypy.
53-
# https://github.com/python/mypy/issues/4954
54-
# We can replace this with `data_source.__class__ in SUPPORTED_SOURCES[online_store_config.__class__]`
55-
# Once ^ is resolved.
56-
if supported_sources:
57-
for source in supported_sources:
58-
if source == data_source.__class__:
59-
return
60-
raise FeastOnlineStoreUnsupportedDataSource(
61-
online_store_config.type, data_source.__class__.__name__
62-
)
16+
module_name = online_store_config.__module__
17+
qualified_name = type(online_store_config).__name__
18+
store_class_name = qualified_name.replace("Config", "")
19+
try:
20+
module = importlib.import_module(module_name)
21+
except Exception as e:
22+
# The original exception can be anything - either module not found,
23+
# or any other kind of error happening during the module import time.
24+
# So we should include the original error as well in the stack trace.
25+
raise errors.FeastModuleImportError(
26+
module_name, module_type="OnlineStore"
27+
) from e
28+
29+
# Try getting the provider class definition
30+
try:
31+
online_store_class = getattr(module, store_class_name)
32+
except AttributeError:
33+
# This can only be one type of error, when class_name attribute does not exist in the module
34+
# So we don't have to include the original exception here
35+
raise errors.FeastClassImportError(
36+
module_name, store_class_name, class_type="OnlineStore"
37+
) from None
38+
return online_store_class()
6339

6440

6541
def _redis_key(project: str, entity_key: EntityKeyProto):

sdk/python/feast/infra/online_stores/redis.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,19 @@
1313
# limitations under the License.
1414
import json
1515
from datetime import datetime
16+
from enum import Enum
1617
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
1718

1819
from google.protobuf.timestamp_pb2 import Timestamp
20+
from pydantic import StrictStr
21+
from pydantic.typing import Literal
1922

2023
from feast import Entity, FeatureTable, FeatureView, RepoConfig, utils
2124
from feast.infra.online_stores.helpers import _mmh3, _redis_key
2225
from feast.infra.online_stores.online_store import OnlineStore
2326
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2427
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
25-
from feast.repo_config import RedisOnlineStoreConfig, RedisType
28+
from feast.repo_config import FeastConfigBaseModel
2629

2730
try:
2831
from redis import Redis
@@ -35,6 +38,25 @@
3538
EX_SECONDS = 253402300799
3639

3740

41+
class RedisType(str, Enum):
42+
redis = "redis"
43+
redis_cluster = "redis_cluster"
44+
45+
46+
class RedisOnlineStoreConfig(FeastConfigBaseModel):
47+
"""Online store config for Redis store"""
48+
49+
type: Literal["redis"] = "redis"
50+
"""Online store type selector"""
51+
52+
redis_type: RedisType = RedisType.redis
53+
"""Redis type: redis or redis_cluster"""
54+
55+
connection_string: StrictStr = "localhost:6379"
56+
"""Connection string containing the host, port, and configuration parameters for Redis
57+
format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """
58+
59+
3860
class RedisOnlineStore(OnlineStore):
3961
_client: Optional[Union[Redis, RedisCluster]] = None
4062

@@ -99,7 +121,6 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig):
99121
startup_nodes, kwargs = self._parse_connection_string(
100122
online_store_config.connection_string
101123
)
102-
print(f"Startup nodes: {startup_nodes}, {kwargs}")
103124
if online_store_config.type == RedisType.redis_cluster:
104125
kwargs["startup_nodes"] = startup_nodes
105126
self._client = RedisCluster(**kwargs)

sdk/python/feast/infra/online_stores/sqlite.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,26 @@
1919
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
2020

2121
import pytz
22+
from pydantic import StrictStr
23+
from pydantic.schema import Literal
2224

2325
from feast import Entity, FeatureTable
2426
from feast.feature_view import FeatureView
2527
from feast.infra.key_encoding_utils import serialize_entity_key
2628
from feast.infra.online_stores.online_store import OnlineStore
2729
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2830
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
29-
from feast.repo_config import RepoConfig
31+
from feast.repo_config import FeastConfigBaseModel, RepoConfig
32+
33+
34+
class SqliteOnlineStoreConfig(FeastConfigBaseModel):
35+
""" Online store config for local (SQLite-based) store """
36+
37+
type: Literal["sqlite"] = "sqlite"
38+
""" Online store type selector"""
39+
40+
path: StrictStr = "data/online.db"
41+
""" (optional) Path to sqlite db """
3042

3143

3244
class SqliteOnlineStore(OnlineStore):
@@ -65,6 +77,7 @@ def online_write_batch(
6577
],
6678
progress: Optional[Callable[[int], Any]],
6779
) -> None:
80+
6881
conn = self._get_conn(config)
6982

7083
project = config.project

sdk/python/feast/infra/provider.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,17 +163,15 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
163163
# The original exception can be anything - either module not found,
164164
# or any other kind of error happening during the module import time.
165165
# So we should include the original error as well in the stack trace.
166-
raise errors.FeastProviderModuleImportError(module_name) from e
166+
raise errors.FeastModuleImportError(module_name) from e
167167

168168
# Try getting the provider class definition
169169
try:
170170
ProviderCls = getattr(module, class_name)
171171
except AttributeError:
172172
# This can only be one type of error, when class_name attribute does not exist in the module
173173
# So we don't have to include the original exception here
174-
raise errors.FeastProviderClassImportError(
175-
module_name, class_name
176-
) from None
174+
raise errors.FeastClassImportError(module_name, class_name) from None
177175

178176
return ProviderCls(config, repo_path)
179177

0 commit comments

Comments
 (0)