Skip to content

Commit ce243a4

Browse files
authored
Add a feast plan command, and have CLI output differentiates between created, deleted and unchanged objects (#2147)
* Print changes in the repo objects in the new style during feast apply Signed-off-by: Achal Shah <achals@gmail.com> * change color for deleted infra Signed-off-by: Achal Shah <achals@gmail.com> * Add a feast plan command Signed-off-by: Achal Shah <achals@gmail.com> * Add a feast plan command Signed-off-by: Achal Shah <achals@gmail.com> * return from apply() Signed-off-by: Achal Shah <achals@gmail.com> * Fix errors in doctests Signed-off-by: Achal Shah <achals@gmail.com> * Fix deepcopy and use a clone method instead Signed-off-by: Achal Shah <achals@gmail.com> * Fix registry clone Signed-off-by: Achal Shah <achals@gmail.com> * CR updates Signed-off-by: Achal Shah <achals@gmail.com>
1 parent 7eba23c commit ce243a4

File tree

8 files changed

+298
-89
lines changed

8 files changed

+298
-89
lines changed

sdk/python/feast/cli.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
cli_check_repo,
3535
generate_project_name,
3636
init_repo,
37+
plan,
3738
registry_dump,
3839
teardown,
3940
)
@@ -351,6 +352,26 @@ def on_demand_feature_view_list(ctx: click.Context):
351352
print(tabulate(table, headers=["NAME"], tablefmt="plain"))
352353

353354

355+
@cli.command("plan", cls=NoOptionDefaultFormat)
356+
@click.option(
357+
"--skip-source-validation",
358+
is_flag=True,
359+
help="Don't validate the data sources by checking for that the tables exist.",
360+
)
361+
@click.pass_context
362+
def plan_command(ctx: click.Context, skip_source_validation: bool):
363+
"""
364+
Create or update a feature store deployment
365+
"""
366+
repo = ctx.obj["CHDIR"]
367+
cli_check_repo(repo)
368+
repo_config = load_repo_config(repo)
369+
try:
370+
plan(repo_config, repo, skip_source_validation)
371+
except FeastProviderLoginError as e:
372+
print(str(e))
373+
374+
354375
@cli.command("apply", cls=NoOptionDefaultFormat)
355376
@click.option(
356377
"--skip-source-validation",

sdk/python/feast/diff/FcoDiff.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ class TransitionType(Enum):
2626

2727
@dataclass
2828
class FcoDiff:
29+
name: str
30+
fco_type: str
2931
current_fco: Any
3032
new_fco: Any
3133
fco_property_diffs: List[PropertyDiff]

sdk/python/feast/feature_store.py

Lines changed: 114 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,26 @@
1818
from collections import Counter, OrderedDict, defaultdict
1919
from datetime import datetime
2020
from pathlib import Path
21-
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast
21+
from typing import (
22+
Any,
23+
Dict,
24+
Iterable,
25+
List,
26+
NamedTuple,
27+
Optional,
28+
Set,
29+
Tuple,
30+
Union,
31+
cast,
32+
)
2233

2334
import pandas as pd
2435
from colorama import Fore, Style
2536
from tqdm import tqdm
2637

2738
from feast import feature_server, flags, flags_helper, utils
2839
from feast.base_feature_view import BaseFeatureView
40+
from feast.diff.FcoDiff import RegistryDiff
2941
from feast.entity import Entity
3042
from feast.errors import (
3143
EntityNotFoundException,
@@ -51,6 +63,7 @@
5163
from feast.infra.provider import Provider, RetrievalJob, get_provider
5264
from feast.on_demand_feature_view import OnDemandFeatureView
5365
from feast.online_response import OnlineResponse, _infer_online_entity_rows
66+
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
5467
from feast.protos.feast.serving.ServingService_pb2 import (
5568
GetOnlineFeaturesRequestV2,
5669
GetOnlineFeaturesResponse,
@@ -66,6 +79,31 @@
6679
warnings.simplefilter("once", DeprecationWarning)
6780

6881

82+
class RepoContents(NamedTuple):
83+
feature_views: Set[FeatureView]
84+
on_demand_feature_views: Set[OnDemandFeatureView]
85+
request_feature_views: Set[RequestFeatureView]
86+
entities: Set[Entity]
87+
feature_services: Set[FeatureService]
88+
89+
def to_registry_proto(self) -> RegistryProto:
90+
registry_proto = RegistryProto()
91+
registry_proto.entities.extend([e.to_proto() for e in self.entities])
92+
registry_proto.feature_views.extend(
93+
[fv.to_proto() for fv in self.feature_views]
94+
)
95+
registry_proto.on_demand_feature_views.extend(
96+
[fv.to_proto() for fv in self.on_demand_feature_views]
97+
)
98+
registry_proto.request_feature_views.extend(
99+
[fv.to_proto() for fv in self.request_feature_views]
100+
)
101+
registry_proto.feature_services.extend(
102+
[fs.to_proto() for fs in self.feature_services]
103+
)
104+
return registry_proto
105+
106+
69107
class FeatureStore:
70108
"""
71109
A FeatureStore object is used to define, create, and retrieve features.
@@ -357,6 +395,55 @@ def _get_features(self, features: Union[List[str], FeatureService],) -> List[str
357395
_feature_refs = _features
358396
return _feature_refs
359397

398+
@log_exceptions_and_usage
399+
def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
400+
"""Dry-run registering objects to metadata store.
401+
402+
The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces
403+
a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan
404+
command are for informational purpose, and are not actually applied to the registry.
405+
406+
Args:
407+
objects: A single object, or a list of objects that are intended to be registered with the Feature Store.
408+
objects_to_delete: A list of objects to be deleted from the registry.
409+
partial: If True, apply will only handle the specified objects; if False, apply will also delete
410+
all the objects in objects_to_delete.
411+
412+
Raises:
413+
ValueError: The 'objects' parameter could not be parsed properly.
414+
415+
Examples:
416+
Generate a plan adding an Entity and a FeatureView.
417+
418+
>>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig
419+
>>> from feast.feature_store import RepoContents
420+
>>> from datetime import timedelta
421+
>>> fs = FeatureStore(repo_path="feature_repo")
422+
>>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id")
423+
>>> driver_hourly_stats = FileSource(
424+
... path="feature_repo/data/driver_stats.parquet",
425+
... event_timestamp_column="event_timestamp",
426+
... created_timestamp_column="created",
427+
... )
428+
>>> driver_hourly_stats_view = FeatureView(
429+
... name="driver_hourly_stats",
430+
... entities=["driver_id"],
431+
... ttl=timedelta(seconds=86400 * 1),
432+
... batch_source=driver_hourly_stats,
433+
... )
434+
>>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
435+
"""
436+
437+
current_registry_proto = (
438+
self._registry.cached_registry_proto.__deepcopy__()
439+
if self._registry.cached_registry_proto
440+
else RegistryProto()
441+
)
442+
443+
desired_registry_proto = desired_repo_objects.to_registry_proto()
444+
diffs = Registry.diff_between(current_registry_proto, desired_registry_proto)
445+
return diffs
446+
360447
@log_exceptions_and_usage
361448
def apply(
362449
self,
@@ -388,7 +475,7 @@ def apply(
388475
]
389476
] = None,
390477
partial: bool = True,
391-
):
478+
) -> RegistryDiff:
392479
"""Register objects to metadata store and update related infrastructure.
393480
394481
The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these
@@ -424,18 +511,22 @@ def apply(
424511
... ttl=timedelta(seconds=86400 * 1),
425512
... batch_source=driver_hourly_stats,
426513
... )
427-
>>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view
514+
>>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view
428515
"""
429516
# TODO: Add locking
430-
431517
if not isinstance(objects, Iterable):
432518
objects = [objects]
433-
434519
assert isinstance(objects, list)
435520

436521
if not objects_to_delete:
437522
objects_to_delete = []
438523

524+
current_registry_proto = (
525+
self._registry.cached_registry_proto.__deepcopy__()
526+
if self._registry.cached_registry_proto
527+
else RegistryProto()
528+
)
529+
439530
# Separate all objects into entities, feature services, and different feature view types.
440531
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
441532
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
@@ -533,6 +624,22 @@ def apply(
533624
service.name, project=self.project, commit=False
534625
)
535626

627+
new_registry_proto = (
628+
self._registry.cached_registry_proto
629+
if self._registry.cached_registry_proto
630+
else RegistryProto()
631+
)
632+
633+
diffs = Registry.diff_between(current_registry_proto, new_registry_proto)
634+
635+
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
636+
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
637+
638+
entities_to_delete = [ob for ob in objects_to_delete if isinstance(ob, Entity)]
639+
views_to_delete = [
640+
ob for ob in objects_to_delete if isinstance(ob, FeatureView)
641+
]
642+
536643
self._get_provider().update_infra(
537644
project=self.project,
538645
tables_to_delete=views_to_delete if not partial else [],
@@ -544,6 +651,8 @@ def apply(
544651

545652
self._registry.commit()
546653

654+
return diffs
655+
547656
@log_exceptions_and_usage
548657
def teardown(self):
549658
"""Tears down all local and cloud resources for the feature store."""

sdk/python/feast/registry.py

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from feast.feature_view import FeatureView
4343
from feast.on_demand_feature_view import OnDemandFeatureView
4444
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
45+
from feast.registry_store import NoopRegistryStore
4546
from feast.repo_config import RegistryConfig
4647
from feast.request_feature_view import RequestFeatureView
4748

@@ -128,32 +129,85 @@ def __init__(
128129
else 0
129130
)
130131

132+
def clone(self) -> "Registry":
133+
new_registry = Registry(None, None)
134+
new_registry.cached_registry_proto_ttl = timedelta(seconds=0)
135+
new_registry.cached_registry_proto = (
136+
self.cached_registry_proto.__deepcopy__()
137+
if self.cached_registry_proto
138+
else RegistryProto()
139+
)
140+
new_registry.cached_registry_proto_created = datetime.utcnow()
141+
new_registry._registry_store = NoopRegistryStore()
142+
return new_registry
143+
131144
# TODO(achals): This method needs to be filled out and used in the feast plan/apply methods.
132145
@staticmethod
133146
def diff_between(
134147
current_registry: RegistryProto, new_registry: RegistryProto
135148
) -> RegistryDiff:
136149
diff = RegistryDiff()
137150

138-
# Handle Entities
139-
(
140-
entities_to_keep,
141-
entities_to_delete,
142-
entities_to_add,
143-
) = tag_proto_objects_for_keep_delete_add(
144-
current_registry.entities, new_registry.entities,
145-
)
151+
attribute_to_object_type_str = {
152+
"entities": "entity",
153+
"feature_views": "feature view",
154+
"feature_tables": "feature table",
155+
"on_demand_feature_views": "on demand feature view",
156+
"request_feature_views": "request feature view",
157+
"feature_services": "feature service",
158+
}
146159

147-
for e in entities_to_add:
148-
diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE))
149-
for e in entities_to_delete:
150-
diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE))
160+
for object_type in [
161+
"entities",
162+
"feature_views",
163+
"feature_tables",
164+
"on_demand_feature_views",
165+
"request_feature_views",
166+
"feature_services",
167+
]:
168+
(
169+
objects_to_keep,
170+
objects_to_delete,
171+
objects_to_add,
172+
) = tag_proto_objects_for_keep_delete_add(
173+
getattr(current_registry, object_type),
174+
getattr(new_registry, object_type),
175+
)
176+
177+
for e in objects_to_add:
178+
diff.add_fco_diff(
179+
FcoDiff(
180+
e.spec.name,
181+
attribute_to_object_type_str[object_type],
182+
None,
183+
e,
184+
[],
185+
TransitionType.CREATE,
186+
)
187+
)
188+
for e in objects_to_delete:
189+
diff.add_fco_diff(
190+
FcoDiff(
191+
e.spec.name,
192+
attribute_to_object_type_str[object_type],
193+
e,
194+
None,
195+
[],
196+
TransitionType.DELETE,
197+
)
198+
)
199+
for e in objects_to_keep:
200+
diff.add_fco_diff(
201+
FcoDiff(
202+
e.spec.name,
203+
attribute_to_object_type_str[object_type],
204+
e,
205+
e,
206+
[],
207+
TransitionType.UNCHANGED,
208+
)
209+
)
151210

152-
# Handle Feature Views
153-
# Handle On Demand Feature Views
154-
# Handle Request Feature Views
155-
# Handle Feature Services
156-
logger.info(f"Diff: {diff}")
157211
return diff
158212

159213
def _initialize_registry(self):

sdk/python/feast/registry_store.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,14 @@ def teardown(self):
3636
Tear down the registry.
3737
"""
3838
pass
39+
40+
41+
class NoopRegistryStore(RegistryStore):
42+
def get_registry_proto(self) -> RegistryProto:
43+
pass
44+
45+
def update_registry_proto(self, registry_proto: RegistryProto):
46+
pass
47+
48+
def teardown(self):
49+
pass

0 commit comments

Comments
 (0)