From 6cc7e75bcde31a760df48b9955786e53788dd142 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 1 Jul 2022 14:05:31 -0700 Subject: [PATCH 01/11] feat: Add scaffolding for batch materialization engine Signed-off-by: Achal Shah --- .../feast/infra/materialization/__init__.py | 13 + .../batch_materialization_engine.py | 61 ++++ .../infra/materialization/local_engine.py | 279 ++++++++++++++++++ .../feast/infra/passthrough_provider.py | 68 ++--- sdk/python/feast/repo_config.py | 29 ++ 5 files changed, 399 insertions(+), 51 deletions(-) create mode 100644 sdk/python/feast/infra/materialization/__init__.py create mode 100644 sdk/python/feast/infra/materialization/batch_materialization_engine.py create mode 100644 sdk/python/feast/infra/materialization/local_engine.py diff --git a/sdk/python/feast/infra/materialization/__init__.py b/sdk/python/feast/infra/materialization/__init__.py new file mode 100644 index 0000000000..65ae02e455 --- /dev/null +++ b/sdk/python/feast/infra/materialization/__init__.py @@ -0,0 +1,13 @@ +from batch_materialization_engine import ( + BatchMaterializationEngine, + MaterializationJob, + MaterializationTask, +) +from local_engine import LocalMaterializationEngine + +__all__ = [ + "MaterializationJob", + "MaterializationTask", + "BatchMaterializationEngine", + "LocalMaterializationEngine", +] diff --git a/sdk/python/feast/infra/materialization/batch_materialization_engine.py b/sdk/python/feast/infra/materialization/batch_materialization_engine.py new file mode 100644 index 0000000000..76ddb7dce3 --- /dev/null +++ b/sdk/python/feast/infra/materialization/batch_materialization_engine.py @@ -0,0 +1,61 @@ +import dataclasses +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Callable, List, Optional + +from tqdm import tqdm + +from feast import RepoConfig +from feast.base_feature_view import BaseFeatureView +from feast.infra.offline_stores.offline_store import OfflineStore +from feast.infra.online_stores.online_store import OnlineStore +from feast.registry import BaseRegistry + + +@dataclasses.dataclass +class MaterializationTask: + project: str + feature_view: BaseFeatureView + start_time: datetime + end_time: datetime + tqdm_builder: Callable[[int], tqdm] + + +class MaterializationJob(ABC): + task: MaterializationTask + + @abstractmethod + def status(self) -> str: + ... + + @abstractmethod + def should_be_retried(self) -> str: + ... + + @abstractmethod + def job_id(self) -> str: + ... + + @abstractmethod + def url(self) -> Optional[str]: + ... + + +class BatchMaterializationEngine(ABC): + def __init__( + self, + *, + repo_config: RepoConfig, + registry: BaseRegistry, + offline_store: OfflineStore, + online_store: OnlineStore, + **kwargs, + ): + self.repo_config = repo_config + self.registry = registry + self.offline_store = offline_store + self.online_store = online_store + + @abstractmethod + def materialize(self, tasks: List[MaterializationTask]) -> List[MaterializationJob]: + ... diff --git a/sdk/python/feast/infra/materialization/local_engine.py b/sdk/python/feast/infra/materialization/local_engine.py new file mode 100644 index 0000000000..e549383a52 --- /dev/null +++ b/sdk/python/feast/infra/materialization/local_engine.py @@ -0,0 +1,279 @@ +from datetime import datetime +from typing import Callable, Dict, List, Literal, Optional, Tuple, Union + +import dask as dd +import pandas as pd +import pyarrow as pa +from tqdm import tqdm + +from feast import Entity, FeatureView, RepoConfig, ValueType +from feast.feature_view import DUMMY_ENTITY_ID +from feast.infra.materialization import ( + BatchMaterializationEngine, + MaterializationJob, + MaterializationTask, +) +from feast.infra.offline_stores.offline_store import OfflineStore +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel +from feast.type_map import python_values_to_proto_values + + +class LocalMaterializationJob(MaterializationJob): + def __init__(self) -> None: + super().__init__() + + def status(self) -> str: + pass + + def should_be_retried(self) -> str: + pass + + def job_id(self) -> str: + pass + + def url(self) -> Optional[str]: + pass + + +DEFAULT_BATCH_SIZE = 10_000 + + +class LocalMaterializationEngineConfig(FeastConfigBaseModel): + """Batch Materialization Engine config for local in-process engine""" + + type: Literal["local"] = "local" + """ Type selector""" + + +class LocalMaterializationEngine(BatchMaterializationEngine): + def __init__( + self, + *, + repo_config: RepoConfig, + offline_store: OfflineStore, + online_store: OnlineStore, + **kwargs, + ): + super().__init__( + repo_config=repo_config, + offline_store=offline_store, + online_store=online_store, + **kwargs, + ) + + def materialize(self, tasks: List[MaterializationTask]) -> List[MaterializationJob]: + return [] + + def materialize_one( + self, + feature_view: FeatureView, + start_date: datetime, + end_date: datetime, + project: str, + tqdm_builder: Callable[[int], tqdm], + ): + entities = [] + for entity_name in feature_view.entities: + entities.append(self.registry.get_entity(entity_name, project)) + + ( + join_key_columns, + feature_name_columns, + timestamp_field, + created_timestamp_column, + ) = _get_column_names(feature_view, entities) + + offline_job = self.offline_store.pull_latest_from_table_or_query( + config=self.repo_config, + data_source=feature_view.batch_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + table = offline_job.to_arrow() + + if feature_view.batch_source.field_mapping is not None: + table = _run_field_mapping(table, feature_view.batch_source.field_mapping) + + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } + + with tqdm_builder(table.num_rows) as pbar: + for batch in table.to_batches(DEFAULT_BATCH_SIZE): + rows_to_write = _convert_arrow_to_proto( + batch, feature_view, join_key_to_value_type + ) + self.online_store.online_write_batch( + self.repo_config, + feature_view, + rows_to_write, + lambda x: pbar.update(x), + ) + + +def _get_column_names( + feature_view: FeatureView, entities: List[Entity] +) -> Tuple[List[str], List[str], str, Optional[str]]: + """ + If a field mapping exists, run it in reverse on the join keys, + feature names, event timestamp column, and created timestamp column + to get the names of the relevant columns in the offline feature store table. + + Returns: + Tuple containing the list of reverse-mapped join_keys, + reverse-mapped feature names, reverse-mapped event timestamp column, + and reverse-mapped created timestamp column that will be passed into + the query to the offline store. + """ + # if we have mapped fields, use the original field names in the call to the offline store + timestamp_field = feature_view.batch_source.timestamp_field + feature_names = [feature.name for feature in feature_view.features] + created_timestamp_column = feature_view.batch_source.created_timestamp_column + join_keys = [ + entity.join_key for entity in entities if entity.join_key != DUMMY_ENTITY_ID + ] + if feature_view.batch_source.field_mapping is not None: + reverse_field_mapping = { + v: k for k, v in feature_view.batch_source.field_mapping.items() + } + timestamp_field = ( + reverse_field_mapping[timestamp_field] + if timestamp_field in reverse_field_mapping.keys() + else timestamp_field + ) + created_timestamp_column = ( + reverse_field_mapping[created_timestamp_column] + if created_timestamp_column + and created_timestamp_column in reverse_field_mapping.keys() + else created_timestamp_column + ) + join_keys = [ + reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col + for col in join_keys + ] + feature_names = [ + reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col + for col in feature_names + ] + + # We need to exclude join keys and timestamp columns from the list of features, after they are mapped to + # their final column names via the `field_mapping` field of the source. + feature_names = [ + name + for name in feature_names + if name not in join_keys + and name != timestamp_field + and name != created_timestamp_column + ] + return ( + join_keys, + feature_names, + timestamp_field, + created_timestamp_column, + ) + + +def _run_field_mapping(table: pa.Table, field_mapping: Dict[str, str],) -> pa.Table: + # run field mapping in the forward direction + cols = table.column_names + mapped_cols = [ + field_mapping[col] if col in field_mapping.keys() else col for col in cols + ] + table = table.rename_columns(mapped_cols) + return table + + +def _run_dask_field_mapping( + table: dd.DataFrame, field_mapping: Dict[str, str], +): + if field_mapping: + # run field mapping in the forward direction + table = table.rename(columns=field_mapping) + table = table.persist() + + return table + + +def _coerce_datetime(ts): + """ + Depending on underlying time resolution, arrow to_pydict() sometimes returns pd + timestamp type (for nanosecond resolution), and sometimes you get standard python datetime + (for microsecond resolution). + While pd timestamp class is a subclass of python datetime, it doesn't always behave the + same way. We convert it to normal datetime so that consumers downstream don't have to deal + with these quirks. + """ + if isinstance(ts, pd.Timestamp): + return ts.to_pydatetime() + else: + return ts + + +def _convert_arrow_to_proto( + table: Union[pa.Table, pa.RecordBatch], + feature_view: FeatureView, + join_keys: Dict[str, ValueType], +) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: + # Avoid ChunkedArrays which guarentees `zero_copy_only` availiable. + if isinstance(table, pa.Table): + table = table.to_batches()[0] + + columns = [ + (field.name, field.dtype.to_value_type()) for field in feature_view.features + ] + list(join_keys.items()) + + proto_values_by_column = { + column: python_values_to_proto_values( + table.column(column).to_numpy(zero_copy_only=False), value_type + ) + for column, value_type in columns + } + + entity_keys = [ + EntityKeyProto( + join_keys=join_keys, + entity_values=[proto_values_by_column[k][idx] for k in join_keys], + ) + for idx in range(table.num_rows) + ] + + # Serialize the features per row + feature_dict = { + feature.name: proto_values_by_column[feature.name] + for feature in feature_view.features + } + features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())] + + # Convert event_timestamps + event_timestamps = [ + _coerce_datetime(val) + for val in pd.to_datetime( + table.column(feature_view.batch_source.timestamp_field).to_numpy( + zero_copy_only=False + ) + ) + ] + + # Convert created_timestamps if they exist + if feature_view.batch_source.created_timestamp_column: + created_timestamps = [ + _coerce_datetime(val) + for val in pd.to_datetime( + table.column( + feature_view.batch_source.created_timestamp_column + ).to_numpy(zero_copy_only=False) + ) + ] + else: + created_timestamps = [None] * table.num_rows + + return list(zip(entity_keys, features, event_timestamps, created_timestamps)) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 8c6dd831dd..f65abe27ed 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -2,7 +2,6 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas as pd -import pyarrow import pyarrow as pa from tqdm import tqdm @@ -10,15 +9,11 @@ from feast.entity import Entity from feast.feature_logging import FeatureServiceLoggingSource from feast.feature_view import FeatureView +from feast.infra.materialization import BatchMaterializationEngine, MaterializationTask from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.offline_stores.offline_utils import get_offline_store_from_config from feast.infra.online_stores.helpers import get_online_store_from_config -from feast.infra.provider import ( - Provider, - _convert_arrow_to_proto, - _get_column_names, - _run_field_mapping, -) +from feast.infra.provider import Provider, _convert_arrow_to_proto, _run_field_mapping from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import BaseRegistry @@ -41,6 +36,7 @@ def __init__(self, config: RepoConfig): self.repo_config = config self._offline_store = None self._online_store = None + self._batch_engine = None @property def online_store(self): @@ -58,6 +54,12 @@ def offline_store(self): ) return self._offline_store + @property + def batch_engine(self) -> BatchMaterializationEngine: + if not self._batch_engine and self.repo_config.batch_engine: + self._batch_engine = self.repo_config.batch_engine + return self._batch_engine + def update_infra( self, project: str, @@ -165,50 +167,14 @@ def materialize_single_feature_view( tqdm_builder: Callable[[int], tqdm], ) -> None: set_usage_attribute("provider", self.__class__.__name__) - - entities = [] - for entity_name in feature_view.entities: - entities.append(registry.get_entity(entity_name, project)) - - ( - join_key_columns, - feature_name_columns, - timestamp_field, - created_timestamp_column, - ) = _get_column_names(feature_view, entities) - - offline_job = self.offline_store.pull_latest_from_table_or_query( - config=config, - data_source=feature_view.batch_source, - join_key_columns=join_key_columns, - feature_name_columns=feature_name_columns, - timestamp_field=timestamp_field, - created_timestamp_column=created_timestamp_column, - start_date=start_date, - end_date=end_date, + task = MaterializationTask( + project=project, + feature_view=feature_view, + start_time=start_date, + end_time=end_date, + tqdm_builder=tqdm_builder, ) - - table = offline_job.to_arrow() - - if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping(table, feature_view.batch_source.field_mapping) - - join_key_to_value_type = { - entity.name: entity.dtype.to_value_type() - for entity in feature_view.entity_columns - } - - with tqdm_builder(table.num_rows) as pbar: - for batch in table.to_batches(DEFAULT_BATCH_SIZE): - rows_to_write = _convert_arrow_to_proto( - batch, feature_view, join_key_to_value_type - ) - self.online_write_batch( - self.repo_config, - feature_view, - rows_to_write, - lambda x: pbar.update(x), - ) + self.batch_engine.materialize([task]) def get_historical_features( self, @@ -260,7 +226,7 @@ def retrieve_saved_dataset( def write_feature_service_logs( self, feature_service: FeatureService, - logs: Union[pyarrow.Table, str], + logs: Union[pa.Table, str], config: RepoConfig, registry: BaseRegistry, ): diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index b7cf1683dc..086865d54c 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -63,6 +63,11 @@ } +BATCH_ENGINE_CLASS_FOR_TYPE = { + "local": "feast.infra.materialization.LocalMaterializationEngine", +} + + class FeastBaseModel(BaseModel): """Feast Pydantic Configuration Class""" @@ -120,6 +125,9 @@ class RepoConfig(FeastBaseModel): _offline_config: Any = Field(alias="offline_store") """ OfflineStoreConfig: Offline store configuration (optional depending on provider) """ + _batch_engine_config: Any = Field(alias="batch_engine") + """ BatchMaterializationEngine: Batch materialization configuration (optional depending on provider)""" + feature_server: Optional[Any] """ FeatureServerConfig: Feature server configuration (optional depending on provider) """ @@ -155,6 +163,13 @@ def __init__(self, **data: Any): elif data["provider"] == "aws": self._online_config = "dynamodb" + self._batch_engine = None + if "batch_engine" in data: + self._batch_engine_config = data["batch_engine"] + else: + # Defaults to using local in-process materialization engine. + self._batch_engine_config = "local" + if isinstance(self.feature_server, Dict): self.feature_server = get_feature_server_config_from_type( self.feature_server["type"] @@ -195,6 +210,20 @@ def online_store(self): return self._online_store + @property + def batch_engine(self): + assert self._batch_engine_config == "local" + if not self._batch_engine: + from feast.infra.materialization import LocalMaterializationEngine + + self._batch_engine = LocalMaterializationEngine( + repo_config=self, + offline_store=self.offline_store, + online_store=self.online_store, + ) + + return self._batch_engine + @root_validator(pre=True) @log_exceptions def _validate_online_store_config(cls, values): From 165643675ae866a700e88d57d7d811a7f35c0d81 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 5 Jul 2022 11:11:59 -0700 Subject: [PATCH 02/11] fix tests Signed-off-by: Achal Shah --- sdk/python/feast/infra/materialization/__init__.py | 4 ++-- .../infra/materialization/batch_materialization_engine.py | 2 -- sdk/python/feast/infra/materialization/local_engine.py | 5 +++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/materialization/__init__.py b/sdk/python/feast/infra/materialization/__init__.py index 65ae02e455..6be653b26e 100644 --- a/sdk/python/feast/infra/materialization/__init__.py +++ b/sdk/python/feast/infra/materialization/__init__.py @@ -1,9 +1,9 @@ -from batch_materialization_engine import ( +from .batch_materialization_engine import ( BatchMaterializationEngine, MaterializationJob, MaterializationTask, ) -from local_engine import LocalMaterializationEngine +from .local_engine import LocalMaterializationEngine __all__ = [ "MaterializationJob", diff --git a/sdk/python/feast/infra/materialization/batch_materialization_engine.py b/sdk/python/feast/infra/materialization/batch_materialization_engine.py index 76ddb7dce3..657d225fb3 100644 --- a/sdk/python/feast/infra/materialization/batch_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/batch_materialization_engine.py @@ -46,13 +46,11 @@ def __init__( self, *, repo_config: RepoConfig, - registry: BaseRegistry, offline_store: OfflineStore, online_store: OnlineStore, **kwargs, ): self.repo_config = repo_config - self.registry = registry self.offline_store = offline_store self.online_store = online_store diff --git a/sdk/python/feast/infra/materialization/local_engine.py b/sdk/python/feast/infra/materialization/local_engine.py index e549383a52..1a8cf44510 100644 --- a/sdk/python/feast/infra/materialization/local_engine.py +++ b/sdk/python/feast/infra/materialization/local_engine.py @@ -1,14 +1,14 @@ from datetime import datetime from typing import Callable, Dict, List, Literal, Optional, Tuple, Union -import dask as dd +import dask.dataframe as dd import pandas as pd import pyarrow as pa from tqdm import tqdm from feast import Entity, FeatureView, RepoConfig, ValueType from feast.feature_view import DUMMY_ENTITY_ID -from feast.infra.materialization import ( +from .batch_materialization_engine import ( BatchMaterializationEngine, MaterializationJob, MaterializationTask, @@ -19,6 +19,7 @@ from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel from feast.type_map import python_values_to_proto_values +from ...registry import BaseRegistry class LocalMaterializationJob(MaterializationJob): From c16734b4d38f3592520740d8a7a5561bbf5bef54 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 5 Jul 2022 12:35:48 -0700 Subject: [PATCH 03/11] fix tests Signed-off-by: Achal Shah --- .../batch_materialization_engine.py | 7 +- .../infra/materialization/local_engine.py | 66 +++++++++++-------- .../feast/infra/passthrough_provider.py | 23 +++++-- sdk/python/feast/repo_config.py | 20 +----- 4 files changed, 63 insertions(+), 53 deletions(-) diff --git a/sdk/python/feast/infra/materialization/batch_materialization_engine.py b/sdk/python/feast/infra/materialization/batch_materialization_engine.py index 657d225fb3..cc9ff5d626 100644 --- a/sdk/python/feast/infra/materialization/batch_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/batch_materialization_engine.py @@ -9,7 +9,6 @@ from feast.base_feature_view import BaseFeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore -from feast.registry import BaseRegistry @dataclasses.dataclass @@ -29,7 +28,7 @@ def status(self) -> str: ... @abstractmethod - def should_be_retried(self) -> str: + def should_be_retried(self) -> bool: ... @abstractmethod @@ -55,5 +54,7 @@ def __init__( self.online_store = online_store @abstractmethod - def materialize(self, tasks: List[MaterializationTask]) -> List[MaterializationJob]: + def materialize( + self, registry, tasks: List[MaterializationTask] + ) -> List[MaterializationJob]: ... diff --git a/sdk/python/feast/infra/materialization/local_engine.py b/sdk/python/feast/infra/materialization/local_engine.py index 1a8cf44510..00ca69bc2f 100644 --- a/sdk/python/feast/infra/materialization/local_engine.py +++ b/sdk/python/feast/infra/materialization/local_engine.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Callable, Dict, List, Literal, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union import dask.dataframe as dd import pandas as pd @@ -8,36 +8,18 @@ from feast import Entity, FeatureView, RepoConfig, ValueType from feast.feature_view import DUMMY_ENTITY_ID -from .batch_materialization_engine import ( - BatchMaterializationEngine, - MaterializationJob, - MaterializationTask, -) from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel from feast.type_map import python_values_to_proto_values -from ...registry import BaseRegistry - - -class LocalMaterializationJob(MaterializationJob): - def __init__(self) -> None: - super().__init__() - - def status(self) -> str: - pass - - def should_be_retried(self) -> str: - pass - - def job_id(self) -> str: - pass - - def url(self) -> Optional[str]: - pass +from .batch_materialization_engine import ( + BatchMaterializationEngine, + MaterializationJob, + MaterializationTask, +) DEFAULT_BATCH_SIZE = 10_000 @@ -49,6 +31,20 @@ class LocalMaterializationEngineConfig(FeastConfigBaseModel): """ Type selector""" +class LocalMaterializationJob(MaterializationJob): + def status(self) -> str: + return "success" + + def should_be_retried(self) -> bool: + return False + + def job_id(self) -> str: + return "" + + def url(self) -> Optional[str]: + return None + + class LocalMaterializationEngine(BatchMaterializationEngine): def __init__( self, @@ -65,12 +61,25 @@ def __init__( **kwargs, ) - def materialize(self, tasks: List[MaterializationTask]) -> List[MaterializationJob]: - return [] + def materialize( + self, registry, tasks: List[MaterializationTask] + ) -> List[MaterializationJob]: + return [ + self.materialize_one( + registry, + task.feature_view, + task.start_time, + task.end_time, + task.project, + task.tqdm_builder, + ) + for task in tasks + ] def materialize_one( self, - feature_view: FeatureView, + registry, + feature_view: Any, # TODO (achals): This should be typed more narrowly start_date: datetime, end_date: datetime, project: str, @@ -78,7 +87,7 @@ def materialize_one( ): entities = [] for entity_name in feature_view.entities: - entities.append(self.registry.get_entity(entity_name, project)) + entities.append(registry.get_entity(entity_name, project)) ( join_key_columns, @@ -119,6 +128,7 @@ def materialize_one( rows_to_write, lambda x: pbar.update(x), ) + return LocalMaterializationJob() def _get_column_names( diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index f65abe27ed..ea069de7a9 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -24,6 +24,10 @@ DEFAULT_BATCH_SIZE = 10_000 +BATCH_ENGINE_CLASS_FOR_TYPE = { + "local": "feast.infra.materialization.LocalMaterializationEngine", +} + class PassthroughProvider(Provider): """ @@ -36,7 +40,7 @@ def __init__(self, config: RepoConfig): self.repo_config = config self._offline_store = None self._online_store = None - self._batch_engine = None + self._batch_engine: Optional[BatchMaterializationEngine] = None @property def online_store(self): @@ -56,9 +60,18 @@ def offline_store(self): @property def batch_engine(self) -> BatchMaterializationEngine: - if not self._batch_engine and self.repo_config.batch_engine: - self._batch_engine = self.repo_config.batch_engine - return self._batch_engine + if self._batch_engine: + return self._batch_engine + else: + from feast.infra.materialization import LocalMaterializationEngine + + _batch_engine = LocalMaterializationEngine( + repo_config=self.repo_config, + offline_store=self.offline_store, + online_store=self.online_store, + ) + self._batch_engine = _batch_engine + return _batch_engine def update_infra( self, @@ -174,7 +187,7 @@ def materialize_single_feature_view( end_time=end_date, tqdm_builder=tqdm_builder, ) - self.batch_engine.materialize([task]) + self.batch_engine.materialize(registry, [task]) def get_historical_features( self, diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 086865d54c..aa802e9d2c 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -125,7 +125,7 @@ class RepoConfig(FeastBaseModel): _offline_config: Any = Field(alias="offline_store") """ OfflineStoreConfig: Offline store configuration (optional depending on provider) """ - _batch_engine_config: Any = Field(alias="batch_engine") + batch_engine_config: Any = Field(alias="batch_engine") """ BatchMaterializationEngine: Batch materialization configuration (optional depending on provider)""" feature_server: Optional[Any] @@ -165,10 +165,10 @@ def __init__(self, **data: Any): self._batch_engine = None if "batch_engine" in data: - self._batch_engine_config = data["batch_engine"] + self.batch_engine_config = data["batch_engine"] else: # Defaults to using local in-process materialization engine. - self._batch_engine_config = "local" + self.batch_engine_config = "local" if isinstance(self.feature_server, Dict): self.feature_server = get_feature_server_config_from_type( @@ -210,20 +210,6 @@ def online_store(self): return self._online_store - @property - def batch_engine(self): - assert self._batch_engine_config == "local" - if not self._batch_engine: - from feast.infra.materialization import LocalMaterializationEngine - - self._batch_engine = LocalMaterializationEngine( - repo_config=self, - offline_store=self.offline_store, - online_store=self.online_store, - ) - - return self._batch_engine - @root_validator(pre=True) @log_exceptions def _validate_online_store_config(cls, values): From ceedbabcc7ec7445ddefc0036f02e6e2d05bca70 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 5 Jul 2022 13:16:18 -0700 Subject: [PATCH 04/11] a little better Signed-off-by: Achal Shah --- .../batch_materialization_engine.py | 9 ++++---- .../infra/materialization/local_engine.py | 22 ++++++++++++++----- .../feast/infra/passthrough_provider.py | 7 +++++- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/materialization/batch_materialization_engine.py b/sdk/python/feast/infra/materialization/batch_materialization_engine.py index cc9ff5d626..d6a8ca237d 100644 --- a/sdk/python/feast/infra/materialization/batch_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/batch_materialization_engine.py @@ -1,20 +1,21 @@ import dataclasses from abc import ABC, abstractmethod from datetime import datetime -from typing import Callable, List, Optional +from typing import Callable, List, Optional, Union from tqdm import tqdm -from feast import RepoConfig -from feast.base_feature_view import BaseFeatureView +from feast.batch_feature_view import BatchFeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore +from feast.repo_config import RepoConfig +from feast.stream_feature_view import StreamFeatureView @dataclasses.dataclass class MaterializationTask: project: str - feature_view: BaseFeatureView + feature_view: Union[BatchFeatureView, StreamFeatureView] start_time: datetime end_time: datetime tqdm_builder: Callable[[int], tqdm] diff --git a/sdk/python/feast/infra/materialization/local_engine.py b/sdk/python/feast/infra/materialization/local_engine.py index 00ca69bc2f..89a521a7c9 100644 --- a/sdk/python/feast/infra/materialization/local_engine.py +++ b/sdk/python/feast/infra/materialization/local_engine.py @@ -1,12 +1,19 @@ from datetime import datetime -from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union +from typing import Callable, Dict, List, Literal, Optional, Tuple, Union import dask.dataframe as dd import pandas as pd import pyarrow as pa from tqdm import tqdm -from feast import Entity, FeatureView, RepoConfig, ValueType +from feast import ( + BatchFeatureView, + Entity, + FeatureView, + RepoConfig, + StreamFeatureView, + ValueType, +) from feast.feature_view import DUMMY_ENTITY_ID from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore @@ -32,6 +39,10 @@ class LocalMaterializationEngineConfig(FeastConfigBaseModel): class LocalMaterializationJob(MaterializationJob): + def __init__(self, job_id: str) -> None: + super().__init__() + self._job_id: str = job_id + def status(self) -> str: return "success" @@ -39,7 +50,7 @@ def should_be_retried(self) -> bool: return False def job_id(self) -> str: - return "" + return self.job_id() def url(self) -> Optional[str]: return None @@ -79,7 +90,7 @@ def materialize( def materialize_one( self, registry, - feature_view: Any, # TODO (achals): This should be typed more narrowly + feature_view: Union[BatchFeatureView, StreamFeatureView], start_date: datetime, end_date: datetime, project: str, @@ -128,7 +139,8 @@ def materialize_one( rows_to_write, lambda x: pbar.update(x), ) - return LocalMaterializationJob() + job_id = f"{feature_view.name}-{start_date}-{end_date}" + return LocalMaterializationJob(job_id=job_id) def _get_column_names( diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index ea069de7a9..705eee9a25 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -5,9 +5,10 @@ import pyarrow as pa from tqdm import tqdm -from feast import FeatureService +from feast.batch_feature_view import BatchFeatureView from feast.entity import Entity from feast.feature_logging import FeatureServiceLoggingSource +from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.materialization import BatchMaterializationEngine, MaterializationTask from feast.infra.offline_stores.offline_store import RetrievalJob @@ -19,6 +20,7 @@ from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDataset +from feast.stream_feature_view import StreamFeatureView from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute from feast.utils import make_tzaware @@ -180,6 +182,9 @@ def materialize_single_feature_view( tqdm_builder: Callable[[int], tqdm], ) -> None: set_usage_attribute("provider", self.__class__.__name__) + assert isinstance(feature_view, BatchFeatureView) or isinstance( + feature_view, StreamFeatureView + ) task = MaterializationTask( project=project, feature_view=feature_view, From 37fa0811456566c4041188db0ad005906bedb7be Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 5 Jul 2022 14:23:01 -0700 Subject: [PATCH 05/11] a little better Signed-off-by: Achal Shah --- .../batch_materialization_engine.py | 30 ++++++++++++- .../infra/materialization/local_engine.py | 44 ++++++++++++++----- .../feast/infra/passthrough_provider.py | 8 ++-- 3 files changed, 65 insertions(+), 17 deletions(-) diff --git a/sdk/python/feast/infra/materialization/batch_materialization_engine.py b/sdk/python/feast/infra/materialization/batch_materialization_engine.py index d6a8ca237d..69f00ebe5e 100644 --- a/sdk/python/feast/infra/materialization/batch_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/batch_materialization_engine.py @@ -1,11 +1,13 @@ import dataclasses from abc import ABC, abstractmethod from datetime import datetime -from typing import Callable, List, Optional, Union +from typing import Callable, List, Optional, Sequence, Union from tqdm import tqdm from feast.batch_feature_view import BatchFeatureView +from feast.entity import Entity +from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore from feast.repo_config import RepoConfig @@ -15,7 +17,7 @@ @dataclasses.dataclass class MaterializationTask: project: str - feature_view: Union[BatchFeatureView, StreamFeatureView] + feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView] start_time: datetime end_time: datetime tqdm_builder: Callable[[int], tqdm] @@ -54,8 +56,32 @@ def __init__( self.offline_store = offline_store self.online_store = online_store + @abstractmethod + def update( + self, + project: str, + views_to_delete: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + views_to_keep: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + ): + ... + @abstractmethod def materialize( self, registry, tasks: List[MaterializationTask] ) -> List[MaterializationJob]: ... + + @abstractmethod + def teardown_infra( + self, + project: str, + fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]], + entities: Sequence[Entity], + ): + ... diff --git a/sdk/python/feast/infra/materialization/local_engine.py b/sdk/python/feast/infra/materialization/local_engine.py index 89a521a7c9..24e3e2a0eb 100644 --- a/sdk/python/feast/infra/materialization/local_engine.py +++ b/sdk/python/feast/infra/materialization/local_engine.py @@ -1,26 +1,22 @@ from datetime import datetime -from typing import Callable, Dict, List, Literal, Optional, Tuple, Union +from typing import Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union import dask.dataframe as dd import pandas as pd import pyarrow as pa from tqdm import tqdm -from feast import ( - BatchFeatureView, - Entity, - FeatureView, - RepoConfig, - StreamFeatureView, - ValueType, -) -from feast.feature_view import DUMMY_ENTITY_ID +from feast.batch_feature_view import BatchFeatureView +from feast.entity import Entity +from feast.feature_view import DUMMY_ENTITY_ID, FeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.repo_config import FeastConfigBaseModel +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.stream_feature_view import StreamFeatureView from feast.type_map import python_values_to_proto_values +from feast.value_type import ValueType from .batch_materialization_engine import ( BatchMaterializationEngine, @@ -57,6 +53,30 @@ def url(self) -> Optional[str]: class LocalMaterializationEngine(BatchMaterializationEngine): + def update( + self, + project: str, + views_to_delete: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + views_to_keep: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + ): + # Nothing to set up. + pass + + def teardown_infra( + self, + project: str, + fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]], + entities: Sequence[Entity], + ): + # Nothing to tear down. + pass + def __init__( self, *, @@ -90,7 +110,7 @@ def materialize( def materialize_one( self, registry, - feature_view: Union[BatchFeatureView, StreamFeatureView], + feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView], start_date: datetime, end_date: datetime, project: str, diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 705eee9a25..49ce35a251 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -182,9 +182,11 @@ def materialize_single_feature_view( tqdm_builder: Callable[[int], tqdm], ) -> None: set_usage_attribute("provider", self.__class__.__name__) - assert isinstance(feature_view, BatchFeatureView) or isinstance( - feature_view, StreamFeatureView - ) + assert ( + isinstance(feature_view, BatchFeatureView) + or isinstance(feature_view, StreamFeatureView) + or isinstance(feature_view, FeatureView) + ), f"Unexpected type for {feature_view.name}: {type(feature_view)}" task = MaterializationTask( project=project, feature_view=feature_view, From 795e65a2866c3b1279eccc6e21082e8c92fc2207 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 5 Jul 2022 14:54:33 -0700 Subject: [PATCH 06/11] docs Signed-off-by: Achal Shah --- .../batch_materialization_engine.py | 30 ++- .../infra/materialization/local_engine.py | 177 ++------------- sdk/python/feast/infra/offline_stores/file.py | 8 +- .../infra/offline_stores/offline_utils.py | 3 +- .../feast/infra/passthrough_provider.py | 16 +- sdk/python/feast/infra/provider.py | 202 +---------------- sdk/python/feast/repo_config.py | 5 - sdk/python/feast/utils.py | 214 ++++++++++++++++++ sdk/python/tests/unit/infra/test_provider.py | 2 +- .../tests/utils/online_write_benchmark.py | 2 +- 10 files changed, 272 insertions(+), 387 deletions(-) diff --git a/sdk/python/feast/infra/materialization/batch_materialization_engine.py b/sdk/python/feast/infra/materialization/batch_materialization_engine.py index 69f00ebe5e..4d6fd73392 100644 --- a/sdk/python/feast/infra/materialization/batch_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/batch_materialization_engine.py @@ -1,5 +1,5 @@ -import dataclasses from abc import ABC, abstractmethod +from dataclasses import dataclass from datetime import datetime from typing import Callable, List, Optional, Sequence, Union @@ -10,12 +10,18 @@ from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore +from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.stream_feature_view import StreamFeatureView -@dataclasses.dataclass +@dataclass class MaterializationTask: + """ + A MaterializationTask represents a unit of data that needs to be materialized from an + offline store to an online store. + """ + project: str feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView] start_time: datetime @@ -24,6 +30,11 @@ class MaterializationTask: class MaterializationJob(ABC): + """ + MaterializationJob represents an ongoing or executed process that's materialization data as per the + definition of a materialization task. + """ + task: MaterializationTask @abstractmethod @@ -69,12 +80,21 @@ def update( entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], ): - ... + """This method ensures that any necessary infrastructure or resources needed by the + engine are set up ahead of materialization.""" @abstractmethod def materialize( - self, registry, tasks: List[MaterializationTask] + self, registry: BaseRegistry, tasks: List[MaterializationTask] ) -> List[MaterializationJob]: + """ + Materialize data from the offline store to the online store for this feature repo. + Args: + registry: The feast registry containing the applied feature views. + tasks: A list of individual materialization tasks. + Returns: + A list of materialization jobs representing each task. + """ ... @abstractmethod @@ -84,4 +104,4 @@ def teardown_infra( fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]], entities: Sequence[Entity], ): - ... + """This method ensures that any infrastructure or resources set up by ``update()``are torn down.""" diff --git a/sdk/python/feast/infra/materialization/local_engine.py b/sdk/python/feast/infra/materialization/local_engine.py index 24e3e2a0eb..6bc2d94255 100644 --- a/sdk/python/feast/infra/materialization/local_engine.py +++ b/sdk/python/feast/infra/materialization/local_engine.py @@ -18,6 +18,12 @@ from feast.type_map import python_values_to_proto_values from feast.value_type import ValueType +from ...registry import BaseRegistry +from ...utils import ( + _convert_arrow_to_proto, + _get_column_names, + _run_pyarrow_field_mapping, +) from .batch_materialization_engine import ( BatchMaterializationEngine, MaterializationJob, @@ -46,7 +52,7 @@ def should_be_retried(self) -> bool: return False def job_id(self) -> str: - return self.job_id() + return self._job_id def url(self) -> Optional[str]: return None @@ -96,7 +102,7 @@ def materialize( self, registry, tasks: List[MaterializationTask] ) -> List[MaterializationJob]: return [ - self.materialize_one( + self._materialize_one( registry, task.feature_view, task.start_time, @@ -107,9 +113,9 @@ def materialize( for task in tasks ] - def materialize_one( + def _materialize_one( self, - registry, + registry: BaseRegistry, feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView], start_date: datetime, end_date: datetime, @@ -141,7 +147,9 @@ def materialize_one( table = offline_job.to_arrow() if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping(table, feature_view.batch_source.field_mapping) + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) join_key_to_value_type = { entity.name: entity.dtype.to_value_type() @@ -161,162 +169,3 @@ def materialize_one( ) job_id = f"{feature_view.name}-{start_date}-{end_date}" return LocalMaterializationJob(job_id=job_id) - - -def _get_column_names( - feature_view: FeatureView, entities: List[Entity] -) -> Tuple[List[str], List[str], str, Optional[str]]: - """ - If a field mapping exists, run it in reverse on the join keys, - feature names, event timestamp column, and created timestamp column - to get the names of the relevant columns in the offline feature store table. - - Returns: - Tuple containing the list of reverse-mapped join_keys, - reverse-mapped feature names, reverse-mapped event timestamp column, - and reverse-mapped created timestamp column that will be passed into - the query to the offline store. - """ - # if we have mapped fields, use the original field names in the call to the offline store - timestamp_field = feature_view.batch_source.timestamp_field - feature_names = [feature.name for feature in feature_view.features] - created_timestamp_column = feature_view.batch_source.created_timestamp_column - join_keys = [ - entity.join_key for entity in entities if entity.join_key != DUMMY_ENTITY_ID - ] - if feature_view.batch_source.field_mapping is not None: - reverse_field_mapping = { - v: k for k, v in feature_view.batch_source.field_mapping.items() - } - timestamp_field = ( - reverse_field_mapping[timestamp_field] - if timestamp_field in reverse_field_mapping.keys() - else timestamp_field - ) - created_timestamp_column = ( - reverse_field_mapping[created_timestamp_column] - if created_timestamp_column - and created_timestamp_column in reverse_field_mapping.keys() - else created_timestamp_column - ) - join_keys = [ - reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col - for col in join_keys - ] - feature_names = [ - reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col - for col in feature_names - ] - - # We need to exclude join keys and timestamp columns from the list of features, after they are mapped to - # their final column names via the `field_mapping` field of the source. - feature_names = [ - name - for name in feature_names - if name not in join_keys - and name != timestamp_field - and name != created_timestamp_column - ] - return ( - join_keys, - feature_names, - timestamp_field, - created_timestamp_column, - ) - - -def _run_field_mapping(table: pa.Table, field_mapping: Dict[str, str],) -> pa.Table: - # run field mapping in the forward direction - cols = table.column_names - mapped_cols = [ - field_mapping[col] if col in field_mapping.keys() else col for col in cols - ] - table = table.rename_columns(mapped_cols) - return table - - -def _run_dask_field_mapping( - table: dd.DataFrame, field_mapping: Dict[str, str], -): - if field_mapping: - # run field mapping in the forward direction - table = table.rename(columns=field_mapping) - table = table.persist() - - return table - - -def _coerce_datetime(ts): - """ - Depending on underlying time resolution, arrow to_pydict() sometimes returns pd - timestamp type (for nanosecond resolution), and sometimes you get standard python datetime - (for microsecond resolution). - While pd timestamp class is a subclass of python datetime, it doesn't always behave the - same way. We convert it to normal datetime so that consumers downstream don't have to deal - with these quirks. - """ - if isinstance(ts, pd.Timestamp): - return ts.to_pydatetime() - else: - return ts - - -def _convert_arrow_to_proto( - table: Union[pa.Table, pa.RecordBatch], - feature_view: FeatureView, - join_keys: Dict[str, ValueType], -) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: - # Avoid ChunkedArrays which guarentees `zero_copy_only` availiable. - if isinstance(table, pa.Table): - table = table.to_batches()[0] - - columns = [ - (field.name, field.dtype.to_value_type()) for field in feature_view.features - ] + list(join_keys.items()) - - proto_values_by_column = { - column: python_values_to_proto_values( - table.column(column).to_numpy(zero_copy_only=False), value_type - ) - for column, value_type in columns - } - - entity_keys = [ - EntityKeyProto( - join_keys=join_keys, - entity_values=[proto_values_by_column[k][idx] for k in join_keys], - ) - for idx in range(table.num_rows) - ] - - # Serialize the features per row - feature_dict = { - feature.name: proto_values_by_column[feature.name] - for feature in feature_view.features - } - features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())] - - # Convert event_timestamps - event_timestamps = [ - _coerce_datetime(val) - for val in pd.to_datetime( - table.column(feature_view.batch_source.timestamp_field).to_numpy( - zero_copy_only=False - ) - ) - ] - - # Convert created_timestamps if they exist - if feature_view.batch_source.created_timestamp_column: - created_timestamps = [ - _coerce_datetime(val) - for val in pd.to_datetime( - table.column( - feature_view.batch_source.created_timestamp_column - ).to_numpy(zero_copy_only=False) - ) - ] - else: - created_timestamps = [None] * table.num_rows - - return list(zip(entity_keys, features, event_timestamps, created_timestamps)) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 10012c2d80..d60d468174 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -29,14 +29,14 @@ DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, get_pyarrow_schema_from_batch_source, ) -from feast.infra.provider import ( - _get_requested_feature_views_to_features_dict, - _run_dask_field_mapping, -) from feast.registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.usage import log_exceptions_and_usage +from feast.utils import ( + _get_requested_feature_views_to_features_dict, + _run_dask_field_mapping, +) class FileOfflineStoreConfig(FeastConfigBaseModel): diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index abe8d4e4e5..8b963a864b 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -17,11 +17,10 @@ from feast.feature_view import FeatureView from feast.importer import import_class from feast.infra.offline_stores.offline_store import OfflineStore -from feast.infra.provider import _get_requested_feature_views_to_features_dict from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.type_map import feast_value_type_to_pa -from feast.utils import to_naive_utc +from feast.utils import _get_requested_feature_views_to_features_dict, to_naive_utc DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp" diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 49ce35a251..25712040ac 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -14,7 +14,7 @@ from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.offline_stores.offline_utils import get_offline_store_from_config from feast.infra.online_stores.helpers import get_online_store_from_config -from feast.infra.provider import Provider, _convert_arrow_to_proto, _run_field_mapping +from feast.infra.provider import Provider from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import BaseRegistry @@ -22,7 +22,11 @@ from feast.saved_dataset import SavedDataset from feast.stream_feature_view import StreamFeatureView from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute -from feast.utils import make_tzaware +from feast.utils import ( + _convert_arrow_to_proto, + _run_pyarrow_field_mapping, + make_tzaware, +) DEFAULT_BATCH_SIZE = 10_000 @@ -154,7 +158,9 @@ def ingest_df( table = pa.Table.from_pandas(df) if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping(table, feature_view.batch_source.field_mapping) + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) join_keys = {entity.join_key: entity.value_type for entity in entities} rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) @@ -167,7 +173,9 @@ def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table) set_usage_attribute("provider", self.__class__.__name__) if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping(table, feature_view.batch_source.field_mapping) + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) self.offline_write_batch(self.repo_config, feature_view, table, None) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index d2e37e69db..9695e4d736 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -1,29 +1,24 @@ import abc -from collections import defaultdict from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union -import dask.dataframe as dd import pandas as pd import pyarrow from tqdm import tqdm from feast import FeatureService, errors from feast.entity import Entity -from feast.feature_view import DUMMY_ENTITY_ID, FeatureView +from feast.feature_view import FeatureView from feast.importer import import_class from feast.infra.infra_object import Infra from feast.infra.offline_stores.offline_store import RetrievalJob -from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDataset -from feast.type_map import python_values_to_proto_values -from feast.value_type import ValueType PROVIDERS_CLASS_FOR_TYPE = { "gcp": "feast.infra.gcp.GcpProvider", @@ -252,198 +247,3 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: cls = import_class(module_name, class_name, "Provider") return cls(config) - - -def _get_requested_feature_views_to_features_dict( - feature_refs: List[str], - feature_views: List[FeatureView], - on_demand_feature_views: List[OnDemandFeatureView], -) -> Tuple[Dict[FeatureView, List[str]], Dict[OnDemandFeatureView, List[str]]]: - """Create a dict of FeatureView -> List[Feature] for all requested features. - Set full_feature_names to True to have feature names prefixed by their feature view name.""" - - feature_views_to_feature_map: Dict[FeatureView, List[str]] = defaultdict(list) - on_demand_feature_views_to_feature_map: Dict[ - OnDemandFeatureView, List[str] - ] = defaultdict(list) - - for ref in feature_refs: - ref_parts = ref.split(":") - feature_view_from_ref = ref_parts[0] - feature_from_ref = ref_parts[1] - - found = False - for fv in feature_views: - if fv.projection.name_to_use() == feature_view_from_ref: - found = True - feature_views_to_feature_map[fv].append(feature_from_ref) - for odfv in on_demand_feature_views: - if odfv.projection.name_to_use() == feature_view_from_ref: - found = True - on_demand_feature_views_to_feature_map[odfv].append(feature_from_ref) - - if not found: - raise ValueError(f"Could not find feature view from reference {ref}") - - return feature_views_to_feature_map, on_demand_feature_views_to_feature_map - - -def _get_column_names( - feature_view: FeatureView, entities: List[Entity] -) -> Tuple[List[str], List[str], str, Optional[str]]: - """ - If a field mapping exists, run it in reverse on the join keys, - feature names, event timestamp column, and created timestamp column - to get the names of the relevant columns in the offline feature store table. - - Returns: - Tuple containing the list of reverse-mapped join_keys, - reverse-mapped feature names, reverse-mapped event timestamp column, - and reverse-mapped created timestamp column that will be passed into - the query to the offline store. - """ - # if we have mapped fields, use the original field names in the call to the offline store - timestamp_field = feature_view.batch_source.timestamp_field - feature_names = [feature.name for feature in feature_view.features] - created_timestamp_column = feature_view.batch_source.created_timestamp_column - join_keys = [ - entity.join_key for entity in entities if entity.join_key != DUMMY_ENTITY_ID - ] - if feature_view.batch_source.field_mapping is not None: - reverse_field_mapping = { - v: k for k, v in feature_view.batch_source.field_mapping.items() - } - timestamp_field = ( - reverse_field_mapping[timestamp_field] - if timestamp_field in reverse_field_mapping.keys() - else timestamp_field - ) - created_timestamp_column = ( - reverse_field_mapping[created_timestamp_column] - if created_timestamp_column - and created_timestamp_column in reverse_field_mapping.keys() - else created_timestamp_column - ) - join_keys = [ - reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col - for col in join_keys - ] - feature_names = [ - reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col - for col in feature_names - ] - - # We need to exclude join keys and timestamp columns from the list of features, after they are mapped to - # their final column names via the `field_mapping` field of the source. - feature_names = [ - name - for name in feature_names - if name not in join_keys - and name != timestamp_field - and name != created_timestamp_column - ] - return ( - join_keys, - feature_names, - timestamp_field, - created_timestamp_column, - ) - - -def _run_field_mapping( - table: pyarrow.Table, field_mapping: Dict[str, str], -) -> pyarrow.Table: - # run field mapping in the forward direction - cols = table.column_names - mapped_cols = [ - field_mapping[col] if col in field_mapping.keys() else col for col in cols - ] - table = table.rename_columns(mapped_cols) - return table - - -def _run_dask_field_mapping( - table: dd.DataFrame, field_mapping: Dict[str, str], -): - if field_mapping: - # run field mapping in the forward direction - table = table.rename(columns=field_mapping) - table = table.persist() - - return table - - -def _coerce_datetime(ts): - """ - Depending on underlying time resolution, arrow to_pydict() sometimes returns pd - timestamp type (for nanosecond resolution), and sometimes you get standard python datetime - (for microsecond resolution). - While pd timestamp class is a subclass of python datetime, it doesn't always behave the - same way. We convert it to normal datetime so that consumers downstream don't have to deal - with these quirks. - """ - if isinstance(ts, pd.Timestamp): - return ts.to_pydatetime() - else: - return ts - - -def _convert_arrow_to_proto( - table: Union[pyarrow.Table, pyarrow.RecordBatch], - feature_view: FeatureView, - join_keys: Dict[str, ValueType], -) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: - # Avoid ChunkedArrays which guarentees `zero_copy_only` availiable. - if isinstance(table, pyarrow.Table): - table = table.to_batches()[0] - - columns = [ - (field.name, field.dtype.to_value_type()) for field in feature_view.features - ] + list(join_keys.items()) - - proto_values_by_column = { - column: python_values_to_proto_values( - table.column(column).to_numpy(zero_copy_only=False), value_type - ) - for column, value_type in columns - } - - entity_keys = [ - EntityKeyProto( - join_keys=join_keys, - entity_values=[proto_values_by_column[k][idx] for k in join_keys], - ) - for idx in range(table.num_rows) - ] - - # Serialize the features per row - feature_dict = { - feature.name: proto_values_by_column[feature.name] - for feature in feature_view.features - } - features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())] - - # Convert event_timestamps - event_timestamps = [ - _coerce_datetime(val) - for val in pd.to_datetime( - table.column(feature_view.batch_source.timestamp_field).to_numpy( - zero_copy_only=False - ) - ) - ] - - # Convert created_timestamps if they exist - if feature_view.batch_source.created_timestamp_column: - created_timestamps = [ - _coerce_datetime(val) - for val in pd.to_datetime( - table.column( - feature_view.batch_source.created_timestamp_column - ).to_numpy(zero_copy_only=False) - ) - ] - else: - created_timestamps = [None] * table.num_rows - - return list(zip(entity_keys, features, event_timestamps, created_timestamps)) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index aa802e9d2c..f315023ee1 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -63,11 +63,6 @@ } -BATCH_ENGINE_CLASS_FOR_TYPE = { - "local": "feast.infra.materialization.LocalMaterializationEngine", -} - - class FeastBaseModel(BaseModel): """Feast Pydantic Configuration Class""" diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index a40f423c53..e24471ad26 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -1,8 +1,24 @@ +import typing +from collections import defaultdict from datetime import datetime +from typing import Dict, List, Optional, Tuple, Union +import pandas as pd +import pyarrow +from dask import dataframe as dd from dateutil.tz import tzlocal from pytz import utc +from feast.entity import Entity +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.type_map import python_values_to_proto_values +from feast.value_type import ValueType + +if typing.TYPE_CHECKING: + from feast.feature_view import FeatureView + from feast.on_demand_feature_view import OnDemandFeatureView + def make_tzaware(t: datetime) -> datetime: """We assume tz-naive datetimes are UTC""" @@ -24,3 +40,201 @@ def maybe_local_tz(t: datetime) -> datetime: return t.replace(tzinfo=tzlocal()) else: return t + + +def _get_requested_feature_views_to_features_dict( + feature_refs: List[str], + feature_views: List["FeatureView"], + on_demand_feature_views: List["OnDemandFeatureView"], +) -> Tuple[Dict["FeatureView", List[str]], Dict["OnDemandFeatureView", List[str]]]: + """Create a dict of FeatureView -> List[Feature] for all requested features. + Set full_feature_names to True to have feature names prefixed by their feature view name.""" + + feature_views_to_feature_map: Dict["FeatureView", List[str]] = defaultdict(list) + on_demand_feature_views_to_feature_map: Dict[ + "OnDemandFeatureView", List[str] + ] = defaultdict(list) + + for ref in feature_refs: + ref_parts = ref.split(":") + feature_view_from_ref = ref_parts[0] + feature_from_ref = ref_parts[1] + + found = False + for fv in feature_views: + if fv.projection.name_to_use() == feature_view_from_ref: + found = True + feature_views_to_feature_map[fv].append(feature_from_ref) + for odfv in on_demand_feature_views: + if odfv.projection.name_to_use() == feature_view_from_ref: + found = True + on_demand_feature_views_to_feature_map[odfv].append(feature_from_ref) + + if not found: + raise ValueError(f"Could not find feature view from reference {ref}") + + return feature_views_to_feature_map, on_demand_feature_views_to_feature_map + + +def _get_column_names( + feature_view: "FeatureView", entities: List[Entity] +) -> Tuple[List[str], List[str], str, Optional[str]]: + """ + If a field mapping exists, run it in reverse on the join keys, + feature names, event timestamp column, and created timestamp column + to get the names of the relevant columns in the offline feature store table. + + Returns: + Tuple containing the list of reverse-mapped join_keys, + reverse-mapped feature names, reverse-mapped event timestamp column, + and reverse-mapped created timestamp column that will be passed into + the query to the offline store. + """ + # if we have mapped fields, use the original field names in the call to the offline store + timestamp_field = feature_view.batch_source.timestamp_field + feature_names = [feature.name for feature in feature_view.features] + created_timestamp_column = feature_view.batch_source.created_timestamp_column + + from feast.feature_view import DUMMY_ENTITY_ID + + join_keys = [ + entity.join_key for entity in entities if entity.join_key != DUMMY_ENTITY_ID + ] + if feature_view.batch_source.field_mapping is not None: + reverse_field_mapping = { + v: k for k, v in feature_view.batch_source.field_mapping.items() + } + timestamp_field = ( + reverse_field_mapping[timestamp_field] + if timestamp_field in reverse_field_mapping.keys() + else timestamp_field + ) + created_timestamp_column = ( + reverse_field_mapping[created_timestamp_column] + if created_timestamp_column + and created_timestamp_column in reverse_field_mapping.keys() + else created_timestamp_column + ) + join_keys = [ + reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col + for col in join_keys + ] + feature_names = [ + reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col + for col in feature_names + ] + + # We need to exclude join keys and timestamp columns from the list of features, after they are mapped to + # their final column names via the `field_mapping` field of the source. + feature_names = [ + name + for name in feature_names + if name not in join_keys + and name != timestamp_field + and name != created_timestamp_column + ] + return ( + join_keys, + feature_names, + timestamp_field, + created_timestamp_column, + ) + + +def _run_pyarrow_field_mapping( + table: pyarrow.Table, field_mapping: Dict[str, str], +) -> pyarrow.Table: + # run field mapping in the forward direction + cols = table.column_names + mapped_cols = [ + field_mapping[col] if col in field_mapping.keys() else col for col in cols + ] + table = table.rename_columns(mapped_cols) + return table + + +def _run_dask_field_mapping( + table: dd.DataFrame, field_mapping: Dict[str, str], +): + if field_mapping: + # run field mapping in the forward direction + table = table.rename(columns=field_mapping) + table = table.persist() + + return table + + +def _coerce_datetime(ts): + """ + Depending on underlying time resolution, arrow to_pydict() sometimes returns pd + timestamp type (for nanosecond resolution), and sometimes you get standard python datetime + (for microsecond resolution). + While pd timestamp class is a subclass of python datetime, it doesn't always behave the + same way. We convert it to normal datetime so that consumers downstream don't have to deal + with these quirks. + """ + if isinstance(ts, pd.Timestamp): + return ts.to_pydatetime() + else: + return ts + + +def _convert_arrow_to_proto( + table: Union[pyarrow.Table, pyarrow.RecordBatch], + feature_view: "FeatureView", + join_keys: Dict[str, ValueType], +) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: + # Avoid ChunkedArrays which guarentees `zero_copy_only` availiable. + if isinstance(table, pyarrow.Table): + table = table.to_batches()[0] + + columns = [ + (field.name, field.dtype.to_value_type()) for field in feature_view.features + ] + list(join_keys.items()) + + proto_values_by_column = { + column: python_values_to_proto_values( + table.column(column).to_numpy(zero_copy_only=False), value_type + ) + for column, value_type in columns + } + + entity_keys = [ + EntityKeyProto( + join_keys=join_keys, + entity_values=[proto_values_by_column[k][idx] for k in join_keys], + ) + for idx in range(table.num_rows) + ] + + # Serialize the features per row + feature_dict = { + feature.name: proto_values_by_column[feature.name] + for feature in feature_view.features + } + features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())] + + # Convert event_timestamps + event_timestamps = [ + _coerce_datetime(val) + for val in pd.to_datetime( + table.column(feature_view.batch_source.timestamp_field).to_numpy( + zero_copy_only=False + ) + ) + ] + + # Convert created_timestamps if they exist + if feature_view.batch_source.created_timestamp_column: + created_timestamps = [ + _coerce_datetime(val) + for val in pd.to_datetime( + table.column( + feature_view.batch_source.created_timestamp_column + ).to_numpy(zero_copy_only=False) + ) + ] + else: + created_timestamps = [None] * table.num_rows + + return list(zip(entity_keys, features, event_timestamps, created_timestamps)) diff --git a/sdk/python/tests/unit/infra/test_provider.py b/sdk/python/tests/unit/infra/test_provider.py index 5ed5603b03..217a1361b4 100644 --- a/sdk/python/tests/unit/infra/test_provider.py +++ b/sdk/python/tests/unit/infra/test_provider.py @@ -18,8 +18,8 @@ from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field -from feast.infra.provider import _get_column_names from feast.types import String +from feast.utils import _get_column_names def test_get_column_names_preserves_feature_ordering(): diff --git a/sdk/python/tests/utils/online_write_benchmark.py b/sdk/python/tests/utils/online_write_benchmark.py index 9f2f8ba60d..8a138f41db 100644 --- a/sdk/python/tests/utils/online_write_benchmark.py +++ b/sdk/python/tests/utils/online_write_benchmark.py @@ -14,9 +14,9 @@ from feast.feature_store import FeatureStore from feast.feature_view import FeatureView from feast.field import Field -from feast.infra.provider import _convert_arrow_to_proto from feast.repo_config import RepoConfig from feast.types import Float32, Int32 +from feast.utils import _convert_arrow_to_proto def create_driver_hourly_stats_feature_view(source): From 5833556863f225bdde05c01ad1cd4e2d671bbcf0 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 5 Jul 2022 15:27:21 -0700 Subject: [PATCH 07/11] more api updates' Signed-off-by: Achal Shah --- .../batch_materialization_engine.py | 17 ++- .../infra/materialization/local_engine.py | 100 ++++++++++-------- .../feast/infra/passthrough_provider.py | 10 +- 3 files changed, 82 insertions(+), 45 deletions(-) diff --git a/sdk/python/feast/infra/materialization/batch_materialization_engine.py b/sdk/python/feast/infra/materialization/batch_materialization_engine.py index 4d6fd73392..4f8da5466f 100644 --- a/sdk/python/feast/infra/materialization/batch_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/batch_materialization_engine.py @@ -1,3 +1,4 @@ +import enum from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime @@ -29,6 +30,16 @@ class MaterializationTask: tqdm_builder: Callable[[int], tqdm] +class MaterializationJobStatus(enum.Enum): + WAITING = 1 + RUNNING = 2 + AVAILABLE = 3 + ERROR = 4 + CANCELLING = 5 + CANCELLED = 6 + SUCCEEDED = 7 + + class MaterializationJob(ABC): """ MaterializationJob represents an ongoing or executed process that's materialization data as per the @@ -38,7 +49,11 @@ class MaterializationJob(ABC): task: MaterializationTask @abstractmethod - def status(self) -> str: + def status(self) -> MaterializationJobStatus: + ... + + @abstractmethod + def error(self) -> Optional[BaseException]: ... @abstractmethod diff --git a/sdk/python/feast/infra/materialization/local_engine.py b/sdk/python/feast/infra/materialization/local_engine.py index 6bc2d94255..4f775981ef 100644 --- a/sdk/python/feast/infra/materialization/local_engine.py +++ b/sdk/python/feast/infra/materialization/local_engine.py @@ -1,22 +1,16 @@ +from dataclasses import dataclass from datetime import datetime -from typing import Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union +from typing import Callable, List, Literal, Optional, Sequence, Union -import dask.dataframe as dd -import pandas as pd -import pyarrow as pa from tqdm import tqdm from feast.batch_feature_view import BatchFeatureView from feast.entity import Entity -from feast.feature_view import DUMMY_ENTITY_ID, FeatureView +from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore from feast.infra.online_stores.online_store import OnlineStore -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.stream_feature_view import StreamFeatureView -from feast.type_map import python_values_to_proto_values -from feast.value_type import ValueType from ...registry import BaseRegistry from ...utils import ( @@ -27,6 +21,7 @@ from .batch_materialization_engine import ( BatchMaterializationEngine, MaterializationJob, + MaterializationJobStatus, MaterializationTask, ) @@ -40,13 +35,24 @@ class LocalMaterializationEngineConfig(FeastConfigBaseModel): """ Type selector""" +@dataclass class LocalMaterializationJob(MaterializationJob): - def __init__(self, job_id: str) -> None: + def __init__( + self, + job_id: str, + status: MaterializationJobStatus, + error: Optional[BaseException] = None, + ) -> None: super().__init__() self._job_id: str = job_id + self._status: MaterializationJobStatus = status + self._error: Optional[BaseException] = error + + def status(self) -> MaterializationJobStatus: + return self._status - def status(self) -> str: - return "success" + def error(self) -> Optional[BaseException]: + return self._error def should_be_retried(self) -> bool: return False @@ -133,39 +139,47 @@ def _materialize_one( created_timestamp_column, ) = _get_column_names(feature_view, entities) - offline_job = self.offline_store.pull_latest_from_table_or_query( - config=self.repo_config, - data_source=feature_view.batch_source, - join_key_columns=join_key_columns, - feature_name_columns=feature_name_columns, - timestamp_field=timestamp_field, - created_timestamp_column=created_timestamp_column, - start_date=start_date, - end_date=end_date, - ) - - table = offline_job.to_arrow() + job_id = f"{feature_view.name}-{start_date}-{end_date}" - if feature_view.batch_source.field_mapping is not None: - table = _run_pyarrow_field_mapping( - table, feature_view.batch_source.field_mapping + try: + offline_job = self.offline_store.pull_latest_from_table_or_query( + config=self.repo_config, + data_source=feature_view.batch_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, ) - join_key_to_value_type = { - entity.name: entity.dtype.to_value_type() - for entity in feature_view.entity_columns - } + table = offline_job.to_arrow() - with tqdm_builder(table.num_rows) as pbar: - for batch in table.to_batches(DEFAULT_BATCH_SIZE): - rows_to_write = _convert_arrow_to_proto( - batch, feature_view, join_key_to_value_type + if feature_view.batch_source.field_mapping is not None: + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping ) - self.online_store.online_write_batch( - self.repo_config, - feature_view, - rows_to_write, - lambda x: pbar.update(x), - ) - job_id = f"{feature_view.name}-{start_date}-{end_date}" - return LocalMaterializationJob(job_id=job_id) + + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } + + with tqdm_builder(table.num_rows) as pbar: + for batch in table.to_batches(DEFAULT_BATCH_SIZE): + rows_to_write = _convert_arrow_to_proto( + batch, feature_view, join_key_to_value_type + ) + self.online_store.online_write_batch( + self.repo_config, + feature_view, + rows_to_write, + lambda x: pbar.update(x), + ) + return LocalMaterializationJob( + job_id=job_id, status=MaterializationJobStatus.SUCCEEDED + ) + except BaseException as e: + return LocalMaterializationJob( + job_id=job_id, status=MaterializationJobStatus.ERROR, error=e + ) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 25712040ac..e67f4e558c 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -11,6 +11,9 @@ from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.materialization import BatchMaterializationEngine, MaterializationTask +from feast.infra.materialization.batch_materialization_engine import ( + MaterializationJobStatus, +) from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.offline_stores.offline_utils import get_offline_store_from_config from feast.infra.online_stores.helpers import get_online_store_from_config @@ -202,7 +205,12 @@ def materialize_single_feature_view( end_time=end_date, tqdm_builder=tqdm_builder, ) - self.batch_engine.materialize(registry, [task]) + jobs = self.batch_engine.materialize(registry, [task]) + assert len(jobs) == 1 + if jobs[0].status() == MaterializationJobStatus.ERROR and jobs[0].error(): + e = jobs[0].error() + assert e + raise e def get_historical_features( self, From 5d1af3396884a250bce1b21abff9f506638e9971 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 5 Jul 2022 22:08:45 -0700 Subject: [PATCH 08/11] fix typos Signed-off-by: Achal Shah --- sdk/python/feast/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index e24471ad26..9f18da38cd 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -184,7 +184,7 @@ def _convert_arrow_to_proto( feature_view: "FeatureView", join_keys: Dict[str, ValueType], ) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: - # Avoid ChunkedArrays which guarentees `zero_copy_only` availiable. + # Avoid ChunkedArrays which guarantees `zero_copy_only` available. if isinstance(table, pyarrow.Table): table = table.to_batches()[0] From ff680a927bd4c781ac0965d67989ae7f4449efb8 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 5 Jul 2022 22:21:05 -0700 Subject: [PATCH 09/11] make engine importable Signed-off-by: Achal Shah --- .../feast/infra/passthrough_provider.py | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index e67f4e558c..d9981ecabe 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -5,6 +5,7 @@ import pyarrow as pa from tqdm import tqdm +from feast import importer from feast.batch_feature_view import BatchFeatureView from feast.entity import Entity from feast.feature_logging import FeatureServiceLoggingSource @@ -72,13 +73,36 @@ def batch_engine(self) -> BatchMaterializationEngine: if self._batch_engine: return self._batch_engine else: - from feast.infra.materialization import LocalMaterializationEngine - - _batch_engine = LocalMaterializationEngine( - repo_config=self.repo_config, - offline_store=self.offline_store, - online_store=self.online_store, - ) + engine_config = self.repo_config.batch_engine_config + config_is_dict = False + if isinstance(engine_config, str): + engine_config_type = engine_config + elif isinstance(engine_config, Dict): + if "type" not in engine_config: + raise ValueError("engine_config needs to have a `type` specified.") + engine_config_type = engine_config["type"] + config_is_dict = True + else: + raise RuntimeError(f"Invalid config type specified for batch_engine: {type(engine_config)}") + + if engine_config_type in BATCH_ENGINE_CLASS_FOR_TYPE: + engine_config_type = BATCH_ENGINE_CLASS_FOR_TYPE[engine_config_type] + engine_module, engine_class = engine_config_type.rsplit('.', 1) + engine_class = importer.import_class(engine_module, engine_class) + + if config_is_dict: + _batch_engine = engine_class( + repo_config=self.repo_config, + offline_store=self.offline_store, + online_store=self.online_store, + **engine_config + ) + else: + _batch_engine = engine_class( + repo_config=self.repo_config, + offline_store=self.offline_store, + online_store=self.online_store, + ) self._batch_engine = _batch_engine return _batch_engine From 11b6da04d21b595a1ac84eb83a08945b2e1c5d6b Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 6 Jul 2022 09:05:07 -0700 Subject: [PATCH 10/11] style stuff Signed-off-by: Achal Shah --- .../infra/materialization/batch_materialization_engine.py | 2 +- sdk/python/feast/infra/passthrough_provider.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/materialization/batch_materialization_engine.py b/sdk/python/feast/infra/materialization/batch_materialization_engine.py index 4f8da5466f..773c685d6e 100644 --- a/sdk/python/feast/infra/materialization/batch_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/batch_materialization_engine.py @@ -42,7 +42,7 @@ class MaterializationJobStatus(enum.Enum): class MaterializationJob(ABC): """ - MaterializationJob represents an ongoing or executed process that's materialization data as per the + MaterializationJob represents an ongoing or executed process that materializes data as per the definition of a materialization task. """ diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index d9981ecabe..ef29098020 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -83,11 +83,13 @@ def batch_engine(self) -> BatchMaterializationEngine: engine_config_type = engine_config["type"] config_is_dict = True else: - raise RuntimeError(f"Invalid config type specified for batch_engine: {type(engine_config)}") + raise RuntimeError( + f"Invalid config type specified for batch_engine: {type(engine_config)}" + ) if engine_config_type in BATCH_ENGINE_CLASS_FOR_TYPE: engine_config_type = BATCH_ENGINE_CLASS_FOR_TYPE[engine_config_type] - engine_module, engine_class = engine_config_type.rsplit('.', 1) + engine_module, engine_class = engine_config_type.rsplit(".", 1) engine_class = importer.import_class(engine_module, engine_class) if config_is_dict: @@ -95,7 +97,7 @@ def batch_engine(self) -> BatchMaterializationEngine: repo_config=self.repo_config, offline_store=self.offline_store, online_store=self.online_store, - **engine_config + **engine_config, ) else: _batch_engine = engine_class( From 2e9e7a8a034c7d6fe1d984fe4e2c1cec3c5670ee Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 6 Jul 2022 09:12:04 -0700 Subject: [PATCH 11/11] style stuff Signed-off-by: Achal Shah --- sdk/python/feast/infra/passthrough_provider.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index ef29098020..181d46a5a8 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -89,8 +89,8 @@ def batch_engine(self) -> BatchMaterializationEngine: if engine_config_type in BATCH_ENGINE_CLASS_FOR_TYPE: engine_config_type = BATCH_ENGINE_CLASS_FOR_TYPE[engine_config_type] - engine_module, engine_class = engine_config_type.rsplit(".", 1) - engine_class = importer.import_class(engine_module, engine_class) + engine_module, engine_class_name = engine_config_type.rsplit(".", 1) + engine_class = importer.import_class(engine_module, engine_class_name) if config_is_dict: _batch_engine = engine_class(