Skip to content

Commit 6d7678f

Browse files
author
Tsotne Tabidze
authored
Fix timezone issue in materialize & materialize_incremental (feast-dev#1439)
Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>
1 parent 9e5377c commit 6d7678f

8 files changed

Lines changed: 212 additions & 173 deletions

File tree

sdk/python/feast/cli.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import click
2323
import pkg_resources
2424
import yaml
25-
from pytz import utc
2625

2726
from feast.client import Client
2827
from feast.config import Config
@@ -425,8 +424,8 @@ def materialize_command(start_ts: str, end_ts: str, repo_path: str, views: List[
425424
store = FeatureStore(repo_path=repo_path)
426425
store.materialize(
427426
feature_views=None if not views else views,
428-
start_date=datetime.fromisoformat(start_ts).replace(tzinfo=utc),
429-
end_date=datetime.fromisoformat(end_ts).replace(tzinfo=utc),
427+
start_date=datetime.fromisoformat(start_ts),
428+
end_date=datetime.fromisoformat(end_ts),
430429
)
431430

432431

sdk/python/feast/feature_store.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import pandas as pd
2020
import pyarrow
2121

22+
from feast import utils
2223
from feast.entity import Entity
2324
from feast.feature_view import FeatureView
2425
from feast.infra.provider import Provider, RetrievalJob, get_provider
@@ -359,6 +360,10 @@ def _materialize_single_feature_view(
359360
event_timestamp_column,
360361
created_timestamp_column,
361362
) = _run_reverse_field_mapping(feature_view)
363+
364+
start_date = utils.make_tzaware(start_date)
365+
end_date = utils.make_tzaware(end_date)
366+
362367
provider = self._get_provider()
363368
table = provider.pull_latest_from_table_or_query(
364369
feature_view.input,

sdk/python/feast/feature_view.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from google.protobuf.duration_pb2 import Duration
1818
from google.protobuf.timestamp_pb2 import Timestamp
1919

20+
from feast import utils
2021
from feast.data_source import BigQuerySource, DataSource, FileSource
2122
from feast.feature import Feature
2223
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
@@ -166,7 +167,10 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
166167

167168
for interval in feature_view_proto.meta.materialization_intervals:
168169
feature_view.materialization_intervals.append(
169-
(interval.start_time.ToDatetime(), interval.end_time.ToDatetime())
170+
(
171+
utils.make_tzaware(interval.start_time.ToDatetime()),
172+
utils.make_tzaware(interval.end_time.ToDatetime()),
173+
)
170174
)
171175

172176
return feature_view

sdk/python/feast/infra/gcp.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@
1010
import pyarrow
1111
from google.cloud import bigquery
1212
from jinja2 import BaseLoader, Environment
13-
from pytz import utc
1413

15-
from feast import FeatureTable
14+
from feast import FeatureTable, utils
1615
from feast.data_source import BigQuerySource, DataSource
1716
from feast.feature_view import FeatureView
1817
from feast.infra.key_encoding_utils import serialize_entity_key
@@ -252,14 +251,14 @@ def _write_minibatch(
252251

253252
entity = client.get(key)
254253
if entity is not None:
255-
if entity["event_ts"] > _make_tzaware(timestamp):
254+
if entity["event_ts"] > utils.make_tzaware(timestamp):
256255
# Do not overwrite feature values computed from fresher data
257256
continue
258257
elif (
259-
entity["event_ts"] == _make_tzaware(timestamp)
258+
entity["event_ts"] == utils.make_tzaware(timestamp)
260259
and created_ts is not None
261260
and entity["created_ts"] is not None
262-
and entity["created_ts"] > _make_tzaware(created_ts)
261+
and entity["created_ts"] > utils.make_tzaware(created_ts)
263262
):
264263
# Do not overwrite feature values computed from the same data, but
265264
# computed later than this one
@@ -273,9 +272,9 @@ def _write_minibatch(
273272
values={
274273
k: v.SerializeToString() for k, v in features.items()
275274
},
276-
event_ts=_make_tzaware(timestamp),
275+
event_ts=utils.make_tzaware(timestamp),
277276
created_ts=(
278-
_make_tzaware(created_ts)
277+
utils.make_tzaware(created_ts)
279278
if created_ts is not None
280279
else None
281280
),
@@ -316,14 +315,6 @@ def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
316315
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
317316

318317

319-
def _make_tzaware(t: datetime):
320-
""" We assume tz-naive datetimes are UTC """
321-
if t.tzinfo is None:
322-
return t.replace(tzinfo=utc)
323-
else:
324-
return t
325-
326-
327318
class BigQueryRetrievalJob(RetrievalJob):
328319
def __init__(self, query):
329320
self.query = query

sdk/python/feast/infra/local_sqlite.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,13 @@ def pull_latest_from_table_or_query(
169169
) -> pyarrow.Table:
170170
assert isinstance(data_source, FileSource)
171171
source_df = pd.read_parquet(data_source.path)
172+
# Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC
173+
source_df[event_timestamp_column] = source_df[event_timestamp_column].apply(
174+
lambda x: x if x.tz is not None else x.replace(tzinfo=pytz.utc)
175+
)
176+
source_df[created_timestamp_column] = source_df[created_timestamp_column].apply(
177+
lambda x: x if x.tz is not None else x.replace(tzinfo=pytz.utc)
178+
)
172179

173180
ts_columns = (
174181
[event_timestamp_column, created_timestamp_column]

sdk/python/feast/utils.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from datetime import datetime
2+
3+
from pytz import utc
4+
5+
6+
def make_tzaware(t: datetime):
7+
""" We assume tz-naive datetimes are UTC """
8+
if t.tzinfo is None:
9+
return t.replace(tzinfo=utc)
10+
else:
11+
return t
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
import contextlib
2+
import tempfile
3+
import time
4+
import uuid
5+
from datetime import datetime, timedelta
6+
from pathlib import Path
7+
from typing import Iterator, Tuple, Union
8+
9+
import pandas as pd
10+
import pytest
11+
from google.cloud import bigquery
12+
from pytz import timezone, utc
13+
14+
from feast.data_format import ParquetFormat
15+
from feast.data_source import BigQuerySource, FileSource
16+
from feast.feature import Feature
17+
from feast.feature_store import FeatureStore
18+
from feast.feature_view import FeatureView
19+
from feast.repo_config import LocalOnlineStoreConfig, OnlineStoreConfig, RepoConfig
20+
from feast.value_type import ValueType
21+
22+
23+
def create_dataset() -> pd.DataFrame:
24+
now = datetime.utcnow()
25+
ts = pd.Timestamp(now).round("ms")
26+
data = {
27+
"id": [1, 2, 1, 3, 3],
28+
"value": [0.1, 0.2, 0.3, 4, 5],
29+
"ts_1": [
30+
ts - timedelta(hours=4),
31+
ts,
32+
ts - timedelta(hours=3),
33+
# Use different time zones to test tz-naive -> tz-aware conversion
34+
(ts - timedelta(hours=4))
35+
.replace(tzinfo=utc)
36+
.astimezone(tz=timezone("Europe/Berlin")),
37+
(ts - timedelta(hours=1))
38+
.replace(tzinfo=utc)
39+
.astimezone(tz=timezone("US/Pacific")),
40+
],
41+
"created_ts": [ts, ts, ts, ts, ts],
42+
}
43+
return pd.DataFrame.from_dict(data)
44+
45+
46+
def get_feature_view(data_source: Union[FileSource, BigQuerySource]) -> FeatureView:
47+
return FeatureView(
48+
name="test_bq_correctness",
49+
entities=["driver_id"],
50+
features=[Feature("value", ValueType.FLOAT)],
51+
ttl=timedelta(days=5),
52+
input=data_source,
53+
)
54+
55+
56+
# bq_source_type must be one of "query" and "table"
57+
@contextlib.contextmanager
58+
def prep_bq_fs_and_fv(
59+
bq_source_type: str,
60+
) -> Iterator[Tuple[FeatureStore, FeatureView]]:
61+
client = bigquery.Client()
62+
gcp_project = client.project
63+
bigquery_dataset = "test_ingestion"
64+
dataset = bigquery.Dataset(f"{gcp_project}.{bigquery_dataset}")
65+
client.create_dataset(dataset, exists_ok=True)
66+
dataset.default_table_expiration_ms = (
67+
1000 * 60 * 60 * 24 * 14
68+
) # 2 weeks in milliseconds
69+
client.update_dataset(dataset, ["default_table_expiration_ms"])
70+
71+
df = create_dataset()
72+
73+
job_config = bigquery.LoadJobConfig()
74+
table_ref = f"{gcp_project}.{bigquery_dataset}.{bq_source_type}_correctness_{int(time.time())}"
75+
query = f"SELECT * FROM `{table_ref}`"
76+
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
77+
job.result()
78+
79+
bigquery_source = BigQuerySource(
80+
table_ref=table_ref if bq_source_type == "table" else None,
81+
query=query if bq_source_type == "query" else None,
82+
event_timestamp_column="ts",
83+
created_timestamp_column="created_ts",
84+
date_partition_column="",
85+
field_mapping={"ts_1": "ts", "id": "driver_id"},
86+
)
87+
88+
fv = get_feature_view(bigquery_source)
89+
with tempfile.TemporaryDirectory() as repo_dir_name:
90+
config = RepoConfig(
91+
registry=str(Path(repo_dir_name) / "registry.db"),
92+
project=f"test_bq_correctness_{uuid.uuid4()}",
93+
provider="gcp",
94+
)
95+
fs = FeatureStore(config=config)
96+
fs.apply([fv])
97+
98+
yield fs, fv
99+
100+
101+
@contextlib.contextmanager
102+
def prep_local_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]:
103+
with tempfile.NamedTemporaryFile(suffix=".parquet") as f:
104+
df = create_dataset()
105+
f.close()
106+
df.to_parquet(f.name)
107+
file_source = FileSource(
108+
file_format=ParquetFormat(),
109+
file_url=f"file://{f.name}",
110+
event_timestamp_column="ts",
111+
created_timestamp_column="created_ts",
112+
date_partition_column="",
113+
field_mapping={"ts_1": "ts", "id": "driver_id"},
114+
)
115+
fv = get_feature_view(file_source)
116+
with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name:
117+
config = RepoConfig(
118+
registry=str(Path(repo_dir_name) / "registry.db"),
119+
project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}",
120+
provider="local",
121+
online_store=OnlineStoreConfig(
122+
local=LocalOnlineStoreConfig(
123+
path=str(Path(data_dir_name) / "online_store.db")
124+
)
125+
),
126+
)
127+
fs = FeatureStore(config=config)
128+
fs.apply([fv])
129+
130+
yield fs, fv
131+
132+
133+
def run_materialization_test(fs: FeatureStore, fv: FeatureView) -> None:
134+
now = datetime.utcnow()
135+
# Run materialize()
136+
# use both tz-naive & tz-aware timestamps to test that they're both correctly handled
137+
start_date = (now - timedelta(hours=5)).replace(tzinfo=utc)
138+
end_date = now - timedelta(hours=2)
139+
fs.materialize([fv.name], start_date, end_date)
140+
141+
# check result of materialize()
142+
response_dict = fs.get_online_features(
143+
[f"{fv.name}:value"], [{"driver_id": 1}]
144+
).to_dict()
145+
assert abs(response_dict[f"{fv.name}__value"][0] - 0.3) < 1e-6
146+
147+
# check prior value for materialize_incremental()
148+
response_dict = fs.get_online_features(
149+
[f"{fv.name}:value"], [{"driver_id": 3}]
150+
).to_dict()
151+
assert abs(response_dict[f"{fv.name}__value"][0] - 4) < 1e-6
152+
153+
# run materialize_incremental()
154+
fs.materialize_incremental(
155+
[fv.name], now - timedelta(seconds=0),
156+
)
157+
158+
# check result of materialize_incremental()
159+
response_dict = fs.get_online_features(
160+
[f"{fv.name}:value"], [{"driver_id": 3}]
161+
).to_dict()
162+
assert abs(response_dict[f"{fv.name}__value"][0] - 5) < 1e-6
163+
164+
165+
@pytest.mark.integration
166+
@pytest.mark.parametrize(
167+
"bq_source_type", ["query", "table"],
168+
)
169+
def test_bq_materialization(bq_source_type: str):
170+
with prep_bq_fs_and_fv(bq_source_type) as (fs, fv):
171+
run_materialization_test(fs, fv)
172+
173+
174+
def test_local_materialization():
175+
with prep_local_fs_and_fv() as (fs, fv):
176+
run_materialization_test(fs, fv)

0 commit comments

Comments
 (0)