1313from feast .infra .offline_stores .file_source import FileSource
1414from feast .infra .offline_stores .redshift_source import RedshiftSource
1515from feast .infra .offline_stores .snowflake_source import SnowflakeSource
16+ from feast .on_demand_feature_view import OnDemandFeatureView
1617from feast .repo_config import RepoConfig
1718from feast .stream_feature_view import StreamFeatureView
1819from feast .types import String
@@ -94,7 +95,7 @@ def update_data_sources_with_inferred_event_timestamp_col(
9495
9596
9697def update_feature_views_with_inferred_features_and_entities (
97- fvs : Union [List [FeatureView ], List [StreamFeatureView ]],
98+ fvs : Union [List [FeatureView ], List [StreamFeatureView ], List [ OnDemandFeatureView ] ],
9899 entities : List [Entity ],
99100 config : RepoConfig ,
100101) -> None :
@@ -127,13 +128,14 @@ def update_feature_views_with_inferred_features_and_entities(
127128
128129 # Fields whose names match a join key are considered to be entity columns; all
129130 # other fields are considered to be feature columns.
131+ entity_columns = fv .entity_columns if fv .entity_columns else []
130132 for field in fv .schema :
131133 if field .name in join_keys :
132134 # Do not override a preexisting field with the same name.
133135 if field .name not in [
134- entity_column .name for entity_column in fv . entity_columns
136+ entity_column .name for entity_column in entity_columns
135137 ]:
136- fv . entity_columns .append (field )
138+ entity_columns .append (field )
137139 else :
138140 if field .name not in [feature .name for feature in fv .features ]:
139141 fv .features .append (field )
@@ -146,10 +148,10 @@ def update_feature_views_with_inferred_features_and_entities(
146148 continue
147149 if (
148150 entity .join_key
149- not in [entity_column .name for entity_column in fv . entity_columns ]
151+ not in [entity_column .name for entity_column in entity_columns ]
150152 and entity .value_type != ValueType .UNKNOWN
151153 ):
152- fv . entity_columns .append (
154+ entity_columns .append (
153155 Field (
154156 name = entity .join_key ,
155157 dtype = from_value_type (entity .value_type ),
@@ -160,10 +162,11 @@ def update_feature_views_with_inferred_features_and_entities(
160162 if (
161163 len (fv .entities ) == 1
162164 and fv .entities [0 ] == DUMMY_ENTITY_NAME
163- and not fv . entity_columns
165+ and not entity_columns
164166 ):
165- fv . entity_columns .append (Field (name = DUMMY_ENTITY_ID , dtype = String ))
167+ entity_columns .append (Field (name = DUMMY_ENTITY_ID , dtype = String ))
166168
169+ fv .entity_columns = entity_columns
167170 # Run inference for entity columns if there are fewer entity fields than expected.
168171 run_inference_for_entities = len (fv .entity_columns ) < len (join_keys )
169172
@@ -200,49 +203,116 @@ def _infer_features_and_entities(
200203 run_inference_for_features: Whether to run inference for features.
201204 config: The config for the current feature store.
202205 """
203- columns_to_exclude = {
204- fv . batch_source . timestamp_field ,
205- fv . batch_source . created_timestamp_column ,
206- }
207- for original_col , mapped_col in fv . batch_source . field_mapping . items ():
208- if mapped_col in columns_to_exclude :
209- columns_to_exclude . remove ( mapped_col )
210- columns_to_exclude .add (original_col )
211-
212- table_column_names_and_types = fv . batch_source . get_table_column_names_and_types (
213- config
214- )
215-
216- for col_name , col_datatype in table_column_names_and_types :
217- if col_name in columns_to_exclude :
218- continue
219- elif col_name in join_keys :
220- field = Field (
221- name = col_name ,
222- dtype = from_value_type (
223- fv . batch_source . source_datatype_to_feast_value_type ()( col_datatype )
224- ),
206+ entity_columns = []
207+ if isinstance ( fv , OnDemandFeatureView ):
208+ columns_to_exclude = set ()
209+ for (
210+ source_feature_view_name ,
211+ source_feature_view ,
212+ ) in fv . source_feature_view_projections . items ():
213+ columns_to_exclude .add (source_feature_view . timestamp_field )
214+ columns_to_exclude . add ( source_feature_view . created_timestamp_column )
215+
216+ for (
217+ original_col ,
218+ mapped_col ,
219+ ) in source_feature_view . batch_source . field_mapping . items () :
220+ if mapped_col in columns_to_exclude :
221+ columns_to_exclude . remove ( mapped_col )
222+ columns_to_exclude . add ( original_col )
223+
224+ table_column_names_and_types = (
225+ source_feature_view . batch_source . get_table_column_names_and_types (
226+ config
227+ )
225228 )
226- if field . name not in [
227- entity_column . name for entity_column in fv . entity_columns
228- ] :
229- fv . entity_columns . append ( field )
230- elif not re . match (
231- "^__|__$" , col_name
232- ): # double underscores often signal an internal-use column
233- if run_inference_for_features :
234- feature_name = (
235- fv . batch_source . field_mapping [ col_name ]
236- if col_name in fv . batch_source . field_mapping
237- else col_name
229+
230+ for col_name , col_datatype in table_column_names_and_types :
231+ if col_name in columns_to_exclude :
232+ continue
233+ elif col_name in join_keys :
234+ field = Field (
235+ name = col_name ,
236+ dtype = from_value_type (
237+ source_feature_view . batch_source . source_datatype_to_feast_value_type () (
238+ col_datatype
239+ )
240+ ),
238241 )
242+ if field .name not in [
243+ entity_column .name for entity_column in entity_columns
244+ ]:
245+ entity_columns .append (field )
246+ elif not re .match (
247+ "^__|__$" , col_name
248+ ): # double underscores often signal an internal-use column
249+ if run_inference_for_features :
250+ feature_name = (
251+ source_feature_view .batch_source .field_mapping [col_name ]
252+ if col_name in source_feature_view .batch_source .field_mapping
253+ else col_name
254+ )
255+ field = Field (
256+ name = feature_name ,
257+ dtype = from_value_type (
258+ source_feature_view .batch_source .source_datatype_to_feast_value_type ()(
259+ col_datatype
260+ )
261+ ),
262+ )
263+ if field .name not in [
264+ feature .name for feature in source_feature_view .features
265+ ]:
266+ source_feature_view .features .append (field )
267+
268+ else :
269+ columns_to_exclude = {
270+ fv .batch_source .timestamp_field ,
271+ fv .batch_source .created_timestamp_column ,
272+ }
273+ for original_col , mapped_col in fv .batch_source .field_mapping .items ():
274+ if mapped_col in columns_to_exclude :
275+ columns_to_exclude .remove (mapped_col )
276+ columns_to_exclude .add (original_col )
277+
278+ table_column_names_and_types = fv .batch_source .get_table_column_names_and_types (
279+ config
280+ )
281+
282+ for col_name , col_datatype in table_column_names_and_types :
283+ if col_name in columns_to_exclude :
284+ continue
285+ elif col_name in join_keys :
239286 field = Field (
240- name = feature_name ,
287+ name = col_name ,
241288 dtype = from_value_type (
242289 fv .batch_source .source_datatype_to_feast_value_type ()(
243290 col_datatype
244291 )
245292 ),
246293 )
247- if field .name not in [feature .name for feature in fv .features ]:
248- fv .features .append (field )
294+ if field .name not in [
295+ entity_column .name for entity_column in entity_columns
296+ ]:
297+ entity_columns .append (field )
298+ elif not re .match (
299+ "^__|__$" , col_name
300+ ): # double underscores often signal an internal-use column
301+ if run_inference_for_features :
302+ feature_name = (
303+ fv .batch_source .field_mapping [col_name ]
304+ if col_name in fv .batch_source .field_mapping
305+ else col_name
306+ )
307+ field = Field (
308+ name = feature_name ,
309+ dtype = from_value_type (
310+ fv .batch_source .source_datatype_to_feast_value_type ()(
311+ col_datatype
312+ )
313+ ),
314+ )
315+ if field .name not in [feature .name for feature in fv .features ]:
316+ fv .features .append (field )
317+
318+ fv .entity_columns = entity_columns
0 commit comments