Skip to content

Commit 2a95629

Browse files
Modify feature_store.plan to produce an InfraDiff (feast-dev#2211)
* Implement InfraObject.from_proto for easier conversion Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Implement InfraDiff.update Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Modify feature_store.plan to produce an InfraDiff Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Stricter typing for FcoDiff and InfraObjectDiff Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Small fixes Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Fix typevar names Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Add comment Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Fix protos Signed-off-by: Felix Wang <wangfelix98@gmail.com>
1 parent d5cb044 commit 2a95629

12 files changed

Lines changed: 208 additions & 44 deletions

File tree

sdk/python/feast/diff/FcoDiff.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,38 @@
11
from dataclasses import dataclass
2-
from typing import Any, Iterable, List, Set, Tuple, TypeVar
2+
from typing import Generic, Iterable, List, Set, Tuple, TypeVar
33

44
from feast.base_feature_view import BaseFeatureView
55
from feast.diff.property_diff import PropertyDiff, TransitionType
66
from feast.entity import Entity
77
from feast.feature_service import FeatureService
88
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
9+
from feast.protos.feast.core.FeatureService_pb2 import (
10+
FeatureService as FeatureServiceProto,
11+
)
912
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
13+
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
14+
OnDemandFeatureView as OnDemandFeatureViewProto,
15+
)
16+
from feast.protos.feast.core.RequestFeatureView_pb2 import (
17+
RequestFeatureView as RequestFeatureViewProto,
18+
)
19+
20+
FcoProto = TypeVar(
21+
"FcoProto",
22+
EntityProto,
23+
FeatureViewProto,
24+
FeatureServiceProto,
25+
OnDemandFeatureViewProto,
26+
RequestFeatureViewProto,
27+
)
1028

1129

1230
@dataclass
13-
class FcoDiff:
31+
class FcoDiff(Generic[FcoProto]):
1432
name: str
1533
fco_type: str
16-
current_fco: Any
17-
new_fco: Any
34+
current_fco: FcoProto
35+
new_fco: FcoProto
1836
fco_property_diffs: List[PropertyDiff]
1937
transition_type: TransitionType
2038

@@ -30,12 +48,12 @@ def add_fco_diff(self, fco_diff: FcoDiff):
3048
self.fco_diffs.append(fco_diff)
3149

3250

33-
T = TypeVar("T", Entity, BaseFeatureView, FeatureService)
51+
Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService)
3452

3553

3654
def tag_objects_for_keep_delete_add(
37-
existing_objs: Iterable[T], desired_objs: Iterable[T]
38-
) -> Tuple[Set[T], Set[T], Set[T]]:
55+
existing_objs: Iterable[Fco], desired_objs: Iterable[Fco]
56+
) -> Tuple[Set[Fco], Set[Fco], Set[Fco]]:
3957
existing_obj_names = {e.name for e in existing_objs}
4058
desired_obj_names = {e.name for e in desired_objs}
4159

@@ -46,12 +64,9 @@ def tag_objects_for_keep_delete_add(
4664
return objs_to_keep, objs_to_delete, objs_to_add
4765

4866

49-
U = TypeVar("U", EntityProto, FeatureViewProto)
50-
51-
5267
def tag_proto_objects_for_keep_delete_add(
53-
existing_objs: Iterable[U], desired_objs: Iterable[U]
54-
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
68+
existing_objs: Iterable[FcoProto], desired_objs: Iterable[FcoProto]
69+
) -> Tuple[Iterable[FcoProto], Iterable[FcoProto], Iterable[FcoProto]]:
5570
existing_obj_names = {e.spec.name for e in existing_objs}
5671
desired_obj_names = {e.spec.name for e in desired_objs}
5772

@@ -65,7 +80,7 @@ def tag_proto_objects_for_keep_delete_add(
6580
FIELDS_TO_IGNORE = {"project"}
6681

6782

68-
def diff_between(current: U, new: U, object_type: str) -> FcoDiff:
83+
def diff_between(current: FcoProto, new: FcoProto, object_type: str) -> FcoDiff:
6984
assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name
7085
property_diffs = []
7186
transition: TransitionType = TransitionType.UNCHANGED

sdk/python/feast/diff/infra_diff.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass
2-
from typing import Any, Iterable, List, Tuple, TypeVar
2+
from typing import Generic, Iterable, List, Tuple, TypeVar
33

44
from feast.diff.property_diff import PropertyDiff, TransitionType
55
from feast.infra.infra_object import (
@@ -17,13 +17,17 @@
1717
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
1818
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto
1919

20+
InfraObjectProto = TypeVar(
21+
"InfraObjectProto", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto
22+
)
23+
2024

2125
@dataclass
22-
class InfraObjectDiff:
26+
class InfraObjectDiff(Generic[InfraObjectProto]):
2327
name: str
2428
infra_object_type: str
25-
current_infra_object: Any
26-
new_infra_object: Any
29+
current_infra_object: InfraObjectProto
30+
new_infra_object: InfraObjectProto
2731
infra_object_property_diffs: List[PropertyDiff]
2832
transition_type: TransitionType
2933

@@ -36,18 +40,34 @@ def __init__(self):
3640
self.infra_object_diffs = []
3741

3842
def update(self):
39-
pass
43+
"""Apply the infrastructure changes specified in this object."""
44+
for infra_object_diff in self.infra_object_diffs:
45+
if infra_object_diff.transition_type in [
46+
TransitionType.DELETE,
47+
TransitionType.UPDATE,
48+
]:
49+
infra_object = InfraObject.from_proto(
50+
infra_object_diff.current_infra_object
51+
)
52+
infra_object.teardown()
53+
elif infra_object_diff.transition_type in [
54+
TransitionType.CREATE,
55+
TransitionType.UPDATE,
56+
]:
57+
infra_object = InfraObject.from_proto(
58+
infra_object_diff.new_infra_object
59+
)
60+
infra_object.update()
4061

4162
def to_string(self):
4263
pass
4364

4465

45-
U = TypeVar("U", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto)
46-
47-
4866
def tag_infra_proto_objects_for_keep_delete_add(
49-
existing_objs: Iterable[U], desired_objs: Iterable[U]
50-
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
67+
existing_objs: Iterable[InfraObjectProto], desired_objs: Iterable[InfraObjectProto]
68+
) -> Tuple[
69+
Iterable[InfraObjectProto], Iterable[InfraObjectProto], Iterable[InfraObjectProto]
70+
]:
5171
existing_obj_names = {e.name for e in existing_objs}
5272
desired_obj_names = {e.name for e in desired_objs}
5373

@@ -123,7 +143,7 @@ def diff_infra_protos(
123143

124144
def get_infra_object_protos_by_type(
125145
infra_proto: InfraProto, infra_object_class_type: str
126-
) -> List[U]:
146+
) -> List[InfraObjectProto]:
127147
return [
128148
InfraObject.from_infra_object_proto(infra_object).to_proto()
129149
for infra_object in infra_proto.infra_objects
@@ -134,7 +154,9 @@ def get_infra_object_protos_by_type(
134154
FIELDS_TO_IGNORE = {"project"}
135155

136156

137-
def diff_between(current: U, new: U, infra_object_type: str) -> InfraObjectDiff:
157+
def diff_between(
158+
current: InfraObjectProto, new: InfraObjectProto, infra_object_type: str
159+
) -> InfraObjectDiff:
138160
assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name
139161
property_diffs = []
140162
transition: TransitionType = TransitionType.UNCHANGED

sdk/python/feast/errors.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,3 +293,8 @@ def __init__(self, actual_class: str, expected_class: str):
293293
super().__init__(
294294
f"The registry store class was expected to be {expected_class}, but was instead {actual_class}."
295295
)
296+
297+
298+
class FeastInvalidInfraObjectType(Exception):
299+
def __init__(self):
300+
super().__init__("Could not identify the type of the InfraObject.")

sdk/python/feast/feature_store.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from feast import feature_server, flags, flags_helper, utils
3939
from feast.base_feature_view import BaseFeatureView
4040
from feast.diff.FcoDiff import RegistryDiff
41+
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
4142
from feast.entity import Entity
4243
from feast.errors import (
4344
EntityNotFoundException,
@@ -63,6 +64,7 @@
6364
from feast.infra.provider import Provider, RetrievalJob, get_provider
6465
from feast.on_demand_feature_view import OnDemandFeatureView
6566
from feast.online_response import OnlineResponse
67+
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
6668
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
6769
from feast.protos.feast.serving.ServingService_pb2 import (
6870
FieldStatus,
@@ -405,7 +407,9 @@ def _get_features(
405407
return _feature_refs
406408

407409
@log_exceptions_and_usage
408-
def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
410+
def plan(
411+
self, desired_repo_objects: RepoContents
412+
) -> Tuple[RegistryDiff, InfraDiff]:
409413
"""Dry-run registering objects to metadata store.
410414
411415
The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces
@@ -440,7 +444,7 @@ def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
440444
... ttl=timedelta(seconds=86400 * 1),
441445
... batch_source=driver_hourly_stats,
442446
... )
443-
>>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
447+
>>> registry_diff, infra_diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
444448
"""
445449

446450
current_registry_proto = (
@@ -450,8 +454,21 @@ def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
450454
)
451455

452456
desired_registry_proto = desired_repo_objects.to_registry_proto()
453-
diffs = Registry.diff_between(current_registry_proto, desired_registry_proto)
454-
return diffs
457+
registry_diff = Registry.diff_between(
458+
current_registry_proto, desired_registry_proto
459+
)
460+
461+
current_infra_proto = (
462+
self._registry.cached_registry_proto.infra.__deepcopy__()
463+
if self._registry.cached_registry_proto
464+
else InfraProto()
465+
)
466+
new_infra_proto = self._provider.plan_infra(
467+
self.config, desired_registry_proto
468+
).to_proto()
469+
infra_diff = diff_infra_protos(current_infra_proto, new_infra_proto)
470+
471+
return (registry_diff, infra_diff)
455472

456473
@log_exceptions_and_usage
457474
def apply(

sdk/python/feast/infra/infra_object.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,21 @@
1515
from dataclasses import dataclass, field
1616
from typing import Any, List
1717

18+
from feast.errors import FeastInvalidInfraObjectType
1819
from feast.importer import import_class
20+
from feast.protos.feast.core.DatastoreTable_pb2 import (
21+
DatastoreTable as DatastoreTableProto,
22+
)
23+
from feast.protos.feast.core.DynamoDBTable_pb2 import (
24+
DynamoDBTable as DynamoDBTableProto,
25+
)
1926
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
2027
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
28+
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto
2129

2230
DATASTORE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.datastore.DatastoreTable"
2331
DYNAMODB_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.dynamodb.DynamoDBTable"
24-
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_store.sqlite.SqliteTable"
32+
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.sqlite.SqliteTable"
2533

2634

2735
class InfraObject(ABC):
@@ -49,15 +57,38 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
4957
infra_object_proto: A protobuf representation of an InfraObject.
5058
5159
Raises:
52-
ValueError: The type of InfraObject could not be identified.
60+
FeastInvalidInfraObjectType: The type of InfraObject could not be identified.
5361
"""
5462
if infra_object_proto.infra_object_class_type:
5563
cls = _get_infra_object_class_from_type(
5664
infra_object_proto.infra_object_class_type
5765
)
5866
return cls.from_infra_object_proto(infra_object_proto)
5967

60-
raise ValueError("Could not identify the type of the InfraObject.")
68+
raise FeastInvalidInfraObjectType()
69+
70+
@staticmethod
71+
def from_proto(infra_object_proto: Any) -> Any:
72+
"""
73+
Converts a protobuf representation of a subclass to an object of that subclass.
74+
75+
Args:
76+
infra_object_proto: A protobuf representation of an InfraObject.
77+
78+
Raises:
79+
FeastInvalidInfraObjectType: The type of InfraObject could not be identified.
80+
"""
81+
if isinstance(infra_object_proto, DatastoreTableProto):
82+
infra_object_class_type = DATASTORE_INFRA_OBJECT_CLASS_TYPE
83+
elif isinstance(infra_object_proto, DynamoDBTableProto):
84+
infra_object_class_type = DYNAMODB_INFRA_OBJECT_CLASS_TYPE
85+
elif isinstance(infra_object_proto, SqliteTableProto):
86+
infra_object_class_type = SQLITE_INFRA_OBJECT_CLASS_TYPE
87+
else:
88+
raise FeastInvalidInfraObjectType()
89+
90+
cls = _get_infra_object_class_from_type(infra_object_class_type)
91+
return cls.from_proto(infra_object_proto)
6192

6293
@abstractmethod
6394
def update(self):
@@ -94,7 +125,7 @@ def to_proto(self) -> InfraProto:
94125
"""
95126
infra_proto = InfraProto()
96127
for infra_object in self.infra_objects:
97-
infra_object_proto = infra_object.to_proto()
128+
infra_object_proto = infra_object.to_infra_object_proto()
98129
infra_proto.infra_objects.append(infra_object_proto)
99130

100131
return infra_proto

sdk/python/feast/infra/local.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import uuid
22
from datetime import datetime
33
from pathlib import Path
4+
from typing import List
45

5-
from feast.feature_view import FeatureView
6+
from feast.infra.infra_object import Infra, InfraObject
67
from feast.infra.passthrough_provider import PassthroughProvider
78
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
89
from feast.registry_store import RegistryStore
9-
from feast.repo_config import RegistryConfig
10+
from feast.repo_config import RegistryConfig, RepoConfig
1011
from feast.usage import log_exceptions_and_usage
1112

1213

@@ -15,11 +16,15 @@ class LocalProvider(PassthroughProvider):
1516
This class only exists for backwards compatibility.
1617
"""
1718

18-
pass
19-
20-
21-
def _table_id(project: str, table: FeatureView) -> str:
22-
return f"{project}_{table.name}"
19+
def plan_infra(
20+
self, config: RepoConfig, desired_registry_proto: RegistryProto
21+
) -> Infra:
22+
infra_objects: List[InfraObject] = self.online_store.plan(
23+
config, desired_registry_proto
24+
)
25+
infra = Infra()
26+
infra.infra_objects += infra_objects
27+
return infra
2328

2429

2530
class LocalRegistryStore(RegistryStore):

sdk/python/feast/infra/online_stores/datastore.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
376376
name=infra_object_proto.datastore_table.name,
377377
)
378378

379+
# Distinguish between null and empty string, since project_id and namespace are StringValues.
379380
if infra_object_proto.datastore_table.HasField("project_id"):
380381
datastore_table.project_id = (
381382
infra_object_proto.datastore_table.project_id.value
@@ -387,6 +388,20 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
387388

388389
return datastore_table
389390

391+
@staticmethod
392+
def from_proto(datastore_table_proto: DatastoreTableProto) -> Any:
393+
datastore_table = DatastoreTable(
394+
project=datastore_table_proto.project, name=datastore_table_proto.name,
395+
)
396+
397+
# Distinguish between null and empty string, since project_id and namespace are StringValues.
398+
if datastore_table_proto.HasField("project_id"):
399+
datastore_table.project_id = datastore_table_proto.project_id.value
400+
if datastore_table_proto.HasField("namespace"):
401+
datastore_table.namespace = datastore_table_proto.namespace.value
402+
403+
return datastore_table
404+
390405
def update(self):
391406
client = _initialize_client(self.project_id, self.namespace)
392407
key = client.key("Project", self.project, "Table", self.name)

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,12 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
254254
region=infra_object_proto.dynamodb_table.region,
255255
)
256256

257+
@staticmethod
258+
def from_proto(dynamodb_table_proto: DynamoDBTableProto) -> Any:
259+
return DynamoDBTable(
260+
name=dynamodb_table_proto.name, region=dynamodb_table_proto.region,
261+
)
262+
257263
def update(self):
258264
dynamodb_client = _initialize_dynamodb_client(region=self.region)
259265
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)

0 commit comments

Comments
 (0)