Skip to content

Commit 4d7aada

Browse files
authored
Infer entity dataframe event timestamp column (#1495)
* Infer entity df event timestamp Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Simplify inference of BQ Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Address comment Signed-off-by: Jacob Klegar <jacob@tecton.ai>
1 parent 1efc53b commit 4d7aada

File tree

5 files changed

+169
-88
lines changed

5 files changed

+169
-88
lines changed

sdk/python/feast/driver_test_data.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pandas as pd
66
from pytz import FixedOffset, timezone, utc
77

8-
from feast.infra.provider import ENTITY_DF_EVENT_TIMESTAMP_COL
8+
from feast.infra.provider import DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
99

1010

1111
class EventTimestampType(Enum):
@@ -27,7 +27,12 @@ def _convert_event_timestamp(event_timestamp: pd.Timestamp, t: EventTimestampTyp
2727

2828

2929
def create_orders_df(
30-
customers, drivers, start_date, end_date, order_count
30+
customers,
31+
drivers,
32+
start_date,
33+
end_date,
34+
order_count,
35+
infer_event_timestamp_col=False,
3136
) -> pd.DataFrame:
3237
"""
3338
Example df generated by this function:
@@ -45,19 +50,38 @@ def create_orders_df(
4550
df["customer_id"] = np.random.choice(customers, order_count)
4651
df["order_is_success"] = np.random.randint(0, 2, size=order_count).astype(np.int32)
4752

48-
df[ENTITY_DF_EVENT_TIMESTAMP_COL] = [
49-
_convert_event_timestamp(
50-
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
51-
EventTimestampType(idx % 4),
53+
if infer_event_timestamp_col:
54+
df["e_ts"] = [
55+
_convert_event_timestamp(
56+
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
57+
EventTimestampType(3),
58+
)
59+
for idx, dt in enumerate(
60+
pd.date_range(start=start_date, end=end_date, periods=order_count)
61+
)
62+
]
63+
df.sort_values(
64+
by=["e_ts", "order_id", "driver_id", "customer_id"], inplace=True,
5265
)
53-
for idx, dt in enumerate(
54-
pd.date_range(start=start_date, end=end_date, periods=order_count)
66+
else:
67+
df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [
68+
_convert_event_timestamp(
69+
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
70+
EventTimestampType(idx % 4),
71+
)
72+
for idx, dt in enumerate(
73+
pd.date_range(start=start_date, end=end_date, periods=order_count)
74+
)
75+
]
76+
df.sort_values(
77+
by=[
78+
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
79+
"order_id",
80+
"driver_id",
81+
"customer_id",
82+
],
83+
inplace=True,
5584
)
56-
]
57-
df.sort_values(
58-
by=[ENTITY_DF_EVENT_TIMESTAMP_COL, "order_id", "driver_id", "customer_id"],
59-
inplace=True,
60-
)
6185
return df
6286

6387

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from feast.feature_view import FeatureView
1515
from feast.infra.offline_stores.offline_store import OfflineStore
1616
from feast.infra.provider import (
17+
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
1718
RetrievalJob,
1819
_get_requested_feature_views_to_features_dict,
1920
)
@@ -79,12 +80,19 @@ def get_historical_features(
7980
client = _get_bigquery_client()
8081

8182
if type(entity_df) is str:
82-
entity_df_sql_table = f"({entity_df})"
83+
entity_df_job = client.query(entity_df)
84+
entity_df_result = entity_df_job.result() # also starts job
85+
86+
entity_df_event_timestamp_col = _infer_event_timestamp_from_bigquery_query(
87+
entity_df_result
88+
)
89+
90+
entity_df_sql_table = f"`{entity_df_job.destination.project}.{entity_df_job.destination.dataset_id}.{entity_df_job.destination.table_id}`"
8391
elif isinstance(entity_df, pandas.DataFrame):
84-
if "event_timestamp" not in entity_df.columns:
85-
raise ValueError(
86-
"Please provide an entity_df with a column named event_timestamp representing the time of events."
87-
)
92+
entity_df_event_timestamp_col = _infer_event_timestamp_from_dataframe(
93+
entity_df
94+
)
95+
8896
table_id = _upload_entity_df_into_bigquery(
8997
config.project, entity_df, client
9098
)
@@ -107,12 +115,55 @@ def get_historical_features(
107115
min_timestamp=datetime.now() - timedelta(days=365),
108116
max_timestamp=datetime.now() + timedelta(days=1),
109117
left_table_query_string=entity_df_sql_table,
118+
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
110119
)
111120

112121
job = BigQueryRetrievalJob(query=query, client=client)
113122
return job
114123

115124

125+
def _infer_event_timestamp_from_bigquery_query(entity_df_result) -> str:
126+
if any(
127+
schema_field.name == DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
128+
for schema_field in entity_df_result.schema
129+
):
130+
return DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
131+
else:
132+
datetime_columns = list(
133+
filter(
134+
lambda schema_field: schema_field.field_type == "TIMESTAMP",
135+
entity_df_result.schema,
136+
)
137+
)
138+
if len(datetime_columns) == 1:
139+
print(
140+
f"Using {datetime_columns[0].name} as the event timestamp. To specify a column explicitly, please name it {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}."
141+
)
142+
return datetime_columns[0].name
143+
else:
144+
raise ValueError(
145+
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
146+
)
147+
148+
149+
def _infer_event_timestamp_from_dataframe(entity_df: pandas.DataFrame) -> str:
150+
if DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL in entity_df.columns:
151+
return DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
152+
else:
153+
datetime_columns = entity_df.select_dtypes(
154+
include=["datetime", "datetimetz"]
155+
).columns
156+
if len(datetime_columns) == 1:
157+
print(
158+
f"Using {datetime_columns[0]} as the event timestamp. To specify a column explicitly, please name it {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}."
159+
)
160+
return datetime_columns[0]
161+
else:
162+
raise ValueError(
163+
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
164+
)
165+
166+
116167
class BigQueryRetrievalJob(RetrievalJob):
117168
def __init__(self, query, client):
118169
self.query = query
@@ -230,6 +281,7 @@ def build_point_in_time_query(
230281
min_timestamp: datetime,
231282
max_timestamp: datetime,
232283
left_table_query_string: str,
284+
entity_df_event_timestamp_col: str,
233285
):
234286
"""Build point-in-time query between each feature view table and the entity dataframe"""
235287
template = Environment(loader=BaseLoader()).from_string(
@@ -241,6 +293,7 @@ def build_point_in_time_query(
241293
"min_timestamp": min_timestamp,
242294
"max_timestamp": max_timestamp,
243295
"left_table_query_string": left_table_query_string,
296+
"entity_df_event_timestamp_col": entity_df_event_timestamp_col,
244297
"featureviews": [asdict(context) for context in feature_view_query_contexts],
245298
}
246299

@@ -292,7 +345,7 @@ def _get_bigquery_client():
292345
-- unique identifier for each row in the entity dataset.
293346
row_number,
294347
-- event_timestamp contains the timestamps to join onto
295-
event_timestamp,
348+
{{entity_df_event_timestamp_col}} AS event_timestamp,
296349
-- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
297350
NULL as {{ featureview.name }}_feature_timestamp,
298351
-- created timestamp of the feature at the corresponding feature_timestamp
@@ -373,7 +426,7 @@ def _get_bigquery_client():
373426
/*
374427
Joins the outputs of multiple time travel joins to a single table.
375428
*/
376-
SELECT edf.event_timestamp as event_timestamp, * EXCEPT (row_number, event_timestamp) FROM entity_dataframe edf
429+
SELECT edf.{{entity_df_event_timestamp_col}} as {{entity_df_event_timestamp_col}}, * EXCEPT (row_number, {{entity_df_event_timestamp_col}}) FROM entity_dataframe edf
377430
{% for featureview in featureviews %}
378431
LEFT JOIN (
379432
SELECT
@@ -384,5 +437,5 @@ def _get_bigquery_client():
384437
FROM {{ featureview.name }}__deduped
385438
) USING (row_number)
386439
{% endfor %}
387-
ORDER BY event_timestamp
440+
ORDER BY {{entity_df_event_timestamp_col}}
388441
"""

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from feast.feature_view import FeatureView
1010
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
1111
from feast.infra.provider import (
12-
ENTITY_DF_EVENT_TIMESTAMP_COL,
12+
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
1313
_get_requested_feature_views_to_features_dict,
1414
_run_field_mapping,
1515
)
@@ -44,10 +44,20 @@ def get_historical_features(
4444
raise ValueError(
4545
f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}"
4646
)
47-
if ENTITY_DF_EVENT_TIMESTAMP_COL not in entity_df.columns:
48-
raise ValueError(
49-
f"Please provide an entity_df with a column named {ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
50-
)
47+
entity_df_event_timestamp_col = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL # local modifiable copy of global variable
48+
if entity_df_event_timestamp_col not in entity_df.columns:
49+
datetime_columns = entity_df.select_dtypes(
50+
include=["datetime", "datetimetz"]
51+
).columns
52+
if len(datetime_columns) == 1:
53+
print(
54+
f"Using {datetime_columns[0]} as the event timestamp. To specify a column explicitly, please name it {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}."
55+
)
56+
entity_df_event_timestamp_col = datetime_columns[0]
57+
else:
58+
raise ValueError(
59+
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
60+
)
5161

5262
feature_views_to_features = _get_requested_feature_views_to_features_dict(
5363
feature_refs, feature_views
@@ -57,22 +67,22 @@ def get_historical_features(
5767
def evaluate_historical_retrieval():
5868

5969
# Make sure all event timestamp fields are tz-aware. We default tz-naive fields to UTC
60-
entity_df[ENTITY_DF_EVENT_TIMESTAMP_COL] = entity_df[
61-
ENTITY_DF_EVENT_TIMESTAMP_COL
70+
entity_df[entity_df_event_timestamp_col] = entity_df[
71+
entity_df_event_timestamp_col
6272
].apply(lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc))
6373

6474
# Create a copy of entity_df to prevent modifying the original
6575
entity_df_with_features = entity_df.copy()
6676

6777
# Convert event timestamp column to datetime and normalize time zone to UTC
6878
# This is necessary to avoid issues with pd.merge_asof
69-
entity_df_with_features[ENTITY_DF_EVENT_TIMESTAMP_COL] = pd.to_datetime(
70-
entity_df_with_features[ENTITY_DF_EVENT_TIMESTAMP_COL], utc=True
79+
entity_df_with_features[entity_df_event_timestamp_col] = pd.to_datetime(
80+
entity_df_with_features[entity_df_event_timestamp_col], utc=True
7181
)
7282

7383
# Sort event timestamp values
7484
entity_df_with_features = entity_df_with_features.sort_values(
75-
ENTITY_DF_EVENT_TIMESTAMP_COL
85+
entity_df_event_timestamp_col
7686
)
7787

7888
# Load feature view data from sources and join them incrementally
@@ -153,14 +163,14 @@ def evaluate_historical_retrieval():
153163
entity_df_with_features = pd.merge_asof(
154164
entity_df_with_features,
155165
df_to_join,
156-
left_on=ENTITY_DF_EVENT_TIMESTAMP_COL,
166+
left_on=entity_df_event_timestamp_col,
157167
right_on=event_timestamp_column,
158168
by=right_entity_columns,
159169
tolerance=feature_view.ttl,
160170
)
161171

162172
# Remove right (feature table/view) event_timestamp column.
163-
if event_timestamp_column != ENTITY_DF_EVENT_TIMESTAMP_COL:
173+
if event_timestamp_column != entity_df_event_timestamp_col:
164174
entity_df_with_features.drop(
165175
columns=[event_timestamp_column], inplace=True
166176
)
@@ -170,9 +180,9 @@ def evaluate_historical_retrieval():
170180

171181
# Move "datetime" column to front
172182
current_cols = entity_df_with_features.columns.tolist()
173-
current_cols.remove(ENTITY_DF_EVENT_TIMESTAMP_COL)
183+
current_cols.remove(entity_df_event_timestamp_col)
174184
entity_df_with_features = entity_df_with_features[
175-
[ENTITY_DF_EVENT_TIMESTAMP_COL] + current_cols
185+
[entity_df_event_timestamp_col] + current_cols
176186
]
177187

178188
return entity_df_with_features

sdk/python/feast/infra/provider.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from feast.repo_config import RepoConfig
1919
from feast.type_map import python_value_to_proto_value
2020

21-
ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"
21+
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"
2222

2323

2424
class Provider(abc.ABC):

0 commit comments

Comments
 (0)