|
| 1 | +import importlib |
1 | 2 | import struct |
2 | | -from typing import Any, Dict, Set |
| 3 | +from typing import Any |
3 | 4 |
|
4 | 5 | import mmh3 |
5 | 6 |
|
6 | | -from feast.data_source import BigQuerySource, DataSource, FileSource |
7 | | -from feast.errors import FeastOnlineStoreUnsupportedDataSource |
| 7 | +from feast import errors |
8 | 8 | from feast.infra.online_stores.online_store import OnlineStore |
9 | 9 | from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto |
10 | 10 | from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto |
11 | | -from feast.repo_config import ( |
12 | | - DatastoreOnlineStoreConfig, |
13 | | - OnlineStoreConfig, |
14 | | - RedisOnlineStoreConfig, |
15 | | - SqliteOnlineStoreConfig, |
16 | | -) |
17 | 11 |
|
18 | 12 |
|
19 | | -def get_online_store_from_config( |
20 | | - online_store_config: OnlineStoreConfig, |
21 | | -) -> OnlineStore: |
| 13 | +def get_online_store_from_config(online_store_config: Any,) -> OnlineStore: |
22 | 14 | """Get the offline store from offline store config""" |
23 | 15 |
|
24 | | - if isinstance(online_store_config, SqliteOnlineStoreConfig): |
25 | | - from feast.infra.online_stores.sqlite import SqliteOnlineStore |
26 | | - |
27 | | - return SqliteOnlineStore() |
28 | | - elif isinstance(online_store_config, DatastoreOnlineStoreConfig): |
29 | | - from feast.infra.online_stores.datastore import DatastoreOnlineStore |
30 | | - |
31 | | - return DatastoreOnlineStore() |
32 | | - elif isinstance(online_store_config, RedisOnlineStoreConfig): |
33 | | - from feast.infra.online_stores.redis import RedisOnlineStore |
34 | | - |
35 | | - return RedisOnlineStore() |
36 | | - raise ValueError(f"Unsupported offline store config '{online_store_config}'") |
37 | | - |
38 | | - |
39 | | -SUPPORTED_SOURCES: Dict[Any, Set[Any]] = { |
40 | | - SqliteOnlineStoreConfig: {FileSource}, |
41 | | - DatastoreOnlineStoreConfig: {BigQuerySource}, |
42 | | - RedisOnlineStoreConfig: {FileSource, BigQuerySource}, |
43 | | -} |
44 | | - |
45 | | - |
46 | | -def assert_online_store_supports_data_source( |
47 | | - online_store_config: OnlineStoreConfig, data_source: DataSource |
48 | | -): |
49 | | - supported_sources: Set[Any] = SUPPORTED_SOURCES.get( |
50 | | - online_store_config.__class__, set() |
51 | | - ) |
52 | | - # This is needed because checking for `in` with Union types breaks mypy. |
53 | | - # https://github.com/python/mypy/issues/4954 |
54 | | - # We can replace this with `data_source.__class__ in SUPPORTED_SOURCES[online_store_config.__class__]` |
55 | | - # Once ^ is resolved. |
56 | | - if supported_sources: |
57 | | - for source in supported_sources: |
58 | | - if source == data_source.__class__: |
59 | | - return |
60 | | - raise FeastOnlineStoreUnsupportedDataSource( |
61 | | - online_store_config.type, data_source.__class__.__name__ |
62 | | - ) |
| 16 | + module_name = online_store_config.__module__ |
| 17 | + qualified_name = type(online_store_config).__name__ |
| 18 | + store_class_name = qualified_name.replace("Config", "") |
| 19 | + try: |
| 20 | + module = importlib.import_module(module_name) |
| 21 | + except Exception as e: |
| 22 | + # The original exception can be anything - either module not found, |
| 23 | + # or any other kind of error happening during the module import time. |
| 24 | + # So we should include the original error as well in the stack trace. |
| 25 | + raise errors.FeastModuleImportError( |
| 26 | + module_name, module_type="OnlineStore" |
| 27 | + ) from e |
| 28 | + |
| 29 | + # Try getting the provider class definition |
| 30 | + try: |
| 31 | + online_store_class = getattr(module, store_class_name) |
| 32 | + except AttributeError: |
| 33 | + # This can only be one type of error, when class_name attribute does not exist in the module |
| 34 | + # So we don't have to include the original exception here |
| 35 | + raise errors.FeastClassImportError( |
| 36 | + module_name, store_class_name, class_type="OnlineStore" |
| 37 | + ) from None |
| 38 | + return online_store_class() |
63 | 39 |
|
64 | 40 |
|
65 | 41 | def _redis_key(project: str, entity_key: EntityKeyProto): |
|
0 commit comments