@@ -204,122 +204,131 @@ def _infer_features_and_entities(
204204 run_inference_for_features: Whether to run inference for features.
205205 config: The config for the current feature store.
206206 """
207- entity_columns : list [Field ] = []
208207 if isinstance (fv , OnDemandFeatureView ):
209- columns_to_exclude = set ()
210- for (
211- source_feature_view_name ,
212- source_feature_view ,
213- ) in fv .source_feature_view_projections .items ():
214- columns_to_exclude .add (source_feature_view .timestamp_field )
215- columns_to_exclude .add (source_feature_view .created_timestamp_column )
208+ return _infer_on_demand_features_and_entities (
209+ fv , join_keys , run_inference_for_features , config
210+ )
216211
217- batch_source = getattr (source_feature_view , "batch_source" )
218- batch_field_mapping = getattr (batch_source or None , "field_mapping" )
219- if batch_field_mapping :
220- for (
221- original_col ,
222- mapped_col ,
223- ) in batch_field_mapping .items ():
224- if mapped_col in columns_to_exclude :
225- columns_to_exclude .remove (mapped_col )
226- columns_to_exclude .add (original_col )
212+ columns_to_exclude = {
213+ fv .batch_source .timestamp_field ,
214+ fv .batch_source .created_timestamp_column ,
215+ }
216+ for original_col , mapped_col in fv .batch_source .field_mapping .items ():
217+ if mapped_col in columns_to_exclude :
218+ columns_to_exclude .remove (mapped_col )
219+ columns_to_exclude .add (original_col )
227220
228- table_column_names_and_types = (
229- batch_source . get_table_column_names_and_types ( config )
230- )
221+ table_column_names_and_types = fv . batch_source . get_table_column_names_and_types (
222+ config
223+ )
231224
232- for col_name , col_datatype in table_column_names_and_types :
233- if col_name in columns_to_exclude :
234- continue
235- elif col_name in join_keys :
236- field = Field (
237- name = col_name ,
238- dtype = from_value_type (
239- batch_source .source_datatype_to_feast_value_type ()(
240- col_datatype
241- )
242- ),
243- )
244- if field .name not in [
245- entity_column .name
246- for entity_column in entity_columns
247- if hasattr (entity_column , "name" )
248- ]:
249- entity_columns .append (field )
250- elif not re .match (
251- "^__|__$" , col_name
252- ): # double underscores often signal an internal-use column
253- if run_inference_for_features :
254- feature_name = (
255- batch_field_mapping [col_name ]
256- if col_name in batch_field_mapping
257- else col_name
258- )
259- field = Field (
260- name = feature_name ,
261- dtype = from_value_type (
262- batch_source .source_datatype_to_feast_value_type ()(
263- col_datatype
264- )
265- ),
225+ for col_name , col_datatype in table_column_names_and_types :
226+ if col_name in columns_to_exclude :
227+ continue
228+ elif col_name in join_keys :
229+ field = Field (
230+ name = col_name ,
231+ dtype = from_value_type (
232+ fv .batch_source .source_datatype_to_feast_value_type ()(col_datatype )
233+ ),
234+ )
235+ if field .name not in [
236+ entity_column .name for entity_column in fv .entity_columns
237+ ]:
238+ fv .entity_columns .append (field )
239+ elif not re .match (
240+ "^__|__$" , col_name
241+ ): # double underscores often signal an internal-use column
242+ if run_inference_for_features :
243+ feature_name = (
244+ fv .batch_source .field_mapping [col_name ]
245+ if col_name in fv .batch_source .field_mapping
246+ else col_name
247+ )
248+ field = Field (
249+ name = feature_name ,
250+ dtype = from_value_type (
251+ fv .batch_source .source_datatype_to_feast_value_type ()(
252+ col_datatype
266253 )
267- if field . name not in [
268- feature . name for feature in source_feature_view . features
269- ]:
270- source_feature_view .features .append (field )
254+ ),
255+ )
256+ if field . name not in [ feature . name for feature in fv . features ]:
257+ fv .features .append (field )
271258
272- else :
273- columns_to_exclude = {
274- fv .batch_source .timestamp_field ,
275- fv .batch_source .created_timestamp_column ,
276- }
277- for original_col , mapped_col in fv .batch_source .field_mapping .items ():
278- if mapped_col in columns_to_exclude :
279- columns_to_exclude .remove (mapped_col )
280- columns_to_exclude .add (original_col )
281259
282- table_column_names_and_types = fv .batch_source .get_table_column_names_and_types (
283- config
284- )
260+ def _infer_on_demand_features_and_entities (
261+ fv : OnDemandFeatureView ,
262+ join_keys : Set [Optional [str ]],
263+ run_inference_for_features ,
264+ config ,
265+ ) -> None :
266+ """
267+ Updates the specific feature in place with inferred features and entities.
268+ Args:
269+ fv: The feature view on which to run inference.
270+ join_keys: The set of join keys for the feature view's entities.
271+ run_inference_for_features: Whether to run inference for features.
272+ config: The config for the current feature store.
273+ """
274+ entity_columns : list [Field ] = []
275+ columns_to_exclude = set ()
276+ for (
277+ source_feature_view_name ,
278+ source_feature_view ,
279+ ) in fv .source_feature_view_projections .items ():
280+ columns_to_exclude .add (source_feature_view .timestamp_field )
281+ columns_to_exclude .add (source_feature_view .created_timestamp_column )
282+
283+ batch_source = getattr (source_feature_view , "batch_source" )
284+ batch_field_mapping = getattr (batch_source or None , "field_mapping" )
285+ if batch_field_mapping :
286+ for (
287+ original_col ,
288+ mapped_col ,
289+ ) in batch_field_mapping .items ():
290+ if mapped_col in columns_to_exclude :
291+ columns_to_exclude .remove (mapped_col )
292+ columns_to_exclude .add (original_col )
285293
294+ table_column_names_and_types = (
295+ batch_source .get_table_column_names_and_types (config )
296+ )
286297 for col_name , col_datatype in table_column_names_and_types :
287298 if col_name in columns_to_exclude :
288299 continue
289300 elif col_name in join_keys :
290301 field = Field (
291302 name = col_name ,
292303 dtype = from_value_type (
293- fv .batch_source .source_datatype_to_feast_value_type ()(
294- col_datatype
295- )
304+ batch_source .source_datatype_to_feast_value_type ()(col_datatype )
296305 ),
297306 )
298307 if field .name not in [
299308 entity_column .name
300- if not isinstance (entity_column , str )
301- else entity_column
302309 for entity_column in entity_columns
310+ if hasattr (entity_column , "name" )
303311 ]:
304312 entity_columns .append (field )
305313 elif not re .match (
306314 "^__|__$" , col_name
307315 ): # double underscores often signal an internal-use column
308316 if run_inference_for_features :
309317 feature_name = (
310- fv . batch_source . field_mapping [col_name ]
311- if col_name in fv . batch_source . field_mapping
318+ batch_field_mapping [col_name ]
319+ if col_name in batch_field_mapping
312320 else col_name
313321 )
314322 field = Field (
315323 name = feature_name ,
316324 dtype = from_value_type (
317- fv . batch_source .source_datatype_to_feast_value_type ()(
325+ batch_source .source_datatype_to_feast_value_type ()(
318326 col_datatype
319327 )
320328 ),
321329 )
322- if field .name not in [feature .name for feature in fv .features ]:
323- fv .features .append (field )
324-
330+ if field .name not in [
331+ feature .name for feature in source_feature_view .features
332+ ]:
333+ source_feature_view .features .append (field )
325334 fv .entity_columns = entity_columns
0 commit comments