Skip to content

Commit 8df96a1

Browse files
authored
When staging entities dataset timestamps are being truncated to ms (#1207)
* floor event timestamp to ms Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * fix bq staging Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 0e13927 commit 8df96a1

3 files changed

Lines changed: 15 additions & 5 deletions

File tree

sdk/python/feast/client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,12 @@ def get_historical_features(
979979
feature_refs, self.project
980980
)
981981

982+
assert all(ft.batch_source.created_timestamp_column for ft in feature_tables), (
983+
"All BatchSources attached to retrieved FeatureTables "
984+
"must have specified `created_timestamp_column` to be used in "
985+
"historical dataset generation."
986+
)
987+
982988
if output_location is None:
983989
output_location = os.path.join(
984990
self._config.get(opt.HISTORICAL_FEATURE_OUTPUT_LOCATION),

sdk/python/feast/staging/entities.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ def stage_entities_to_fs(
2929
entity_staging_uri = urlparse(os.path.join(staging_location, str(uuid.uuid4())))
3030
staging_client = get_staging_client(entity_staging_uri.scheme, config)
3131
with tempfile.NamedTemporaryFile() as df_export_path:
32+
# prevent casting ns -> ms exception inside pyarrow
33+
entity_source["event_timestamp"] = entity_source["event_timestamp"].dt.floor(
34+
"ms"
35+
)
36+
3237
entity_source.to_parquet(df_export_path.name)
3338
bucket = (
3439
None if entity_staging_uri.scheme == "file" else entity_staging_uri.netloc
@@ -70,6 +75,9 @@ def stage_entities_to_bq(
7075
f"_entities_{datetime.now():%Y%m%d%H%M%s}",
7176
)
7277

78+
# prevent casting ns -> ms exception inside pyarrow
79+
entity_source["event_timestamp"] = entity_source["event_timestamp"].dt.floor("ms")
80+
7381
load_job: bigquery.LoadJob = bq_client.load_table_from_dataframe(
7482
entity_source, destination
7583
)

tests/e2e/test_historical_features.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,7 @@ def read_parquet(uri):
3636

3737

3838
def generate_data():
39-
retrieval_date = (
40-
datetime.utcnow()
41-
.replace(hour=0, minute=0, second=0, microsecond=0)
42-
.replace(tzinfo=None)
43-
)
39+
retrieval_date = datetime.utcnow().replace(tzinfo=None)
4440
retrieval_outside_max_age_date = retrieval_date + timedelta(1)
4541
event_date = retrieval_date - timedelta(2)
4642
creation_date = retrieval_date - timedelta(1)

0 commit comments

Comments
 (0)