@@ -265,6 +265,19 @@ def _list_feature_views(
265265 feature_views .append (fv )
266266 return feature_views
267267
268+ def _list_stream_feature_views (
269+ self , allow_cache : bool = False , hide_dummy_entity : bool = True ,
270+ ) -> List [StreamFeatureView ]:
271+ stream_feature_views = []
272+ for sfv in self ._registry .list_stream_feature_views (
273+ self .project , allow_cache = allow_cache
274+ ):
275+ if hide_dummy_entity and sfv .entities [0 ] == DUMMY_ENTITY_NAME :
276+ sfv .entities = []
277+ sfv .entity_columns = []
278+ stream_feature_views .append (sfv )
279+ return stream_feature_views
280+
268281 @log_exceptions_and_usage
269282 def list_on_demand_feature_views (
270283 self , allow_cache : bool = False
@@ -289,9 +302,7 @@ def list_stream_feature_views(
289302 Returns:
290303 A list of stream feature views.
291304 """
292- return self ._registry .list_stream_feature_views (
293- self .project , allow_cache = allow_cache
294- )
305+ return self ._list_stream_feature_views (allow_cache )
295306
296307 @log_exceptions_and_usage
297308 def list_data_sources (self , allow_cache : bool = False ) -> List [DataSource ]:
@@ -558,6 +569,9 @@ def _make_inferences(
558569 update_feature_views_with_inferred_features_and_entities (
559570 views_to_update , entities + entities_to_update , self .config
560571 )
572+ update_feature_views_with_inferred_features_and_entities (
573+ sfvs_to_update , entities + entities_to_update , self .config
574+ )
561575 # TODO(kevjumba): Update schema inferrence
562576 for sfv in sfvs_to_update :
563577 if not sfv .schema :
@@ -574,6 +588,53 @@ def _make_inferences(
574588 for feature_service in feature_services_to_update :
575589 feature_service .infer_features (fvs_to_update = fvs_to_update_map )
576590
591+ def _get_feature_views_to_materialize (
592+ self , feature_views : Optional [List [str ]],
593+ ) -> List [FeatureView ]:
594+ """
595+ Returns the list of feature views that should be materialized.
596+
597+ If no feature views are specified, all feature views will be returned.
598+
599+ Args:
600+ feature_views: List of names of feature views to materialize.
601+
602+ Raises:
603+ FeatureViewNotFoundException: One of the specified feature views could not be found.
604+ ValueError: One of the specified feature views is not configured for materialization.
605+ """
606+ feature_views_to_materialize : List [FeatureView ] = []
607+
608+ if feature_views is None :
609+ feature_views_to_materialize = self ._list_feature_views (
610+ hide_dummy_entity = False
611+ )
612+ feature_views_to_materialize = [
613+ fv for fv in feature_views_to_materialize if fv .online
614+ ]
615+ stream_feature_views_to_materialize = self ._list_stream_feature_views (
616+ hide_dummy_entity = False
617+ )
618+ feature_views_to_materialize += [
619+ sfv for sfv in stream_feature_views_to_materialize if sfv .online
620+ ]
621+ else :
622+ for name in feature_views :
623+ try :
624+ feature_view = self ._get_feature_view (name , hide_dummy_entity = False )
625+ except FeatureViewNotFoundException :
626+ feature_view = self ._get_stream_feature_view (
627+ name , hide_dummy_entity = False
628+ )
629+
630+ if not feature_view .online :
631+ raise ValueError (
632+ f"FeatureView { feature_view .name } is not configured to be served online."
633+ )
634+ feature_views_to_materialize .append (feature_view )
635+
636+ return feature_views_to_materialize
637+
577638 @log_exceptions_and_usage
578639 def _plan (
579640 self , desired_repo_contents : RepoContents
@@ -873,8 +934,8 @@ def apply(
873934
874935 self ._get_provider ().update_infra (
875936 project = self .project ,
876- tables_to_delete = views_to_delete if not partial else [],
877- tables_to_keep = views_to_update ,
937+ tables_to_delete = views_to_delete + sfvs_to_delete if not partial else [],
938+ tables_to_keep = views_to_update + sfvs_to_update ,
878939 entities_to_delete = entities_to_delete if not partial else [],
879940 entities_to_keep = entities_to_update ,
880941 partial = partial ,
@@ -1151,23 +1212,9 @@ def materialize_incremental(
11511212 <BLANKLINE>
11521213 ...
11531214 """
1154- feature_views_to_materialize : List [FeatureView ] = []
1155- if feature_views is None :
1156- feature_views_to_materialize = self ._list_feature_views (
1157- hide_dummy_entity = False
1158- )
1159- feature_views_to_materialize = [
1160- fv for fv in feature_views_to_materialize if fv .online
1161- ]
1162- else :
1163- for name in feature_views :
1164- feature_view = self ._get_feature_view (name , hide_dummy_entity = False )
1165- if not feature_view .online :
1166- raise ValueError (
1167- f"FeatureView { feature_view .name } is not configured to be served online."
1168- )
1169- feature_views_to_materialize .append (feature_view )
1170-
1215+ feature_views_to_materialize = self ._get_feature_views_to_materialize (
1216+ feature_views
1217+ )
11711218 _print_materialization_log (
11721219 None ,
11731220 end_date ,
@@ -1258,23 +1305,9 @@ def materialize(
12581305 f"The given start_date { start_date } is greater than the given end_date { end_date } ."
12591306 )
12601307
1261- feature_views_to_materialize : List [FeatureView ] = []
1262- if feature_views is None :
1263- feature_views_to_materialize = self ._list_feature_views (
1264- hide_dummy_entity = False
1265- )
1266- feature_views_to_materialize = [
1267- fv for fv in feature_views_to_materialize if fv .online
1268- ]
1269- else :
1270- for name in feature_views :
1271- feature_view = self ._get_feature_view (name , hide_dummy_entity = False )
1272- if not feature_view .online :
1273- raise ValueError (
1274- f"FeatureView { feature_view .name } is not configured to be served online."
1275- )
1276- feature_views_to_materialize .append (feature_view )
1277-
1308+ feature_views_to_materialize = self ._get_feature_views_to_materialize (
1309+ feature_views
1310+ )
12781311 _print_materialization_log (
12791312 start_date ,
12801313 end_date ,
@@ -1327,6 +1360,7 @@ def push(
13271360 from feast .data_source import PushSource
13281361
13291362 all_fvs = self .list_feature_views (allow_cache = allow_registry_cache )
1363+ all_fvs += self .list_stream_feature_views (allow_cache = allow_registry_cache )
13301364
13311365 fvs_with_push_sources = {
13321366 fv
0 commit comments