Skip to content

Commit 1808e06

Browse files
Revert "feat: Enable stream feature view materialization (feast-dev#2798)" (feast-dev#2806)
1 parent a06700d commit 1808e06

12 files changed

Lines changed: 256 additions & 428 deletions

File tree

go/internal/feast/featurestore.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -224,14 +224,6 @@ func (fs *FeatureStore) listAllViews() (map[string]*model.FeatureView, map[strin
224224
fvs[featureView.Base.Name] = featureView
225225
}
226226

227-
streamFeatureViews, err := fs.ListStreamFeatureViews()
228-
if err != nil {
229-
return nil, nil, err
230-
}
231-
for _, streamFeatureView := range streamFeatureViews {
232-
fvs[streamFeatureView.Base.Name] = streamFeatureView
233-
}
234-
235227
onDemandFeatureViews, err := fs.registry.ListOnDemandFeatureViews(fs.config.Project)
236228
if err != nil {
237229
return nil, nil, err
@@ -250,14 +242,6 @@ func (fs *FeatureStore) ListFeatureViews() ([]*model.FeatureView, error) {
250242
return featureViews, nil
251243
}
252244

253-
func (fs *FeatureStore) ListStreamFeatureViews() ([]*model.FeatureView, error) {
254-
streamFeatureViews, err := fs.registry.ListStreamFeatureViews(fs.config.Project)
255-
if err != nil {
256-
return streamFeatureViews, err
257-
}
258-
return streamFeatureViews, nil
259-
}
260-
261245
func (fs *FeatureStore) ListEntities(hideDummyEntity bool) ([]*model.Entity, error) {
262246

263247
allEntities, err := fs.registry.ListEntities(fs.config.Project)

go/internal/feast/model/featureview.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,7 @@ type FeatureView struct {
2424

2525
func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView {
2626
featureView := &FeatureView{Base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features),
27-
Ttl: proto.Spec.Ttl,
28-
}
29-
if len(proto.Spec.Entities) == 0 {
30-
featureView.EntityNames = []string{DUMMY_ENTITY_NAME}
31-
} else {
32-
featureView.EntityNames = proto.Spec.Entities
33-
}
34-
entityColumns := make([]*Field, len(proto.Spec.EntityColumns))
35-
for i, entityColumn := range proto.Spec.EntityColumns {
36-
entityColumns[i] = NewFieldFromProto(entityColumn)
37-
}
38-
featureView.EntityColumns = entityColumns
39-
return featureView
40-
}
41-
42-
func NewFeatureViewFromStreamFeatureViewProto(proto *core.StreamFeatureView) *FeatureView {
43-
featureView := &FeatureView{Base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features),
44-
Ttl: proto.Spec.Ttl,
27+
Ttl: &(*proto.Spec.Ttl),
4528
}
4629
if len(proto.Spec.Entities) == 0 {
4730
featureView.EntityNames = []string{DUMMY_ENTITY_NAME}

go/internal/feast/registry/registry.go

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ type Registry struct {
3030
cachedFeatureServices map[string]map[string]*core.FeatureService
3131
cachedEntities map[string]map[string]*core.Entity
3232
cachedFeatureViews map[string]map[string]*core.FeatureView
33-
cachedStreamFeatureViews map[string]map[string]*core.StreamFeatureView
3433
cachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView
3534
cachedRegistry *core.Registry
3635
cachedRegistryProtoLastUpdated time.Time
@@ -107,12 +106,10 @@ func (r *Registry) load(registry *core.Registry) {
107106
r.cachedFeatureServices = make(map[string]map[string]*core.FeatureService)
108107
r.cachedEntities = make(map[string]map[string]*core.Entity)
109108
r.cachedFeatureViews = make(map[string]map[string]*core.FeatureView)
110-
r.cachedStreamFeatureViews = make(map[string]map[string]*core.StreamFeatureView)
111109
r.cachedOnDemandFeatureViews = make(map[string]map[string]*core.OnDemandFeatureView)
112110
r.loadEntities(registry)
113111
r.loadFeatureServices(registry)
114112
r.loadFeatureViews(registry)
115-
r.loadStreamFeatureViews(registry)
116113
r.loadOnDemandFeatureViews(registry)
117114
r.cachedRegistryProtoLastUpdated = time.Now()
118115
}
@@ -147,16 +144,6 @@ func (r *Registry) loadFeatureViews(registry *core.Registry) {
147144
}
148145
}
149146

150-
func (r *Registry) loadStreamFeatureViews(registry *core.Registry) {
151-
streamFeatureViews := registry.StreamFeatureViews
152-
for _, streamFeatureView := range streamFeatureViews {
153-
if _, ok := r.cachedStreamFeatureViews[streamFeatureView.Spec.Project]; !ok {
154-
r.cachedStreamFeatureViews[streamFeatureView.Spec.Project] = make(map[string]*core.StreamFeatureView)
155-
}
156-
r.cachedStreamFeatureViews[streamFeatureView.Spec.Project][streamFeatureView.Spec.Name] = streamFeatureView
157-
}
158-
}
159-
160147
func (r *Registry) loadOnDemandFeatureViews(registry *core.Registry) {
161148
onDemandFeatureViews := registry.OnDemandFeatureViews
162149
for _, onDemandFeatureView := range onDemandFeatureViews {
@@ -206,26 +193,7 @@ func (r *Registry) ListFeatureViews(project string) ([]*model.FeatureView, error
206193
}
207194

208195
/*
209-
Look up Stream Feature Views inside project
210-
Returns empty list if project not found
211-
*/
212-
213-
func (r *Registry) ListStreamFeatureViews(project string) ([]*model.FeatureView, error) {
214-
if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok {
215-
return []*model.FeatureView{}, nil
216-
} else {
217-
streamFeatureViews := make([]*model.FeatureView, len(cachedStreamFeatureViews))
218-
index := 0
219-
for _, streamFeatureViewProto := range cachedStreamFeatureViews {
220-
streamFeatureViews[index] = model.NewFeatureViewFromStreamFeatureViewProto(streamFeatureViewProto)
221-
index += 1
222-
}
223-
return streamFeatureViews, nil
224-
}
225-
}
226-
227-
/*
228-
Look up Feature Services inside project
196+
Look up Feature Views inside project
229197
Returns empty list if project not found
230198
*/
231199

@@ -286,18 +254,6 @@ func (r *Registry) GetFeatureView(project, featureViewName string) (*model.Featu
286254
}
287255
}
288256

289-
func (r *Registry) GetStreamFeatureView(project, streamFeatureViewName string) (*model.FeatureView, error) {
290-
if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok {
291-
return nil, fmt.Errorf("no cached stream feature views found for project %s", project)
292-
} else {
293-
if streamFeatureViewProto, ok := cachedStreamFeatureViews[streamFeatureViewName]; !ok {
294-
return nil, fmt.Errorf("no cached stream feature view %s found for project %s", streamFeatureViewName, project)
295-
} else {
296-
return model.NewFeatureViewFromStreamFeatureViewProto(streamFeatureViewProto), nil
297-
}
298-
}
299-
}
300-
301257
func (r *Registry) GetFeatureService(project, featureServiceName string) (*model.FeatureService, error) {
302258
if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok {
303259
return nil, fmt.Errorf("no cached feature services found for project %s", project)

sdk/python/feast/feature_store.py

Lines changed: 39 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -265,19 +265,6 @@ def _list_feature_views(
265265
feature_views.append(fv)
266266
return feature_views
267267

268-
def _list_stream_feature_views(
269-
self, allow_cache: bool = False, hide_dummy_entity: bool = True,
270-
) -> List[StreamFeatureView]:
271-
stream_feature_views = []
272-
for sfv in self._registry.list_stream_feature_views(
273-
self.project, allow_cache=allow_cache
274-
):
275-
if hide_dummy_entity and sfv.entities[0] == DUMMY_ENTITY_NAME:
276-
sfv.entities = []
277-
sfv.entity_columns = []
278-
stream_feature_views.append(sfv)
279-
return stream_feature_views
280-
281268
@log_exceptions_and_usage
282269
def list_on_demand_feature_views(
283270
self, allow_cache: bool = False
@@ -302,7 +289,9 @@ def list_stream_feature_views(
302289
Returns:
303290
A list of stream feature views.
304291
"""
305-
return self._list_stream_feature_views(allow_cache)
292+
return self._registry.list_stream_feature_views(
293+
self.project, allow_cache=allow_cache
294+
)
306295

307296
@log_exceptions_and_usage
308297
def list_data_sources(self, allow_cache: bool = False) -> List[DataSource]:
@@ -569,9 +558,6 @@ def _make_inferences(
569558
update_feature_views_with_inferred_features_and_entities(
570559
views_to_update, entities + entities_to_update, self.config
571560
)
572-
update_feature_views_with_inferred_features_and_entities(
573-
sfvs_to_update, entities + entities_to_update, self.config
574-
)
575561
# TODO(kevjumba): Update schema inferrence
576562
for sfv in sfvs_to_update:
577563
if not sfv.schema:
@@ -588,53 +574,6 @@ def _make_inferences(
588574
for feature_service in feature_services_to_update:
589575
feature_service.infer_features(fvs_to_update=fvs_to_update_map)
590576

591-
def _get_feature_views_to_materialize(
592-
self, feature_views: Optional[List[str]],
593-
) -> List[FeatureView]:
594-
"""
595-
Returns the list of feature views that should be materialized.
596-
597-
If no feature views are specified, all feature views will be returned.
598-
599-
Args:
600-
feature_views: List of names of feature views to materialize.
601-
602-
Raises:
603-
FeatureViewNotFoundException: One of the specified feature views could not be found.
604-
ValueError: One of the specified feature views is not configured for materialization.
605-
"""
606-
feature_views_to_materialize: List[FeatureView] = []
607-
608-
if feature_views is None:
609-
feature_views_to_materialize = self._list_feature_views(
610-
hide_dummy_entity=False
611-
)
612-
feature_views_to_materialize = [
613-
fv for fv in feature_views_to_materialize if fv.online
614-
]
615-
stream_feature_views_to_materialize = self._list_stream_feature_views(
616-
hide_dummy_entity=False
617-
)
618-
feature_views_to_materialize += [
619-
sfv for sfv in stream_feature_views_to_materialize if sfv.online
620-
]
621-
else:
622-
for name in feature_views:
623-
try:
624-
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
625-
except FeatureViewNotFoundException:
626-
feature_view = self._get_stream_feature_view(
627-
name, hide_dummy_entity=False
628-
)
629-
630-
if not feature_view.online:
631-
raise ValueError(
632-
f"FeatureView {feature_view.name} is not configured to be served online."
633-
)
634-
feature_views_to_materialize.append(feature_view)
635-
636-
return feature_views_to_materialize
637-
638577
@log_exceptions_and_usage
639578
def _plan(
640579
self, desired_repo_contents: RepoContents
@@ -934,8 +873,8 @@ def apply(
934873

935874
self._get_provider().update_infra(
936875
project=self.project,
937-
tables_to_delete=views_to_delete + sfvs_to_delete if not partial else [],
938-
tables_to_keep=views_to_update + sfvs_to_update,
876+
tables_to_delete=views_to_delete if not partial else [],
877+
tables_to_keep=views_to_update,
939878
entities_to_delete=entities_to_delete if not partial else [],
940879
entities_to_keep=entities_to_update,
941880
partial=partial,
@@ -1212,9 +1151,23 @@ def materialize_incremental(
12121151
<BLANKLINE>
12131152
...
12141153
"""
1215-
feature_views_to_materialize = self._get_feature_views_to_materialize(
1216-
feature_views
1217-
)
1154+
feature_views_to_materialize: List[FeatureView] = []
1155+
if feature_views is None:
1156+
feature_views_to_materialize = self._list_feature_views(
1157+
hide_dummy_entity=False
1158+
)
1159+
feature_views_to_materialize = [
1160+
fv for fv in feature_views_to_materialize if fv.online
1161+
]
1162+
else:
1163+
for name in feature_views:
1164+
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
1165+
if not feature_view.online:
1166+
raise ValueError(
1167+
f"FeatureView {feature_view.name} is not configured to be served online."
1168+
)
1169+
feature_views_to_materialize.append(feature_view)
1170+
12181171
_print_materialization_log(
12191172
None,
12201173
end_date,
@@ -1305,9 +1258,23 @@ def materialize(
13051258
f"The given start_date {start_date} is greater than the given end_date {end_date}."
13061259
)
13071260

1308-
feature_views_to_materialize = self._get_feature_views_to_materialize(
1309-
feature_views
1310-
)
1261+
feature_views_to_materialize: List[FeatureView] = []
1262+
if feature_views is None:
1263+
feature_views_to_materialize = self._list_feature_views(
1264+
hide_dummy_entity=False
1265+
)
1266+
feature_views_to_materialize = [
1267+
fv for fv in feature_views_to_materialize if fv.online
1268+
]
1269+
else:
1270+
for name in feature_views:
1271+
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
1272+
if not feature_view.online:
1273+
raise ValueError(
1274+
f"FeatureView {feature_view.name} is not configured to be served online."
1275+
)
1276+
feature_views_to_materialize.append(feature_view)
1277+
13111278
_print_materialization_log(
13121279
start_date,
13131280
end_date,
@@ -1360,7 +1327,6 @@ def push(
13601327
from feast.data_source import PushSource
13611328

13621329
all_fvs = self.list_feature_views(allow_cache=allow_registry_cache)
1363-
all_fvs += self.list_stream_feature_views(allow_cache=allow_registry_cache)
13641330

13651331
fvs_with_push_sources = {
13661332
fv

sdk/python/feast/inference.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,6 @@ def update_feature_views_with_inferred_features_and_entities(
9999
other columns except designated timestamp columns are considered to be feature columns. If
100100
the feature view already has features, feature inference is skipped.
101101
102-
Note that this inference logic currently does not take any transformations (either a UDF or
103-
aggregations) into account. For example, even if a stream feature view has a transformation,
104-
this method assumes that the batch source contains transformed data with the correct final schema.
105-
106102
Args:
107103
fvs: The feature views to be updated.
108104
entities: A list containing entities associated with the feature views.

sdk/python/feast/registry.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,30 +1267,6 @@ def apply_materialization(
12671267
self.commit()
12681268
return
12691269

1270-
for idx, existing_stream_feature_view_proto in enumerate(
1271-
self.cached_registry_proto.stream_feature_views
1272-
):
1273-
if (
1274-
existing_stream_feature_view_proto.spec.name == feature_view.name
1275-
and existing_stream_feature_view_proto.spec.project == project
1276-
):
1277-
existing_stream_feature_view = StreamFeatureView.from_proto(
1278-
existing_stream_feature_view_proto
1279-
)
1280-
existing_stream_feature_view.materialization_intervals.append(
1281-
(start_date, end_date)
1282-
)
1283-
existing_stream_feature_view.last_updated_timestamp = datetime.utcnow()
1284-
stream_feature_view_proto = existing_stream_feature_view.to_proto()
1285-
stream_feature_view_proto.spec.project = project
1286-
del self.cached_registry_proto.stream_feature_views[idx]
1287-
self.cached_registry_proto.stream_feature_views.append(
1288-
stream_feature_view_proto
1289-
)
1290-
if commit:
1291-
self.commit()
1292-
return
1293-
12941270
raise FeatureViewNotFoundException(feature_view.name, project)
12951271

12961272
def list_feature_views(

0 commit comments

Comments
 (0)