diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 0b09f5df430..d8a16417837 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -37,7 +37,7 @@ class PassthroughProvider(Provider): """ - The Passthrough provider delegates all operations to the underlying online and offline stores. + The passthrough provider delegates all operations to the underlying online and offline stores. """ def __init__(self, config: RepoConfig): diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 086c9ec6b3d..e99a09a9e2e 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -1,4 +1,4 @@ -import abc +from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union @@ -27,12 +27,18 @@ } -class Provider(abc.ABC): - @abc.abstractmethod +class Provider(ABC): + """ + A provider defines an implementation of a feature store object. It orchestrates the various + components of a feature store, such as the offline store, online store, and materialization + engine. It is configured through a RepoConfig object. + """ + + @abstractmethod def __init__(self, config: RepoConfig): - ... + pass - @abc.abstractmethod + @abstractmethod def update_infra( self, project: str, @@ -43,22 +49,20 @@ def update_infra( partial: bool, ): """ - Reconcile cloud resources with the objects declared in the feature repo. + Reconciles cloud resources with the specified set of Feast objects. Args: - project: Project to which tables belong - tables_to_delete: Tables that were deleted from the feature repo, so provider needs to - clean up the corresponding cloud resources. - tables_to_keep: Tables that are still in the feature repo. Depending on implementation, - provider may or may not need to update the corresponding resources. - entities_to_delete: Entities that were deleted from the feature repo, so provider needs to - clean up the corresponding cloud resources. - entities_to_keep: Entities that are still in the feature repo. Depending on implementation, - provider may or may not need to update the corresponding resources. - partial: if true, then tables_to_delete and tables_to_keep are *not* exhaustive lists. - There may be other tables that are not touched by this update. + project: Feast project to which the objects belong. + tables_to_delete: Feature views whose corresponding infrastructure should be deleted. + tables_to_keep: Feature views whose corresponding infrastructure should not be deleted, and + may need to be updated. + entities_to_delete: Entities whose corresponding infrastructure should be deleted. + entities_to_keep: Entities whose corresponding infrastructure should not be deleted, and + may need to be updated. + partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so + infrastructure corresponding to other feature views should be not be touched. """ - ... + pass def plan_infra( self, config: RepoConfig, desired_registry_proto: RegistryProto @@ -72,7 +76,7 @@ def plan_infra( """ return Infra() - @abc.abstractmethod + @abstractmethod def teardown_infra( self, project: str, @@ -80,16 +84,16 @@ def teardown_infra( entities: Sequence[Entity], ): """ - Tear down all cloud resources for a repo. + Tears down all cloud resources for the specified set of Feast objects. Args: - project: Feast project to which tables belong - tables: Tables that are declared in the feature repo. - entities: Entities that are declared in the feature repo. + project: Feast project to which the objects belong. + tables: Feature views whose corresponding infrastructure should be deleted. + entities: Entities whose corresponding infrastructure should be deleted. """ - ... + pass - @abc.abstractmethod + @abstractmethod def online_write_batch( self, config: RepoConfig, @@ -100,21 +104,20 @@ def online_write_batch( progress: Optional[Callable[[int], Any]], ) -> None: """ - Write a batch of feature rows to the online store. This is a low level interface, not - expected to be used by the users directly. + Writes a batch of feature rows to the online store. If a tz-naive timestamp is passed to this method, it is assumed to be UTC. Args: - config: The RepoConfig for the current FeatureStore. - table: Feast FeatureView - data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key, - a dict containing feature values, an event timestamp for the row, and - the created timestamp for the row if it exists. - progress: Optional function to be called once every mini-batch of rows is written to - the online store. Can be used to display progress. + config: The config for the current feature store. + table: Feature view to which these feature rows correspond. + data: A list of quadruplets containing feature data. Each quadruplet contains an entity + key, a dict containing feature values, an event timestamp for the row, and the created + timestamp for the row if it exists. + progress: Function to be called once a batch of rows is written to the online store, used + to show progress. """ - ... + pass def ingest_df( self, @@ -123,7 +126,12 @@ def ingest_df( df: pd.DataFrame, ): """ - Ingests a DataFrame directly into the online store + Persists a dataframe to the online store. + + Args: + feature_view: The feature view to which the dataframe corresponds. + entities: The entities that are referenced by the dataframe. + df: The dataframe to be persisted. """ pass @@ -133,11 +141,15 @@ def ingest_df_to_offline_store( df: pyarrow.Table, ): """ - Ingests a DataFrame directly into the offline store + Persists a dataframe to the offline store. + + Args: + feature_view: The feature view to which the dataframe corresponds. + df: The dataframe to be persisted. """ pass - @abc.abstractmethod + @abstractmethod def materialize_single_feature_view( self, config: RepoConfig, @@ -148,9 +160,21 @@ def materialize_single_feature_view( project: str, tqdm_builder: Callable[[int], tqdm], ) -> None: + """ + Writes latest feature values in the specified time range to the online store. + + Args: + config: The config for the current feature store. + feature_view: The feature view to materialize. + start_date: The start of the time range. + end_date: The end of the time range. + registry: The registry for the current feature store. + project: Feast project to which the objects belong. + tqdm_builder: A function to monitor the progress of materialization. + """ pass - @abc.abstractmethod + @abstractmethod def get_historical_features( self, config: RepoConfig, @@ -161,9 +185,28 @@ def get_historical_features( project: str, full_feature_names: bool, ) -> RetrievalJob: + """ + Retrieves the point-in-time correct historical feature values for the specified entity rows. + + Args: + config: The config for the current feature store. + feature_views: A list containing all feature views that are referenced in the entity rows. + feature_refs: The features to be retrieved. + entity_df: A collection of rows containing all entity columns on which features need to be joined, + as well as the timestamp column used for point-in-time joins. Either a pandas dataframe can be + provided or a SQL query. + registry: The registry for the current feature store. + project: Feast project to which the feature views belong. + full_feature_names: If True, feature names will be prefixed with the corresponding feature view name, + changing them from the format "feature" to "feature_view__feature" (e.g. "daily_transactions" + changes to "customer_fv__daily_transactions"). + + Returns: + A RetrievalJob that can be executed to get the features. + """ pass - @abc.abstractmethod + @abstractmethod def online_read( self, config: RepoConfig, @@ -172,32 +215,38 @@ def online_read( requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ - Read feature values given an Entity Key. This is a low level interface, not - expected to be used by the users directly. + Reads features values for the given entity keys. + + Args: + config: The config for the current feature store. + table: The feature view whose feature values should be read. + entity_keys: The list of entity keys for which feature values should be read. + requested_features: The list of features that should be read. Returns: - Data is returned as a list, one item per entity key. Each item in the list is a tuple - of event_ts for the row, and the feature data as a dict from feature names to values. - Values are returned as Value proto message. + A list of the same length as entity_keys. Each item in the list is a tuple where the first + item is the event timestamp for the row, and the second item is a dict mapping feature names + to values, which are returned in proto format. """ - ... + pass - @abc.abstractmethod + @abstractmethod def retrieve_saved_dataset( self, config: RepoConfig, dataset: SavedDataset ) -> RetrievalJob: """ - Read saved dataset from offline store. - All parameters for retrieval (like path, datetime boundaries, column names for both keys and features, etc) - are determined from SavedDataset object. + Reads a saved dataset. - Returns: - RetrievalJob object, which is lazy wrapper for actual query performed under the hood. + Args: + config: The config for the current feature store. + dataset: A SavedDataset object containing all parameters necessary for retrieving the dataset. + Returns: + A RetrievalJob that can be executed to get the saved dataset. """ - ... + pass - @abc.abstractmethod + @abstractmethod def write_feature_service_logs( self, feature_service: FeatureService, @@ -206,16 +255,20 @@ def write_feature_service_logs( registry: BaseRegistry, ): """ - Write features and entities logged by a feature server to an offline store. + Writes features and entities logged by a feature server to the offline store. - Schema of logs table is being inferred from the provided feature service. - Only feature services with configured logging are accepted. + The schema of the logs table is inferred from the specified feature service. Only feature + services with configured logging are accepted. - Logs dataset can be passed as Arrow Table or path to parquet directory. + Args: + feature_service: The feature service to be logged. + logs: The logs, either as an arrow table or as a path to a parquet directory. + config: The config for the current feature store. + registry: The registry for the current feature store. """ - ... + pass - @abc.abstractmethod + @abstractmethod def retrieve_feature_service_logs( self, feature_service: FeatureService, @@ -225,14 +278,19 @@ def retrieve_feature_service_logs( registry: BaseRegistry, ) -> RetrievalJob: """ - Read logged features from an offline store for a given time window [from, to). - Target table is determined based on logging configuration from the feature service. + Reads logged features for the specified time window. - Returns: - RetrievalJob object, which wraps the query to the offline store. + Args: + feature_service: The feature service whose logs should be retrieved. + start_date: The start of the window. + end_date: The end of the window. + config: The config for the current feature store. + registry: The registry for the current feature store. + Returns: + A RetrievalJob that can be executed to get the feature service logs. """ - ... + pass def get_feature_server_endpoint(self) -> Optional[str]: """Returns endpoint for the feature server, if it exists."""