diff --git a/docs/getting-started/concepts/feature-retrieval.md b/docs/getting-started/concepts/feature-retrieval.md index f4462d06900..fd216fc71f5 100644 --- a/docs/getting-started/concepts/feature-retrieval.md +++ b/docs/getting-started/concepts/feature-retrieval.md @@ -17,41 +17,112 @@ Each of these retrieval mechanisms accept: Before beginning, you need to instantiate a local `FeatureStore` object that knows how to parse the registry (see [more details](https://docs.feast.dev/getting-started/concepts/registry)) -
+For code examples of how the below work, inspect the generated repository from `feast init -t [YOUR TEMPLATE]` (`gcp`, `snowflake`, and `aws` are the most fully fleshed). -How to: generate training data +## Concepts +Before diving into how to retrieve features, we need to understand some high level concepts in Feast. -Feast abstracts away point-in-time join complexities with the `get_historical_features` API. +### Feature Services -It expects an **entity dataframe (or SQL query to retrieve a list of entities)** and a **list of feature references (or a feature service)** +A feature service is an object that represents a logical group of features from one or more [feature views](feature-view.md#feature-view). Feature Services allows features from within a feature view to be used as needed by an ML model. Users can expect to create one feature service per model version, allowing for tracking of the features used by models. -#### **Option 1: using feature references (to pick individual features when exploring data)** +{% tabs %} +{% tab title="driver_trips_feature_service.py" %} +```python +from driver_ratings_feature_view import driver_ratings_fv +from driver_trips_feature_view import driver_stats_fv + +driver_stats_fs = FeatureService( + name="driver_activity", + features=[driver_stats_fv, driver_ratings_fv[["lifetime_rating"]]] +) +``` +{% endtab %} +{% endtabs %} + +Feature services are used during + +* The generation of training datasets when querying feature views in order to find historical feature values. A single training dataset may consist of features from multiple feature views. +* Retrieval of features for batch scoring from the offline store (e.g. with an entity dataframe where all timestamps are `now()`) +* Retrieval of features from the online store for online inference (with smaller batch sizes). The features retrieved from the online store may also belong to multiple feature views. + +{% hint style="info" %} +Applying a feature service does not result in an actual service being deployed. +{% endhint %} + +Feature services enable referencing all or some features from a feature view. + +Retrieving from the online store with a feature service ```python -entity_df = pd.DataFrame.from_dict( - { - "driver_id": [1001, 1002, 1003, 1004, 1001], - "event_timestamp": [ - datetime(2021, 4, 12, 10, 59, 42), - datetime(2021, 4, 12, 8, 12, 10), - datetime(2021, 4, 12, 16, 40, 26), - datetime(2021, 4, 12, 15, 1, 12), - datetime.now() - ] - } +from feast import FeatureStore +feature_store = FeatureStore('.') # Initialize the feature store + +feature_service = feature_store.get_feature_service("driver_activity") +features = feature_store.get_online_features( + features=feature_service, entity_rows=[entity_dict] ) -training_df = store.get_historical_features( - entity_df=entity_df, +``` + +Retrieving from the offline store with a feature service + +```python +from feast import FeatureStore +feature_store = FeatureStore('.') # Initialize the feature store + +feature_service = feature_store.get_feature_service("driver_activity") +feature_store.get_historical_features(features=feature_service, entity_df=entity_df) +``` + +### Feature References + +This mechanism of retrieving features is only recommended as you're experimenting. Once you want to launch experiments or serve models, feature services are recommended. + +Feature references uniquely identify feature values in Feast. The structure of a feature reference in string form is as follows: `:` + +Feature references are used for the retrieval of features from Feast: + +```python +online_features = fs.get_online_features( features=[ - "driver_hourly_stats:conv_rate", - "driver_hourly_stats:acc_rate", - "driver_daily_features:daily_miles_driven" + 'driver_locations:lon', + 'drivers_activity:trips_today' ], -).to_df() -print(training_df.head()) + entity_rows=[ + # {join_key: entity_value} + {'driver': 'driver_1001'} + ] +) ``` -#### Option 2: using feature services (to version models) +It is possible to retrieve features from multiple feature views with a single request, and Feast is able to join features from multiple tables in order to build a training dataset. However, it is not possible to reference (or retrieve) features from multiple projects at the same time. + +{% hint style="info" %} +Note, if you're using [Feature views without entities](feature-view.md#feature-views-without-entities), then those features can be added here without additional entity values in the `entity_rows` parameter. +{% endhint %} + +### Event timestamp + +The timestamp on which an event occurred, as found in a feature view's data source. The event timestamp describes the event time at which a feature was observed or generated. + +Event timestamps are used during point-in-time joins to ensure that the latest feature values are joined from feature views onto entity rows. Event timestamps are also used to ensure that old feature values aren't served to models during online serving. + +### Dataset + +A dataset is a collection of rows that is produced by a historical retrieval from Feast in order to train a model. A dataset is produced by a join from one or more feature views onto an entity dataframe. Therefore, a dataset may consist of features from multiple feature views. + +**Dataset vs Feature View:** Feature views contain the schema of data and a reference to where data can be found (through its data source). Datasets are the actual data manifestation of querying those data sources. + +**Dataset vs Data Source:** Datasets are the output of historical retrieval, whereas data sources are the inputs. One or more data sources can be used in the creation of a dataset. + +## Retrieving historical features (for training data or batch scoring) +Feast abstracts away point-in-time join complexities with the `get_historical_features` API. + +We go through the major steps, and also show example code. Note that the quickstart templates generally have end-to-end working examples for all these cases. + +
+ +Full example: generate training data ```python entity_df = pd.DataFrame.from_dict( @@ -77,60 +148,118 @@ print(training_df.head())
-How to: retrieve offline features for batch scoring +Full example: retrieve offline features for batch scoring The main difference here compared to training data generation is how to handle timestamps in the entity dataframe. You want to pass in the **current time** to get the latest feature values for all your entities. -#### Option 1: fetching features with entity dataframe - ```python from feast import FeatureStore -import pandas as pd store = FeatureStore(repo_path=".") # Get the latest feature values for unique entities -entity_df = pd.DataFrame.from_dict({"driver_id": [1001, 1002, 1003, 1004, 1005],}) -entity_df["event_timestamp"] = pd.to_datetime("now", utc=True) +entity_sql = f""" + SELECT + driver_id, + CURRENT_TIMESTAMP() as event_timestamp + FROM {store.get_data_source("driver_hourly_stats_source").get_table_query_string()} + WHERE event_timestamp BETWEEN '2021-01-01' and '2021-12-31' + GROUP BY driver_id +""" batch_scoring_features = store.get_historical_features( - entity_df=entity_df, features=store.get_feature_service("model_v2"), + entity_df=entity_sql, + features=store.get_feature_service("model_v2"), ).to_df() # predictions = model.predict(batch_scoring_features) ``` -#### Option 2: fetching features using a SQL query to generate entities +
-```python -from feast import FeatureStore +### Step 1: Specifying Features +Feast accepts either: +- [feature services](feature-retrieval.md#feature-services), which group features needed for a model version +- [feature references](feature-retrieval.md#feature-references) -store = FeatureStore(repo_path=".") +### Example: querying a feature service (recommended) +```python +training_df = store.get_historical_features( + entity_df=entity_df, + features=store.get_feature_service("model_v1"), +).to_df() +``` -# Get the latest feature values for unique entities -batch_scoring_features = store.get_historical_features( - entity_df=""" - SELECT - user_id, - CURRENT_TIME() as event_timestamp - FROM entity_source_table - WHERE user_last_active_time BETWEEN '2019-01-01' and '2020-12-31' - GROUP BY user_id - """ - , - features=store.get_feature_service("model_v2"), +### Example: querying a list of feature references +```python +training_df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_daily_features:daily_miles_driven" + ], ).to_df() -# predictions = model.predict(batch_scoring_features) ``` +### Step 2: Specifying Entities +Feast accepts either a **Pandas dataframe** as the entity dataframe (including entity keys and timestamps) or a **SQL query** to generate the entities. -
+Both approaches must specify the full **entity key** needed as well as the **timestamps**. Feast then joins features onto this dataframe. -
+### Example: entity dataframe for generating training data +```python +entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002, 1003, 1004, 1001], + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + datetime(2021, 4, 12, 15, 1, 12), + datetime.now() + ] + } +) +training_df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_daily_features:daily_miles_driven" + ], +).to_df() +``` -How to: retrieve online features for real-time model inference (Python SDK) +### Example: entity SQL query for generating training data +You can also pass a SQL string to generate the above dataframe. This is useful for getting all entities in a timeframe from some data source. + +```python +entity_sql = f""" + SELECT + driver_id, + event_timestamp + FROM {store.get_data_source("driver_hourly_stats_source").get_table_query_string()} + WHERE event_timestamp BETWEEN '2021-01-01' and '2021-12-31' +""" +training_df = store.get_historical_features( + entity_df=entity_sql, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_daily_features:daily_miles_driven" + ], +).to_df() +``` +## Retrieving online features (for model inference) Feast will ensure the latest feature values for registered features are available. At retrieval time, you need to supply a list of **entities** and the corresponding **features** to be retrieved. Similar to `get_historical_features`, we recommend using feature services as a mechanism for grouping features in a model version. _Note: unlike `get_historical_features`, the `entity_rows` **do not need timestamps** since you only want one feature value per entity key._ +There are several options for retrieving online features: through the SDK, or through a feature server + +
+ +Full example: retrieve online features for real-time model inference (Python SDK) + ```python from feast import RepoConfig, FeatureStore from feast.repo_config import RegistryConfig @@ -160,11 +289,7 @@ features = store.get_online_features(
-How to: retrieve online features for real-time model inference (Feature Server) - -Feast will ensure the latest feature values for registered features are available. At retrieval time, you need to supply a list of **entities** and the corresponding **features** to be retrieved. Similar to `get_historical_features`, we recommend using feature services as a mechanism for grouping features in a model version. - -_Note: unlike `get_historical_features`, the `entity_rows` **do not need timestamps** since you only want one feature value per entity key._ +Full example: retrieve online features for real-time model inference (Feature Server) This approach requires you to deploy a feature server (see [Python feature server](../../reference/feature-servers/python-feature-server)). @@ -183,96 +308,3 @@ print(json.dumps(r.json(), indent=4, sort_keys=True)) ```
- -## Feature Services - -A feature service is an object that represents a logical group of features from one or more [feature views](feature-view.md#feature-view). Feature Services allows features from within a feature view to be used as needed by an ML model. Users can expect to create one feature service per model version, allowing for tracking of the features used by models. - -{% tabs %} -{% tab title="driver_trips_feature_service.py" %} -```python -from driver_ratings_feature_view import driver_ratings_fv -from driver_trips_feature_view import driver_stats_fv - -driver_stats_fs = FeatureService( - name="driver_activity", - features=[driver_stats_fv, driver_ratings_fv[["lifetime_rating"]]] -) -``` -{% endtab %} -{% endtabs %} - -Feature services are used during - -* The generation of training datasets when querying feature views in order to find historical feature values. A single training dataset may consist of features from multiple feature views. -* Retrieval of features for batch scoring from the offline store (e.g. with an entity dataframe where all timestamps are `now()`) -* Retrieval of features from the online store for online inference (with smaller batch sizes). The features retrieved from the online store may also belong to multiple feature views. - -{% hint style="info" %} -Applying a feature service does not result in an actual service being deployed. -{% endhint %} - -Feature services enable referencing all or some features from a feature view. - -Retrieving from the online store with a feature service - -```python -from feast import FeatureStore -feature_store = FeatureStore('.') # Initialize the feature store - -feature_service = feature_store.get_feature_service("driver_activity") -features = feature_store.get_online_features( - features=feature_service, entity_rows=[entity_dict] -) -``` - -Retrieving from the offline store with a feature service - -```python -from feast import FeatureStore -feature_store = FeatureStore('.') # Initialize the feature store - -feature_service = feature_store.get_feature_service("driver_activity") -feature_store.get_historical_features(features=feature_service, entity_df=entity_df) -``` - -## Feature References - -This mechanism of retrieving features is only recommended as you're experimenting. Once you want to launch experiments or serve models, feature services are recommended. - -Feature references uniquely identify feature values in Feast. The structure of a feature reference in string form is as follows: `:` - -Feature references are used for the retrieval of features from Feast: - -```python -online_features = fs.get_online_features( - features=[ - 'driver_locations:lon', - 'drivers_activity:trips_today' - ], - entity_rows=[ - # {join_key: entity_value} - {'driver': 'driver_1001'} - ] -) -``` - -It is possible to retrieve features from multiple feature views with a single request, and Feast is able to join features from multiple tables in order to build a training dataset. However, it is not possible to reference (or retrieve) features from multiple projects at the same time. - -{% hint style="info" %} -Note, if you're using [Feature views without entities](feature-view.md#feature-views-without-entities), then those features can be added here without additional entity values in the `entity_rows` parameter. -{% endhint %} - -## Event timestamp - -The timestamp on which an event occurred, as found in a feature view's data source. The event timestamp describes the event time at which a feature was observed or generated. - -Event timestamps are used during point-in-time joins to ensure that the latest feature values are joined from feature views onto entity rows. Event timestamps are also used to ensure that old feature values aren't served to models during online serving. - -## Dataset - -A dataset is a collection of rows that is produced by a historical retrieval from Feast in order to train a model. A dataset is produced by a join from one or more feature views onto an entity dataframe. Therefore, a dataset may consist of features from multiple feature views. - -**Dataset vs Feature View:** Feature views contain the schema of data and a reference to where data can be found (through its data source). Datasets are the actual data manifestation of querying those data sources. - -**Dataset vs Data Source:** Datasets are the output of historical retrieval, whereas data sources are the inputs. One or more data sources can be used in the creation of a dataset. diff --git a/docs/how-to-guides/running-feast-in-production.md b/docs/how-to-guides/running-feast-in-production.md index 95f3e99581e..ef903b68c4b 100644 --- a/docs/how-to-guides/running-feast-in-production.md +++ b/docs/how-to-guides/running-feast-in-production.md @@ -132,6 +132,8 @@ This supports pushing feature values into Feast to both online or offline stores ## 3. How to use Feast for model training ### 3.1. Generating training data +> For more details, see [feature retrieval](../getting-started/concepts/feature-retrieval.md#retrieving-historical-features-for-training-data-or-batch-scoring) + After we've defined our features and data sources in the repository, we can generate training datasets. We highly recommend you use a `FeatureService` to version the features that go into a specific model version. 1. The first thing we need to do in our training code is to create a `FeatureStore` object with a path to the registry. diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 15fda6ac7d0..834df0e5c48 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -384,3 +384,17 @@ def __init__(self, features: Any): super().__init__( f"Invalid `features` parameter type {type(features)}. Expected one of List[str] and FeatureService." ) + + +class EntitySQLEmptyResults(Exception): + def __init__(self, entity_sql: str): + super().__init__( + f"No entity values found from the specified SQL query to generate the entity dataframe: {entity_sql}." + ) + + +class EntityDFNotDateTime(Exception): + def __init__(self): + super().__init__( + "The entity dataframe specified does not have the timestamp field as a datetime." + ) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 8b2773fb657..c570b8d38ab 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -28,6 +28,8 @@ from feast.errors import ( BigQueryJobCancelled, BigQueryJobStillRunning, + EntityDFNotDateTime, + EntitySQLEmptyResults, FeastProviderLoginError, InvalidEntityType, ) @@ -665,6 +667,13 @@ def _get_entity_df_event_timestamp_range( res.get("min"), res.get("max"), ) + if ( + entity_df_event_timestamp_range[0] is None + or entity_df_event_timestamp_range[1] is None + ): + raise EntitySQLEmptyResults(entity_df) + if type(entity_df_event_timestamp_range[0]) != datetime: + raise EntityDFNotDateTime() elif isinstance(entity_df, pd.DataFrame): entity_df_event_timestamp = entity_df.loc[ :, entity_df_event_timestamp_col diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index c835eb939ff..2d621de50ff 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -25,7 +25,7 @@ from feast import OnDemandFeatureView from feast.data_source import DataSource -from feast.errors import InvalidEntityType +from feast.errors import EntitySQLEmptyResults, InvalidEntityType from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView from feast.infra.offline_stores import offline_utils @@ -574,6 +574,11 @@ def _get_entity_df_event_timestamp_range( results = execute_snowflake_statement(snowflake_conn, query).fetchall() entity_df_event_timestamp_range = cast(Tuple[datetime, datetime], results[0]) + if ( + entity_df_event_timestamp_range[0] is None + or entity_df_event_timestamp_range[1] is None + ): + raise EntitySQLEmptyResults(entity_df) else: raise InvalidEntityType(type(entity_df)) diff --git a/sdk/python/feast/templates/aws/feature_repo/test_workflow.py b/sdk/python/feast/templates/aws/feature_repo/test_workflow.py index 494b1914f92..59ac1f0ee73 100644 --- a/sdk/python/feast/templates/aws/feature_repo/test_workflow.py +++ b/sdk/python/feast/templates/aws/feature_repo/test_workflow.py @@ -1,7 +1,9 @@ +import random import subprocess -from datetime import datetime +from datetime import datetime, timedelta import pandas as pd +from pytz import utc from feast import FeatureStore from feast.data_source import PushMode @@ -18,6 +20,16 @@ def run_demo(): print("\n--- Historical features for batch scoring ---") fetch_historical_features_entity_df(store, for_batch_scoring=True) + print( + "\n--- Historical features for training (all entities in a window using SQL entity dataframe) ---" + ) + fetch_historical_features_entity_sql(store, for_batch_scoring=False) + + print( + "\n--- Historical features for batch scoring (all entities in a window using SQL entity dataframe) ---" + ) + fetch_historical_features_entity_sql(store, for_batch_scoring=True) + print("\n--- Load features into online store ---") store.materialize_incremental(end_date=datetime.now()) @@ -43,8 +55,8 @@ def run_demo(): datetime.now(), ], "conv_rate": [1.0], - "acc_rate": [1.0], - "avg_daily_trips": [1000], + "acc_rate": [1.0 + random.random()], + "avg_daily_trips": [int(1000 * random.random())], } ) print(event_df) @@ -57,6 +69,47 @@ def run_demo(): subprocess.run(["feast", "teardown"]) +def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring): + end_date = ( + datetime.now().replace(microsecond=0, second=0, minute=0).astimezone(tz=utc) + ) + start_date = (end_date - timedelta(days=60)).astimezone(tz=utc) + # For batch scoring, we want the latest timestamps + if for_batch_scoring: + print( + "Generating a list of all unique entities in a time window for batch scoring" + ) + # We use a group by since we want all distinct driver_ids. + entity_sql = f""" + SELECT + driver_id, + GETDATE() as event_timestamp + FROM {store.get_data_source("feast_driver_hourly_stats").get_table_query_string()} + WHERE event_timestamp BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + GROUP BY driver_id + """ + else: + print("Generating training data for all entities in a time window") + # We don't need a group by if we want to generate training data + entity_sql = f""" + SELECT + driver_id, + event_timestamp + FROM {store.get_data_source("feast_driver_hourly_stats").get_table_query_string()} + WHERE event_timestamp BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}' + """ + + training_df = store.get_historical_features( + entity_df=entity_sql, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + ).to_df() + print(training_df.head()) + + def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool): # Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve # for all entities in the offline store instead diff --git a/sdk/python/feast/templates/gcp/feature_repo/test_workflow.py b/sdk/python/feast/templates/gcp/feature_repo/test_workflow.py index 51ce1eb05be..95ca080012b 100644 --- a/sdk/python/feast/templates/gcp/feature_repo/test_workflow.py +++ b/sdk/python/feast/templates/gcp/feature_repo/test_workflow.py @@ -19,6 +19,16 @@ def run_demo(): print("\n--- Historical features for batch scoring ---") fetch_historical_features_entity_df(store, for_batch_scoring=True) + print( + "\n--- Historical features for training (all entities in a window using SQL entity dataframe) ---" + ) + fetch_historical_features_entity_sql(store, for_batch_scoring=False) + + print( + "\n--- Historical features for batch scoring (all entities in a window using SQL entity dataframe) ---" + ) + fetch_historical_features_entity_sql(store, for_batch_scoring=True) + print("\n--- Load features into online store ---") store.materialize_incremental(end_date=datetime.now()) @@ -60,6 +70,43 @@ def run_demo(): subprocess.run(["feast", "teardown"]) +def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring): + # For batch scoring, we want the latest timestamps + if for_batch_scoring: + print( + "Generating a list of all unique entities in a time window for batch scoring" + ) + # We use a group by since we want all distinct driver_ids. + entity_sql = f""" + SELECT + driver_id, + CURRENT_TIMESTAMP() as event_timestamp + FROM {store.get_data_source("driver_hourly_stats_source").get_table_query_string()} + WHERE event_timestamp BETWEEN '2021-01-01' and '2021-12-31' + GROUP BY driver_id + """ + else: + print("Generating training data for all entities in a time window") + # We don't need a group by if we want to generate training data + entity_sql = f""" + SELECT + driver_id, + event_timestamp + FROM {store.get_data_source("driver_hourly_stats_source").get_table_query_string()} + WHERE event_timestamp BETWEEN '2021-01-01' and '2021-12-31' + """ + + training_df = store.get_historical_features( + entity_df=entity_sql, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + ).to_df() + print(training_df.head()) + + def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool): # Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve # for all entities in the offline store instead diff --git a/sdk/python/feast/templates/snowflake/test_workflow.py b/sdk/python/feast/templates/snowflake/test_workflow.py index adeed279af5..904d1e1f3e5 100644 --- a/sdk/python/feast/templates/snowflake/test_workflow.py +++ b/sdk/python/feast/templates/snowflake/test_workflow.py @@ -1,7 +1,9 @@ +import random import subprocess -from datetime import datetime +from datetime import datetime, timedelta import pandas as pd +from pytz import utc from feast import FeatureStore from feast.data_source import PushMode @@ -19,6 +21,16 @@ def run_demo(): print("\n--- Historical features for batch scoring ---") fetch_historical_features_entity_df(store, for_batch_scoring=True) + print( + "\n--- Historical features for training (all entities in a window using SQL entity dataframe) ---" + ) + fetch_historical_features_entity_sql(store, for_batch_scoring=False) + + print( + "\n--- Historical features for batch scoring (all entities in a window using SQL entity dataframe) ---" + ) + fetch_historical_features_entity_sql(store, for_batch_scoring=True) + print("\n--- Load features into online store ---") store.materialize_incremental(end_date=datetime.now()) @@ -44,8 +56,8 @@ def run_demo(): datetime.now(), ], "conv_rate": [1.0], - "acc_rate": [1.0], - "avg_daily_trips": [1000], + "acc_rate": [1.0 + random.random()], + "avg_daily_trips": [int(1000 * random.random())], } ) print(event_df) @@ -59,6 +71,47 @@ def run_demo(): subprocess.run(command, shell=True) +def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring): + end_date = ( + datetime.now().replace(microsecond=0, second=0, minute=0).astimezone(tz=utc) + ) + start_date = (end_date - timedelta(days=60)).astimezone(tz=utc) + # For batch scoring, we want the latest timestamps + if for_batch_scoring: + print( + "Generating a list of all unique entities in a time window for batch scoring" + ) + # We use a group by since we want all distinct driver_ids. + entity_sql = f""" + SELECT + "driver_id", + CURRENT_TIMESTAMP() as "event_timestamp" + FROM {store.list_data_sources()[-1].get_table_query_string()} + WHERE "event_timestamp" BETWEEN '{start_date}' AND '{end_date}' + GROUP BY "driver_id" + """ + else: + print("Generating training data for all entities in a time window") + # We don't need a group by if we want to generate training data + entity_sql = f""" + SELECT + "driver_id", + "event_timestamp" + FROM {store.list_data_sources()[-1].get_table_query_string()} + WHERE "event_timestamp" BETWEEN '{start_date}' AND '{end_date}' + """ + + training_df = store.get_historical_features( + entity_df=entity_sql, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + ).to_df() + print(training_df.head()) + + def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool): # Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve # for all entities in the offline store instead