Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
more api updates'
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jul 5, 2022
commit 5833556863f225bdde05c01ad1cd4e2d671bbcf0
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
Expand Down Expand Up @@ -29,6 +30,16 @@ class MaterializationTask:
tqdm_builder: Callable[[int], tqdm]


class MaterializationJobStatus(enum.Enum):
WAITING = 1
RUNNING = 2
AVAILABLE = 3
ERROR = 4
CANCELLING = 5
CANCELLED = 6
SUCCEEDED = 7


class MaterializationJob(ABC):
"""
MaterializationJob represents an ongoing or executed process that's materialization data as per the
Comment thread
achals marked this conversation as resolved.
Outdated
Expand All @@ -38,7 +49,11 @@ class MaterializationJob(ABC):
task: MaterializationTask

@abstractmethod
def status(self) -> str:
def status(self) -> MaterializationJobStatus:
...

@abstractmethod
def error(self) -> Optional[BaseException]:
...

@abstractmethod
Expand Down
100 changes: 57 additions & 43 deletions sdk/python/feast/infra/materialization/local_engine.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union
from typing import Callable, List, Literal, Optional, Sequence, Union

import dask.dataframe as dd
import pandas as pd
import pyarrow as pa
from tqdm import tqdm

from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.feature_view import DUMMY_ENTITY_ID, FeatureView
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView
from feast.type_map import python_values_to_proto_values
from feast.value_type import ValueType

from ...registry import BaseRegistry
from ...utils import (
Expand All @@ -27,6 +21,7 @@
from .batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationJobStatus,
MaterializationTask,
)

Expand All @@ -40,13 +35,24 @@ class LocalMaterializationEngineConfig(FeastConfigBaseModel):
""" Type selector"""


@dataclass
class LocalMaterializationJob(MaterializationJob):
def __init__(self, job_id: str) -> None:
def __init__(
self,
job_id: str,
status: MaterializationJobStatus,
error: Optional[BaseException] = None,
) -> None:
super().__init__()
self._job_id: str = job_id
Comment thread
achals marked this conversation as resolved.
self._status: MaterializationJobStatus = status
self._error: Optional[BaseException] = error

def status(self) -> MaterializationJobStatus:
return self._status

def status(self) -> str:
return "success"
def error(self) -> Optional[BaseException]:
return self._error

def should_be_retried(self) -> bool:
return False
Expand Down Expand Up @@ -133,39 +139,47 @@ def _materialize_one(
created_timestamp_column,
) = _get_column_names(feature_view, entities)

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

table = offline_job.to_arrow()
job_id = f"{feature_view.name}-{start_date}-{end_date}"

if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
try:
offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}
table = offline_job.to_arrow()

with tqdm_builder(table.num_rows) as pbar:
for batch in table.to_batches(DEFAULT_BATCH_SIZE):
rows_to_write = _convert_arrow_to_proto(
batch, feature_view, join_key_to_value_type
if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)
self.online_store.online_write_batch(
self.repo_config,
feature_view,
rows_to_write,
lambda x: pbar.update(x),
)
job_id = f"{feature_view.name}-{start_date}-{end_date}"
return LocalMaterializationJob(job_id=job_id)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

with tqdm_builder(table.num_rows) as pbar:
for batch in table.to_batches(DEFAULT_BATCH_SIZE):
rows_to_write = _convert_arrow_to_proto(
batch, feature_view, join_key_to_value_type
)
self.online_store.online_write_batch(
self.repo_config,
feature_view,
rows_to_write,
lambda x: pbar.update(x),
)
return LocalMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
)
except BaseException as e:
return LocalMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.ERROR, error=e
)
10 changes: 9 additions & 1 deletion sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from feast.feature_service import FeatureService
from feast.feature_view import FeatureView
from feast.infra.materialization import BatchMaterializationEngine, MaterializationTask
from feast.infra.materialization.batch_materialization_engine import (
MaterializationJobStatus,
)
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
Expand Down Expand Up @@ -202,7 +205,12 @@ def materialize_single_feature_view(
end_time=end_date,
tqdm_builder=tqdm_builder,
)
self.batch_engine.materialize(registry, [task])
jobs = self.batch_engine.materialize(registry, [task])
assert len(jobs) == 1
if jobs[0].status() == MaterializationJobStatus.ERROR and jobs[0].error():
e = jobs[0].error()
assert e
raise e

def get_historical_features(
self,
Expand Down