Skip to content

Commit 09fe2a6

Browse files
authored
Entity value_type inference for Feature Repo registration (#1538)
* Added inferencing of Entity values types & corresponding test Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Updated example_feature_repo_with_inference.py Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Fixed lint issue introduced by previous commit Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Optimized infer_entity_value_type_from_feature_views Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Added entity data type inferencing to feature_store.py and organized code Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Made error messages more descriptive and improved its structuring Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Removed s comments by using conftest file Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Renamed variables for clarity and added clarifying comments Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * changed file name from inference_helpers.py to inference.py Signed-off-by: David Y Liu <davidyliuliu@gmail.com>
1 parent f29cb8c commit 09fe2a6

File tree

9 files changed

+174
-58
lines changed

9 files changed

+174
-58
lines changed

sdk/python/feast/entity.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class Entity:
3636
def __init__(
3737
self,
3838
name: str,
39-
value_type: ValueType,
39+
value_type: ValueType = ValueType.UNKNOWN,
4040
description: str = "",
4141
join_key: Optional[str] = None,
4242
labels: Optional[MutableMapping[str, str]] = None,

sdk/python/feast/feature_store.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from feast.entity import Entity
2727
from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException
2828
from feast.feature_view import FeatureView
29+
from feast.inference import infer_entity_value_type_from_feature_views
2930
from feast.infra.provider import Provider, RetrievalJob, get_provider
3031
from feast.online_response import OnlineResponse, _infer_online_entity_rows
3132
from feast.protos.feast.serving.ServingService_pb2 import (
@@ -219,19 +220,19 @@ def apply(
219220
objects = [objects]
220221
assert isinstance(objects, list)
221222

222-
views_to_update = []
223-
entities_to_update = []
224-
for ob in objects:
225-
if isinstance(ob, FeatureView):
226-
self._registry.apply_feature_view(ob, project=self.project)
227-
views_to_update.append(ob)
228-
elif isinstance(ob, Entity):
229-
self._registry.apply_entity(ob, project=self.project)
230-
entities_to_update.append(ob)
231-
else:
232-
raise ValueError(
233-
f"Unknown object type ({type(ob)}) provided as part of apply() call"
234-
)
223+
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
224+
entities_to_update = infer_entity_value_type_from_feature_views(
225+
[ob for ob in objects if isinstance(ob, Entity)], views_to_update
226+
)
227+
228+
if len(views_to_update) + len(entities_to_update) != len(objects):
229+
raise ValueError("Unknown object type provided as part of apply() call")
230+
231+
for view in views_to_update:
232+
self._registry.apply_feature_view(view, project=self.project)
233+
for ent in entities_to_update:
234+
self._registry.apply_entity(ent, project=self.project)
235+
235236
self._get_provider().update_infra(
236237
project=self.project,
237238
tables_to_delete=[],

sdk/python/feast/inference.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from typing import List
2+
3+
from feast import Entity
4+
from feast.feature_view import FeatureView
5+
from feast.value_type import ValueType
6+
7+
8+
def infer_entity_value_type_from_feature_views(
9+
entities: List[Entity], feature_views: List[FeatureView]
10+
) -> List[Entity]:
11+
"""
12+
Infer entity value type by examining schema of feature view input sources
13+
"""
14+
incomplete_entities = {
15+
entity.name: entity
16+
for entity in entities
17+
if entity.value_type == ValueType.UNKNOWN
18+
}
19+
incomplete_entities_keys = incomplete_entities.keys()
20+
21+
for view in feature_views:
22+
if not (incomplete_entities_keys & set(view.entities)):
23+
continue # skip if view doesn't contain any entities that need inference
24+
25+
col_names_and_types = view.input.get_table_column_names_and_types()
26+
for entity_name in view.entities:
27+
if entity_name in incomplete_entities:
28+
# get entity information from information extracted from the view input source
29+
extracted_entity_name_type_pairs = list(
30+
filter(lambda tup: tup[0] == entity_name, col_names_and_types)
31+
)
32+
if len(extracted_entity_name_type_pairs) == 0:
33+
# Doesn't mention inference error because would also be an error without inferencing
34+
raise ValueError(
35+
f"""No column in the input source for the {view.name} feature view matches
36+
its entity's name."""
37+
)
38+
39+
entity = incomplete_entities[entity_name]
40+
inferred_value_type = view.input.source_datatype_to_feast_value_type()(
41+
extracted_entity_name_type_pairs[0][1]
42+
)
43+
44+
if (
45+
entity.value_type != ValueType.UNKNOWN
46+
and entity.value_type != inferred_value_type
47+
) or (len(extracted_entity_name_type_pairs) > 1):
48+
raise ValueError(
49+
f"""Entity value_type inference failed for {entity_name} entity.
50+
Multiple viable matches. Please explicitly specify the entity value_type
51+
for this entity."""
52+
)
53+
54+
entity.value_type = inferred_value_type
55+
56+
return entities

sdk/python/feast/repo_operations.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from feast import Entity, FeatureTable
1515
from feast.feature_view import FeatureView
16+
from feast.inference import infer_entity_value_type_from_feature_views
1617
from feast.infra.offline_stores.helpers import assert_offline_store_supports_data_source
1718
from feast.infra.provider import get_provider
1819
from feast.names import adjectives, animals
@@ -128,6 +129,13 @@ def apply_total(repo_config: RepoConfig, repo_path: Path):
128129
registry._initialize_registry()
129130
sys.dont_write_bytecode = True
130131
repo = parse_repo(repo_path)
132+
repo = ParsedRepo(
133+
feature_tables=repo.feature_tables,
134+
entities=infer_entity_value_type_from_feature_views(
135+
repo.entities, repo.feature_views
136+
),
137+
feature_views=repo.feature_views,
138+
)
131139
sys.dont_write_bytecode = False
132140
for entity in repo.entities:
133141
registry.apply_entity(entity, project=project)

sdk/python/tests/conftest.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import multiprocessing
15+
from datetime import datetime, timedelta
1516
from sys import platform
1617

18+
import pandas as pd
1719
import pytest
1820

1921

@@ -45,3 +47,43 @@ def pytest_collection_modifyitems(config, items):
4547
for item in items:
4648
if "integration" in item.keywords:
4749
item.add_marker(skip_integration)
50+
51+
52+
@pytest.fixture
53+
def simple_dataset_1() -> pd.DataFrame:
54+
now = datetime.utcnow()
55+
ts = pd.Timestamp(now).round("ms")
56+
data = {
57+
"id": [1, 2, 1, 3, 3],
58+
"float_col": [0.1, 0.2, 0.3, 4, 5],
59+
"int64_col": [1, 2, 3, 4, 5],
60+
"string_col": ["a", "b", "c", "d", "e"],
61+
"ts_1": [
62+
ts,
63+
ts - timedelta(hours=4),
64+
ts - timedelta(hours=3),
65+
ts - timedelta(hours=2),
66+
ts - timedelta(hours=1),
67+
],
68+
}
69+
return pd.DataFrame.from_dict(data)
70+
71+
72+
@pytest.fixture
73+
def simple_dataset_2() -> pd.DataFrame:
74+
now = datetime.utcnow()
75+
ts = pd.Timestamp(now).round("ms")
76+
data = {
77+
"id": ["a", "b", "c", "d", "e"],
78+
"float_col": [0.1, 0.2, 0.3, 4, 5],
79+
"int64_col": [1, 2, 3, 4, 5],
80+
"string_col": ["a", "b", "c", "d", "e"],
81+
"ts_1": [
82+
ts,
83+
ts - timedelta(hours=4),
84+
ts - timedelta(hours=3),
85+
ts - timedelta(hours=2),
86+
ts - timedelta(hours=1),
87+
],
88+
}
89+
return pd.DataFrame.from_dict(data)

sdk/python/tests/example_feature_repo_with_inference.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
from google.protobuf.duration_pb2 import Duration
22

3-
from feast import Entity, FeatureView, ValueType
3+
from feast import Entity, FeatureView
44
from feast.data_source import FileSource
55

66
driver_hourly_stats = FileSource(
77
path="%PARQUET_PATH%", # placeholder to be replaced by the test
88
created_timestamp_column="created",
99
)
1010

11-
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)
11+
driver = Entity(name="driver_id", description="driver id",)
1212

1313
# features are inferred from columns of data source
1414
driver_hourly_stats_view = FeatureView(

sdk/python/tests/test_feature_store.py

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
from tempfile import mkstemp
1717

1818
import pytest
19-
from fixtures.data_source_fixtures import simple_dataset_1 # noqa: F401
20-
from fixtures.data_source_fixtures import (
19+
from pytest_lazyfixture import lazy_fixture
20+
from utils.data_source_utils import (
2121
prep_file_source,
2222
simple_bq_source_using_query_arg,
2323
simple_bq_source_using_table_ref_arg,
2424
)
25-
from pytest_lazyfixture import lazy_fixture
2625

2726
from feast.data_format import ParquetFormat
2827
from feast.data_source import FileSource
@@ -315,22 +314,6 @@ def test_apply_feature_view_integration(test_feature_store):
315314
assert len(feature_views) == 0
316315

317316

318-
@pytest.mark.integration
319-
@pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")])
320-
def test_data_source_ts_col_inference_success(dataframe_source):
321-
with prep_file_source(df=dataframe_source) as file_source:
322-
actual_file_source = file_source.event_timestamp_column
323-
actual_bq_1 = simple_bq_source_using_table_ref_arg(
324-
dataframe_source
325-
).event_timestamp_column
326-
actual_bq_2 = simple_bq_source_using_query_arg(
327-
dataframe_source
328-
).event_timestamp_column
329-
expected = "ts_1"
330-
331-
assert expected == actual_file_source == actual_bq_1 == actual_bq_2
332-
333-
334317
@pytest.mark.parametrize(
335318
"test_feature_store", [lazy_fixture("feature_store_with_local_registry")],
336319
)

sdk/python/tests/test_inference.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import pytest
2+
from utils.data_source_utils import (
3+
prep_file_source,
4+
simple_bq_source_using_query_arg,
5+
simple_bq_source_using_table_ref_arg,
6+
)
7+
8+
from feast import Entity, ValueType
9+
from feast.feature_view import FeatureView
10+
from feast.inference import infer_entity_value_type_from_feature_views
11+
12+
13+
@pytest.mark.integration
14+
def test_data_source_ts_col_inference_success(simple_dataset_1):
15+
with prep_file_source(df=simple_dataset_1) as file_source:
16+
actual_file_source = file_source.event_timestamp_column
17+
actual_bq_1 = simple_bq_source_using_table_ref_arg(
18+
simple_dataset_1
19+
).event_timestamp_column
20+
actual_bq_2 = simple_bq_source_using_query_arg(
21+
simple_dataset_1
22+
).event_timestamp_column
23+
expected = "ts_1"
24+
25+
assert expected == actual_file_source == actual_bq_1 == actual_bq_2
26+
27+
28+
def test_infer_entity_value_type_from_feature_views(simple_dataset_1, simple_dataset_2):
29+
with prep_file_source(
30+
df=simple_dataset_1, event_timestamp_column="ts_1"
31+
) as file_source, prep_file_source(
32+
df=simple_dataset_2, event_timestamp_column="ts_1"
33+
) as file_source_2:
34+
35+
fv1 = FeatureView(name="fv1", entities=["id"], input=file_source, ttl=None,)
36+
fv2 = FeatureView(name="fv2", entities=["id"], input=file_source_2, ttl=None,)
37+
38+
actual_1 = infer_entity_value_type_from_feature_views(
39+
[Entity(name="id")], [fv1]
40+
)
41+
actual_2 = infer_entity_value_type_from_feature_views(
42+
[Entity(name="id")], [fv2]
43+
)
44+
assert actual_1 == [Entity(name="id", value_type=ValueType.INT64)]
45+
assert actual_2 == [Entity(name="id", value_type=ValueType.STRING)]
46+
47+
with pytest.raises(ValueError):
48+
# two viable data types
49+
infer_entity_value_type_from_feature_views([Entity(name="id")], [fv1, fv2])

sdk/python/tests/fixtures/data_source_fixtures.py renamed to sdk/python/tests/utils/data_source_utils.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,12 @@
11
import contextlib
22
import tempfile
3-
from datetime import datetime, timedelta
43

5-
import pandas as pd
6-
import pytest
74
from google.cloud import bigquery
85

96
from feast.data_format import ParquetFormat
107
from feast.data_source import BigQuerySource, FileSource
118

129

13-
@pytest.fixture
14-
def simple_dataset_1() -> pd.DataFrame:
15-
now = datetime.utcnow()
16-
ts = pd.Timestamp(now).round("ms")
17-
data = {
18-
"id": [1, 2, 1, 3, 3],
19-
"float_col": [0.1, 0.2, 0.3, 4, 5],
20-
"int64_col": [1, 2, 3, 4, 5],
21-
"string_col": ["a", "b", "c", "d", "e"],
22-
"ts_1": [
23-
ts,
24-
ts - timedelta(hours=4),
25-
ts - timedelta(hours=3),
26-
ts - timedelta(hours=2),
27-
ts - timedelta(hours=1),
28-
],
29-
}
30-
return pd.DataFrame.from_dict(data)
31-
32-
3310
@contextlib.contextmanager
3411
def prep_file_source(df, event_timestamp_column="") -> FileSource:
3512
with tempfile.NamedTemporaryFile(suffix=".parquet") as f:

0 commit comments

Comments
 (0)