Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 181 additions & 149 deletions docs/getting-started/concepts/feature-retrieval.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions docs/how-to-guides/running-feast-in-production.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ This supports pushing feature values into Feast to both online or offline stores
## 3. How to use Feast for model training

### 3.1. Generating training data
> For more details, see [feature retrieval](../getting-started/concepts/feature-retrieval.md#retrieving-historical-features-for-training-data-or-batch-scoring)

After we've defined our features and data sources in the repository, we can generate training datasets. We highly recommend you use a `FeatureService` to version the features that go into a specific model version.

1. The first thing we need to do in our training code is to create a `FeatureStore` object with a path to the registry.
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,17 @@ def __init__(self, features: Any):
super().__init__(
f"Invalid `features` parameter type {type(features)}. Expected one of List[str] and FeatureService."
)


class EntitySQLEmptyResults(Exception):
def __init__(self, entity_sql: str):
super().__init__(
f"No entity values found from the specified SQL query to generate the entity dataframe: {entity_sql}."
)


class EntityDFNotDateTime(Exception):
def __init__(self):
super().__init__(
"The entity dataframe specified does not have the timestamp field as a datetime."
)
9 changes: 9 additions & 0 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from feast.errors import (
BigQueryJobCancelled,
BigQueryJobStillRunning,
EntityDFNotDateTime,
EntitySQLEmptyResults,
FeastProviderLoginError,
InvalidEntityType,
)
Expand Down Expand Up @@ -665,6 +667,13 @@ def _get_entity_df_event_timestamp_range(
res.get("min"),
res.get("max"),
)
if (
entity_df_event_timestamp_range[0] is None
or entity_df_event_timestamp_range[1] is None
):
raise EntitySQLEmptyResults(entity_df)
if type(entity_df_event_timestamp_range[0]) != datetime:
raise EntityDFNotDateTime()
elif isinstance(entity_df, pd.DataFrame):
entity_df_event_timestamp = entity_df.loc[
:, entity_df_event_timestamp_col
Expand Down
7 changes: 6 additions & 1 deletion sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from feast import OnDemandFeatureView
from feast.data_source import DataSource
from feast.errors import InvalidEntityType
from feast.errors import EntitySQLEmptyResults, InvalidEntityType
from feast.feature_logging import LoggingConfig, LoggingSource
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView
from feast.infra.offline_stores import offline_utils
Expand Down Expand Up @@ -574,6 +574,11 @@ def _get_entity_df_event_timestamp_range(
results = execute_snowflake_statement(snowflake_conn, query).fetchall()

entity_df_event_timestamp_range = cast(Tuple[datetime, datetime], results[0])
if (
entity_df_event_timestamp_range[0] is None
or entity_df_event_timestamp_range[1] is None
):
raise EntitySQLEmptyResults(entity_df)
else:
raise InvalidEntityType(type(entity_df))

Expand Down
59 changes: 56 additions & 3 deletions sdk/python/feast/templates/aws/feature_repo/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import random
import subprocess
from datetime import datetime
from datetime import datetime, timedelta

import pandas as pd
from pytz import utc

from feast import FeatureStore
from feast.data_source import PushMode
Expand All @@ -18,6 +20,16 @@ def run_demo():
print("\n--- Historical features for batch scoring ---")
fetch_historical_features_entity_df(store, for_batch_scoring=True)

print(
"\n--- Historical features for training (all entities in a window using SQL entity dataframe) ---"
)
fetch_historical_features_entity_sql(store, for_batch_scoring=False)

print(
"\n--- Historical features for batch scoring (all entities in a window using SQL entity dataframe) ---"
)
fetch_historical_features_entity_sql(store, for_batch_scoring=True)

print("\n--- Load features into online store ---")
store.materialize_incremental(end_date=datetime.now())

Expand All @@ -43,8 +55,8 @@ def run_demo():
datetime.now(),
],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000],
"acc_rate": [1.0 + random.random()],
"avg_daily_trips": [int(1000 * random.random())],
}
)
print(event_df)
Expand All @@ -57,6 +69,47 @@ def run_demo():
subprocess.run(["feast", "teardown"])


def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring):
end_date = (
datetime.now().replace(microsecond=0, second=0, minute=0).astimezone(tz=utc)
)
start_date = (end_date - timedelta(days=60)).astimezone(tz=utc)
# For batch scoring, we want the latest timestamps
if for_batch_scoring:
print(
"Generating a list of all unique entities in a time window for batch scoring"
)
# We use a group by since we want all distinct driver_ids.
entity_sql = f"""
SELECT
driver_id,
GETDATE() as event_timestamp
FROM {store.get_data_source("feast_driver_hourly_stats").get_table_query_string()}
WHERE event_timestamp BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
GROUP BY driver_id
"""
else:
print("Generating training data for all entities in a time window")
# We don't need a group by if we want to generate training data
entity_sql = f"""
SELECT
driver_id,
event_timestamp
FROM {store.get_data_source("feast_driver_hourly_stats").get_table_query_string()}
WHERE event_timestamp BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
"""

training_df = store.get_historical_features(
entity_df=entity_sql,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
).to_df()
print(training_df.head())


def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
# for all entities in the offline store instead
Expand Down
47 changes: 47 additions & 0 deletions sdk/python/feast/templates/gcp/feature_repo/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ def run_demo():
print("\n--- Historical features for batch scoring ---")
fetch_historical_features_entity_df(store, for_batch_scoring=True)

print(
"\n--- Historical features for training (all entities in a window using SQL entity dataframe) ---"
)
fetch_historical_features_entity_sql(store, for_batch_scoring=False)

print(
"\n--- Historical features for batch scoring (all entities in a window using SQL entity dataframe) ---"
)
fetch_historical_features_entity_sql(store, for_batch_scoring=True)

print("\n--- Load features into online store ---")
store.materialize_incremental(end_date=datetime.now())

Expand Down Expand Up @@ -60,6 +70,43 @@ def run_demo():
subprocess.run(["feast", "teardown"])


def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring):
# For batch scoring, we want the latest timestamps
if for_batch_scoring:
print(
"Generating a list of all unique entities in a time window for batch scoring"
)
# We use a group by since we want all distinct driver_ids.
entity_sql = f"""
SELECT
driver_id,
CURRENT_TIMESTAMP() as event_timestamp
FROM {store.get_data_source("driver_hourly_stats_source").get_table_query_string()}
WHERE event_timestamp BETWEEN '2021-01-01' and '2021-12-31'
GROUP BY driver_id
"""
else:
print("Generating training data for all entities in a time window")
# We don't need a group by if we want to generate training data
entity_sql = f"""
SELECT
driver_id,
event_timestamp
FROM {store.get_data_source("driver_hourly_stats_source").get_table_query_string()}
WHERE event_timestamp BETWEEN '2021-01-01' and '2021-12-31'
"""

training_df = store.get_historical_features(
entity_df=entity_sql,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
).to_df()
print(training_df.head())


def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
# for all entities in the offline store instead
Expand Down
59 changes: 56 additions & 3 deletions sdk/python/feast/templates/snowflake/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import random
import subprocess
from datetime import datetime
from datetime import datetime, timedelta

import pandas as pd
from pytz import utc

from feast import FeatureStore
from feast.data_source import PushMode
Expand All @@ -19,6 +21,16 @@ def run_demo():
print("\n--- Historical features for batch scoring ---")
fetch_historical_features_entity_df(store, for_batch_scoring=True)

print(
"\n--- Historical features for training (all entities in a window using SQL entity dataframe) ---"
)
fetch_historical_features_entity_sql(store, for_batch_scoring=False)

print(
"\n--- Historical features for batch scoring (all entities in a window using SQL entity dataframe) ---"
)
fetch_historical_features_entity_sql(store, for_batch_scoring=True)

print("\n--- Load features into online store ---")
store.materialize_incremental(end_date=datetime.now())

Expand All @@ -44,8 +56,8 @@ def run_demo():
datetime.now(),
],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000],
"acc_rate": [1.0 + random.random()],
"avg_daily_trips": [int(1000 * random.random())],
}
)
print(event_df)
Expand All @@ -59,6 +71,47 @@ def run_demo():
subprocess.run(command, shell=True)


def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring):
end_date = (
datetime.now().replace(microsecond=0, second=0, minute=0).astimezone(tz=utc)
)
start_date = (end_date - timedelta(days=60)).astimezone(tz=utc)
# For batch scoring, we want the latest timestamps
if for_batch_scoring:
print(
"Generating a list of all unique entities in a time window for batch scoring"
)
# We use a group by since we want all distinct driver_ids.
entity_sql = f"""
SELECT
"driver_id",
CURRENT_TIMESTAMP() as "event_timestamp"
FROM {store.list_data_sources()[-1].get_table_query_string()}
WHERE "event_timestamp" BETWEEN '{start_date}' AND '{end_date}'
GROUP BY "driver_id"
"""
else:
print("Generating training data for all entities in a time window")
# We don't need a group by if we want to generate training data
entity_sql = f"""
SELECT
"driver_id",
"event_timestamp"
FROM {store.list_data_sources()[-1].get_table_query_string()}
WHERE "event_timestamp" BETWEEN '{start_date}' AND '{end_date}'
"""

training_df = store.get_historical_features(
entity_df=entity_sql,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
).to_df()
print(training_df.head())


def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
# for all entities in the offline store instead
Expand Down