Skip to content

Commit c50a36e

Browse files
authored
Schema Inferencing should happen at apply time (#1646)
* wip1 Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * just need to do clean up Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * linted Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * improve test coverage Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * changed placement of inference methods in repo_operation apply_total Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * updated inference method name + changed to void return since it updates in place Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * fixed integration test and added comments Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Made DataSource event_timestamp_column optional Signed-off-by: David Y Liu <davidyliuliu@gmail.com>
1 parent a708915 commit c50a36e

File tree

8 files changed

+167
-103
lines changed

8 files changed

+167
-103
lines changed

sdk/python/feast/data_source.py

Lines changed: 10 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515

1616
import enum
17-
import re
1817
from typing import Callable, Dict, Iterable, Optional, Tuple
1918

2019
from pyarrow.parquet import ParquetFile
@@ -371,7 +370,7 @@ class DataSource:
371370

372371
def __init__(
373372
self,
374-
event_timestamp_column: str,
373+
event_timestamp_column: Optional[str] = "",
375374
created_timestamp_column: Optional[str] = "",
376375
field_mapping: Optional[Dict[str, str]] = None,
377376
date_partition_column: Optional[str] = "",
@@ -520,45 +519,11 @@ def to_proto(self) -> DataSourceProto:
520519
"""
521520
raise NotImplementedError
522521

523-
def _infer_event_timestamp_column(self, ts_column_type_regex_pattern):
524-
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"
525-
USER_GUIDANCE = "Please specify event_timestamp_column explicitly."
526-
527-
if isinstance(self, FileSource) or isinstance(self, BigQuerySource):
528-
event_timestamp_column, matched_flag = None, False
529-
for col_name, col_datatype in self.get_table_column_names_and_types():
530-
if re.match(ts_column_type_regex_pattern, col_datatype):
531-
if matched_flag:
532-
raise TypeError(
533-
f"""
534-
{ERROR_MSG_PREFIX} due to multiple possible columns satisfying
535-
the criteria. {USER_GUIDANCE}
536-
"""
537-
)
538-
matched_flag = True
539-
event_timestamp_column = col_name
540-
if matched_flag:
541-
return event_timestamp_column
542-
else:
543-
raise TypeError(
544-
f"""
545-
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
546-
{USER_GUIDANCE}
547-
"""
548-
)
549-
else:
550-
raise TypeError(
551-
f"""
552-
{ERROR_MSG_PREFIX} because this DataSource currently does not support this inference.
553-
{USER_GUIDANCE}
554-
"""
555-
)
556-
557522

558523
class FileSource(DataSource):
559524
def __init__(
560525
self,
561-
event_timestamp_column: Optional[str] = None,
526+
event_timestamp_column: Optional[str] = "",
562527
file_url: Optional[str] = None,
563528
path: Optional[str] = None,
564529
file_format: FileFormat = None,
@@ -598,7 +563,7 @@ def __init__(
598563
self._file_options = FileOptions(file_format=file_format, file_url=file_url)
599564

600565
super().__init__(
601-
event_timestamp_column or self._infer_event_timestamp_column(r"^timestamp"),
566+
event_timestamp_column,
602567
created_timestamp_column,
603568
field_mapping,
604569
date_partition_column,
@@ -662,7 +627,7 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
662627
class BigQuerySource(DataSource):
663628
def __init__(
664629
self,
665-
event_timestamp_column: Optional[str] = None,
630+
event_timestamp_column: Optional[str] = "",
666631
table_ref: Optional[str] = None,
667632
created_timestamp_column: Optional[str] = "",
668633
field_mapping: Optional[Dict[str, str]] = None,
@@ -672,8 +637,7 @@ def __init__(
672637
self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query)
673638

674639
super().__init__(
675-
event_timestamp_column
676-
or self._infer_event_timestamp_column("TIMESTAMP|DATETIME"),
640+
event_timestamp_column,
677641
created_timestamp_column,
678642
field_mapping,
679643
date_partition_column,
@@ -743,20 +707,12 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
743707
from google.cloud import bigquery
744708

745709
client = bigquery.Client()
746-
name_type_pairs = []
747710
if self.table_ref is not None:
748-
project_id, dataset_id, table_id = self.table_ref.split(".")
749-
bq_columns_query = f"""
750-
SELECT COLUMN_NAME, DATA_TYPE FROM {project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS
751-
WHERE TABLE_NAME = '{table_id}'
752-
"""
753-
table_schema = (
754-
client.query(bq_columns_query).result().to_dataframe_iterable()
755-
)
756-
for df in table_schema:
757-
name_type_pairs.extend(
758-
list(zip(df["COLUMN_NAME"].to_list(), df["DATA_TYPE"].to_list()))
759-
)
711+
table_schema = client.get_table(self.table_ref).schema
712+
if not isinstance(table_schema[0], bigquery.schema.SchemaField):
713+
raise TypeError("Could not parse BigQuery table schema.")
714+
715+
name_type_pairs = [(field.name, field.field_type) for field in table_schema]
760716
else:
761717
bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1"
762718
queryRes = client.query(bq_columns_query).result()

sdk/python/feast/errors.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,11 @@ def __init__(
9494
f"The DataFrame from {source} being materialized must have at least {join_key_columns} columns present, "
9595
f"but these were missing: {join_key_columns - source_columns} "
9696
)
97+
98+
99+
class RegistryInferenceFailure(Exception):
100+
def __init__(self, repo_obj_type: str, specific_issue: str):
101+
super().__init__(
102+
f"Inference to fill in missing information for {repo_obj_type} failed. {specific_issue}. "
103+
"Try filling the information explicitly."
104+
)

sdk/python/feast/feature_store.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
from feast.entity import Entity
2727
from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException
2828
from feast.feature_view import FeatureView
29-
from feast.inference import infer_entity_value_type_from_feature_views
29+
from feast.inference import (
30+
infer_entity_value_type_from_feature_views,
31+
update_data_sources_with_inferred_event_timestamp_col,
32+
)
3033
from feast.infra.provider import Provider, RetrievalJob, get_provider
3134
from feast.online_response import OnlineResponse, _infer_online_entity_rows
3235
from feast.protos.feast.serving.ServingService_pb2 import (
@@ -224,6 +227,11 @@ def apply(
224227
entities_to_update = infer_entity_value_type_from_feature_views(
225228
[ob for ob in objects if isinstance(ob, Entity)], views_to_update
226229
)
230+
update_data_sources_with_inferred_event_timestamp_col(
231+
[view.input for view in views_to_update]
232+
)
233+
for view in views_to_update:
234+
view.infer_features_from_input_source()
227235

228236
if len(views_to_update) + len(entities_to_update) != len(objects):
229237
raise ValueError("Unknown object type provided as part of apply() call")

sdk/python/feast/feature_view.py

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from feast import utils
2323
from feast.data_source import BigQuerySource, DataSource, FileSource
24+
from feast.errors import RegistryInferenceFailure
2425
from feast.feature import Feature
2526
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
2627
from feast.protos.feast.core.FeatureView_pb2 import (
@@ -64,29 +65,6 @@ def __init__(
6465
tags: Optional[Dict[str, str]] = None,
6566
online: bool = True,
6667
):
67-
if not features:
68-
features = [] # to handle python's mutable default arguments
69-
columns_to_exclude = {
70-
input.event_timestamp_column,
71-
input.created_timestamp_column,
72-
} | set(entities)
73-
74-
for col_name, col_datatype in input.get_table_column_names_and_types():
75-
if col_name not in columns_to_exclude and not re.match(
76-
"^__|__$", col_name
77-
):
78-
features.append(
79-
Feature(
80-
col_name,
81-
input.source_datatype_to_feast_value_type()(col_datatype),
82-
)
83-
)
84-
85-
if not features:
86-
raise ValueError(
87-
f"Could not infer Features for the FeatureView named {name}. Please specify Features explicitly for this FeatureView."
88-
)
89-
9068
cols = [entity for entity in entities] + [feat.name for feat in features]
9169
for col in cols:
9270
if input.field_mapping is not None and col in input.field_mapping.keys():
@@ -241,3 +219,35 @@ def most_recent_end_time(self) -> Optional[datetime]:
241219
if len(self.materialization_intervals) == 0:
242220
return None
243221
return max([interval[1] for interval in self.materialization_intervals])
222+
223+
def infer_features_from_input_source(self):
224+
if not self.features:
225+
columns_to_exclude = {
226+
self.input.event_timestamp_column,
227+
self.input.created_timestamp_column,
228+
} | set(self.entities)
229+
230+
for col_name, col_datatype in self.input.get_table_column_names_and_types():
231+
if col_name not in columns_to_exclude and not re.match(
232+
"^__|__$",
233+
col_name, # double underscores often signal an internal-use column
234+
):
235+
feature_name = (
236+
self.input.field_mapping[col_name]
237+
if col_name in self.input.field_mapping.keys()
238+
else col_name
239+
)
240+
self.features.append(
241+
Feature(
242+
feature_name,
243+
self.input.source_datatype_to_feast_value_type()(
244+
col_datatype
245+
),
246+
)
247+
)
248+
249+
if not self.features:
250+
raise RegistryInferenceFailure(
251+
"FeatureView",
252+
f"Could not infer Features for the FeatureView named {self.name}.",
253+
)

sdk/python/feast/inference.py

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
from typing import List
1+
import re
2+
from typing import List, Union
23

34
from feast import Entity
5+
from feast.data_source import BigQuerySource, FileSource
6+
from feast.errors import RegistryInferenceFailure
47
from feast.feature_view import FeatureView
58
from feast.value_type import ValueType
69

@@ -45,12 +48,70 @@ def infer_entity_value_type_from_feature_views(
4548
entity.value_type != ValueType.UNKNOWN
4649
and entity.value_type != inferred_value_type
4750
) or (len(extracted_entity_name_type_pairs) > 1):
48-
raise ValueError(
51+
raise RegistryInferenceFailure(
52+
"Entity",
4953
f"""Entity value_type inference failed for {entity_name} entity.
50-
Multiple viable matches. Please explicitly specify the entity value_type
51-
for this entity."""
54+
Multiple viable matches.
55+
""",
5256
)
5357

5458
entity.value_type = inferred_value_type
5559

5660
return entities
61+
62+
63+
def update_data_sources_with_inferred_event_timestamp_col(
64+
data_sources: List[Union[BigQuerySource, FileSource]],
65+
) -> None:
66+
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"
67+
68+
for data_source in data_sources:
69+
if (
70+
data_source.event_timestamp_column is None
71+
or data_source.event_timestamp_column == ""
72+
):
73+
# prepare right match pattern for data source
74+
ts_column_type_regex_pattern = ""
75+
if isinstance(data_source, FileSource):
76+
ts_column_type_regex_pattern = r"^timestamp"
77+
elif isinstance(data_source, BigQuerySource):
78+
ts_column_type_regex_pattern = "TIMESTAMP|DATETIME"
79+
else:
80+
raise RegistryInferenceFailure(
81+
"DataSource",
82+
"""
83+
DataSource inferencing of event_timestamp_column is currently only supported
84+
for FileSource and BigQuerySource.
85+
""",
86+
)
87+
# for informing the type checker
88+
assert isinstance(data_source, FileSource) or isinstance(
89+
data_source, BigQuerySource
90+
)
91+
92+
# loop through table columns to find singular match
93+
event_timestamp_column, matched_flag = None, False
94+
for (
95+
col_name,
96+
col_datatype,
97+
) in data_source.get_table_column_names_and_types():
98+
if re.match(ts_column_type_regex_pattern, col_datatype):
99+
if matched_flag:
100+
raise RegistryInferenceFailure(
101+
"DataSource",
102+
f"""
103+
{ERROR_MSG_PREFIX} due to multiple possible columns satisfying
104+
the criteria. {ts_column_type_regex_pattern} {col_name}
105+
""",
106+
)
107+
matched_flag = True
108+
event_timestamp_column = col_name
109+
if matched_flag:
110+
data_source.event_timestamp_column = event_timestamp_column
111+
else:
112+
raise RegistryInferenceFailure(
113+
"DataSource",
114+
f"""
115+
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
116+
""",
117+
)

sdk/python/feast/repo_operations.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313

1414
from feast import Entity, FeatureTable
1515
from feast.feature_view import FeatureView
16-
from feast.inference import infer_entity_value_type_from_feature_views
16+
from feast.inference import (
17+
infer_entity_value_type_from_feature_views,
18+
update_data_sources_with_inferred_event_timestamp_col,
19+
)
1720
from feast.infra.offline_stores.helpers import assert_offline_store_supports_data_source
1821
from feast.infra.provider import get_provider
1922
from feast.names import adjectives, animals
@@ -136,6 +139,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path):
136139
),
137140
feature_views=repo.feature_views,
138141
)
142+
139143
sys.dont_write_bytecode = False
140144
for entity in repo.entities:
141145
registry.apply_entity(entity, project=project)
@@ -156,6 +160,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path):
156160
repo_config.offline_store, data_source
157161
)
158162

163+
update_data_sources_with_inferred_event_timestamp_col(data_sources)
164+
for view in repo.feature_views:
165+
view.infer_features_from_input_source()
166+
159167
tables_to_delete = []
160168
for registry_table in registry.list_feature_tables(project=project):
161169
if registry_table.name not in repo_table_names:

0 commit comments

Comments
 (0)