forked from feast-dev/feast
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinference.py
More file actions
117 lines (104 loc) · 4.75 KB
/
inference.py
File metadata and controls
117 lines (104 loc) · 4.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import re
from typing import List, Union
from feast import Entity
from feast.data_source import BigQuerySource, FileSource
from feast.errors import RegistryInferenceFailure
from feast.feature_view import FeatureView
from feast.value_type import ValueType
def infer_entity_value_type_from_feature_views(
entities: List[Entity], feature_views: List[FeatureView]
) -> List[Entity]:
"""
Infer entity value type by examining schema of feature view input sources
"""
incomplete_entities = {
entity.name: entity
for entity in entities
if entity.value_type == ValueType.UNKNOWN
}
incomplete_entities_keys = incomplete_entities.keys()
for view in feature_views:
if not (incomplete_entities_keys & set(view.entities)):
continue # skip if view doesn't contain any entities that need inference
col_names_and_types = view.input.get_table_column_names_and_types()
for entity_name in view.entities:
if entity_name in incomplete_entities:
# get entity information from information extracted from the view input source
extracted_entity_name_type_pairs = list(
filter(lambda tup: tup[0] == entity_name, col_names_and_types)
)
if len(extracted_entity_name_type_pairs) == 0:
# Doesn't mention inference error because would also be an error without inferencing
raise ValueError(
f"""No column in the input source for the {view.name} feature view matches
its entity's name."""
)
entity = incomplete_entities[entity_name]
inferred_value_type = view.input.source_datatype_to_feast_value_type()(
extracted_entity_name_type_pairs[0][1]
)
if (
entity.value_type != ValueType.UNKNOWN
and entity.value_type != inferred_value_type
) or (len(extracted_entity_name_type_pairs) > 1):
raise RegistryInferenceFailure(
"Entity",
f"""Entity value_type inference failed for {entity_name} entity.
Multiple viable matches.
""",
)
entity.value_type = inferred_value_type
return entities
def update_data_sources_with_inferred_event_timestamp_col(
data_sources: List[Union[BigQuerySource, FileSource]],
) -> None:
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"
for data_source in data_sources:
if (
data_source.event_timestamp_column is None
or data_source.event_timestamp_column == ""
):
# prepare right match pattern for data source
ts_column_type_regex_pattern = ""
if isinstance(data_source, FileSource):
ts_column_type_regex_pattern = r"^timestamp"
elif isinstance(data_source, BigQuerySource):
ts_column_type_regex_pattern = "TIMESTAMP|DATETIME"
else:
raise RegistryInferenceFailure(
"DataSource",
"""
DataSource inferencing of event_timestamp_column is currently only supported
for FileSource and BigQuerySource.
""",
)
# for informing the type checker
assert isinstance(data_source, FileSource) or isinstance(
data_source, BigQuerySource
)
# loop through table columns to find singular match
event_timestamp_column, matched_flag = None, False
for (
col_name,
col_datatype,
) in data_source.get_table_column_names_and_types():
if re.match(ts_column_type_regex_pattern, col_datatype):
if matched_flag:
raise RegistryInferenceFailure(
"DataSource",
f"""
{ERROR_MSG_PREFIX} due to multiple possible columns satisfying
the criteria. {ts_column_type_regex_pattern} {col_name}
""",
)
matched_flag = True
event_timestamp_column = col_name
if matched_flag:
data_source.event_timestamp_column = event_timestamp_column
else:
raise RegistryInferenceFailure(
"DataSource",
f"""
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
""",
)