3737 update_entities_with_inferred_types_from_feature_views ,
3838)
3939from feast .infra .provider import Provider , RetrievalJob , get_provider
40+ from feast .on_demand_feature_view import OnDemandFeatureView
4041from feast .online_response import OnlineResponse , _infer_online_entity_rows
4142from feast .protos .feast .serving .ServingService_pb2 import (
4243 GetOnlineFeaturesRequestV2 ,
4546from feast .protos .feast .types .EntityKey_pb2 import EntityKey as EntityKeyProto
4647from feast .registry import Registry
4748from feast .repo_config import RepoConfig , load_repo_config
49+ from feast .type_map import python_value_to_proto_value
4850from feast .usage import log_exceptions , log_exceptions_and_usage
4951from feast .version import get_version
5052
@@ -267,8 +269,9 @@ def apply(
267269 objects : Union [
268270 Entity ,
269271 FeatureView ,
272+ OnDemandFeatureView ,
270273 FeatureService ,
271- List [Union [FeatureView , Entity , FeatureService ]],
274+ List [Union [FeatureView , OnDemandFeatureView , Entity , FeatureService ]],
272275 ],
273276 commit : bool = True ,
274277 ):
@@ -314,6 +317,7 @@ def apply(
314317 assert isinstance (objects , list )
315318
316319 views_to_update = [ob for ob in objects if isinstance (ob , FeatureView )]
320+ odfvs_to_update = [ob for ob in objects if isinstance (ob , OnDemandFeatureView )]
317321 _validate_feature_views (views_to_update )
318322 entities_to_update = [ob for ob in objects if isinstance (ob , Entity )]
319323 services_to_update = [ob for ob in objects if isinstance (ob , FeatureService )]
@@ -332,11 +336,15 @@ def apply(
332336
333337 if len (views_to_update ) + len (entities_to_update ) + len (
334338 services_to_update
335- ) != len (objects ):
339+ ) + len ( odfvs_to_update ) != len (objects ):
336340 raise ValueError ("Unknown object type provided as part of apply() call" )
337341
338342 for view in views_to_update :
339343 self ._registry .apply_feature_view (view , project = self .project , commit = False )
344+ for odfv in odfvs_to_update :
345+ self ._registry .apply_on_demand_feature_view (
346+ odfv , project = self .project , commit = False
347+ )
340348 for ent in entities_to_update :
341349 self ._registry .apply_entity (ent , project = self .project , commit = False )
342350 for feature_service in services_to_update :
@@ -717,7 +725,6 @@ def get_online_features(
717725 all_feature_views = self ._registry .list_feature_views (
718726 project = self .project , allow_cache = True
719727 )
720-
721728 _validate_feature_refs (_feature_refs , full_feature_names )
722729 grouped_refs = _group_feature_refs (_feature_refs , all_feature_views )
723730 for table , requested_features in grouped_refs :
@@ -759,6 +766,47 @@ def get_online_features(
759766 feature_ref
760767 ] = GetOnlineFeaturesResponse .FieldStatus .PRESENT
761768
769+ initial_response = OnlineResponse (
770+ GetOnlineFeaturesResponse (field_values = result_rows )
771+ )
772+ return self ._augment_response_with_on_demand_transforms (
773+ _feature_refs , full_feature_names , initial_response , result_rows
774+ )
775+
776+ def _augment_response_with_on_demand_transforms (
777+ self ,
778+ feature_refs : List [str ],
779+ full_feature_names : bool ,
780+ initial_response : OnlineResponse ,
781+ result_rows : List [GetOnlineFeaturesResponse .FieldValues ],
782+ ) -> OnlineResponse :
783+ all_on_demand_feature_views = self ._registry .list_on_demand_feature_views (
784+ project = self .project , allow_cache = True
785+ )
786+ if len (all_on_demand_feature_views ) == 0 :
787+ return initial_response
788+ initial_response_df = initial_response .to_df ()
789+ # Apply on demand transformations
790+ for odfv in all_on_demand_feature_views :
791+ feature_ref = odfv .name
792+ if feature_ref in feature_refs :
793+ transformed_features_df = odfv .get_transformed_features_df (
794+ full_feature_names , initial_response_df
795+ )
796+ for row_idx in range (len (result_rows )):
797+ result_row = result_rows [row_idx ]
798+ # TODO(adchia): support multiple output features in an ODFV, which requires different naming
799+ # conventions
800+ result_row .fields [odfv .name ].CopyFrom (
801+ python_value_to_proto_value (
802+ transformed_features_df [odfv .features [0 ].name ].values [
803+ row_idx
804+ ]
805+ )
806+ )
807+ result_row .statuses [
808+ feature_ref
809+ ] = GetOnlineFeaturesResponse .FieldStatus .PRESENT
762810 return OnlineResponse (GetOnlineFeaturesResponse (field_values = result_rows ))
763811
764812 @log_exceptions_and_usage
@@ -791,7 +839,9 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F
791839 ref for ref , occurrences in Counter (feature_refs ).items () if occurrences > 1
792840 ]
793841 else :
794- feature_names = [ref .split (":" )[1 ] for ref in feature_refs ]
842+ feature_names = [
843+ ref .split (":" )[1 ] if ":" in ref else ref for ref in feature_refs
844+ ]
795845 collided_feature_names = [
796846 ref
797847 for ref , occurrences in Counter (feature_names ).items ()
@@ -820,6 +870,9 @@ def _group_feature_refs(
820870
821871 if isinstance (features , list ) and isinstance (features [0 ], str ):
822872 for ref in features :
873+ if ":" not in ref :
874+ # This is an on demand feature view ref
875+ continue
823876 view_name , feat_name = ref .split (":" )
824877 if view_name not in view_index :
825878 raise FeatureViewNotFoundException (view_name )
0 commit comments