Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sdk/python/feast/infra/compute_engines/feature_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ def get_column_info(
# we need to read ALL source columns, not just the output feature columns.
# This is specifically for transformations that create new columns or need raw data.
mode = getattr(getattr(view, "feature_transformation", None), "mode", None)
if mode == "ray" or getattr(mode, "value", None) == "ray":
if mode in ("ray", "pandas") or getattr(mode, "value", None) in (
"ray",
"pandas",
):
# Signal to read all columns by passing empty list for feature_cols
# The transformation will produce the output columns defined in the schema
feature_cols = []
Expand Down
12 changes: 8 additions & 4 deletions sdk/python/feast/infra/offline_stores/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,10 @@ def evaluate_func():
df[DUMMY_ENTITY_ID] = DUMMY_ENTITY_VAL
columns_to_extract.add(DUMMY_ENTITY_ID)

return df[list(columns_to_extract)].persist()
if feature_name_columns:
df = df[list(columns_to_extract)]

return df.persist()

# When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized
return DaskRetrievalJob(
Expand Down Expand Up @@ -483,9 +486,10 @@ def evaluate_func():
columns_to_extract.add(DUMMY_ENTITY_ID)
# TODO: Decides if we want to field mapping for pull_latest_from_table_or_query
# This is default for other offline store.
df = df[list(columns_to_extract)]
df.persist()
return df
if feature_name_columns:
df = df[list(columns_to_extract)]

return df.persist()

# When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized
return DaskRetrievalJob(
Expand Down
Loading