Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixing when source is used
Signed-off-by: Nick Quinn <nicholas_quinn@apple.com>
  • Loading branch information
nickquinn408 authored and ntkathole committed Mar 10, 2026
commit 33d580192df47398fa22fe5e13649aba51398cad
8 changes: 6 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,11 +702,11 @@ def _make_inferences(
)

update_data_sources_with_inferred_event_timestamp_col(
[view.batch_source for view in views_to_update], self.config
[view.batch_source for view in views_to_update if view.batch_source is not None], self.config
)

update_data_sources_with_inferred_event_timestamp_col(
[view.batch_source for view in sfvs_to_update], self.config
[view.batch_source for view in sfvs_to_update if view.batch_source is not None], self.config
)

# New feature views may reference previously applied entities.
Expand Down Expand Up @@ -2416,6 +2416,10 @@ def write_to_offline_store(

provider = self._get_provider()
# Get columns of the batch source and the input dataframe.
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source."
)
column_names_and_types = (
provider.get_table_column_names_and_types_from_data_source(
self.config, feature_view.batch_source
Expand Down
11 changes: 6 additions & 5 deletions sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,20 @@ def from_proto(proto: FeatureViewProjectionProto) -> "FeatureViewProjection":
@staticmethod
def from_feature_view_definition(feature_view: "FeatureView"):
# TODO need to implement this for StreamFeatureViews
if getattr(feature_view, "batch_source", None):
batch_source = getattr(feature_view, "batch_source", None)
if batch_source:
return FeatureViewProjection(
name=feature_view.name,
name_alias=None,
features=feature_view.features,
desired_features=[],
timestamp_field=feature_view.batch_source.created_timestamp_column
timestamp_field=batch_source.created_timestamp_column
or None,
created_timestamp_column=feature_view.batch_source.created_timestamp_column
created_timestamp_column=batch_source.created_timestamp_column
or None,
date_partition_column=feature_view.batch_source.date_partition_column
date_partition_column=batch_source.date_partition_column
or None,
batch_source=feature_view.batch_source or None,
batch_source=batch_source or None,
)
else:
return FeatureViewProjection(
Expand Down
18 changes: 16 additions & 2 deletions sdk/python/feast/feature_view_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def resolve_feature_view_source(

if not is_derived_view:
# Regular feature view - use its batch_source directly
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source."
)
return FeatureViewSourceInfo(
data_source=feature_view.batch_source,
source_type="batch_source",
Expand Down Expand Up @@ -178,8 +182,13 @@ def resolve_feature_view_source(
if hasattr(parent_view, "source_views") and parent_view.source_views:
# Parent is also a derived view - recursively find original source
original_source_view = find_original_source_view(parent_view)
original_batch_source = original_source_view.batch_source
if original_batch_source is None:
raise ValueError(
f"Original source view '{original_source_view.name}' has no batch_source."
)
return FeatureViewSourceInfo(
data_source=original_source_view.batch_source,
data_source=original_batch_source,
source_type="original_source",
has_transformation=view_has_transformation,
transformation_func=transformation_func,
Expand Down Expand Up @@ -229,8 +238,13 @@ def resolve_feature_view_source_with_fallback(
elif hasattr(feature_view, "source_views") and feature_view.source_views:
# Try the original source view as last resort
original_view = find_original_source_view(feature_view)
original_view_batch_source = original_view.batch_source
if original_view_batch_source is None:
raise ValueError(
f"Original source view '{original_view.name}' has no batch_source."
)
return FeatureViewSourceInfo(
data_source=original_view.batch_source,
data_source=original_view_batch_source,
source_type="fallback_original_source",
has_transformation=has_transformation(feature_view),
transformation_func=get_transformation_function(feature_view),
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ def _infer_features_and_entities(
fv, join_keys, run_inference_for_features, config
)

if fv.batch_source is None:
return

entity_columns: List[Field] = fv.entity_columns if fv.entity_columns else []
columns_to_exclude = {
fv.batch_source.timestamp_field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def _materialize_one(

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
data_source=feature_view.batch_source, # type: ignore[arg-type]
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _materialize_one(

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
data_source=feature_view.batch_source, # type: ignore[arg-type]
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/compute_engines/ray/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def _materialize_from_offline_store(
# Pull data from offline store
retrieval_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
data_source=feature_view.batch_source, # type: ignore[arg-type]
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,14 @@ def _materialize_one(
timestamp_field,
created_timestamp_column,
) = _get_column_names(feature_view, entities)
assert feature_view.batch_source is not None # guaranteed by _get_column_names

job_id = f"{feature_view.name}-{start_date}-{end_date}"

try:
offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
data_source=feature_view.batch_source, # type: ignore[arg-type]
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
Expand Down Expand Up @@ -341,6 +342,7 @@ def generate_snowflake_materialization_query(
feature_batch: list,
project: str,
) -> str:
assert feature_view.batch_source is not None
if feature_view.batch_source.created_timestamp_column:
fv_created_str = f',"{feature_view.batch_source.created_timestamp_column}"'
else:
Expand Down Expand Up @@ -406,6 +408,7 @@ def materialize_to_snowflake_online_store(
project: str,
) -> None:
assert_snowflake_feature_names(feature_view)
assert feature_view.batch_source is not None

feature_names_str = '", "'.join(
[feature.name for feature in feature_view.features]
Expand Down Expand Up @@ -467,6 +470,7 @@ def materialize_to_external_online_store(
feature_view: Union[StreamFeatureView, FeatureView],
pbar: tqdm,
) -> None:
assert feature_view.batch_source is not None
feature_names = [feature.name for feature in feature_view.features]

with GetSnowflakeConnection(repo_config.batch_engine) as conn:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/compute_engines/spark/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def _materialize_from_offline_store(
SparkRetrievalJob,
self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
data_source=feature_view.batch_source, # type: ignore[arg-type]
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2168,7 +2168,7 @@ def get_historical_features(

# Build reverse field mapping to get actual source column names
reverse_field_mapping = {}
if fv.batch_source.field_mapping:
if fv.batch_source is not None and fv.batch_source.field_mapping:
reverse_field_mapping = {
v: k for k, v in fv.batch_source.field_mapping.items()
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ def _field_mapping(
full_feature_names: bool,
) -> Tuple[dd.DataFrame, str]:
# Rename columns by the field mapping dictionary if it exists
if feature_view.batch_source.field_mapping:
if feature_view.batch_source is not None and feature_view.batch_source.field_mapping:
df_to_join = _run_dask_field_mapping(
df_to_join, feature_view.batch_source.field_mapping
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def _get_offline_store_for_feature_view(
self, feature_view: FeatureView, config: RepoConfig
) -> OfflineStore:
self._initialize_offline_stores(config)
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source."
)
source_type = feature_view.batch_source.source_type()
store_key = self.get_source_key_from_type(source_type)
if store_key is None:
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/infra/offline_stores/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ def get_historical_features_ibis(
def read_fv(
feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool
) -> Tuple:
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source and cannot be queried."
)
fv_table: Table = data_source_reader(
feature_view.batch_source, str(config.repo_path)
)
Expand Down Expand Up @@ -335,6 +339,10 @@ def offline_write_batch_ibis(
progress: Optional[Callable[[int], Any]],
data_source_writer: Callable[[pyarrow.Table, DataSource, str], None],
):
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source."
)
pa_schema, column_names = get_pyarrow_schema_from_batch_source(
config, feature_view.batch_source
)
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ def get_feature_view_query_context(

query_context = []
for feature_view, features in feature_views_to_feature_map.items():
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source and cannot be queried."
)
reverse_field_mapping = {
v: k for k, v in feature_view.batch_source.field_mapping.items()
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/passthrough_provider.py
Comment thread
nquinn408 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ async def ingest_df_async(
)

def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table):
if feature_view.batch_source.field_mapping is not None:
if feature_view.batch_source is not None and feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)
Expand Down
8 changes: 3 additions & 5 deletions sdk/python/feast/repo_operations.py
Comment thread
nquinn408 marked this conversation as resolved.
Comment thread
nquinn408 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,8 @@ def parse_repo(repo_root: Path) -> RepoContents:

# Handle batch sources defined with feature views.
batch_source = obj.batch_source
if not any((batch_source is ds) for ds in res.data_sources):
if batch_source is not None and not any((batch_source is ds) for ds in res.data_sources):
res.data_sources.append(batch_source)

# Handle stream sources defined with feature views.
assert obj.stream_source
stream_source = obj.stream_source
if not any((stream_source is ds) for ds in res.data_sources):
Expand All @@ -195,7 +193,7 @@ def parse_repo(repo_root: Path) -> RepoContents:

# Handle batch sources defined with feature views.
batch_source = obj.batch_source
if not any((batch_source is ds) for ds in res.data_sources):
if batch_source is not None and not any((batch_source is ds) for ds in res.data_sources):
res.data_sources.append(batch_source)
elif isinstance(obj, Entity) and not any(
(obj is entity) for entity in res.entities
Expand Down Expand Up @@ -345,7 +343,7 @@ def apply_total_with_repo_instance(
):
if not skip_source_validation:
provider = store._get_provider()
data_sources = [t.batch_source for t in repo.feature_views]
data_sources = [t.batch_source for t in repo.feature_views if t.batch_source is not None]
# Make sure the data source used by this feature view is supported by Feast
for data_source in data_sources:
provider.validate_data_source(store.config, data_source)
Expand Down
10 changes: 10 additions & 0 deletions sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ def _get_column_names(
and reverse-mapped created timestamp column that will be passed into
the query to the offline store.
"""
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source and cannot be used for offline retrieval."
)

# if we have mapped fields, use the original field names in the call to the offline store
timestamp_field = feature_view.batch_source.timestamp_field

Expand Down Expand Up @@ -342,6 +347,11 @@ def _convert_arrow_fv_to_proto(
if isinstance(table, pyarrow.Table):
table = table.to_batches()[0]

if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source and cannot be converted to proto."
)

# TODO: This will break if the feature view has aggregations or transformations
columns = [
(field.name, field.dtype.to_value_type()) for field in feature_view.features
Expand Down