Skip to content

Commit 32a4cdb

Browse files
authored
Refactor tag methods to infer created, deleted, and kept repo objects (feast-dev#2142)
* Refactor tag methods to infer objects that are created, deleted, and kept Signed-off-by: Achal Shah <achals@gmail.com> * Fixes Signed-off-by: Achal Shah <achals@gmail.com> * Fixes Signed-off-by: Achal Shah <achals@gmail.com> * True Fixes Signed-off-by: Achal Shah <achals@gmail.com> * Use the same tag method Signed-off-by: Achal Shah <achals@gmail.com> * CR updates Signed-off-by: Achal Shah <achals@gmail.com> * CR updates Signed-off-by: Achal Shah <achals@gmail.com>
1 parent be4b466 commit 32a4cdb

File tree

6 files changed

+232
-118
lines changed

6 files changed

+232
-118
lines changed

sdk/python/feast/diff/FcoDiff.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from dataclasses import dataclass
2+
from enum import Enum
3+
from typing import Any, Iterable, List, Set, Tuple, TypeVar
4+
5+
from feast.base_feature_view import BaseFeatureView
6+
from feast.entity import Entity
7+
from feast.feature_service import FeatureService
8+
from feast.feature_table import FeatureTable
9+
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
10+
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
11+
12+
13+
@dataclass
14+
class PropertyDiff:
15+
property_name: str
16+
val_existing: str
17+
val_declared: str
18+
19+
20+
class TransitionType(Enum):
21+
UNKNOWN = 0
22+
CREATE = 1
23+
DELETE = 2
24+
UPDATE = 3
25+
UNCHANGED = 4
26+
27+
28+
@dataclass
29+
class FcoDiff:
30+
current_fco: Any
31+
new_fco: Any
32+
fco_property_diffs: List[PropertyDiff]
33+
transition_type: TransitionType
34+
35+
36+
@dataclass
37+
class RegistryDiff:
38+
fco_diffs: List[FcoDiff]
39+
40+
def __init__(self):
41+
self.fco_diffs = []
42+
43+
def add_fco_diff(self, fco_diff: FcoDiff):
44+
self.fco_diffs.append(fco_diff)
45+
46+
47+
T = TypeVar("T", Entity, BaseFeatureView, FeatureService, FeatureTable)
48+
49+
50+
def tag_objects_for_keep_delete_add(
51+
existing_objs: Iterable[T], desired_objs: Iterable[T]
52+
) -> Tuple[Set[T], Set[T], Set[T]]:
53+
existing_obj_names = {e.name for e in existing_objs}
54+
desired_obj_names = {e.name for e in desired_objs}
55+
56+
objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names}
57+
objs_to_keep = {e for e in desired_objs if e.name in existing_obj_names}
58+
objs_to_delete = {e for e in existing_objs if e.name not in desired_obj_names}
59+
60+
return objs_to_keep, objs_to_delete, objs_to_add
61+
62+
63+
U = TypeVar("U", EntityProto, FeatureViewProto)
64+
65+
66+
def tag_proto_objects_for_keep_delete_add(
67+
existing_objs: Iterable[U], desired_objs: Iterable[U]
68+
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
69+
existing_obj_names = {e.spec.name for e in existing_objs}
70+
desired_obj_names = {e.spec.name for e in desired_objs}
71+
72+
objs_to_add = [e for e in desired_objs if e.spec.name not in existing_obj_names]
73+
objs_to_keep = [e for e in desired_objs if e.spec.name in existing_obj_names]
74+
objs_to_delete = [e for e in existing_objs if e.spec.name not in desired_obj_names]
75+
76+
return objs_to_keep, objs_to_delete, objs_to_add

sdk/python/feast/diff/__init__.py

Whitespace-only changes.

sdk/python/feast/feature_store.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from feast.feature_service import FeatureService
3939
from feast.feature_table import FeatureTable
4040
from feast.feature_view import (
41+
DUMMY_ENTITY,
4142
DUMMY_ENTITY_ID,
4243
DUMMY_ENTITY_NAME,
4344
DUMMY_ENTITY_VAL,
@@ -61,7 +62,6 @@
6162
from feast.request_feature_view import RequestFeatureView
6263
from feast.type_map import python_value_to_proto_value
6364
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
64-
from feast.value_type import ValueType
6565
from feast.version import get_version
6666

6767
warnings.simplefilter("once", DeprecationWarning)
@@ -379,16 +379,18 @@ def apply(
379379
]
380380
],
381381
],
382-
objects_to_delete: List[
383-
Union[
384-
FeatureView,
385-
OnDemandFeatureView,
386-
RequestFeatureView,
387-
Entity,
388-
FeatureService,
389-
FeatureTable,
382+
objects_to_delete: Optional[
383+
List[
384+
Union[
385+
FeatureView,
386+
OnDemandFeatureView,
387+
RequestFeatureView,
388+
Entity,
389+
FeatureService,
390+
FeatureTable,
391+
]
390392
]
391-
] = [],
393+
] = None,
392394
partial: bool = True,
393395
):
394396
"""Register objects to metadata store and update related infrastructure.
@@ -435,6 +437,9 @@ def apply(
435437

436438
assert isinstance(objects, list)
437439

440+
if not objects_to_delete:
441+
objects_to_delete = []
442+
438443
# Separate all objects into entities, feature services, and different feature view types.
439444
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
440445
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
@@ -484,11 +489,6 @@ def apply(
484489
odfv.infer_features()
485490

486491
# Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity.
487-
DUMMY_ENTITY = Entity(
488-
name=DUMMY_ENTITY_NAME,
489-
join_key=DUMMY_ENTITY_ID,
490-
value_type=ValueType.INT32,
491-
)
492492
entities_to_update.append(DUMMY_ENTITY)
493493

494494
# Add all objects to the registry and update the provider's infrastructure.
@@ -1560,7 +1560,9 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]):
15601560
case_insensitive_fv_name = fv.name.lower()
15611561
if case_insensitive_fv_name in fv_names:
15621562
raise ValueError(
1563-
f"More than one feature view with name {case_insensitive_fv_name} found. Please ensure that all feature view names are case-insensitively unique. It may be necessary to ignore certain files in your feature repository by using a .feastignore file."
1563+
f"More than one feature view with name {case_insensitive_fv_name} found. "
1564+
f"Please ensure that all feature view names are case-insensitively unique. "
1565+
f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file."
15641566
)
15651567
else:
15661568
fv_names.add(case_insensitive_fv_name)

sdk/python/feast/feature_view.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from feast import utils
2222
from feast.base_feature_view import BaseFeatureView
2323
from feast.data_source import DataSource
24+
from feast.entity import Entity
2425
from feast.feature import Feature
2526
from feast.feature_view_projection import FeatureViewProjection
2627
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
@@ -42,6 +43,9 @@
4243
DUMMY_ENTITY_ID = "__dummy_id"
4344
DUMMY_ENTITY_NAME = "__dummy"
4445
DUMMY_ENTITY_VAL = ""
46+
DUMMY_ENTITY = Entity(
47+
name=DUMMY_ENTITY_NAME, join_key=DUMMY_ENTITY_ID, value_type=ValueType.INT32,
48+
)
4549

4650

4751
class FeatureView(BaseFeatureView):

sdk/python/feast/registry.py

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import logging
1515
from collections import defaultdict
1616
from datetime import datetime, timedelta
1717
from pathlib import Path
@@ -24,6 +24,12 @@
2424

2525
from feast import importer
2626
from feast.base_feature_view import BaseFeatureView
27+
from feast.diff.FcoDiff import (
28+
FcoDiff,
29+
RegistryDiff,
30+
TransitionType,
31+
tag_proto_objects_for_keep_delete_add,
32+
)
2733
from feast.entity import Entity
2834
from feast.errors import (
2935
ConflictingFeatureViewNames,
@@ -57,6 +63,8 @@
5763
"": "LocalRegistryStore",
5864
}
5965

66+
logger = logging.getLogger(__name__)
67+
6068

6169
def get_registry_store_class_from_type(registry_store_type: str):
6270
if not registry_store_type.endswith("RegistryStore"):
@@ -95,7 +103,9 @@ class Registry:
95103
cached_registry_proto_ttl: timedelta
96104
cache_being_updated: bool = False
97105

98-
def __init__(self, registry_config: RegistryConfig, repo_path: Path):
106+
def __init__(
107+
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
108+
):
99109
"""
100110
Create the Registry object.
101111
@@ -104,20 +114,50 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
104114
repo_path: Path to the base of the Feast repository
105115
or where it will be created if it does not exist yet.
106116
"""
107-
registry_store_type = registry_config.registry_store_type
108-
registry_path = registry_config.path
109-
if registry_store_type is None:
110-
cls = get_registry_store_class_from_scheme(registry_path)
111-
else:
112-
cls = get_registry_store_class_from_type(str(registry_store_type))
113117

114-
self._registry_store = cls(registry_config, repo_path)
115-
self.cached_registry_proto_ttl = timedelta(
116-
seconds=registry_config.cache_ttl_seconds
117-
if registry_config.cache_ttl_seconds is not None
118-
else 0
118+
if registry_config:
119+
registry_store_type = registry_config.registry_store_type
120+
registry_path = registry_config.path
121+
if registry_store_type is None:
122+
cls = get_registry_store_class_from_scheme(registry_path)
123+
else:
124+
cls = get_registry_store_class_from_type(str(registry_store_type))
125+
126+
self._registry_store = cls(registry_config, repo_path)
127+
self.cached_registry_proto_ttl = timedelta(
128+
seconds=registry_config.cache_ttl_seconds
129+
if registry_config.cache_ttl_seconds is not None
130+
else 0
131+
)
132+
133+
# TODO(achals): This method needs to be filled out and used in the feast plan/apply methods.
134+
@staticmethod
135+
def diff_between(
136+
current_registry: RegistryProto, new_registry: RegistryProto
137+
) -> RegistryDiff:
138+
diff = RegistryDiff()
139+
140+
# Handle Entities
141+
(
142+
entities_to_keep,
143+
entities_to_delete,
144+
entities_to_add,
145+
) = tag_proto_objects_for_keep_delete_add(
146+
current_registry.entities, new_registry.entities,
119147
)
120148

149+
for e in entities_to_add:
150+
diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE))
151+
for e in entities_to_delete:
152+
diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE))
153+
154+
# Handle Feature Views
155+
# Handle On Demand Feature Views
156+
# Handle Request Feature Views
157+
# Handle Feature Services
158+
logger.info(f"Diff: {diff}")
159+
return diff
160+
121161
def _initialize_registry(self):
122162
"""Explicitly initializes the registry with an empty proto if it doesn't exist."""
123163
try:
@@ -752,6 +792,7 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto:
752792
> (self.cached_registry_proto_created + self.cached_registry_proto_ttl)
753793
)
754794
)
795+
755796
if allow_cache and (not expired or self.cache_being_updated):
756797
assert isinstance(self.cached_registry_proto, RegistryProto)
757798
return self.cached_registry_proto

0 commit comments

Comments
 (0)