@@ -372,6 +372,40 @@ def _get_feature_view(
372372 feature_view .entities = []
373373 return feature_view
374374
375+ @log_exceptions_and_usage
376+ def get_stream_feature_view (
377+ self , name : str , allow_registry_cache : bool = False
378+ ) -> StreamFeatureView :
379+ """
380+ Retrieves a stream feature view.
381+
382+ Args:
383+ name: Name of stream feature view.
384+ allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry
385+
386+ Returns:
387+ The specified stream feature view.
388+
389+ Raises:
390+ FeatureViewNotFoundException: The feature view could not be found.
391+ """
392+ return self ._get_stream_feature_view (
393+ name , allow_registry_cache = allow_registry_cache
394+ )
395+
396+ def _get_stream_feature_view (
397+ self ,
398+ name : str ,
399+ hide_dummy_entity : bool = True ,
400+ allow_registry_cache : bool = False ,
401+ ) -> StreamFeatureView :
402+ stream_feature_view = self ._registry .get_stream_feature_view (
403+ name , self .project , allow_cache = allow_registry_cache
404+ )
405+ if hide_dummy_entity and stream_feature_view .entities [0 ] == DUMMY_ENTITY_NAME :
406+ stream_feature_view .entities = []
407+ return stream_feature_view
408+
375409 @log_exceptions_and_usage
376410 def get_on_demand_feature_view (self , name : str ) -> OnDemandFeatureView :
377411 """
@@ -935,7 +969,6 @@ def get_historical_features(
935969 all_feature_views ,
936970 all_request_feature_views ,
937971 all_on_demand_feature_views ,
938- all_stream_feature_views ,
939972 ) = self ._get_feature_views_to_use (features )
940973
941974 if all_request_feature_views :
@@ -1321,9 +1354,14 @@ def write_to_online_store(
13211354 ingests data directly into the Online store
13221355 """
13231356 # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
1324- feature_view = self .get_feature_view (
1325- feature_view_name , allow_registry_cache = allow_registry_cache
1326- )
1357+ try :
1358+ feature_view = self .get_stream_feature_view (
1359+ feature_view_name , allow_registry_cache = allow_registry_cache
1360+ )
1361+ except FeatureViewNotFoundException :
1362+ feature_view = self .get_feature_view (
1363+ feature_view_name , allow_registry_cache = allow_registry_cache
1364+ )
13271365 entities = []
13281366 for entity_name in feature_view .entities :
13291367 entities .append (
@@ -1456,7 +1494,6 @@ def _get_online_features(
14561494 requested_feature_views ,
14571495 requested_request_feature_views ,
14581496 requested_on_demand_feature_views ,
1459- request_stream_feature_views ,
14601497 ) = self ._get_feature_views_to_use (
14611498 features = features , allow_cache = True , hide_dummy_entity = False
14621499 )
@@ -1994,15 +2031,17 @@ def _get_feature_views_to_use(
19942031 allow_cache = False ,
19952032 hide_dummy_entity : bool = True ,
19962033 ) -> Tuple [
1997- List [FeatureView ],
1998- List [RequestFeatureView ],
1999- List [OnDemandFeatureView ],
2000- List [StreamFeatureView ],
2034+ List [FeatureView ], List [RequestFeatureView ], List [OnDemandFeatureView ],
20012035 ]:
20022036
20032037 fvs = {
20042038 fv .name : fv
2005- for fv in self ._list_feature_views (allow_cache , hide_dummy_entity )
2039+ for fv in [
2040+ * self ._list_feature_views (allow_cache , hide_dummy_entity ),
2041+ * self ._registry .list_stream_feature_views (
2042+ project = self .project , allow_cache = allow_cache
2043+ ),
2044+ ]
20062045 }
20072046
20082047 request_fvs = {
@@ -2019,15 +2058,8 @@ def _get_feature_views_to_use(
20192058 )
20202059 }
20212060
2022- sfvs = {
2023- fv .name : fv
2024- for fv in self ._registry .list_stream_feature_views (
2025- project = self .project , allow_cache = allow_cache
2026- )
2027- }
2028-
20292061 if isinstance (features , FeatureService ):
2030- fvs_to_use , request_fvs_to_use , od_fvs_to_use , sfvs_to_use = [], [], [], []
2062+ fvs_to_use , request_fvs_to_use , od_fvs_to_use = [], [], []
20312063 for fv_name , projection in [
20322064 (projection .name , projection )
20332065 for projection in features .feature_view_projections
@@ -2048,23 +2080,18 @@ def _get_feature_views_to_use(
20482080 fv = fvs [projection .name ].with_projection (copy .copy (projection ))
20492081 if fv not in fvs_to_use :
20502082 fvs_to_use .append (fv )
2051- elif fv_name in sfvs :
2052- sfvs_to_use .append (
2053- sfvs [fv_name ].with_projection (copy .copy (projection ))
2054- )
20552083 else :
20562084 raise ValueError (
20572085 f"The provided feature service { features .name } contains a reference to a feature view"
20582086 f"{ fv_name } which doesn't exist. Please make sure that you have created the feature view"
20592087 f'{ fv_name } and that you have registered it by running "apply".'
20602088 )
2061- views_to_use = (fvs_to_use , request_fvs_to_use , od_fvs_to_use , sfvs_to_use )
2089+ views_to_use = (fvs_to_use , request_fvs_to_use , od_fvs_to_use )
20622090 else :
20632091 views_to_use = (
20642092 [* fvs .values ()],
20652093 [* request_fvs .values ()],
20662094 [* od_fvs .values ()],
2067- [* sfvs .values ()],
20682095 )
20692096
20702097 return views_to_use
0 commit comments