Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
entity_df in get_historical_features should be tzaware
Signed-off-by: Abhin Chhabra <abhin.chhabra@shopify.com>
  • Loading branch information
chhabrakadabra authored and kevjumba committed Jul 18, 2022
commit 831665f7cc0b89b8750825624577d7fac7dad47a
10 changes: 5 additions & 5 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,17 +1057,17 @@ def get_historical_features(

# Check that the right request data is present in the entity_df
if type(entity_df) == pd.DataFrame:
entity_pd_df = cast(pd.DataFrame, entity_df)
entity_df = utils.make_df_tzaware(cast(pd.DataFrame, entity_df))
for fv in request_feature_views:
for feature in fv.features:
if feature.name not in entity_pd_df.columns:
if feature.name not in entity_df.columns:
raise RequestDataNotFoundInEntityDfException(
feature_name=feature.name, feature_view_name=fv.name
)
for odfv in on_demand_feature_views:
odfv_request_data_schema = odfv.get_request_data_schema()
for feature_name in odfv_request_data_schema.keys():
if feature_name not in entity_pd_df.columns:
if feature_name not in entity_df.columns:
raise RequestDataNotFoundInEntityDfException(
feature_name=feature_name, feature_view_name=odfv.name,
)
Expand Down Expand Up @@ -2273,7 +2273,7 @@ def _teardown_go_server(self):

@log_exceptions_and_usage
def write_logged_features(
self, logs: Union[pa.Table, Path], source: Union[FeatureService]
self, logs: Union[pa.Table, Path], source: FeatureService
):
"""
Write logs produced by a source (currently only feature service is supported as a source)
Expand Down Expand Up @@ -2302,7 +2302,7 @@ def write_logged_features(
@log_exceptions_and_usage
def validate_logged_features(
self,
source: Union[FeatureService],
source: FeatureService,
start: datetime,
end: datetime,
reference: ValidationReference,
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def make_tzaware(t: datetime) -> datetime:
return t


def make_df_tzaware(t: pd.DataFrame) -> pd.DataFrame:
"""Make all datetime type columns tzaware; leave everything else intact."""
df = t.copy() # don't modify incoming dataframe inplace
for column in df.columns:
if pd.api.types.is_datetime64_any_dtype(df[column]):
df[column] = pd.to_datetime(df[column], utc=True)
return df


def to_naive_utc(ts: datetime) -> datetime:
if ts.tzinfo is None:
return ts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
BigQueryLoggingDestination,
SavedDatasetBigQueryStorage,
)
from feast.utils import make_df_tzaware
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
Expand Down Expand Up @@ -77,9 +78,7 @@ def create_data_source(
# Make all datetime columns timezone aware. This should be the behaviour of
# `BigQueryOfflineStore.offline_write_batch`, but since we're bypassing that API here, we should follow the same
# rule. The schema of this initial dataframe determines the schema in the newly created BigQuery table.
for column in df.columns:
if pd.api.types.is_datetime64_any_dtype(df[column]):
df[column] = pd.to_datetime(df[column], utc=True)
df = make_df_tzaware(df)
job = self.client.load_table_from_dataframe(df, destination_name)
job.result()

Expand Down