Skip to content

Commit b1ccf8d

Browse files
mavysavydavCody Lin
andauthored
Hide FeatureViewProjections from user interface & have FeatureViews carry FVProjections that carries the modified info of the FeatureView (#1899)
* Revised FeatureService class and proto Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * wip transitioning feast off of FVProjections Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * corrected a lot of errors and tests failures Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * fix unit test failure and most lint issues Signed-off-by: Cody Lin <codyl@twitter.com> * remove debugging print statement Signed-off-by: Cody Lin <codyl@twitter.com> * simplified _group_feature_refs Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * deleted FeatureViewProjection files Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * code review fix and test fix Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Correction in cli.py Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Made fixes to still get objects from registry but potentially modify those objects after Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * fixed lint oopsie & added to docstring on feature reference string convention Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Brought back FVProjections without exposing it ever to user-facing API Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * Updated OnDemandFV to this new paradigm. Other misc updates Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * corrections based on integration test failures Signed-off-by: David Y Liu <davidyliuliu@gmail.com> * corrections based on CR comments Signed-off-by: David Y Liu <davidyliuliu@gmail.com> Co-authored-by: Cody Lin <codyl@twitter.com>
1 parent 41535d0 commit b1ccf8d

File tree

9 files changed

+179
-84
lines changed

9 files changed

+179
-84
lines changed

protos/feast/core/FeatureService.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ message FeatureServiceSpec {
2323
// Name of Feast project that this Feature Service belongs to.
2424
string project = 2;
2525

26-
// List of features that this feature service encapsulates.
27-
// Stored as a list of references to other features views and the features from those views.
26+
// Represents a projection that's to be applied on top of the FeatureView.
27+
// Contains data such as the features to use from a FeatureView.
2828
repeated FeatureViewProjection features = 3;
2929

3030
// User defined metadata

protos/feast/core/FeatureViewProjection.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ option java_package = "feast.proto.core";
88
import "feast/core/Feature.proto";
99

1010

11-
// A reference to features in a feature view
11+
// A projection to be applied on top of a FeatureView.
12+
// Contains the modifications to a FeatureView such as the features subset to use.
1213
message FeatureViewProjection {
1314
// The feature view name
1415
string feature_view_name = 1;

sdk/python/feast/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def feature_service_list(ctx: click.Context):
175175
feature_services = []
176176
for feature_service in store.list_feature_services():
177177
feature_names = []
178-
for projection in feature_service.features:
178+
for projection in feature_service.feature_view_projections:
179179
feature_names.extend(
180180
[f"{projection.name}:{feature.name}" for feature in projection.features]
181181
)

sdk/python/feast/feature_service.py

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class FeatureService:
3131
"""
3232

3333
name: str
34-
features: List[FeatureViewProjection]
34+
feature_view_projections: List[FeatureViewProjection]
3535
tags: Dict[str, str]
3636
description: Optional[str] = None
3737
created_timestamp: Optional[datetime] = None
@@ -41,9 +41,7 @@ class FeatureService:
4141
def __init__(
4242
self,
4343
name: str,
44-
features: List[
45-
Union[FeatureTable, FeatureView, OnDemandFeatureView, FeatureViewProjection]
46-
],
44+
features: List[Union[FeatureTable, FeatureView, OnDemandFeatureView]],
4745
tags: Optional[Dict[str, str]] = None,
4846
description: Optional[str] = None,
4947
):
@@ -54,18 +52,23 @@ def __init__(
5452
ValueError: If one of the specified features is not a valid type.
5553
"""
5654
self.name = name
57-
self.features = []
58-
for feature in features:
59-
if (
60-
isinstance(feature, FeatureTable)
61-
or isinstance(feature, FeatureView)
62-
or isinstance(feature, OnDemandFeatureView)
55+
self.feature_view_projections = []
56+
57+
for feature_grouping in features:
58+
if isinstance(feature_grouping, FeatureTable):
59+
self.feature_view_projections.append(
60+
FeatureViewProjection.from_definition(feature_grouping)
61+
)
62+
elif isinstance(feature_grouping, FeatureView) or isinstance(
63+
feature_grouping, OnDemandFeatureView
6364
):
64-
self.features.append(FeatureViewProjection.from_definition(feature))
65-
elif isinstance(feature, FeatureViewProjection):
66-
self.features.append(feature)
65+
self.feature_view_projections.append(feature_grouping.projection)
6766
else:
68-
raise ValueError(f"Unexpected type: {type(feature)}")
67+
raise ValueError(
68+
"The FeatureService {fs_name} has been provided with an invalid type"
69+
f'{type(feature_grouping)} as part of the "features" argument.)'
70+
)
71+
6972
self.tags = tags or {}
7073
self.description = description
7174
self.created_timestamp = None
@@ -89,7 +92,9 @@ def __eq__(self, other):
8992
if self.tags != other.tags or self.name != other.name:
9093
return False
9194

92-
if sorted(self.features) != sorted(other.features):
95+
if sorted(self.feature_view_projections) != sorted(
96+
other.feature_view_projections
97+
):
9398
return False
9499

95100
return True
@@ -104,17 +109,20 @@ def from_proto(feature_service_proto: FeatureServiceProto):
104109
"""
105110
fs = FeatureService(
106111
name=feature_service_proto.spec.name,
107-
features=[
108-
FeatureViewProjection.from_proto(fp)
109-
for fp in feature_service_proto.spec.features
110-
],
112+
features=[],
111113
tags=dict(feature_service_proto.spec.tags),
112114
description=(
113115
feature_service_proto.spec.description
114116
if feature_service_proto.spec.description != ""
115117
else None
116118
),
117119
)
120+
fs.feature_view_projections.extend(
121+
[
122+
FeatureViewProjection.from_proto(projection)
123+
for projection in feature_service_proto.spec.features
124+
]
125+
)
118126

119127
if feature_service_proto.meta.HasField("created_timestamp"):
120128
fs.created_timestamp = (
@@ -138,19 +146,12 @@ def to_proto(self) -> FeatureServiceProto:
138146
if self.created_timestamp:
139147
meta.created_timestamp.FromDatetime(self.created_timestamp)
140148

141-
spec = FeatureServiceSpec()
142-
spec.name = self.name
143-
for definition in self.features:
144-
if isinstance(definition, FeatureTable) or isinstance(
145-
definition, FeatureView
146-
):
147-
feature_ref = FeatureViewProjection(
148-
definition.name, definition.features
149-
)
150-
else:
151-
feature_ref = definition
152-
153-
spec.features.append(feature_ref.to_proto())
149+
spec = FeatureServiceSpec(
150+
name=self.name,
151+
features=[
152+
projection.to_proto() for projection in self.feature_view_projections
153+
],
154+
)
154155

155156
if self.tags:
156157
spec.tags.update(self.tags)

sdk/python/feast/feature_store.py

Lines changed: 62 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -316,12 +316,19 @@ def _get_features(
316316
if not _features:
317317
raise ValueError("No features specified for retrieval")
318318

319-
_feature_refs: List[str]
319+
_feature_refs = []
320320
if isinstance(_features, FeatureService):
321-
# Get the latest value of the feature service, in case the object passed in has been updated underneath us.
322-
_feature_refs = _get_feature_refs_from_feature_services(
323-
self.get_feature_service(_features.name)
324-
)
321+
feature_service_from_registry = self.get_feature_service(_features.name)
322+
if feature_service_from_registry != _features:
323+
warnings.warn(
324+
"The FeatureService object that has been passed in as an argument is"
325+
"inconsistent with the version from Registry. Potentially a newer version"
326+
"of the FeatureService has been applied to the registry."
327+
)
328+
for projection in feature_service_from_registry.feature_view_projections:
329+
_feature_refs.extend(
330+
[f"{projection.name}:{f.name}" for f in projection.features]
331+
)
325332
else:
326333
assert isinstance(_features, list)
327334
_feature_refs = _features
@@ -542,10 +549,8 @@ def get_historical_features(
542549
)
543550

544551
_feature_refs = self._get_features(features, feature_refs)
545-
546-
all_feature_views = self.list_feature_views()
547-
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
548-
project=self.project
552+
all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use(
553+
features
549554
)
550555

551556
# TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider.
@@ -805,11 +810,8 @@ def get_online_features(
805810
>>> online_response_dict = online_response.to_dict()
806811
"""
807812
_feature_refs = self._get_features(features, feature_refs)
808-
all_feature_views = self._list_feature_views(
809-
allow_cache=True, hide_dummy_entity=False
810-
)
811-
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
812-
project=self.project, allow_cache=True
813+
all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use(
814+
features=features, allow_cache=True, hide_dummy_entity=False
813815
)
814816

815817
_validate_feature_refs(_feature_refs, full_feature_names)
@@ -1018,6 +1020,43 @@ def _augment_response_with_on_demand_transforms(
10181020
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
10191021
return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))
10201022

1023+
def _get_feature_views_to_use(
1024+
self,
1025+
features: Optional[Union[List[str], FeatureService]],
1026+
allow_cache=False,
1027+
hide_dummy_entity: bool = True,
1028+
) -> Tuple[List[FeatureView], List[OnDemandFeatureView]]:
1029+
1030+
fvs = {
1031+
fv.name: fv
1032+
for fv in self._list_feature_views(allow_cache, hide_dummy_entity)
1033+
}
1034+
1035+
od_fvs = {
1036+
fv.name: fv
1037+
for fv in self._registry.list_on_demand_feature_views(
1038+
project=self.project, allow_cache=allow_cache
1039+
)
1040+
}
1041+
1042+
if isinstance(features, FeatureService):
1043+
for fv_name, projection in {
1044+
projection.name: projection
1045+
for projection in features.feature_view_projections
1046+
}.items():
1047+
if fv_name in fvs:
1048+
fvs[fv_name].set_projection(projection)
1049+
elif fv_name in od_fvs:
1050+
od_fvs[fv_name].set_projection(projection)
1051+
else:
1052+
raise ValueError(
1053+
f"The provided feature service {features.name} contains a reference to a feature view"
1054+
f"{fv_name} which doesn't exist. Please make sure that you have created the feature view"
1055+
f'{fv_name} and that you have registered it by running "apply".'
1056+
)
1057+
1058+
return [*fvs.values()], [*od_fvs.values()]
1059+
10211060
@log_exceptions_and_usage
10221061
def serve(self, port: int) -> None:
10231062
"""Start the feature consumption server locally on a given port."""
@@ -1070,7 +1109,7 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F
10701109

10711110

10721111
def _group_feature_refs(
1073-
features: Union[List[str], FeatureService],
1112+
features: List[str],
10741113
all_feature_views: List[FeatureView],
10751114
all_on_demand_feature_views: List[OnDemandFeatureView],
10761115
) -> Tuple[
@@ -1090,21 +1129,14 @@ def _group_feature_refs(
10901129
# on demand view name to feature names
10911130
on_demand_view_features = defaultdict(list)
10921131

1093-
if isinstance(features, list) and isinstance(features[0], str):
1094-
for ref in features:
1095-
view_name, feat_name = ref.split(":")
1096-
if view_name in view_index:
1097-
views_features[view_name].append(feat_name)
1098-
elif view_name in on_demand_view_index:
1099-
on_demand_view_features[view_name].append(feat_name)
1100-
else:
1101-
raise FeatureViewNotFoundException(view_name)
1102-
elif isinstance(features, FeatureService):
1103-
for feature_projection in features.features:
1104-
projected_features = feature_projection.features
1105-
views_features[feature_projection.name].extend(
1106-
[f.name for f in projected_features]
1107-
)
1132+
for ref in features:
1133+
view_name, feat_name = ref.split(":")
1134+
if view_name in view_index:
1135+
views_features[view_name].append(feat_name)
1136+
elif view_name in on_demand_view_index:
1137+
on_demand_view_features[view_name].append(feat_name)
1138+
else:
1139+
raise FeatureViewNotFoundException(view_name)
11081140

11091141
fvs_result: List[Tuple[FeatureView, List[str]]] = []
11101142
odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = []
@@ -1116,17 +1148,6 @@ def _group_feature_refs(
11161148
return fvs_result, odfvs_result
11171149

11181150

1119-
def _get_feature_refs_from_feature_services(
1120-
feature_service: FeatureService,
1121-
) -> List[str]:
1122-
feature_refs = []
1123-
for projection in feature_service.features:
1124-
feature_refs.extend(
1125-
[f"{projection.name}:{f.name}" for f in projection.features]
1126-
)
1127-
return feature_refs
1128-
1129-
11301151
def _get_table_entity_keys(
11311152
table: FeatureView, entity_keys: List[EntityKeyProto], join_key_map: Dict[str, str],
11321153
) -> List[EntityKeyProto]:

sdk/python/feast/feature_view.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class FeatureView:
7878
created_timestamp: Optional[datetime] = None
7979
last_updated_timestamp: Optional[datetime] = None
8080
materialization_intervals: List[Tuple[datetime, datetime]]
81+
projection: FeatureViewProjection
8182

8283
@log_exceptions
8384
def __init__(
@@ -141,6 +142,8 @@ def __init__(
141142
self.created_timestamp: Optional[datetime] = None
142143
self.last_updated_timestamp: Optional[datetime] = None
143144

145+
self.projection = FeatureViewProjection.from_definition(self)
146+
144147
def __repr__(self):
145148
items = (f"{k} = {v}" for k, v in self.__dict__.items())
146149
return f"<{self.__class__.__name__}({', '.join(items)})>"
@@ -151,15 +154,17 @@ def __str__(self):
151154
def __hash__(self):
152155
return hash((id(self), self.name))
153156

154-
def __getitem__(self, item) -> FeatureViewProjection:
157+
def __getitem__(self, item):
155158
assert isinstance(item, list)
156159

157160
referenced_features = []
158161
for feature in self.features:
159162
if feature.name in item:
160163
referenced_features.append(feature)
161164

162-
return FeatureViewProjection(self.name, referenced_features)
165+
self.projection.features = referenced_features
166+
167+
return self
163168

164169
def __eq__(self, other):
165170
if not isinstance(other, FeatureView):
@@ -283,6 +288,10 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
283288
stream_source=stream_source,
284289
)
285290

291+
# FeatureViewProjections are not saved in the FeatureView proto.
292+
# Create the default projection.
293+
feature_view.projection = FeatureViewProjection.from_definition(feature_view)
294+
286295
if feature_view_proto.meta.HasField("created_timestamp"):
287296
feature_view.created_timestamp = (
288297
feature_view_proto.meta.created_timestamp.ToDatetime()
@@ -379,3 +388,31 @@ def infer_features_from_batch_source(self, config: RepoConfig):
379388
"FeatureView",
380389
f"Could not infer Features for the FeatureView named {self.name}.",
381390
)
391+
392+
def set_projection(self, feature_view_projection: FeatureViewProjection) -> None:
393+
"""
394+
Setter for the projection object held by this FeatureView. A projection is an
395+
object that stores the modifications to a FeatureView that is applied to the FeatureView
396+
when the FeatureView is used such as during feature_store.get_historical_features.
397+
This method also performs checks to ensure the projection is consistent with this
398+
FeatureView before doing the set.
399+
400+
Args:
401+
feature_view_projection: The FeatureViewProjection object to set this FeatureView's
402+
'projection' field to.
403+
"""
404+
if feature_view_projection.name != self.name:
405+
raise ValueError(
406+
f"The projection for the {self.name} FeatureView cannot be applied because it differs in name. "
407+
f"The projection is named {feature_view_projection.name} and the name indicates which "
408+
"FeatureView the projection is for."
409+
)
410+
411+
for feature in feature_view_projection.features:
412+
if feature not in self.features:
413+
raise ValueError(
414+
f"The projection for {self.name} cannot be applied because it contains {feature.name} which the "
415+
"FeatureView doesn't have."
416+
)
417+
418+
self.projection = feature_view_projection

sdk/python/feast/feature_view_projection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def from_proto(proto: FeatureViewProjectionProto):
3131
return ref
3232

3333
@staticmethod
34-
def from_definition(feature_definition):
34+
def from_definition(feature_grouping):
3535
return FeatureViewProjection(
36-
name=feature_definition.name, features=feature_definition.features
36+
name=feature_grouping.name, features=feature_grouping.features
3737
)

0 commit comments

Comments
 (0)