Skip to content

Commit 38b28ca

Browse files
authored
feat: Add interfaces for batch materialization engine (feast-dev#2901)
* feat: Add scaffolding for batch materialization engine Signed-off-by: Achal Shah <achals@gmail.com> * fix tests Signed-off-by: Achal Shah <achals@gmail.com> * fix tests Signed-off-by: Achal Shah <achals@gmail.com> * a little better Signed-off-by: Achal Shah <achals@gmail.com> * a little better Signed-off-by: Achal Shah <achals@gmail.com> * docs Signed-off-by: Achal Shah <achals@gmail.com> * more api updates' Signed-off-by: Achal Shah <achals@gmail.com> * fix typos Signed-off-by: Achal Shah <achals@gmail.com> * make engine importable Signed-off-by: Achal Shah <achals@gmail.com> * style stuff Signed-off-by: Achal Shah <achals@gmail.com> * style stuff Signed-off-by: Achal Shah <achals@gmail.com>
1 parent eaf4022 commit 38b28ca

11 files changed

Lines changed: 635 additions & 264 deletions

File tree

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from .batch_materialization_engine import (
2+
BatchMaterializationEngine,
3+
MaterializationJob,
4+
MaterializationTask,
5+
)
6+
from .local_engine import LocalMaterializationEngine
7+
8+
__all__ = [
9+
"MaterializationJob",
10+
"MaterializationTask",
11+
"BatchMaterializationEngine",
12+
"LocalMaterializationEngine",
13+
]
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import enum
2+
from abc import ABC, abstractmethod
3+
from dataclasses import dataclass
4+
from datetime import datetime
5+
from typing import Callable, List, Optional, Sequence, Union
6+
7+
from tqdm import tqdm
8+
9+
from feast.batch_feature_view import BatchFeatureView
10+
from feast.entity import Entity
11+
from feast.feature_view import FeatureView
12+
from feast.infra.offline_stores.offline_store import OfflineStore
13+
from feast.infra.online_stores.online_store import OnlineStore
14+
from feast.registry import BaseRegistry
15+
from feast.repo_config import RepoConfig
16+
from feast.stream_feature_view import StreamFeatureView
17+
18+
19+
@dataclass
20+
class MaterializationTask:
21+
"""
22+
A MaterializationTask represents a unit of data that needs to be materialized from an
23+
offline store to an online store.
24+
"""
25+
26+
project: str
27+
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
28+
start_time: datetime
29+
end_time: datetime
30+
tqdm_builder: Callable[[int], tqdm]
31+
32+
33+
class MaterializationJobStatus(enum.Enum):
34+
WAITING = 1
35+
RUNNING = 2
36+
AVAILABLE = 3
37+
ERROR = 4
38+
CANCELLING = 5
39+
CANCELLED = 6
40+
SUCCEEDED = 7
41+
42+
43+
class MaterializationJob(ABC):
44+
"""
45+
MaterializationJob represents an ongoing or executed process that materializes data as per the
46+
definition of a materialization task.
47+
"""
48+
49+
task: MaterializationTask
50+
51+
@abstractmethod
52+
def status(self) -> MaterializationJobStatus:
53+
...
54+
55+
@abstractmethod
56+
def error(self) -> Optional[BaseException]:
57+
...
58+
59+
@abstractmethod
60+
def should_be_retried(self) -> bool:
61+
...
62+
63+
@abstractmethod
64+
def job_id(self) -> str:
65+
...
66+
67+
@abstractmethod
68+
def url(self) -> Optional[str]:
69+
...
70+
71+
72+
class BatchMaterializationEngine(ABC):
73+
def __init__(
74+
self,
75+
*,
76+
repo_config: RepoConfig,
77+
offline_store: OfflineStore,
78+
online_store: OnlineStore,
79+
**kwargs,
80+
):
81+
self.repo_config = repo_config
82+
self.offline_store = offline_store
83+
self.online_store = online_store
84+
85+
@abstractmethod
86+
def update(
87+
self,
88+
project: str,
89+
views_to_delete: Sequence[
90+
Union[BatchFeatureView, StreamFeatureView, FeatureView]
91+
],
92+
views_to_keep: Sequence[
93+
Union[BatchFeatureView, StreamFeatureView, FeatureView]
94+
],
95+
entities_to_delete: Sequence[Entity],
96+
entities_to_keep: Sequence[Entity],
97+
):
98+
"""This method ensures that any necessary infrastructure or resources needed by the
99+
engine are set up ahead of materialization."""
100+
101+
@abstractmethod
102+
def materialize(
103+
self, registry: BaseRegistry, tasks: List[MaterializationTask]
104+
) -> List[MaterializationJob]:
105+
"""
106+
Materialize data from the offline store to the online store for this feature repo.
107+
Args:
108+
registry: The feast registry containing the applied feature views.
109+
tasks: A list of individual materialization tasks.
110+
Returns:
111+
A list of materialization jobs representing each task.
112+
"""
113+
...
114+
115+
@abstractmethod
116+
def teardown_infra(
117+
self,
118+
project: str,
119+
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
120+
entities: Sequence[Entity],
121+
):
122+
"""This method ensures that any infrastructure or resources set up by ``update()``are torn down."""
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime
3+
from typing import Callable, List, Literal, Optional, Sequence, Union
4+
5+
from tqdm import tqdm
6+
7+
from feast.batch_feature_view import BatchFeatureView
8+
from feast.entity import Entity
9+
from feast.feature_view import FeatureView
10+
from feast.infra.offline_stores.offline_store import OfflineStore
11+
from feast.infra.online_stores.online_store import OnlineStore
12+
from feast.repo_config import FeastConfigBaseModel, RepoConfig
13+
from feast.stream_feature_view import StreamFeatureView
14+
15+
from ...registry import BaseRegistry
16+
from ...utils import (
17+
_convert_arrow_to_proto,
18+
_get_column_names,
19+
_run_pyarrow_field_mapping,
20+
)
21+
from .batch_materialization_engine import (
22+
BatchMaterializationEngine,
23+
MaterializationJob,
24+
MaterializationJobStatus,
25+
MaterializationTask,
26+
)
27+
28+
DEFAULT_BATCH_SIZE = 10_000
29+
30+
31+
class LocalMaterializationEngineConfig(FeastConfigBaseModel):
32+
"""Batch Materialization Engine config for local in-process engine"""
33+
34+
type: Literal["local"] = "local"
35+
""" Type selector"""
36+
37+
38+
@dataclass
39+
class LocalMaterializationJob(MaterializationJob):
40+
def __init__(
41+
self,
42+
job_id: str,
43+
status: MaterializationJobStatus,
44+
error: Optional[BaseException] = None,
45+
) -> None:
46+
super().__init__()
47+
self._job_id: str = job_id
48+
self._status: MaterializationJobStatus = status
49+
self._error: Optional[BaseException] = error
50+
51+
def status(self) -> MaterializationJobStatus:
52+
return self._status
53+
54+
def error(self) -> Optional[BaseException]:
55+
return self._error
56+
57+
def should_be_retried(self) -> bool:
58+
return False
59+
60+
def job_id(self) -> str:
61+
return self._job_id
62+
63+
def url(self) -> Optional[str]:
64+
return None
65+
66+
67+
class LocalMaterializationEngine(BatchMaterializationEngine):
68+
def update(
69+
self,
70+
project: str,
71+
views_to_delete: Sequence[
72+
Union[BatchFeatureView, StreamFeatureView, FeatureView]
73+
],
74+
views_to_keep: Sequence[
75+
Union[BatchFeatureView, StreamFeatureView, FeatureView]
76+
],
77+
entities_to_delete: Sequence[Entity],
78+
entities_to_keep: Sequence[Entity],
79+
):
80+
# Nothing to set up.
81+
pass
82+
83+
def teardown_infra(
84+
self,
85+
project: str,
86+
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
87+
entities: Sequence[Entity],
88+
):
89+
# Nothing to tear down.
90+
pass
91+
92+
def __init__(
93+
self,
94+
*,
95+
repo_config: RepoConfig,
96+
offline_store: OfflineStore,
97+
online_store: OnlineStore,
98+
**kwargs,
99+
):
100+
super().__init__(
101+
repo_config=repo_config,
102+
offline_store=offline_store,
103+
online_store=online_store,
104+
**kwargs,
105+
)
106+
107+
def materialize(
108+
self, registry, tasks: List[MaterializationTask]
109+
) -> List[MaterializationJob]:
110+
return [
111+
self._materialize_one(
112+
registry,
113+
task.feature_view,
114+
task.start_time,
115+
task.end_time,
116+
task.project,
117+
task.tqdm_builder,
118+
)
119+
for task in tasks
120+
]
121+
122+
def _materialize_one(
123+
self,
124+
registry: BaseRegistry,
125+
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
126+
start_date: datetime,
127+
end_date: datetime,
128+
project: str,
129+
tqdm_builder: Callable[[int], tqdm],
130+
):
131+
entities = []
132+
for entity_name in feature_view.entities:
133+
entities.append(registry.get_entity(entity_name, project))
134+
135+
(
136+
join_key_columns,
137+
feature_name_columns,
138+
timestamp_field,
139+
created_timestamp_column,
140+
) = _get_column_names(feature_view, entities)
141+
142+
job_id = f"{feature_view.name}-{start_date}-{end_date}"
143+
144+
try:
145+
offline_job = self.offline_store.pull_latest_from_table_or_query(
146+
config=self.repo_config,
147+
data_source=feature_view.batch_source,
148+
join_key_columns=join_key_columns,
149+
feature_name_columns=feature_name_columns,
150+
timestamp_field=timestamp_field,
151+
created_timestamp_column=created_timestamp_column,
152+
start_date=start_date,
153+
end_date=end_date,
154+
)
155+
156+
table = offline_job.to_arrow()
157+
158+
if feature_view.batch_source.field_mapping is not None:
159+
table = _run_pyarrow_field_mapping(
160+
table, feature_view.batch_source.field_mapping
161+
)
162+
163+
join_key_to_value_type = {
164+
entity.name: entity.dtype.to_value_type()
165+
for entity in feature_view.entity_columns
166+
}
167+
168+
with tqdm_builder(table.num_rows) as pbar:
169+
for batch in table.to_batches(DEFAULT_BATCH_SIZE):
170+
rows_to_write = _convert_arrow_to_proto(
171+
batch, feature_view, join_key_to_value_type
172+
)
173+
self.online_store.online_write_batch(
174+
self.repo_config,
175+
feature_view,
176+
rows_to_write,
177+
lambda x: pbar.update(x),
178+
)
179+
return LocalMaterializationJob(
180+
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
181+
)
182+
except BaseException as e:
183+
return LocalMaterializationJob(
184+
job_id=job_id, status=MaterializationJobStatus.ERROR, error=e
185+
)

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@
2929
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
3030
get_pyarrow_schema_from_batch_source,
3131
)
32-
from feast.infra.provider import (
33-
_get_requested_feature_views_to_features_dict,
34-
_run_dask_field_mapping,
35-
)
3632
from feast.registry import BaseRegistry
3733
from feast.repo_config import FeastConfigBaseModel, RepoConfig
3834
from feast.saved_dataset import SavedDatasetStorage
3935
from feast.usage import log_exceptions_and_usage
36+
from feast.utils import (
37+
_get_requested_feature_views_to_features_dict,
38+
_run_dask_field_mapping,
39+
)
4040

4141

4242
class FileOfflineStoreConfig(FeastConfigBaseModel):

sdk/python/feast/infra/offline_stores/offline_utils.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
from feast.feature_view import FeatureView
1818
from feast.importer import import_class
1919
from feast.infra.offline_stores.offline_store import OfflineStore
20-
from feast.infra.provider import _get_requested_feature_views_to_features_dict
2120
from feast.registry import BaseRegistry
2221
from feast.repo_config import RepoConfig
2322
from feast.type_map import feast_value_type_to_pa
24-
from feast.utils import to_naive_utc
23+
from feast.utils import _get_requested_feature_views_to_features_dict, to_naive_utc
2524

2625
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"
2726

0 commit comments

Comments
 (0)