Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdk/python/feast/diff/infra_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,20 @@ def to_string(self):
TransitionType.UPDATE: ("Updated", Fore.YELLOW),
}
for infra_object_diff in self.infra_object_diffs:
if infra_object_diff.transition_type == TransitionType.UNCHANGED:
continue
action, color = message_action_map[infra_object_diff.transition_type]
log_string += f"{action} {infra_object_diff.infra_object_type} {Style.BRIGHT + color}{infra_object_diff.name}{Style.RESET_ALL}\n"
if infra_object_diff.transition_type == TransitionType.UPDATE:
for _p in infra_object_diff.infra_object_property_diffs:
log_string += f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}\n"

log_string = (
f"{Style.BRIGHT + Fore.LIGHTBLUE_EX}No changes to infrastructure"
if not log_string
else log_string
)

return log_string


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,28 @@
from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry
from feast.repo_contents import RepoContents

Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService)
FeastObject = TypeVar("FeastObject", Entity, BaseFeatureView, FeatureService)


@dataclass
class FcoDiff(Generic[Fco]):
class FeastObjectDiff(Generic[FeastObject]):
name: str
fco_type: FeastObjectType
current_fco: Fco
new_fco: Fco
fco_property_diffs: List[PropertyDiff]
feast_object_type: FeastObjectType
current_feast_object: FeastObject
new_feast_object: FeastObject
feast_object_property_diffs: List[PropertyDiff]
transition_type: TransitionType


@dataclass
class RegistryDiff:
fco_diffs: List[FcoDiff]
feast_object_diffs: List[FeastObjectDiff]

def __init__(self):
self.fco_diffs = []
self.feast_object_diffs = []

def add_fco_diff(self, fco_diff: FcoDiff):
self.fco_diffs.append(fco_diff)
def add_feast_object_diff(self, feast_object_diff: FeastObjectDiff):
self.feast_object_diffs.append(feast_object_diff)

def to_string(self):
from colorama import Fore, Style
Expand All @@ -54,21 +54,29 @@ def to_string(self):
TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX),
TransitionType.UPDATE: ("Updated", Fore.YELLOW),
}
for fco_diff in self.fco_diffs:
if fco_diff.name == DUMMY_ENTITY_NAME:
for feast_object_diff in self.feast_object_diffs:
if feast_object_diff.name == DUMMY_ENTITY_NAME:
continue
action, color = message_action_map[fco_diff.transition_type]
log_string += f"{action} {fco_diff.fco_type.value} {Style.BRIGHT + color}{fco_diff.name}{Style.RESET_ALL}\n"
if fco_diff.transition_type == TransitionType.UPDATE:
for _p in fco_diff.fco_property_diffs:
if feast_object_diff.transition_type == TransitionType.UNCHANGED:
continue
action, color = message_action_map[feast_object_diff.transition_type]
log_string += f"{action} {feast_object_diff.feast_object_type.value} {Style.BRIGHT + color}{feast_object_diff.name}{Style.RESET_ALL}\n"
if feast_object_diff.transition_type == TransitionType.UPDATE:
for _p in feast_object_diff.feast_object_property_diffs:
log_string += f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}\n"

log_string = (
f"{Style.BRIGHT + Fore.LIGHTBLUE_EX}No changes to registry"
if not log_string
else log_string
)

return log_string


def tag_objects_for_keep_delete_update_add(
existing_objs: Iterable[Fco], desired_objs: Iterable[Fco]
) -> Tuple[Set[Fco], Set[Fco], Set[Fco], Set[Fco]]:
existing_objs: Iterable[FeastObject], desired_objs: Iterable[FeastObject]
) -> Tuple[Set[FeastObject], Set[FeastObject], Set[FeastObject], Set[FeastObject]]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}

Expand All @@ -80,8 +88,8 @@ def tag_objects_for_keep_delete_update_add(
return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add


FcoProto = TypeVar(
"FcoProto",
FeastObjectProto = TypeVar(
"FeastObjectProto",
EntityProto,
FeatureViewProto,
FeatureServiceProto,
Expand All @@ -90,25 +98,12 @@ def tag_objects_for_keep_delete_update_add(
)


def tag_proto_objects_for_keep_delete_add(
existing_objs: Iterable[FcoProto], desired_objs: Iterable[FcoProto]
) -> Tuple[Iterable[FcoProto], Iterable[FcoProto], Iterable[FcoProto]]:
existing_obj_names = {e.spec.name for e in existing_objs}
desired_obj_names = {e.spec.name for e in desired_objs}

objs_to_add = [e for e in desired_objs if e.spec.name not in existing_obj_names]
objs_to_keep = [e for e in desired_objs if e.spec.name in existing_obj_names]
objs_to_delete = [e for e in existing_objs if e.spec.name not in desired_obj_names]

return objs_to_keep, objs_to_delete, objs_to_add


FIELDS_TO_IGNORE = {"project"}


def diff_registry_objects(
current: Fco, new: Fco, object_type: FeastObjectType
) -> FcoDiff:
current: FeastObject, new: FeastObject, object_type: FeastObjectType
) -> FeastObjectDiff:
current_proto = current.to_proto()
new_proto = new.to_proto()
assert current_proto.DESCRIPTOR.full_name == new_proto.DESCRIPTOR.full_name
Expand All @@ -129,23 +124,23 @@ def diff_registry_objects(
getattr(new_proto.spec, _field.name),
)
)
return FcoDiff(
return FeastObjectDiff(
name=new_proto.spec.name,
fco_type=object_type,
current_fco=current,
new_fco=new,
fco_property_diffs=property_diffs,
feast_object_type=object_type,
current_feast_object=current,
new_feast_object=new,
feast_object_property_diffs=property_diffs,
transition_type=transition,
)


def extract_objects_for_keep_delete_update_add(
registry: Registry, current_project: str, desired_repo_contents: RepoContents,
) -> Tuple[
Dict[FeastObjectType, Set[Fco]],
Dict[FeastObjectType, Set[Fco]],
Dict[FeastObjectType, Set[Fco]],
Dict[FeastObjectType, Set[Fco]],
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
]:
"""
Returns the objects in the registry that must be modified to achieve the desired repo state.
Expand Down Expand Up @@ -215,30 +210,32 @@ def diff_between(
objects_to_add = objs_to_add[object_type]

for e in objects_to_add:
diff.add_fco_diff(
FcoDiff(
diff.add_feast_object_diff(
FeastObjectDiff(
name=e.name,
fco_type=object_type,
current_fco=None,
new_fco=e,
fco_property_diffs=[],
feast_object_type=object_type,
current_feast_object=None,
new_feast_object=e,
feast_object_property_diffs=[],
transition_type=TransitionType.CREATE,
)
)
for e in objects_to_delete:
diff.add_fco_diff(
FcoDiff(
diff.add_feast_object_diff(
FeastObjectDiff(
name=e.name,
fco_type=object_type,
current_fco=e,
new_fco=None,
fco_property_diffs=[],
feast_object_type=object_type,
current_feast_object=e,
new_feast_object=None,
feast_object_property_diffs=[],
transition_type=TransitionType.DELETE,
)
)
for e in objects_to_update:
current_obj = [_e for _e in objects_to_keep if _e.name == e.name][0]
diff.add_fco_diff(diff_registry_objects(current_obj, e, object_type))
diff.add_feast_object_diff(
diff_registry_objects(current_obj, e, object_type)
)

return diff

Expand All @@ -255,39 +252,47 @@ def apply_diff_to_registry(
project: Feast project to be updated.
commit: Whether the change should be persisted immediately
"""
for fco_diff in registry_diff.fco_diffs:
# There is no need to delete the FCO on an update, since applying the new FCO
# will automatically delete the existing FCO.
if fco_diff.transition_type == TransitionType.DELETE:
if fco_diff.fco_type == FeastObjectType.ENTITY:
registry.delete_entity(fco_diff.current_fco.name, project, commit=False)
elif fco_diff.fco_type == FeastObjectType.FEATURE_SERVICE:
for feast_object_diff in registry_diff.feast_object_diffs:
# There is no need to delete the object on an update, since applying the new object
# will automatically delete the existing object.
if feast_object_diff.transition_type == TransitionType.DELETE:
if feast_object_diff.feast_object_type == FeastObjectType.ENTITY:
registry.delete_entity(
feast_object_diff.current_feast_object.name, project, commit=False
)
elif feast_object_diff.feast_object_type == FeastObjectType.FEATURE_SERVICE:
registry.delete_feature_service(
fco_diff.current_fco.name, project, commit=False
feast_object_diff.current_feast_object.name, project, commit=False
)
elif fco_diff.fco_type in [
elif feast_object_diff.feast_object_type in [
FeastObjectType.FEATURE_VIEW,
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
FeastObjectType.REQUEST_FEATURE_VIEW,
]:
registry.delete_feature_view(
fco_diff.current_fco.name, project, commit=False,
feast_object_diff.current_feast_object.name, project, commit=False,
)

if fco_diff.transition_type in [
if feast_object_diff.transition_type in [
TransitionType.CREATE,
TransitionType.UPDATE,
]:
if fco_diff.fco_type == FeastObjectType.ENTITY:
registry.apply_entity(fco_diff.new_fco, project, commit=False)
elif fco_diff.fco_type == FeastObjectType.FEATURE_SERVICE:
registry.apply_feature_service(fco_diff.new_fco, project, commit=False)
elif fco_diff.fco_type in [
if feast_object_diff.feast_object_type == FeastObjectType.ENTITY:
registry.apply_entity(
feast_object_diff.new_feast_object, project, commit=False
)
elif feast_object_diff.feast_object_type == FeastObjectType.FEATURE_SERVICE:
registry.apply_feature_service(
feast_object_diff.new_feast_object, project, commit=False
)
elif feast_object_diff.feast_object_type in [
FeastObjectType.FEATURE_VIEW,
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
FeastObjectType.REQUEST_FEATURE_VIEW,
]:
registry.apply_feature_view(fco_diff.new_fco, project, commit=False)
registry.apply_feature_view(
feast_object_diff.new_feast_object, project, commit=False
)

if commit:
registry.commit()
63 changes: 22 additions & 41 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@

from feast import feature_server, flags, flags_helper, utils
from feast.base_feature_view import BaseFeatureView
from feast.diff.FcoDiff import RegistryDiff, apply_diff_to_registry, diff_between
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
from feast.diff.property_diff import TransitionType
from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between
from feast.entity import Entity
from feast.errors import (
EntityNotFoundException,
Expand Down Expand Up @@ -75,7 +74,7 @@
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value
from feast.registry import FeastObjectType, Registry
from feast.registry import Registry
from feast.repo_config import RepoConfig, load_repo_config
from feast.repo_contents import RepoContents
from feast.request_feature_view import RequestFeatureView
Expand Down Expand Up @@ -126,6 +125,7 @@ def __init__(

registry_config = self.config.get_registry_config()
self._registry = Registry(registry_config, repo_path=self.repo_path)
self._registry._initialize_registry()
self._provider = get_provider(self.config, self.repo_path)

@log_exceptions
Expand Down Expand Up @@ -429,8 +429,10 @@ def _make_inferences(
[view.batch_source for view in views_to_update], self.config
)

# New feature views may reference previously applied entities.
entities = self._list_entities()
update_feature_views_with_inferred_features(
views_to_update, entities_to_update, self.config
views_to_update, entities + entities_to_update, self.config
)

for odfv in odfvs_to_update:
Expand Down Expand Up @@ -476,10 +478,26 @@ def _plan(
... )
>>> registry_diff, infra_diff, new_infra = fs._plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
"""
# Validate and run inference on all the objects to be registered.
self._validate_all_feature_views(
list(desired_repo_contents.feature_views),
list(desired_repo_contents.on_demand_feature_views),
list(desired_repo_contents.request_feature_views),
)
self._make_inferences(
list(desired_repo_contents.entities),
list(desired_repo_contents.feature_views),
list(desired_repo_contents.on_demand_feature_views),
)

# Compute the desired difference between the current objects in the registry and
# the desired repo state.
registry_diff = diff_between(
self._registry, self.project, desired_repo_contents
)

# Compute the desired difference between the current infra, as stored in the registry,
# and the desired infra.
self._registry.refresh()
current_infra_proto = (
self._registry.cached_registry_proto.infra.__deepcopy__()
Expand All @@ -504,43 +522,6 @@ def _apply_diffs(
infra_diff: The diff between the current infra and the desired infra.
new_infra: The desired infra.
"""
entities_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.ENTITY
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]
views_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.FEATURE_VIEW
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]
odfvs_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.ON_DEMAND_FEATURE_VIEW
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]
request_views_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.REQUEST_FEATURE_VIEW
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]

# TODO(felixwang9817): move validation logic into _plan.
# Validate all feature views and make inferences.
self._validate_all_feature_views(
views_to_update, odfvs_to_update, request_views_to_update
)
self._make_inferences(entities_to_update, views_to_update, odfvs_to_update)

# Apply infra and registry changes.
infra_diff.update()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking, but another cleanup would be to have InfraObject#update -> InfraObject#create or something since it only right now creates the object

apply_diff_to_registry(
self._registry, registry_diff, self.project, commit=False
Expand Down
Loading