Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use RetrievalJob instead of creating a new OfflineJob object
Signed-off-by: Matt Delacour <matt.delacour@shopify.com>
  • Loading branch information
Matt Delacour committed Jun 28, 2021
commit fa5551bade595f2b5a84e49de4a530ec34721ab1
27 changes: 7 additions & 20 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
from feast.data_source import BigQuerySource, DataSource
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import (
OfflineJob,
OfflineStore,
RetrievalJob,
)
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.provider import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
_get_requested_feature_views_to_features_dict,
Expand Down Expand Up @@ -63,7 +59,7 @@ def pull_latest_from_table_or_query(
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> OfflineJob:
) -> RetrievalJob:
assert isinstance(data_source, BigQuerySource)
from_expression = data_source.get_table_query_string()

Expand All @@ -78,6 +74,7 @@ def pull_latest_from_table_or_query(
timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
field_string = ", ".join(join_key_columns + feature_name_columns + timestamps)

client = _get_bigquery_client(project=config.offline_store.project_id)
query = f"""
SELECT {field_string}
FROM (
Expand All @@ -88,8 +85,7 @@ def pull_latest_from_table_or_query(
)
WHERE _feast_row = 1
"""

return BigQueryOfflineJob(query=query, config=config)
return BigQueryRetrievalJob(query=query, client=client, config=config)

@staticmethod
def get_historical_features(
Expand Down Expand Up @@ -225,15 +221,6 @@ def _infer_event_timestamp_from_dataframe(entity_df: pandas.DataFrame) -> str:
)


class BigQueryOfflineJob(OfflineJob):
def __init__(self, query: str, config: RepoConfig):
self.query = query
self.client = _get_bigquery_client(project=config.offline_store.project_id)

def to_table(self) -> pyarrow.Table:
return self.client.query(self.query).to_arrow()


class BigQueryRetrievalJob(RetrievalJob):
def __init__(self, query, client, config):
self.query = query
Expand Down Expand Up @@ -269,9 +256,6 @@ def _block_until_done():
if not job_config:
today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
dataset_project = (
self.config.offline_store.project_id or self.client.project
)
path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path)

Expand All @@ -291,6 +275,9 @@ def _block_until_done():
print(f"Done writing to '{job_config.destination}'.")
return str(job_config.destination)

def to_table(self) -> pyarrow.Table:
return self.client.query(self.query).to_arrow()


@dataclass(frozen=True)
class FeatureViewQueryContext:
Expand Down
30 changes: 9 additions & 21 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@
from feast.data_source import DataSource, FileSource
from feast.errors import FeastJoinKeysDuringMaterialization
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import (
OfflineJob,
OfflineStore,
RetrievalJob,
)
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.provider import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
_get_requested_feature_views_to_features_dict,
Expand All @@ -30,19 +26,6 @@ class FileOfflineStoreConfig(FeastConfigBaseModel):
""" Offline store type selector"""


class FileOfflineJob(OfflineJob):
def __init__(self, evaluation_function: Callable):
"""Initialize a lazy historical retrieval job"""

# The evaluation function executes a stored procedure to compute a historical retrieval.
self.evaluation_function = evaluation_function

def to_table(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return pyarrow.Table.from_pandas(df)


class FileRetrievalJob(RetrievalJob):
def __init__(self, evaluation_function: Callable):
"""Initialize a lazy historical retrieval job"""
Expand All @@ -55,6 +38,11 @@ def to_df(self):
df = self.evaluation_function()
return df

def to_table(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return pyarrow.Table.from_pandas(df)


class FileOfflineStore(OfflineStore):
@staticmethod
Expand All @@ -65,7 +53,7 @@ def get_historical_features(
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
) -> FileRetrievalJob:
) -> RetrievalJob:
if not isinstance(entity_df, pd.DataFrame):
raise ValueError(
f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}"
Expand Down Expand Up @@ -230,7 +218,7 @@ def pull_latest_from_table_or_query(
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> FileOfflineJob:
) -> RetrievalJob:
assert isinstance(data_source, FileSource)

# Create lazy function that is only called from the RetrievalJob object
Expand Down Expand Up @@ -274,4 +262,4 @@ def evaluate_offline_job():
)
return last_values_df[columns_to_extract]

return FileOfflineJob(evaluation_function=evaluate_offline_job)
return FileRetrievalJob(evaluation_function=evaluate_offline_job)
10 changes: 3 additions & 7 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,13 @@ class RetrievalJob(ABC):
"""RetrievalJob is used to manage the execution of a historical feature retrieval"""

@abstractmethod
def to_df(self):
def to_df(self) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously"""
pass


class OfflineJob(ABC):
"""OfflineJob is used to manage the execution of a specific logic of the offline store"""

@abstractmethod
def to_table(self) -> pyarrow.Table:
"""Return dataset as Pandas DataFrame synchronously"""
"""Return dataset as pyarrow Table synchronously"""
pass


Expand All @@ -59,7 +55,7 @@ def pull_latest_from_table_or_query(
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> OfflineJob:
) -> RetrievalJob:
"""
Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column
have all already been mapped to column names of the source table and those column names are the values passed
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import List, Optional, Union

import pandas as pd
import pyarrow
from pydantic import StrictStr
from pydantic.typing import Literal

Expand Down Expand Up @@ -38,14 +37,15 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel):
class RedshiftOfflineStore(OfflineStore):
@staticmethod
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> pyarrow.Table:
) -> RetrievalJob:
pass

@staticmethod
Expand Down