Skip to content

Commit a85854e

Browse files
think i got everything working now
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
1 parent 0d489a9 commit a85854e

File tree

1 file changed

+91
-82
lines changed

1 file changed

+91
-82
lines changed

sdk/python/feast/inference.py

Lines changed: 91 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -204,122 +204,131 @@ def _infer_features_and_entities(
204204
run_inference_for_features: Whether to run inference for features.
205205
config: The config for the current feature store.
206206
"""
207-
entity_columns: list[Field] = []
208207
if isinstance(fv, OnDemandFeatureView):
209-
columns_to_exclude = set()
210-
for (
211-
source_feature_view_name,
212-
source_feature_view,
213-
) in fv.source_feature_view_projections.items():
214-
columns_to_exclude.add(source_feature_view.timestamp_field)
215-
columns_to_exclude.add(source_feature_view.created_timestamp_column)
208+
return _infer_on_demand_features_and_entities(
209+
fv, join_keys, run_inference_for_features, config
210+
)
216211

217-
batch_source = getattr(source_feature_view, "batch_source")
218-
batch_field_mapping = getattr(batch_source or None, "field_mapping")
219-
if batch_field_mapping:
220-
for (
221-
original_col,
222-
mapped_col,
223-
) in batch_field_mapping.items():
224-
if mapped_col in columns_to_exclude:
225-
columns_to_exclude.remove(mapped_col)
226-
columns_to_exclude.add(original_col)
212+
columns_to_exclude = {
213+
fv.batch_source.timestamp_field,
214+
fv.batch_source.created_timestamp_column,
215+
}
216+
for original_col, mapped_col in fv.batch_source.field_mapping.items():
217+
if mapped_col in columns_to_exclude:
218+
columns_to_exclude.remove(mapped_col)
219+
columns_to_exclude.add(original_col)
227220

228-
table_column_names_and_types = (
229-
batch_source.get_table_column_names_and_types(config)
230-
)
221+
table_column_names_and_types = fv.batch_source.get_table_column_names_and_types(
222+
config
223+
)
231224

232-
for col_name, col_datatype in table_column_names_and_types:
233-
if col_name in columns_to_exclude:
234-
continue
235-
elif col_name in join_keys:
236-
field = Field(
237-
name=col_name,
238-
dtype=from_value_type(
239-
batch_source.source_datatype_to_feast_value_type()(
240-
col_datatype
241-
)
242-
),
243-
)
244-
if field.name not in [
245-
entity_column.name
246-
for entity_column in entity_columns
247-
if hasattr(entity_column, "name")
248-
]:
249-
entity_columns.append(field)
250-
elif not re.match(
251-
"^__|__$", col_name
252-
): # double underscores often signal an internal-use column
253-
if run_inference_for_features:
254-
feature_name = (
255-
batch_field_mapping[col_name]
256-
if col_name in batch_field_mapping
257-
else col_name
258-
)
259-
field = Field(
260-
name=feature_name,
261-
dtype=from_value_type(
262-
batch_source.source_datatype_to_feast_value_type()(
263-
col_datatype
264-
)
265-
),
225+
for col_name, col_datatype in table_column_names_and_types:
226+
if col_name in columns_to_exclude:
227+
continue
228+
elif col_name in join_keys:
229+
field = Field(
230+
name=col_name,
231+
dtype=from_value_type(
232+
fv.batch_source.source_datatype_to_feast_value_type()(col_datatype)
233+
),
234+
)
235+
if field.name not in [
236+
entity_column.name for entity_column in fv.entity_columns
237+
]:
238+
fv.entity_columns.append(field)
239+
elif not re.match(
240+
"^__|__$", col_name
241+
): # double underscores often signal an internal-use column
242+
if run_inference_for_features:
243+
feature_name = (
244+
fv.batch_source.field_mapping[col_name]
245+
if col_name in fv.batch_source.field_mapping
246+
else col_name
247+
)
248+
field = Field(
249+
name=feature_name,
250+
dtype=from_value_type(
251+
fv.batch_source.source_datatype_to_feast_value_type()(
252+
col_datatype
266253
)
267-
if field.name not in [
268-
feature.name for feature in source_feature_view.features
269-
]:
270-
source_feature_view.features.append(field)
254+
),
255+
)
256+
if field.name not in [feature.name for feature in fv.features]:
257+
fv.features.append(field)
271258

272-
else:
273-
columns_to_exclude = {
274-
fv.batch_source.timestamp_field,
275-
fv.batch_source.created_timestamp_column,
276-
}
277-
for original_col, mapped_col in fv.batch_source.field_mapping.items():
278-
if mapped_col in columns_to_exclude:
279-
columns_to_exclude.remove(mapped_col)
280-
columns_to_exclude.add(original_col)
281259

282-
table_column_names_and_types = fv.batch_source.get_table_column_names_and_types(
283-
config
284-
)
260+
def _infer_on_demand_features_and_entities(
261+
fv: OnDemandFeatureView,
262+
join_keys: Set[Optional[str]],
263+
run_inference_for_features,
264+
config,
265+
) -> None:
266+
"""
267+
Updates the specific feature in place with inferred features and entities.
268+
Args:
269+
fv: The feature view on which to run inference.
270+
join_keys: The set of join keys for the feature view's entities.
271+
run_inference_for_features: Whether to run inference for features.
272+
config: The config for the current feature store.
273+
"""
274+
entity_columns: list[Field] = []
275+
columns_to_exclude = set()
276+
for (
277+
source_feature_view_name,
278+
source_feature_view,
279+
) in fv.source_feature_view_projections.items():
280+
columns_to_exclude.add(source_feature_view.timestamp_field)
281+
columns_to_exclude.add(source_feature_view.created_timestamp_column)
282+
283+
batch_source = getattr(source_feature_view, "batch_source")
284+
batch_field_mapping = getattr(batch_source or None, "field_mapping")
285+
if batch_field_mapping:
286+
for (
287+
original_col,
288+
mapped_col,
289+
) in batch_field_mapping.items():
290+
if mapped_col in columns_to_exclude:
291+
columns_to_exclude.remove(mapped_col)
292+
columns_to_exclude.add(original_col)
285293

294+
table_column_names_and_types = (
295+
batch_source.get_table_column_names_and_types(config)
296+
)
286297
for col_name, col_datatype in table_column_names_and_types:
287298
if col_name in columns_to_exclude:
288299
continue
289300
elif col_name in join_keys:
290301
field = Field(
291302
name=col_name,
292303
dtype=from_value_type(
293-
fv.batch_source.source_datatype_to_feast_value_type()(
294-
col_datatype
295-
)
304+
batch_source.source_datatype_to_feast_value_type()(col_datatype)
296305
),
297306
)
298307
if field.name not in [
299308
entity_column.name
300-
if not isinstance(entity_column, str)
301-
else entity_column
302309
for entity_column in entity_columns
310+
if hasattr(entity_column, "name")
303311
]:
304312
entity_columns.append(field)
305313
elif not re.match(
306314
"^__|__$", col_name
307315
): # double underscores often signal an internal-use column
308316
if run_inference_for_features:
309317
feature_name = (
310-
fv.batch_source.field_mapping[col_name]
311-
if col_name in fv.batch_source.field_mapping
318+
batch_field_mapping[col_name]
319+
if col_name in batch_field_mapping
312320
else col_name
313321
)
314322
field = Field(
315323
name=feature_name,
316324
dtype=from_value_type(
317-
fv.batch_source.source_datatype_to_feast_value_type()(
325+
batch_source.source_datatype_to_feast_value_type()(
318326
col_datatype
319327
)
320328
),
321329
)
322-
if field.name not in [feature.name for feature in fv.features]:
323-
fv.features.append(field)
324-
330+
if field.name not in [
331+
feature.name for feature in source_feature_view.features
332+
]:
333+
source_feature_view.features.append(field)
325334
fv.entity_columns = entity_columns

0 commit comments

Comments
 (0)