1818from collections import Counter , OrderedDict , defaultdict
1919from datetime import datetime
2020from pathlib import Path
21- from typing import Any , Dict , Iterable , List , Optional , Set , Tuple , Union , cast
21+ from typing import (
22+ Any ,
23+ Dict ,
24+ Iterable ,
25+ List ,
26+ NamedTuple ,
27+ Optional ,
28+ Set ,
29+ Tuple ,
30+ Union ,
31+ cast ,
32+ )
2233
2334import pandas as pd
2435from colorama import Fore , Style
2536from tqdm import tqdm
2637
2738from feast import feature_server , flags , flags_helper , utils
2839from feast .base_feature_view import BaseFeatureView
40+ from feast .diff .FcoDiff import RegistryDiff
2941from feast .entity import Entity
3042from feast .errors import (
3143 EntityNotFoundException ,
5163from feast .infra .provider import Provider , RetrievalJob , get_provider
5264from feast .on_demand_feature_view import OnDemandFeatureView
5365from feast .online_response import OnlineResponse , _infer_online_entity_rows
66+ from feast .protos .feast .core .Registry_pb2 import Registry as RegistryProto
5467from feast .protos .feast .serving .ServingService_pb2 import (
5568 GetOnlineFeaturesRequestV2 ,
5669 GetOnlineFeaturesResponse ,
6679warnings .simplefilter ("once" , DeprecationWarning )
6780
6881
82+ class RepoContents (NamedTuple ):
83+ feature_views : Set [FeatureView ]
84+ on_demand_feature_views : Set [OnDemandFeatureView ]
85+ request_feature_views : Set [RequestFeatureView ]
86+ entities : Set [Entity ]
87+ feature_services : Set [FeatureService ]
88+
89+ def to_registry_proto (self ) -> RegistryProto :
90+ registry_proto = RegistryProto ()
91+ registry_proto .entities .extend ([e .to_proto () for e in self .entities ])
92+ registry_proto .feature_views .extend (
93+ [fv .to_proto () for fv in self .feature_views ]
94+ )
95+ registry_proto .on_demand_feature_views .extend (
96+ [fv .to_proto () for fv in self .on_demand_feature_views ]
97+ )
98+ registry_proto .request_feature_views .extend (
99+ [fv .to_proto () for fv in self .request_feature_views ]
100+ )
101+ registry_proto .feature_services .extend (
102+ [fs .to_proto () for fs in self .feature_services ]
103+ )
104+ return registry_proto
105+
106+
69107class FeatureStore :
70108 """
71109 A FeatureStore object is used to define, create, and retrieve features.
@@ -357,6 +395,55 @@ def _get_features(self, features: Union[List[str], FeatureService],) -> List[str
357395 _feature_refs = _features
358396 return _feature_refs
359397
398+ @log_exceptions_and_usage
399+ def plan (self , desired_repo_objects : RepoContents ) -> RegistryDiff :
400+ """Dry-run registering objects to metadata store.
401+
402+ The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces
403+ a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan
404+ command are for informational purpose, and are not actually applied to the registry.
405+
406+ Args:
407+ objects: A single object, or a list of objects that are intended to be registered with the Feature Store.
408+ objects_to_delete: A list of objects to be deleted from the registry.
409+ partial: If True, apply will only handle the specified objects; if False, apply will also delete
410+ all the objects in objects_to_delete.
411+
412+ Raises:
413+ ValueError: The 'objects' parameter could not be parsed properly.
414+
415+ Examples:
416+ Generate a plan adding an Entity and a FeatureView.
417+
418+ >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig
419+ >>> from feast.feature_store import RepoContents
420+ >>> from datetime import timedelta
421+ >>> fs = FeatureStore(repo_path="feature_repo")
422+ >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id")
423+ >>> driver_hourly_stats = FileSource(
424+ ... path="feature_repo/data/driver_stats.parquet",
425+ ... event_timestamp_column="event_timestamp",
426+ ... created_timestamp_column="created",
427+ ... )
428+ >>> driver_hourly_stats_view = FeatureView(
429+ ... name="driver_hourly_stats",
430+ ... entities=["driver_id"],
431+ ... ttl=timedelta(seconds=86400 * 1),
432+ ... batch_source=driver_hourly_stats,
433+ ... )
434+ >>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
435+ """
436+
437+ current_registry_proto = (
438+ self ._registry .cached_registry_proto .__deepcopy__ ()
439+ if self ._registry .cached_registry_proto
440+ else RegistryProto ()
441+ )
442+
443+ desired_registry_proto = desired_repo_objects .to_registry_proto ()
444+ diffs = Registry .diff_between (current_registry_proto , desired_registry_proto )
445+ return diffs
446+
360447 @log_exceptions_and_usage
361448 def apply (
362449 self ,
@@ -388,7 +475,7 @@ def apply(
388475 ]
389476 ] = None ,
390477 partial : bool = True ,
391- ):
478+ ) -> RegistryDiff :
392479 """Register objects to metadata store and update related infrastructure.
393480
394481 The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these
@@ -424,18 +511,22 @@ def apply(
424511 ... ttl=timedelta(seconds=86400 * 1),
425512 ... batch_source=driver_hourly_stats,
426513 ... )
427- >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view
514+ >>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view
428515 """
429516 # TODO: Add locking
430-
431517 if not isinstance (objects , Iterable ):
432518 objects = [objects ]
433-
434519 assert isinstance (objects , list )
435520
436521 if not objects_to_delete :
437522 objects_to_delete = []
438523
524+ current_registry_proto = (
525+ self ._registry .cached_registry_proto .__deepcopy__ ()
526+ if self ._registry .cached_registry_proto
527+ else RegistryProto ()
528+ )
529+
439530 # Separate all objects into entities, feature services, and different feature view types.
440531 entities_to_update = [ob for ob in objects if isinstance (ob , Entity )]
441532 views_to_update = [ob for ob in objects if isinstance (ob , FeatureView )]
@@ -533,6 +624,22 @@ def apply(
533624 service .name , project = self .project , commit = False
534625 )
535626
627+ new_registry_proto = (
628+ self ._registry .cached_registry_proto
629+ if self ._registry .cached_registry_proto
630+ else RegistryProto ()
631+ )
632+
633+ diffs = Registry .diff_between (current_registry_proto , new_registry_proto )
634+
635+ entities_to_update = [ob for ob in objects if isinstance (ob , Entity )]
636+ views_to_update = [ob for ob in objects if isinstance (ob , FeatureView )]
637+
638+ entities_to_delete = [ob for ob in objects_to_delete if isinstance (ob , Entity )]
639+ views_to_delete = [
640+ ob for ob in objects_to_delete if isinstance (ob , FeatureView )
641+ ]
642+
536643 self ._get_provider ().update_infra (
537644 project = self .project ,
538645 tables_to_delete = views_to_delete if not partial else [],
@@ -544,6 +651,8 @@ def apply(
544651
545652 self ._registry .commit ()
546653
654+ return diffs
655+
547656 @log_exceptions_and_usage
548657 def teardown (self ):
549658 """Tears down all local and cloud resources for the feature store."""
0 commit comments