Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions sdk/python/feast/driver_test_data.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,31 @@
# This module generates dummy data to be used for tests and examples.
from enum import Enum

import numpy as np
import pandas as pd
from pytz import FixedOffset, timezone, utc

from feast.infra.provider import ENTITY_DF_EVENT_TIMESTAMP_COL


class EventTimestampType(Enum):
TZ_NAIVE = 0
TZ_AWARE_UTC = 1
TZ_AWARE_FIXED_OFFSET = 2
TZ_AWARE_US_PACIFIC = 3


def _convert_event_timestamp(event_timestamp: pd.Timestamp, t: EventTimestampType):
if t == EventTimestampType.TZ_NAIVE:
return event_timestamp
elif t == EventTimestampType.TZ_AWARE_UTC:
return event_timestamp.replace(tzinfo=utc)
elif t == EventTimestampType.TZ_AWARE_FIXED_OFFSET:
return event_timestamp.replace(tzinfo=utc).astimezone(FixedOffset(60))
elif t == EventTimestampType.TZ_AWARE_US_PACIFIC:
return event_timestamp.replace(tzinfo=utc).astimezone(timezone("US/Pacific"))


def create_orders_df(
customers, drivers, start_date, end_date, order_count
) -> pd.DataFrame:
Expand All @@ -23,9 +44,15 @@ def create_orders_df(
df["driver_id"] = np.random.choice(drivers, order_count)
df["customer_id"] = np.random.choice(customers, order_count)
df["order_is_success"] = np.random.randint(0, 2, size=order_count).astype(np.int32)

df[ENTITY_DF_EVENT_TIMESTAMP_COL] = [
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
for dt in pd.date_range(start=start_date, end=end_date, periods=order_count)
_convert_event_timestamp(
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
EventTimestampType(idx % 4),
)
for idx, dt in enumerate(
pd.date_range(start=start_date, end=end_date, periods=order_count)
)
]
df.sort_values(
by=[ENTITY_DF_EVENT_TIMESTAMP_COL, "order_id", "driver_id", "customer_id"],
Expand Down
16 changes: 13 additions & 3 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,20 @@ def evaluate_historical_retrieval():
entity_df[ENTITY_DF_EVENT_TIMESTAMP_COL] = entity_df[
ENTITY_DF_EVENT_TIMESTAMP_COL
].apply(lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc))
# Sort entity dataframe prior to join, and create a copy to prevent modifying the original
entity_df_with_features = entity_df.sort_values(

# Create a copy of entity_df to prevent modifying the original
entity_df_with_features = entity_df.copy()

# Convert event timestamp column to datetime and normalize time zone to UTC
# This is necessary to avoid issues with pd.merge_asof
entity_df_with_features[ENTITY_DF_EVENT_TIMESTAMP_COL] = pd.to_datetime(
entity_df_with_features[ENTITY_DF_EVENT_TIMESTAMP_COL], utc=True
)

# Sort event timestamp values
entity_df_with_features = entity_df_with_features.sort_values(
ENTITY_DF_EVENT_TIMESTAMP_COL
).copy()
)

# Load feature view data from sources and join them incrementally
for feature_view, features in feature_views_to_features.items():
Expand Down
123 changes: 72 additions & 51 deletions sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import pytest
from google.cloud import bigquery
from pandas.testing import assert_frame_equal
from pytz import utc

import feast.driver_test_data as driver_data
from feast import utils
from feast.data_source import BigQuerySource, FileSource
from feast.entity import Entity
from feast.feature import Feature
Expand Down Expand Up @@ -98,74 +100,93 @@ def create_customer_daily_profile_feature_view(source):
return customer_profile_feature_view


# Converts the given column of the pandas records to UTC timestamps
def convert_timestamp_records_to_utc(records, column):
for record in records:
record[column] = utils.make_tzaware(record[column]).astimezone(utc)
return records


# Find the latest record in the given time range and filter
def find_asof_record(records, ts_key, ts_start, ts_end, filter_key, filter_value):
found_record = {}
for record in records:
if record[filter_key] == filter_value and ts_start <= record[ts_key] <= ts_end:
if not found_record or found_record[ts_key] < record[ts_key]:
found_record = record
return found_record


def get_expected_training_df(
customer_df: pd.DataFrame,
customer_fv: FeatureView,
driver_df: pd.DataFrame,
driver_fv: FeatureView,
orders_df: pd.DataFrame,
):
expected_orders_df = orders_df.copy().sort_values(ENTITY_DF_EVENT_TIMESTAMP_COL)
expected_drivers_df = driver_df.copy().sort_values(
driver_fv.input.event_timestamp_column
# Convert all pandas dataframes into records with UTC timestamps
order_records = convert_timestamp_records_to_utc(
orders_df.to_dict("records"), "event_timestamp"
)
expected_orders_with_drivers = pd.merge_asof(
expected_orders_df,
expected_drivers_df[
[
driver_fv.input.event_timestamp_column,
"driver_id",
"conv_rate",
"avg_daily_trips",
]
],
left_on=ENTITY_DF_EVENT_TIMESTAMP_COL,
right_on=driver_fv.input.event_timestamp_column,
by=["driver_id"],
tolerance=driver_fv.ttl,
driver_records = convert_timestamp_records_to_utc(
driver_df.to_dict("records"), driver_fv.input.event_timestamp_column
)

expected_orders_with_drivers.drop(
columns=[driver_fv.input.event_timestamp_column], inplace=True
customer_records = convert_timestamp_records_to_utc(
customer_df.to_dict("records"), customer_fv.input.event_timestamp_column
)

expected_customers_df = customer_df.copy().sort_values(
[customer_fv.input.event_timestamp_column]
)
expected_df = pd.merge_asof(
expected_orders_with_drivers,
expected_customers_df[
[
customer_fv.input.event_timestamp_column,
"customer_id",
"current_balance",
"avg_passenger_count",
"lifetime_trip_count",
]
],
left_on=ENTITY_DF_EVENT_TIMESTAMP_COL,
right_on=customer_fv.input.event_timestamp_column,
by=["customer_id"],
tolerance=customer_fv.ttl,
)
expected_df.drop(columns=[driver_fv.input.event_timestamp_column], inplace=True)
# Manually do point-in-time join of orders to drivers and customers records
for order_record in order_records:
driver_record = find_asof_record(
driver_records,
ts_key=driver_fv.input.event_timestamp_column,
ts_start=order_record["event_timestamp"] - driver_fv.ttl,
ts_end=order_record["event_timestamp"],
filter_key="driver_id",
filter_value=order_record["driver_id"],
)
customer_record = find_asof_record(
customer_records,
ts_key=customer_fv.input.event_timestamp_column,
ts_start=order_record["event_timestamp"] - customer_fv.ttl,
ts_end=order_record["event_timestamp"],
filter_key="customer_id",
filter_value=order_record["customer_id"],
)
order_record.update(
{
f"driver_stats__{k}": driver_record.get(k, None)
for k in ("conv_rate", "avg_daily_trips")
}
)
order_record.update(
{
f"customer_profile__{k}": customer_record.get(k, None)
for k in (
"current_balance",
"avg_passenger_count",
"lifetime_trip_count",
)
}
)

# Convert records back to pandas dataframe
expected_df = pd.DataFrame(order_records)

# Move "datetime" column to front
current_cols = expected_df.columns.tolist()
current_cols.remove(ENTITY_DF_EVENT_TIMESTAMP_COL)
expected_df = expected_df[[ENTITY_DF_EVENT_TIMESTAMP_COL] + current_cols]

# Rename columns to have double underscore
expected_df.rename(
inplace=True,
columns={
"conv_rate": "driver_stats__conv_rate",
"avg_daily_trips": "driver_stats__avg_daily_trips",
"current_balance": "customer_profile__current_balance",
"avg_passenger_count": "customer_profile__avg_passenger_count",
"lifetime_trip_count": "customer_profile__lifetime_trip_count",
},
)
# Cast some columns to expected types, since we lose information when converting pandas DFs into Python objects.
expected_df["order_is_success"] = expected_df["order_is_success"].astype("int32")
expected_df["customer_profile__current_balance"] = expected_df[
"customer_profile__current_balance"
].astype("float32")
expected_df["customer_profile__avg_passenger_count"] = expected_df[
"customer_profile__avg_passenger_count"
].astype("float32")

return expected_df


Expand Down