1414from feast .feature_view import FeatureView
1515from feast .infra .offline_stores .offline_store import OfflineStore
1616from feast .infra .provider import (
17+ DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL ,
1718 RetrievalJob ,
1819 _get_requested_feature_views_to_features_dict ,
1920)
@@ -79,12 +80,19 @@ def get_historical_features(
7980 client = _get_bigquery_client ()
8081
8182 if type (entity_df ) is str :
82- entity_df_sql_table = f"({ entity_df } )"
83+ entity_df_job = client .query (entity_df )
84+ entity_df_result = entity_df_job .result () # also starts job
85+
86+ entity_df_event_timestamp_col = _infer_event_timestamp_from_bigquery_query (
87+ entity_df_result
88+ )
89+
90+ entity_df_sql_table = f"`{ entity_df_job .destination .project } .{ entity_df_job .destination .dataset_id } .{ entity_df_job .destination .table_id } `"
8391 elif isinstance (entity_df , pandas .DataFrame ):
84- if "event_timestamp" not in entity_df . columns :
85- raise ValueError (
86- "Please provide an entity_df with a column named event_timestamp representing the time of events."
87- )
92+ entity_df_event_timestamp_col = _infer_event_timestamp_from_dataframe (
93+ entity_df
94+ )
95+
8896 table_id = _upload_entity_df_into_bigquery (
8997 config .project , entity_df , client
9098 )
@@ -107,12 +115,55 @@ def get_historical_features(
107115 min_timestamp = datetime .now () - timedelta (days = 365 ),
108116 max_timestamp = datetime .now () + timedelta (days = 1 ),
109117 left_table_query_string = entity_df_sql_table ,
118+ entity_df_event_timestamp_col = entity_df_event_timestamp_col ,
110119 )
111120
112121 job = BigQueryRetrievalJob (query = query , client = client )
113122 return job
114123
115124
125+ def _infer_event_timestamp_from_bigquery_query (entity_df_result ) -> str :
126+ if any (
127+ schema_field .name == DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
128+ for schema_field in entity_df_result .schema
129+ ):
130+ return DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
131+ else :
132+ datetime_columns = list (
133+ filter (
134+ lambda schema_field : schema_field .field_type == "TIMESTAMP" ,
135+ entity_df_result .schema ,
136+ )
137+ )
138+ if len (datetime_columns ) == 1 :
139+ print (
140+ f"Using { datetime_columns [0 ].name } as the event timestamp. To specify a column explicitly, please name it { DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL } ."
141+ )
142+ return datetime_columns [0 ].name
143+ else :
144+ raise ValueError (
145+ f"Please provide an entity_df with a column named { DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL } representing the time of events."
146+ )
147+
148+
149+ def _infer_event_timestamp_from_dataframe (entity_df : pandas .DataFrame ) -> str :
150+ if DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL in entity_df .columns :
151+ return DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
152+ else :
153+ datetime_columns = entity_df .select_dtypes (
154+ include = ["datetime" , "datetimetz" ]
155+ ).columns
156+ if len (datetime_columns ) == 1 :
157+ print (
158+ f"Using { datetime_columns [0 ]} as the event timestamp. To specify a column explicitly, please name it { DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL } ."
159+ )
160+ return datetime_columns [0 ]
161+ else :
162+ raise ValueError (
163+ f"Please provide an entity_df with a column named { DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL } representing the time of events."
164+ )
165+
166+
116167class BigQueryRetrievalJob (RetrievalJob ):
117168 def __init__ (self , query , client ):
118169 self .query = query
@@ -230,6 +281,7 @@ def build_point_in_time_query(
230281 min_timestamp : datetime ,
231282 max_timestamp : datetime ,
232283 left_table_query_string : str ,
284+ entity_df_event_timestamp_col : str ,
233285):
234286 """Build point-in-time query between each feature view table and the entity dataframe"""
235287 template = Environment (loader = BaseLoader ()).from_string (
@@ -241,6 +293,7 @@ def build_point_in_time_query(
241293 "min_timestamp" : min_timestamp ,
242294 "max_timestamp" : max_timestamp ,
243295 "left_table_query_string" : left_table_query_string ,
296+ "entity_df_event_timestamp_col" : entity_df_event_timestamp_col ,
244297 "featureviews" : [asdict (context ) for context in feature_view_query_contexts ],
245298 }
246299
@@ -292,7 +345,7 @@ def _get_bigquery_client():
292345 -- unique identifier for each row in the entity dataset.
293346 row_number,
294347 -- event_timestamp contains the timestamps to join onto
295- event_timestamp,
348+ {{entity_df_event_timestamp_col}} AS event_timestamp,
296349 -- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
297350 NULL as {{ featureview.name }}_feature_timestamp,
298351 -- created timestamp of the feature at the corresponding feature_timestamp
@@ -373,7 +426,7 @@ def _get_bigquery_client():
373426/*
374427 Joins the outputs of multiple time travel joins to a single table.
375428 */
376- SELECT edf.event_timestamp as event_timestamp , * EXCEPT (row_number, event_timestamp ) FROM entity_dataframe edf
429+ SELECT edf.{{entity_df_event_timestamp_col}} as {{entity_df_event_timestamp_col}} , * EXCEPT (row_number, {{entity_df_event_timestamp_col}} ) FROM entity_dataframe edf
377430{% for featureview in featureviews %}
378431LEFT JOIN (
379432 SELECT
@@ -384,5 +437,5 @@ def _get_bigquery_client():
384437 FROM {{ featureview.name }}__deduped
385438) USING (row_number)
386439{% endfor %}
387- ORDER BY event_timestamp
440+ ORDER BY {{entity_df_event_timestamp_col}}
388441"""
0 commit comments