Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Run validation and inference on views and entities during plan
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 committed Jan 29, 2022
commit 817a8da8766d6a94a56e0f545803b1c235ca2b60
57 changes: 19 additions & 38 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,10 @@ def _make_inferences(
[view.batch_source for view in views_to_update], self.config
)

# New feature views may reference previously applied entities.
entities = self._list_entities()
update_feature_views_with_inferred_features(
views_to_update, entities_to_update, self.config
views_to_update, entities + entities_to_update, self.config
)

for odfv in odfvs_to_update:
Expand Down Expand Up @@ -476,10 +478,26 @@ def _plan(
... )
>>> registry_diff, infra_diff, new_infra = fs._plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
"""
# Validate and run inference on all the objects to be registered.
self._validate_all_feature_views(
list(desired_repo_contents.feature_views),
list(desired_repo_contents.on_demand_feature_views),
list(desired_repo_contents.request_feature_views),
)
self._make_inferences(
list(desired_repo_contents.entities),
list(desired_repo_contents.feature_views),
list(desired_repo_contents.on_demand_feature_views),
)

# Compute the desired difference between the current objects in the registry and
# the desired repo state.
registry_diff = diff_between(
self._registry, self.project, desired_repo_contents
)

# Compute the desired difference between the current infra, as stored in the registry,
# and the desired infra.
self._registry.refresh()
current_infra_proto = (
self._registry.cached_registry_proto.infra.__deepcopy__()
Expand All @@ -504,43 +522,6 @@ def _apply_diffs(
infra_diff: The diff between the current infra and the desired infra.
new_infra: The desired infra.
"""
entities_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.ENTITY
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]
views_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.FEATURE_VIEW
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]
odfvs_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.ON_DEMAND_FEATURE_VIEW
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]
request_views_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.REQUEST_FEATURE_VIEW
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]

# TODO(felixwang9817): move validation logic into _plan.
# Validate all feature views and make inferences.
self._validate_all_feature_views(
views_to_update, odfvs_to_update, request_views_to_update
)
self._make_inferences(entities_to_update, views_to_update, odfvs_to_update)

# Apply infra and registry changes.
infra_diff.update()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking, but another cleanup would be to have InfraObject#update -> InfraObject#create or something since it only right now creates the object

apply_diff_to_registry(
self._registry, registry_diff, self.project, commit=False
Expand Down
12 changes: 11 additions & 1 deletion sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ def update_entities_with_inferred_types_from_feature_views(
entities: List[Entity], feature_views: List[FeatureView], config: RepoConfig
) -> None:
"""
Infer entity value type by examining schema of feature view batch sources
Infers the types of the entities by examining the schemas of feature view batch sources.

Args:
entities: The entities to be updated.
feature_views: A list containing feature views associated with the entities.
config: The config for the current feature store.
"""
incomplete_entities = {
entity.name: entity
Expand Down Expand Up @@ -127,6 +132,11 @@ def update_feature_views_with_inferred_features(
Infers the set of features associated to each FeatureView and updates the FeatureView with those features.
Inference occurs through considering each column of the underlying data source as a feature except columns that are
associated with the data source's timestamp columns and the FeatureView's entity columns.

Args:
fvs: The feature views to be updated.
entities: A list containing entities associated with the feature views.
config: The config for the current feature store.
"""
entity_name_to_join_key_map = {entity.name: entity.join_key for entity in entities}

Expand Down