Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make engine importable
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jul 6, 2022
commit ff680a927bd4c781ac0965d67989ae7f4449efb8
38 changes: 31 additions & 7 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pyarrow as pa
from tqdm import tqdm

from feast import importer
from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.feature_logging import FeatureServiceLoggingSource
Expand Down Expand Up @@ -72,13 +73,36 @@ def batch_engine(self) -> BatchMaterializationEngine:
if self._batch_engine:
return self._batch_engine
else:
from feast.infra.materialization import LocalMaterializationEngine

_batch_engine = LocalMaterializationEngine(
repo_config=self.repo_config,
offline_store=self.offline_store,
online_store=self.online_store,
)
engine_config = self.repo_config.batch_engine_config
config_is_dict = False
if isinstance(engine_config, str):
engine_config_type = engine_config
elif isinstance(engine_config, Dict):
if "type" not in engine_config:
raise ValueError("engine_config needs to have a `type` specified.")
engine_config_type = engine_config["type"]
config_is_dict = True
else:
raise RuntimeError(f"Invalid config type specified for batch_engine: {type(engine_config)}")

if engine_config_type in BATCH_ENGINE_CLASS_FOR_TYPE:
engine_config_type = BATCH_ENGINE_CLASS_FOR_TYPE[engine_config_type]
engine_module, engine_class = engine_config_type.rsplit('.', 1)
engine_class = importer.import_class(engine_module, engine_class)

if config_is_dict:
_batch_engine = engine_class(
repo_config=self.repo_config,
offline_store=self.offline_store,
online_store=self.online_store,
**engine_config
)
else:
_batch_engine = engine_class(
repo_config=self.repo_config,
offline_store=self.offline_store,
online_store=self.online_store,
)
self._batch_engine = _batch_engine
return _batch_engine

Expand Down