1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414import os
15- import sys
16- from collections import OrderedDict , defaultdict
15+ from collections import Counter , OrderedDict , defaultdict
1716from datetime import datetime , timedelta
1817from pathlib import Path
1918from typing import Any , Dict , List , Optional , Tuple , Union
2423
2524from feast import utils
2625from feast .entity import Entity
27- from feast .errors import FeastProviderLoginError , FeatureViewNotFoundException
2826from feast .feature_service import FeatureService
27+ from feast .errors import FeatureNameCollisionError , FeatureViewNotFoundException
2928from feast .feature_view import FeatureView
3029from feast .inference import (
3130 update_data_sources_with_inferred_event_timestamp_col ,
@@ -270,9 +269,11 @@ def apply(
270269 update_entities_with_inferred_types_from_feature_views (
271270 entities_to_update , views_to_update , self .config
272271 )
272+
273273 update_data_sources_with_inferred_event_timestamp_col (
274274 [view .input for view in views_to_update ], self .config
275275 )
276+
276277 for view in views_to_update :
277278 view .infer_features_from_input_source (self .config )
278279
@@ -303,6 +304,7 @@ def get_historical_features(
303304 entity_df : Union [pd .DataFrame , str ],
304305 features : Union [List [str ], FeatureService ],
305306 feature_refs : Optional [List [str ]] = None ,
307+ full_feature_names : bool = False ,
306308 ) -> RetrievalJob :
307309 """Enrich an entity dataframe with historical feature values for either training or batch scoring.
308310
@@ -324,6 +326,9 @@ def get_historical_features(
324326 SQL query. The query must be of a format supported by the configured offline store (e.g., BigQuery)
325327 features: A list of features that should be retrieved from the offline store. Feature references are of
326328 the format "feature_view:feature", e.g., "customer_fv:daily_transactions".
329+ full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names,
330+ changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to
331+ "customer_fv__daily_transactions"). By default, this value is set to False.
327332
328333 Returns:
329334 RetrievalJob which can be used to materialize the results.
@@ -341,12 +346,12 @@ def get_historical_features(
341346 >>> feature_data = retrieval_job.to_df()
342347 >>> model.fit(feature_data) # insert your modeling framework here.
343348 """
344-
345349 all_feature_views = self ._registry .list_feature_views (project = self .project )
346- try :
347- feature_views = _get_requested_feature_views (features , all_feature_views )
348- except FeatureViewNotFoundException as e :
349- sys .exit (e )
350+
351+ _validate_feature_refs (feature_refs , full_feature_names )
352+ feature_views = list (
353+ view for view , _ in _group_feature_refs (feature_refs , all_feature_views )
354+ )
350355
351356 _features = features or feature_refs
352357 if not _features :
@@ -362,17 +367,16 @@ def get_historical_features(
362367 _feature_refs = _features
363368
364369 provider = self ._get_provider ()
365- try :
366- job = provider .get_historical_features (
367- self .config ,
368- feature_views ,
369- _feature_refs ,
370- entity_df ,
371- self ._registry ,
372- self .project ,
373- )
374- except FeastProviderLoginError as e :
375- sys .exit (e )
370+
371+ job = provider .get_historical_features (
372+ self .config ,
373+ feature_views ,
374+ feature_refs ,
375+ entity_df ,
376+ self ._registry ,
377+ self .project ,
378+ full_feature_names ,
379+ )
376380
377381 return job
378382
@@ -542,6 +546,7 @@ def get_online_features(
542546 features : Union [List [str ], FeatureService ],
543547 entity_rows : List [Dict [str , Any ]],
544548 feature_refs : Optional [List [str ]] = None ,
549+ full_feature_names : bool = False ,
545550 ) -> OnlineResponse :
546551 """
547552 Retrieves the latest online feature data.
@@ -617,7 +622,8 @@ def get_online_features(
617622 project = self .project , allow_cache = True
618623 )
619624
620- grouped_refs = _group_refs (_features , all_feature_views )
625+ _validate_feature_refs (feature_refs , full_feature_names )
626+ grouped_refs = _group_feature_refs (feature_refs , all_feature_views )
621627 for table , requested_features in grouped_refs :
622628 entity_keys = _get_table_entity_keys (
623629 table , union_of_entity_keys , entity_name_to_join_key_map
@@ -634,13 +640,21 @@ def get_online_features(
634640
635641 if feature_data is None :
636642 for feature_name in requested_features :
637- feature_ref = f"{ table .name } __{ feature_name } "
643+ feature_ref = (
644+ f"{ table .name } __{ feature_name } "
645+ if full_feature_names
646+ else feature_name
647+ )
638648 result_row .statuses [
639649 feature_ref
640650 ] = GetOnlineFeaturesResponse .FieldStatus .NOT_FOUND
641651 else :
642652 for feature_name in feature_data :
643- feature_ref = f"{ table .name } __{ feature_name } "
653+ feature_ref = (
654+ f"{ table .name } __{ feature_name } "
655+ if full_feature_names
656+ else feature_name
657+ )
644658 if feature_name in requested_features :
645659 result_row .fields [feature_ref ].CopyFrom (
646660 feature_data [feature_name ]
@@ -668,8 +682,32 @@ def _entity_row_to_field_values(
668682 return result
669683
670684
671- def _group_refs (
672- features : Union [List [str ], FeatureService ], all_feature_views : List [FeatureView ],
685+ def _validate_feature_refs (feature_refs : Union [List [str ], FeatureService ], full_feature_names : bool = False ):
686+ collided_feature_refs = []
687+
688+ if full_feature_names :
689+ collided_feature_refs = [
690+ ref for ref , occurrences in Counter (feature_refs ).items () if occurrences > 1
691+ ]
692+ else :
693+ feature_names = [ref .split (":" )[1 ] for ref in feature_refs ]
694+ collided_feature_names = [
695+ ref
696+ for ref , occurrences in Counter (feature_names ).items ()
697+ if occurrences > 1
698+ ]
699+
700+ for feature_name in collided_feature_names :
701+ collided_feature_refs .extend (
702+ [ref for ref in feature_refs if ref .endswith (":" + feature_name )]
703+ )
704+
705+ if len (collided_feature_refs ) > 0 :
706+ raise FeatureNameCollisionError (collided_feature_refs , full_feature_names )
707+
708+
709+ def _group_feature_refs (
710+ features : Union [List [str ], FeatureService ], all_feature_views : List [FeatureView ]
673711) -> List [Tuple [FeatureView , List [str ]]]:
674712 """ Get list of feature views and corresponding feature names based on feature references"""
675713
@@ -703,14 +741,6 @@ def _get_features_refs_from_feature_services(
703741 return [f"{ feature_service .name } :{ f .name } " for f in feature_service .features ]
704742
705743
706- def _get_requested_feature_views (
707- features : Union [List [str ], FeatureService ], all_feature_views : List [FeatureView ],
708- ) -> List [FeatureView ]:
709- """Get list of feature views based on feature references"""
710- # TODO: Get rid of this function. We only need _group_refs
711- return list (view for view , _ in _group_refs (features , all_feature_views ))
712-
713-
714744def _get_table_entity_keys (
715745 table : FeatureView , entity_keys : List [EntityKeyProto ], join_key_map : Dict [str , str ],
716746) -> List [EntityKeyProto ]:
0 commit comments