Skip to content

Commit 895589a

Browse files
Feast plan clean up (feast-dev#2256)
* Run validation and inference on views and entities during plan Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Do not log objects that are unchanged Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Rename Fco to FeastObject Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Remove useless method Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Lint Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Always initialize registry during feature store initialization Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Fix usage test Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Remove print statements Signed-off-by: Felix Wang <wangfelix98@gmail.com>
1 parent 08d6881 commit 895589a

7 files changed

Lines changed: 145 additions & 171 deletions

File tree

sdk/python/feast/diff/infra_diff.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,20 @@ def to_string(self):
7171
TransitionType.UPDATE: ("Updated", Fore.YELLOW),
7272
}
7373
for infra_object_diff in self.infra_object_diffs:
74+
if infra_object_diff.transition_type == TransitionType.UNCHANGED:
75+
continue
7476
action, color = message_action_map[infra_object_diff.transition_type]
7577
log_string += f"{action} {infra_object_diff.infra_object_type} {Style.BRIGHT + color}{infra_object_diff.name}{Style.RESET_ALL}\n"
7678
if infra_object_diff.transition_type == TransitionType.UPDATE:
7779
for _p in infra_object_diff.infra_object_property_diffs:
7880
log_string += f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}\n"
7981

82+
log_string = (
83+
f"{Style.BRIGHT + Fore.LIGHTBLUE_EX}No changes to infrastructure"
84+
if not log_string
85+
else log_string
86+
)
87+
8088
return log_string
8189

8290

Lines changed: 79 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,28 @@
2020
from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry
2121
from feast.repo_contents import RepoContents
2222

23-
Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService)
23+
FeastObject = TypeVar("FeastObject", Entity, BaseFeatureView, FeatureService)
2424

2525

2626
@dataclass
27-
class FcoDiff(Generic[Fco]):
27+
class FeastObjectDiff(Generic[FeastObject]):
2828
name: str
29-
fco_type: FeastObjectType
30-
current_fco: Fco
31-
new_fco: Fco
32-
fco_property_diffs: List[PropertyDiff]
29+
feast_object_type: FeastObjectType
30+
current_feast_object: FeastObject
31+
new_feast_object: FeastObject
32+
feast_object_property_diffs: List[PropertyDiff]
3333
transition_type: TransitionType
3434

3535

3636
@dataclass
3737
class RegistryDiff:
38-
fco_diffs: List[FcoDiff]
38+
feast_object_diffs: List[FeastObjectDiff]
3939

4040
def __init__(self):
41-
self.fco_diffs = []
41+
self.feast_object_diffs = []
4242

43-
def add_fco_diff(self, fco_diff: FcoDiff):
44-
self.fco_diffs.append(fco_diff)
43+
def add_feast_object_diff(self, feast_object_diff: FeastObjectDiff):
44+
self.feast_object_diffs.append(feast_object_diff)
4545

4646
def to_string(self):
4747
from colorama import Fore, Style
@@ -54,21 +54,29 @@ def to_string(self):
5454
TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX),
5555
TransitionType.UPDATE: ("Updated", Fore.YELLOW),
5656
}
57-
for fco_diff in self.fco_diffs:
58-
if fco_diff.name == DUMMY_ENTITY_NAME:
57+
for feast_object_diff in self.feast_object_diffs:
58+
if feast_object_diff.name == DUMMY_ENTITY_NAME:
5959
continue
60-
action, color = message_action_map[fco_diff.transition_type]
61-
log_string += f"{action} {fco_diff.fco_type.value} {Style.BRIGHT + color}{fco_diff.name}{Style.RESET_ALL}\n"
62-
if fco_diff.transition_type == TransitionType.UPDATE:
63-
for _p in fco_diff.fco_property_diffs:
60+
if feast_object_diff.transition_type == TransitionType.UNCHANGED:
61+
continue
62+
action, color = message_action_map[feast_object_diff.transition_type]
63+
log_string += f"{action} {feast_object_diff.feast_object_type.value} {Style.BRIGHT + color}{feast_object_diff.name}{Style.RESET_ALL}\n"
64+
if feast_object_diff.transition_type == TransitionType.UPDATE:
65+
for _p in feast_object_diff.feast_object_property_diffs:
6466
log_string += f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}\n"
6567

68+
log_string = (
69+
f"{Style.BRIGHT + Fore.LIGHTBLUE_EX}No changes to registry"
70+
if not log_string
71+
else log_string
72+
)
73+
6674
return log_string
6775

6876

6977
def tag_objects_for_keep_delete_update_add(
70-
existing_objs: Iterable[Fco], desired_objs: Iterable[Fco]
71-
) -> Tuple[Set[Fco], Set[Fco], Set[Fco], Set[Fco]]:
78+
existing_objs: Iterable[FeastObject], desired_objs: Iterable[FeastObject]
79+
) -> Tuple[Set[FeastObject], Set[FeastObject], Set[FeastObject], Set[FeastObject]]:
7280
existing_obj_names = {e.name for e in existing_objs}
7381
desired_obj_names = {e.name for e in desired_objs}
7482

@@ -80,8 +88,8 @@ def tag_objects_for_keep_delete_update_add(
8088
return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add
8189

8290

83-
FcoProto = TypeVar(
84-
"FcoProto",
91+
FeastObjectProto = TypeVar(
92+
"FeastObjectProto",
8593
EntityProto,
8694
FeatureViewProto,
8795
FeatureServiceProto,
@@ -90,25 +98,12 @@ def tag_objects_for_keep_delete_update_add(
9098
)
9199

92100

93-
def tag_proto_objects_for_keep_delete_add(
94-
existing_objs: Iterable[FcoProto], desired_objs: Iterable[FcoProto]
95-
) -> Tuple[Iterable[FcoProto], Iterable[FcoProto], Iterable[FcoProto]]:
96-
existing_obj_names = {e.spec.name for e in existing_objs}
97-
desired_obj_names = {e.spec.name for e in desired_objs}
98-
99-
objs_to_add = [e for e in desired_objs if e.spec.name not in existing_obj_names]
100-
objs_to_keep = [e for e in desired_objs if e.spec.name in existing_obj_names]
101-
objs_to_delete = [e for e in existing_objs if e.spec.name not in desired_obj_names]
102-
103-
return objs_to_keep, objs_to_delete, objs_to_add
104-
105-
106101
FIELDS_TO_IGNORE = {"project"}
107102

108103

109104
def diff_registry_objects(
110-
current: Fco, new: Fco, object_type: FeastObjectType
111-
) -> FcoDiff:
105+
current: FeastObject, new: FeastObject, object_type: FeastObjectType
106+
) -> FeastObjectDiff:
112107
current_proto = current.to_proto()
113108
new_proto = new.to_proto()
114109
assert current_proto.DESCRIPTOR.full_name == new_proto.DESCRIPTOR.full_name
@@ -129,23 +124,23 @@ def diff_registry_objects(
129124
getattr(new_proto.spec, _field.name),
130125
)
131126
)
132-
return FcoDiff(
127+
return FeastObjectDiff(
133128
name=new_proto.spec.name,
134-
fco_type=object_type,
135-
current_fco=current,
136-
new_fco=new,
137-
fco_property_diffs=property_diffs,
129+
feast_object_type=object_type,
130+
current_feast_object=current,
131+
new_feast_object=new,
132+
feast_object_property_diffs=property_diffs,
138133
transition_type=transition,
139134
)
140135

141136

142137
def extract_objects_for_keep_delete_update_add(
143138
registry: Registry, current_project: str, desired_repo_contents: RepoContents,
144139
) -> Tuple[
145-
Dict[FeastObjectType, Set[Fco]],
146-
Dict[FeastObjectType, Set[Fco]],
147-
Dict[FeastObjectType, Set[Fco]],
148-
Dict[FeastObjectType, Set[Fco]],
140+
Dict[FeastObjectType, Set[FeastObject]],
141+
Dict[FeastObjectType, Set[FeastObject]],
142+
Dict[FeastObjectType, Set[FeastObject]],
143+
Dict[FeastObjectType, Set[FeastObject]],
149144
]:
150145
"""
151146
Returns the objects in the registry that must be modified to achieve the desired repo state.
@@ -215,30 +210,32 @@ def diff_between(
215210
objects_to_add = objs_to_add[object_type]
216211

217212
for e in objects_to_add:
218-
diff.add_fco_diff(
219-
FcoDiff(
213+
diff.add_feast_object_diff(
214+
FeastObjectDiff(
220215
name=e.name,
221-
fco_type=object_type,
222-
current_fco=None,
223-
new_fco=e,
224-
fco_property_diffs=[],
216+
feast_object_type=object_type,
217+
current_feast_object=None,
218+
new_feast_object=e,
219+
feast_object_property_diffs=[],
225220
transition_type=TransitionType.CREATE,
226221
)
227222
)
228223
for e in objects_to_delete:
229-
diff.add_fco_diff(
230-
FcoDiff(
224+
diff.add_feast_object_diff(
225+
FeastObjectDiff(
231226
name=e.name,
232-
fco_type=object_type,
233-
current_fco=e,
234-
new_fco=None,
235-
fco_property_diffs=[],
227+
feast_object_type=object_type,
228+
current_feast_object=e,
229+
new_feast_object=None,
230+
feast_object_property_diffs=[],
236231
transition_type=TransitionType.DELETE,
237232
)
238233
)
239234
for e in objects_to_update:
240235
current_obj = [_e for _e in objects_to_keep if _e.name == e.name][0]
241-
diff.add_fco_diff(diff_registry_objects(current_obj, e, object_type))
236+
diff.add_feast_object_diff(
237+
diff_registry_objects(current_obj, e, object_type)
238+
)
242239

243240
return diff
244241

@@ -255,39 +252,47 @@ def apply_diff_to_registry(
255252
project: Feast project to be updated.
256253
commit: Whether the change should be persisted immediately
257254
"""
258-
for fco_diff in registry_diff.fco_diffs:
259-
# There is no need to delete the FCO on an update, since applying the new FCO
260-
# will automatically delete the existing FCO.
261-
if fco_diff.transition_type == TransitionType.DELETE:
262-
if fco_diff.fco_type == FeastObjectType.ENTITY:
263-
registry.delete_entity(fco_diff.current_fco.name, project, commit=False)
264-
elif fco_diff.fco_type == FeastObjectType.FEATURE_SERVICE:
255+
for feast_object_diff in registry_diff.feast_object_diffs:
256+
# There is no need to delete the object on an update, since applying the new object
257+
# will automatically delete the existing object.
258+
if feast_object_diff.transition_type == TransitionType.DELETE:
259+
if feast_object_diff.feast_object_type == FeastObjectType.ENTITY:
260+
registry.delete_entity(
261+
feast_object_diff.current_feast_object.name, project, commit=False
262+
)
263+
elif feast_object_diff.feast_object_type == FeastObjectType.FEATURE_SERVICE:
265264
registry.delete_feature_service(
266-
fco_diff.current_fco.name, project, commit=False
265+
feast_object_diff.current_feast_object.name, project, commit=False
267266
)
268-
elif fco_diff.fco_type in [
267+
elif feast_object_diff.feast_object_type in [
269268
FeastObjectType.FEATURE_VIEW,
270269
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
271270
FeastObjectType.REQUEST_FEATURE_VIEW,
272271
]:
273272
registry.delete_feature_view(
274-
fco_diff.current_fco.name, project, commit=False,
273+
feast_object_diff.current_feast_object.name, project, commit=False,
275274
)
276275

277-
if fco_diff.transition_type in [
276+
if feast_object_diff.transition_type in [
278277
TransitionType.CREATE,
279278
TransitionType.UPDATE,
280279
]:
281-
if fco_diff.fco_type == FeastObjectType.ENTITY:
282-
registry.apply_entity(fco_diff.new_fco, project, commit=False)
283-
elif fco_diff.fco_type == FeastObjectType.FEATURE_SERVICE:
284-
registry.apply_feature_service(fco_diff.new_fco, project, commit=False)
285-
elif fco_diff.fco_type in [
280+
if feast_object_diff.feast_object_type == FeastObjectType.ENTITY:
281+
registry.apply_entity(
282+
feast_object_diff.new_feast_object, project, commit=False
283+
)
284+
elif feast_object_diff.feast_object_type == FeastObjectType.FEATURE_SERVICE:
285+
registry.apply_feature_service(
286+
feast_object_diff.new_feast_object, project, commit=False
287+
)
288+
elif feast_object_diff.feast_object_type in [
286289
FeastObjectType.FEATURE_VIEW,
287290
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
288291
FeastObjectType.REQUEST_FEATURE_VIEW,
289292
]:
290-
registry.apply_feature_view(fco_diff.new_fco, project, commit=False)
293+
registry.apply_feature_view(
294+
feast_object_diff.new_feast_object, project, commit=False
295+
)
291296

292297
if commit:
293298
registry.commit()

sdk/python/feast/feature_store.py

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@
3939

4040
from feast import feature_server, flags, flags_helper, utils
4141
from feast.base_feature_view import BaseFeatureView
42-
from feast.diff.FcoDiff import RegistryDiff, apply_diff_to_registry, diff_between
4342
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
44-
from feast.diff.property_diff import TransitionType
43+
from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between
4544
from feast.entity import Entity
4645
from feast.errors import (
4746
EntityNotFoundException,
@@ -75,7 +74,7 @@
7574
)
7675
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
7776
from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value
78-
from feast.registry import FeastObjectType, Registry
77+
from feast.registry import Registry
7978
from feast.repo_config import RepoConfig, load_repo_config
8079
from feast.repo_contents import RepoContents
8180
from feast.request_feature_view import RequestFeatureView
@@ -126,6 +125,7 @@ def __init__(
126125

127126
registry_config = self.config.get_registry_config()
128127
self._registry = Registry(registry_config, repo_path=self.repo_path)
128+
self._registry._initialize_registry()
129129
self._provider = get_provider(self.config, self.repo_path)
130130

131131
@log_exceptions
@@ -429,8 +429,10 @@ def _make_inferences(
429429
[view.batch_source for view in views_to_update], self.config
430430
)
431431

432+
# New feature views may reference previously applied entities.
433+
entities = self._list_entities()
432434
update_feature_views_with_inferred_features(
433-
views_to_update, entities_to_update, self.config
435+
views_to_update, entities + entities_to_update, self.config
434436
)
435437

436438
for odfv in odfvs_to_update:
@@ -476,10 +478,26 @@ def _plan(
476478
... )
477479
>>> registry_diff, infra_diff, new_infra = fs._plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
478480
"""
481+
# Validate and run inference on all the objects to be registered.
482+
self._validate_all_feature_views(
483+
list(desired_repo_contents.feature_views),
484+
list(desired_repo_contents.on_demand_feature_views),
485+
list(desired_repo_contents.request_feature_views),
486+
)
487+
self._make_inferences(
488+
list(desired_repo_contents.entities),
489+
list(desired_repo_contents.feature_views),
490+
list(desired_repo_contents.on_demand_feature_views),
491+
)
492+
493+
# Compute the desired difference between the current objects in the registry and
494+
# the desired repo state.
479495
registry_diff = diff_between(
480496
self._registry, self.project, desired_repo_contents
481497
)
482498

499+
# Compute the desired difference between the current infra, as stored in the registry,
500+
# and the desired infra.
483501
self._registry.refresh()
484502
current_infra_proto = (
485503
self._registry.cached_registry_proto.infra.__deepcopy__()
@@ -504,43 +522,6 @@ def _apply_diffs(
504522
infra_diff: The diff between the current infra and the desired infra.
505523
new_infra: The desired infra.
506524
"""
507-
entities_to_update = [
508-
fco_diff.new_fco
509-
for fco_diff in registry_diff.fco_diffs
510-
if fco_diff.fco_type == FeastObjectType.ENTITY
511-
and fco_diff.transition_type
512-
in [TransitionType.CREATE, TransitionType.UPDATE]
513-
]
514-
views_to_update = [
515-
fco_diff.new_fco
516-
for fco_diff in registry_diff.fco_diffs
517-
if fco_diff.fco_type == FeastObjectType.FEATURE_VIEW
518-
and fco_diff.transition_type
519-
in [TransitionType.CREATE, TransitionType.UPDATE]
520-
]
521-
odfvs_to_update = [
522-
fco_diff.new_fco
523-
for fco_diff in registry_diff.fco_diffs
524-
if fco_diff.fco_type == FeastObjectType.ON_DEMAND_FEATURE_VIEW
525-
and fco_diff.transition_type
526-
in [TransitionType.CREATE, TransitionType.UPDATE]
527-
]
528-
request_views_to_update = [
529-
fco_diff.new_fco
530-
for fco_diff in registry_diff.fco_diffs
531-
if fco_diff.fco_type == FeastObjectType.REQUEST_FEATURE_VIEW
532-
and fco_diff.transition_type
533-
in [TransitionType.CREATE, TransitionType.UPDATE]
534-
]
535-
536-
# TODO(felixwang9817): move validation logic into _plan.
537-
# Validate all feature views and make inferences.
538-
self._validate_all_feature_views(
539-
views_to_update, odfvs_to_update, request_views_to_update
540-
)
541-
self._make_inferences(entities_to_update, views_to_update, odfvs_to_update)
542-
543-
# Apply infra and registry changes.
544525
infra_diff.update()
545526
apply_diff_to_registry(
546527
self._registry, registry_diff, self.project, commit=False

0 commit comments

Comments
 (0)