@@ -282,20 +282,23 @@ def list_data_sources(self, allow_cache: bool = False) -> List[DataSource]:
282282 return self ._registry .list_data_sources (self .project , allow_cache = allow_cache )
283283
284284 @log_exceptions_and_usage
285- def get_entity (self , name : str ) -> Entity :
285+ def get_entity (self , name : str , allow_registry_cache : bool = False ) -> Entity :
286286 """
287287 Retrieves an entity.
288288
289289 Args:
290290 name: Name of entity.
291+ allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry
291292
292293 Returns:
293294 The specified entity.
294295
295296 Raises:
296297 EntityNotFoundException: The entity could not be found.
297298 """
298- return self ._registry .get_entity (name , self .project )
299+ return self ._registry .get_entity (
300+ name , self .project , allow_cache = allow_registry_cache
301+ )
299302
300303 @log_exceptions_and_usage
301304 def get_feature_service (
@@ -317,25 +320,33 @@ def get_feature_service(
317320 return self ._registry .get_feature_service (name , self .project , allow_cache )
318321
319322 @log_exceptions_and_usage
320- def get_feature_view (self , name : str ) -> FeatureView :
323+ def get_feature_view (
324+ self , name : str , allow_registry_cache : bool = False
325+ ) -> FeatureView :
321326 """
322327 Retrieves a feature view.
323328
324329 Args:
325330 name: Name of feature view.
331+ allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry
326332
327333 Returns:
328334 The specified feature view.
329335
330336 Raises:
331337 FeatureViewNotFoundException: The feature view could not be found.
332338 """
333- return self ._get_feature_view (name )
339+ return self ._get_feature_view (name , allow_registry_cache = allow_registry_cache )
334340
335341 def _get_feature_view (
336- self , name : str , hide_dummy_entity : bool = True
342+ self ,
343+ name : str ,
344+ hide_dummy_entity : bool = True ,
345+ allow_registry_cache : bool = False ,
337346 ) -> FeatureView :
338- feature_view = self ._registry .get_feature_view (name , self .project )
347+ feature_view = self ._registry .get_feature_view (
348+ name , self .project , allow_cache = allow_registry_cache
349+ )
339350 if hide_dummy_entity and feature_view .entities [0 ] == DUMMY_ENTITY_NAME :
340351 feature_view .entities = []
341352 return feature_view
@@ -1144,6 +1155,31 @@ def tqdm_builder(length):
11441155 feature_view , self .project , start_date , end_date ,
11451156 )
11461157
1158+ @log_exceptions_and_usage
1159+ def push (self , push_source_name : str , df : pd .DataFrame ):
1160+ """
1161+ Push features to a push source. This updates all the feature views that have the push source as stream source.
1162+ Args:
1163+ push_source_name: The name of the push source we want to push data to.
1164+ df: the data being pushed.
1165+ """
1166+ from feast .data_source import PushSource
1167+
1168+ all_fvs = self .list_feature_views (allow_cache = True )
1169+
1170+ fvs_with_push_sources = {
1171+ fv
1172+ for fv in all_fvs
1173+ if (
1174+ fv .stream_source is not None
1175+ and isinstance (fv .stream_source , PushSource )
1176+ and fv .stream_source .name == push_source_name
1177+ )
1178+ }
1179+
1180+ for fv in fvs_with_push_sources :
1181+ self .write_to_online_store (fv .name , df , allow_registry_cache = True )
1182+
11471183 @log_exceptions_and_usage
11481184 def write_to_online_store (
11491185 self ,
@@ -1155,12 +1191,14 @@ def write_to_online_store(
11551191 ingests data directly into the Online store
11561192 """
11571193 # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
1158- feature_view = self ._registry . get_feature_view (
1159- feature_view_name , self . project , allow_cache = allow_registry_cache
1194+ feature_view = self .get_feature_view (
1195+ feature_view_name , allow_registry_cache = allow_registry_cache
11601196 )
11611197 entities = []
11621198 for entity_name in feature_view .entities :
1163- entities .append (self ._registry .get_entity (entity_name , self .project ))
1199+ entities .append (
1200+ self .get_entity (entity_name , allow_registry_cache = allow_registry_cache )
1201+ )
11641202 provider = self ._get_provider ()
11651203 provider .ingest_df (feature_view , entities , df )
11661204
0 commit comments