Skip to content

Commit 6589f15

Browse files
chore: Add feature repo for version 0.19 (feast-dev#2659)
* Use `join_keys` instead of `join_key` when instantiating entity Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Switch from `batch_source` to `source` Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Clean up test Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Add example feature repo for version 0.19 Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Add test for 0.19 example feature repo Signed-off-by: Felix Wang <wangfelix98@gmail.com>
1 parent bb72b7c commit 6589f15

5 files changed

Lines changed: 106 additions & 41 deletions

File tree

sdk/python/feast/entity.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def from_proto(cls, entity_proto: EntityProto):
180180
entity = cls(
181181
name=entity_proto.spec.name,
182182
value_type=ValueType(entity_proto.spec.value_type),
183-
join_key=entity_proto.spec.join_key,
183+
join_keys=[entity_proto.spec.join_key],
184184
description=entity_proto.spec.description,
185185
tags=entity_proto.spec.tags,
186186
owner=entity_proto.spec.owner,

sdk/python/tests/example_repos/example_feature_repo_2.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
created_timestamp_column="created",
1010
)
1111

12-
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)
12+
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id")
1313

1414

1515
driver_hourly_stats_view = FeatureView(
@@ -22,7 +22,7 @@
2222
Field(name="avg_daily_trips", dtype=Int64),
2323
],
2424
online=True,
25-
batch_source=driver_hourly_stats,
25+
source=driver_hourly_stats,
2626
tags={},
2727
)
2828

@@ -43,6 +43,6 @@
4343
Field(name="avg_ride_length", dtype=Float32),
4444
],
4545
online=True,
46-
batch_source=global_daily_stats,
46+
source=global_daily_stats,
4747
tags={},
4848
)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from datetime import timedelta
2+
3+
from feast import Entity, Feature, FeatureView, FileSource, ValueType
4+
5+
driver_hourly_stats = FileSource(
6+
path="%PARQUET_PATH%", # placeholder to be replaced by the test
7+
event_timestamp_column="event_timestamp", # Changed to `timestamp_field` in 0.20
8+
created_timestamp_column="created",
9+
)
10+
11+
driver = Entity(
12+
name="driver_id",
13+
value_type=ValueType.INT64,
14+
description="driver id",
15+
join_key="driver_id", # Changed to `join_keys` in 0.20
16+
)
17+
18+
19+
driver_hourly_stats_view = FeatureView(
20+
name="driver_hourly_stats",
21+
entities=["driver_id"],
22+
ttl=timedelta(days=1),
23+
features=[ # Changed to `schema` in 0.20
24+
Feature(name="conv_rate", dtype=ValueType.FLOAT), # Changed to `Field` in 0.20
25+
Feature(name="acc_rate", dtype=ValueType.FLOAT),
26+
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
27+
],
28+
online=True,
29+
batch_source=driver_hourly_stats, # Changed to `source` in 0.20
30+
tags={},
31+
)
32+
33+
34+
global_daily_stats = FileSource(
35+
path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test
36+
event_timestamp_column="event_timestamp", # Changed to `timestamp_field` in 0.20
37+
created_timestamp_column="created",
38+
)
39+
40+
41+
global_stats_feature_view = FeatureView(
42+
name="global_daily_stats",
43+
entities=[],
44+
ttl=timedelta(days=1),
45+
features=[ # Changed to `schema` in 0.20
46+
Feature(name="num_rides", dtype=ValueType.INT32), # Changed to `Field` in 0.20
47+
Feature(name="avg_ride_length", dtype=ValueType.FLOAT),
48+
],
49+
online=True,
50+
batch_source=global_daily_stats, # Changed to `source` in 0.20
51+
tags={},
52+
)

sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,6 @@
2929
Field(name="avg_daily_trips", dtype=Int64),
3030
],
3131
online=True,
32-
batch_source=driver_hourly_stats,
32+
source=driver_hourly_stats,
3333
tags={},
3434
)

sdk/python/tests/integration/online_store/test_e2e_local.py

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
import pandas as pd
77
from pytz import utc
88

9-
import feast.driver_test_data as driver_data
10-
from feast import FeatureStore
9+
from feast.driver_test_data import (
10+
create_driver_hourly_stats_df,
11+
create_global_daily_stats_df,
12+
)
13+
from feast.feature_store import FeatureStore
1114
from tests.utils.cli_utils import CliRunner, get_example_repo
1215

1316

@@ -65,6 +68,37 @@ def _assert_online_features(
6568
assert "global_daily_stats__avg_ride_length" in result
6669

6770

71+
def _test_materialize_and_online_retrieval(
72+
runner: CliRunner,
73+
store: FeatureStore,
74+
start_date: datetime,
75+
end_date: datetime,
76+
driver_df: pd.DataFrame,
77+
):
78+
assert store.repo_path is not None
79+
80+
# Test `feast materialize` and online retrieval.
81+
r = runner.run(
82+
[
83+
"materialize",
84+
start_date.isoformat(),
85+
(end_date - timedelta(days=7)).isoformat(),
86+
],
87+
cwd=Path(store.repo_path),
88+
)
89+
90+
assert r.returncode == 0
91+
_assert_online_features(store, driver_df, end_date - timedelta(days=7))
92+
93+
# Test `feast materialize-incremental` and online retrieval.
94+
r = runner.run(
95+
["materialize-incremental", end_date.isoformat()], cwd=Path(store.repo_path),
96+
)
97+
98+
assert r.returncode == 0
99+
_assert_online_features(store, driver_df, end_date)
100+
101+
68102
def test_e2e_local() -> None:
69103
"""
70104
A more comprehensive than "basic" test, using local provider.
@@ -74,71 +108,50 @@ def test_e2e_local() -> None:
74108
3. Ingest some data to online store from parquet
75109
4. Read from the online store to make sure it made it there.
76110
"""
77-
78111
runner = CliRunner()
79112
with tempfile.TemporaryDirectory() as data_dir:
80-
81-
# Generate some test data in parquet format.
113+
# Generate test data.
82114
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
83115
start_date = end_date - timedelta(days=15)
84116

85117
driver_entities = [1001, 1002, 1003, 1004, 1005]
86-
driver_df = driver_data.create_driver_hourly_stats_df(
87-
driver_entities, start_date, end_date
88-
)
118+
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)
89119
driver_stats_path = os.path.join(data_dir, "driver_stats.parquet")
90120
driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True)
91121

92-
global_df = driver_data.create_global_daily_stats_df(start_date, end_date)
122+
global_df = create_global_daily_stats_df(start_date, end_date)
93123
global_stats_path = os.path.join(data_dir, "global_stats.parquet")
94124
global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True)
95125

96-
# Note that runner takes care of running apply/teardown for us here.
97-
# We patch python code in example_feature_repo_2.py to set the path to Parquet files.
98126
with runner.local_repo(
99127
get_example_repo("example_feature_repo_2.py")
100128
.replace("%PARQUET_PATH%", driver_stats_path)
101129
.replace("%PARQUET_PATH_GLOBAL%", global_stats_path),
102130
"file",
103131
) as store:
104-
105-
assert store.repo_path is not None
106-
107-
# feast materialize
108-
r = runner.run(
109-
[
110-
"materialize",
111-
start_date.isoformat(),
112-
(end_date - timedelta(days=7)).isoformat(),
113-
],
114-
cwd=Path(store.repo_path),
132+
_test_materialize_and_online_retrieval(
133+
runner, store, start_date, end_date, driver_df
115134
)
116135

117-
assert r.returncode == 0
118-
119-
_assert_online_features(store, driver_df, end_date - timedelta(days=7))
120-
121-
# feast materialize-incremental
122-
r = runner.run(
123-
["materialize-incremental", end_date.isoformat()],
124-
cwd=Path(store.repo_path),
136+
with runner.local_repo(
137+
get_example_repo("example_feature_repo_version_0_19.py")
138+
.replace("%PARQUET_PATH%", driver_stats_path)
139+
.replace("%PARQUET_PATH_GLOBAL%", global_stats_path),
140+
"file",
141+
) as store:
142+
_test_materialize_and_online_retrieval(
143+
runner, store, start_date, end_date, driver_df
125144
)
126145

127-
assert r.returncode == 0
128-
129-
_assert_online_features(store, driver_df, end_date)
130-
131146
# Test a failure case when the parquet file doesn't include a join key
132147
with runner.local_repo(
133148
get_example_repo("example_feature_repo_with_entity_join_key.py").replace(
134149
"%PARQUET_PATH%", driver_stats_path
135150
),
136151
"file",
137152
) as store:
138-
139153
assert store.repo_path is not None
140154

141-
# feast materialize
142155
returncode, output = runner.run_with_output(
143156
[
144157
"materialize",

0 commit comments

Comments
 (0)