Skip to content

Commit 3c39261

Browse files
authored
Adding initial type support related tests for BQ (feast-dev#1768)
* Adding initial type support related tests for BQ Signed-off-by: Danny Chiao <danny@tecton.ai>
1 parent d40636d commit 3c39261

8 files changed

Lines changed: 324 additions & 22 deletions

File tree

sdk/python/feast/inference.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@ def update_entities_with_inferred_types_from_feature_views(
2929
col_names_and_types = view.batch_source.get_table_column_names_and_types(config)
3030
for entity_name in view.entities:
3131
if entity_name in incomplete_entities:
32+
entity = incomplete_entities[entity_name]
33+
3234
# get entity information from information extracted from the view batch source
3335
extracted_entity_name_type_pairs = list(
34-
filter(lambda tup: tup[0] == entity_name, col_names_and_types)
36+
filter(lambda tup: tup[0] == entity.join_key, col_names_and_types,)
3537
)
3638
if len(extracted_entity_name_type_pairs) == 0:
3739
# Doesn't mention inference error because would also be an error without inferencing
@@ -40,7 +42,6 @@ def update_entities_with_inferred_types_from_feature_views(
4042
its entity's name."""
4143
)
4244

43-
entity = incomplete_entities[entity_name]
4445
inferred_value_type = view.batch_source.source_datatype_to_feast_value_type()(
4546
extracted_entity_name_type_pairs[0][1]
4647
)

sdk/python/tests/data/data_creator.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
from datetime import datetime, timedelta
2+
from typing import List
23

34
import pandas as pd
45
from pytz import timezone, utc
56

7+
from feast.value_type import ValueType
68

7-
def create_dataset() -> pd.DataFrame:
8-
now = datetime.utcnow()
9+
10+
def create_dataset(
11+
entity_type: ValueType = ValueType.INT32,
12+
feature_dtype: str = None,
13+
feature_is_list: bool = False,
14+
) -> pd.DataFrame:
15+
now = datetime.now().replace(microsecond=0, second=0, minute=0)
916
ts = pd.Timestamp(now).round("ms")
1017
data = {
11-
"id": [1, 2, 1, 3, 3],
12-
"value": [0.1, None, 0.3, 4, 5],
18+
"driver_id": get_entities_for_value_type(entity_type),
19+
"value": get_feature_values_for_dtype(feature_dtype, feature_is_list),
1320
"ts_1": [
1421
ts - timedelta(hours=4),
1522
ts,
@@ -25,3 +32,33 @@ def create_dataset() -> pd.DataFrame:
2532
"created_ts": [ts, ts, ts, ts, ts],
2633
}
2734
return pd.DataFrame.from_dict(data)
35+
36+
37+
def get_entities_for_value_type(value_type: ValueType) -> List:
38+
value_type_map = {
39+
ValueType.INT32: [1, 2, 1, 3, 3],
40+
ValueType.INT64: [1, 2, 1, 3, 3],
41+
ValueType.FLOAT: [1.0, 2.0, 1.0, 3.0, 3.0],
42+
ValueType.STRING: ["1", "2", "1", "3", "3"],
43+
}
44+
return value_type_map[value_type]
45+
46+
47+
def get_feature_values_for_dtype(dtype: str, is_list: bool) -> List:
48+
if dtype is None:
49+
return [0.1, None, 0.3, 4, 5]
50+
# TODO(adchia): for int columns, consider having a better error when dealing with None values (pandas int dfs can't
51+
# have na)
52+
dtype_map = {
53+
"int32": [1, 2, 3, 4, 5],
54+
"int64": [1, 2, 3, 4, 5],
55+
"float": [1.0, None, 3.0, 4.0, 5.0],
56+
"string": ["1", None, "3", "4", "5"],
57+
"bool": [True, None, False, True, False],
58+
}
59+
non_list_val = dtype_map[dtype]
60+
# Duplicate the value once if this is a list
61+
if is_list:
62+
return [[n, n] if n is not None else None for n in non_list_val]
63+
else:
64+
return non_list_val

sdk/python/tests/integration/e2e/test_universal_e2e.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def check_offline_and_online_features(
7878
def run_offline_online_store_consistency_test(
7979
fs: FeatureStore, fv: FeatureView
8080
) -> None:
81-
now = datetime.utcnow()
81+
now = datetime.now()
8282

8383
full_feature_names = True
8484
check_offline_store: bool = True

sdk/python/tests/integration/feature_repos/test_repo_configuration.py

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from feast import FeatureStore, FeatureView, RepoConfig, driver_test_data, importer
1212
from feast.data_source import DataSource
13+
from feast.value_type import ValueType
1314
from tests.data.data_creator import create_dataset
1415
from tests.integration.feature_repos.universal.data_source_creator import (
1516
DataSourceCreator,
@@ -70,7 +71,6 @@ def ds_creator_path(cls: str):
7071
),
7172
]
7273

73-
7474
OFFLINE_STORES: List[str] = []
7575
ONLINE_STORES: List[str] = []
7676
PROVIDERS: List[str] = []
@@ -83,6 +83,9 @@ class Environment:
8383
feature_store: FeatureStore
8484
data_source: DataSource
8585
data_source_creator: DataSourceCreator
86+
entity_type: ValueType
87+
feature_dtype: str
88+
feature_is_list: bool
8689

8790
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
8891
start_date = end_date - timedelta(days=7)
@@ -199,6 +202,9 @@ def construct_test_environment(
199202
test_repo_config: TestRepoConfig,
200203
create_and_apply: bool = False,
201204
materialize: bool = False,
205+
entity_type: ValueType = ValueType.INT32,
206+
feature_dtype: str = None,
207+
feature_is_list: bool = False,
202208
) -> Environment:
203209
"""
204210
This method should take in the parameters from the test repo config and created a feature repo, apply it,
@@ -208,9 +214,14 @@ def construct_test_environment(
208214
The user is *not* expected to perform any clean up actions.
209215
210216
:param test_repo_config: configuration
217+
:param create_and_apply: whether to create and apply the repo config
218+
:param materialize: whether to materialize features to online store
219+
:param entity_type: the data type for the entity column (i.e. id)
220+
:param feature_dtype: the data type for the feature column (i.e. value)
221+
:param feature_is_list: whether the feature column (i.e. value) should be a list feature
211222
:return: A feature store built using the supplied configuration.
212223
"""
213-
df = create_dataset()
224+
df = create_dataset(entity_type, feature_dtype, feature_is_list)
214225

215226
project = f"test_correctness_{str(uuid.uuid4()).replace('-', '')[:8]}"
216227

@@ -221,9 +232,7 @@ def construct_test_environment(
221232
offline_creator: DataSourceCreator = importer.get_class_from_type(
222233
module_name, config_class_name, "DataSourceCreator"
223234
)(project)
224-
ds = offline_creator.create_data_source(
225-
project, df, field_mapping={"ts_1": "ts", "id": "driver_id"}
226-
)
235+
ds = offline_creator.create_data_source(project, df, field_mapping={"ts_1": "ts"})
227236
offline_store = offline_creator.create_offline_store_config()
228237
online_store = test_repo_config.online_store
229238

@@ -243,6 +252,9 @@ def construct_test_environment(
243252
feature_store=fs,
244253
data_source=ds,
245254
data_source_creator=offline_creator,
255+
entity_type=entity_type,
256+
feature_dtype=feature_dtype,
257+
feature_is_list=feature_is_list,
246258
)
247259

248260
fvs = []
@@ -341,3 +353,80 @@ def inner_test(config):
341353
online_test(environment)
342354

343355
return inner_test
356+
357+
358+
def parametrize_types_no_materialize_test(types_test):
359+
"""
360+
This decorator should be used by tests that want to parametrize by different kinds of entity + feature types and
361+
not materialize said features
362+
"""
363+
return _parametrize_types_test_internal(types_test, create_apply_materialize=False)
364+
365+
366+
def parametrize_types_materialize_test(types_test):
367+
"""
368+
This decorator should be used by tests that want to parametrize by different kinds of entity + feature types and
369+
materialize said features
370+
"""
371+
return _parametrize_types_test_internal(types_test, create_apply_materialize=True)
372+
373+
374+
def parametrize_types_no_materialize_test_no_list(types_test):
375+
"""
376+
This decorator should be used by tests that want to parametrize by different kinds of entity + feature types, but
377+
not materializing and not allowing for feature list types
378+
"""
379+
return _parametrize_types_test_internal(
380+
types_test, create_apply_materialize=False, vary_feature_is_list=False
381+
)
382+
383+
384+
def _parametrize_types_test_internal(
385+
types_test, create_apply_materialize: bool, vary_feature_is_list: bool = True
386+
):
387+
def entity_feature_types_ids(entity_type: ValueType, feature_dtype: str):
388+
return f"entity_type:{str(entity_type)}-feature_dtype:{feature_dtype}"
389+
390+
# TODO(adchia): consider adding timestamp / bytes for feature_dtypes
391+
# TODO(adchia): test materializing float entity types and ensure we throw an error before querying BQ
392+
entity_type_feature_dtypes = [
393+
(ValueType.INT32, "int32"),
394+
(ValueType.INT64, "int64"),
395+
(ValueType.STRING, "float"),
396+
(ValueType.STRING, "bool"),
397+
]
398+
399+
# TODO(adchia): fix conversion to allow for lists in materialization
400+
feature_is_list = [True, False] if vary_feature_is_list else [False]
401+
402+
@pytest.mark.integration
403+
@pytest.mark.parametrize(
404+
"entity_type,feature_dtype",
405+
entity_type_feature_dtypes,
406+
ids=[
407+
entity_feature_types_ids(entity_type, feature_dtype)
408+
for entity_type, feature_dtype in entity_type_feature_dtypes
409+
],
410+
)
411+
@pytest.mark.parametrize(
412+
"feature_is_list", feature_is_list, ids=lambda v: f"feature_is_list:{str(v)}"
413+
)
414+
def inner_test(entity_type: ValueType, feature_dtype: str, feature_is_list: bool):
415+
# TODO: parametrize config
416+
with construct_test_environment(
417+
TestRepoConfig(
418+
provider="gcp",
419+
offline_store_creator=ds_creator_path(
420+
"bigquery.BigQueryDataSourceCreator"
421+
),
422+
online_store="datastore",
423+
),
424+
create_and_apply=create_apply_materialize,
425+
materialize=create_apply_materialize,
426+
entity_type=entity_type,
427+
feature_dtype=feature_dtype,
428+
feature_is_list=feature_is_list,
429+
) as environment:
430+
types_test(environment)
431+
432+
return inner_test

sdk/python/tests/integration/feature_repos/universal/entities.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from feast import Entity, ValueType
22

33

4-
def driver():
4+
def driver(value_type: ValueType = ValueType.INT64):
55
return Entity(
66
name="driver", # The name is derived from this argument, not object name.
7-
value_type=ValueType.INT64,
7+
value_type=value_type,
88
description="driver id",
99
join_key="driver_id",
1010
)

sdk/python/tests/integration/feature_repos/universal/feature_views.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55

66

77
def driver_feature_view(
8-
data_source: DataSource, name="test_correctness"
8+
data_source: DataSource,
9+
name="test_correctness",
10+
value_type: ValueType = ValueType.FLOAT,
911
) -> FeatureView:
1012
return FeatureView(
1113
name=name,
1214
entities=["driver"],
13-
features=[Feature("value", ValueType.FLOAT)],
15+
features=[Feature("value", value_type)],
1416
ttl=timedelta(days=5),
1517
input=data_source,
1618
)

sdk/python/tests/integration/materialization/test_offline_online_store_consistency.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def prep_bq_fs_and_fv(
5959
event_timestamp_column="ts",
6060
created_timestamp_column="created_ts",
6161
date_partition_column="",
62-
field_mapping={"ts_1": "ts", "id": "driver_id"},
62+
field_mapping={"ts_1": "ts"},
6363
)
6464

6565
fv = driver_feature_view(bigquery_source)
@@ -122,7 +122,7 @@ def prep_redshift_fs_and_fv(
122122
event_timestamp_column="ts",
123123
created_timestamp_column="created_ts",
124124
date_partition_column="",
125-
field_mapping={"ts_1": "ts", "id": "driver_id"},
125+
field_mapping={"ts_1": "ts"},
126126
)
127127

128128
fv = driver_feature_view(redshift_source)
@@ -171,7 +171,7 @@ def prep_local_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]:
171171
event_timestamp_column="ts",
172172
created_timestamp_column="created_ts",
173173
date_partition_column="",
174-
field_mapping={"ts_1": "ts", "id": "driver_id"},
174+
field_mapping={"ts_1": "ts"},
175175
)
176176
fv = driver_feature_view(file_source)
177177
e = Entity(
@@ -212,7 +212,7 @@ def prep_redis_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]:
212212
event_timestamp_column="ts",
213213
created_timestamp_column="created_ts",
214214
date_partition_column="",
215-
field_mapping={"ts_1": "ts", "id": "driver_id"},
215+
field_mapping={"ts_1": "ts"},
216216
)
217217
fv = driver_feature_view(file_source)
218218
e = Entity(
@@ -254,7 +254,7 @@ def prep_dynamodb_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]:
254254
event_timestamp_column="ts",
255255
created_timestamp_column="created_ts",
256256
date_partition_column="",
257-
field_mapping={"ts_1": "ts", "id": "driver_id"},
257+
field_mapping={"ts_1": "ts"},
258258
)
259259
fv = driver_feature_view(file_source)
260260
e = Entity(
@@ -332,7 +332,7 @@ def check_offline_and_online_features(
332332
def run_offline_online_store_consistency_test(
333333
fs: FeatureStore, fv: FeatureView, full_feature_names: bool,
334334
) -> None:
335-
now = datetime.utcnow()
335+
now = datetime.now()
336336
# Run materialize()
337337
# use both tz-naive & tz-aware timestamps to test that they're both correctly handled
338338
start_date = (now - timedelta(hours=5)).replace(tzinfo=utc)

0 commit comments

Comments
 (0)