88import pandas as pd
99
1010from feast .base_feature_view import BaseFeatureView
11+ from feast .batch_feature_view import BatchFeatureView
1112from feast .data_source import RequestSource
1213from feast .errors import RegistryInferenceFailure , SpecifiedFeaturesNotPresentError
1314from feast .feature import Feature
2526from feast .protos .feast .core .OnDemandFeatureView_pb2 import (
2627 UserDefinedFunction as UserDefinedFunctionProto ,
2728)
29+ from feast .stream_feature_view import StreamFeatureView
2830from feast .type_map import (
2931 feast_value_type_to_pandas_type ,
3032 python_type_to_feast_value_type ,
@@ -66,14 +68,21 @@ class OnDemandFeatureView(BaseFeatureView):
6668 tags : Dict [str , str ]
6769 owner : str
6870
69- @log_exceptions
70- def __init__ (
71+ @log_exceptions # noqa: C901
72+ def __init__ ( # noqa: C901
7173 self ,
7274 * args ,
7375 name : Optional [str ] = None ,
7476 features : Optional [List [Feature ]] = None ,
7577 sources : Optional [
76- Dict [str , Union [FeatureView , FeatureViewProjection , RequestSource ]]
78+ List [
79+ Union [
80+ BatchFeatureView ,
81+ StreamFeatureView ,
82+ RequestSource ,
83+ FeatureViewProjection ,
84+ ]
85+ ]
7786 ] = None ,
7887 udf : Optional [MethodType ] = None ,
7988 inputs : Optional [
@@ -92,11 +101,11 @@ def __init__(
92101 features (deprecated): The list of features in the output of the on demand
93102 feature view, after the transformation has been applied.
94103 sources (optional): A map from input source names to the actual input sources,
95- which may be feature views, feature view projections, or request data sources.
104+ which may be feature views, or request data sources.
96105 These sources serve as inputs to the udf, which will refer to them by name.
97106 udf (optional): The user defined transformation function, which must take pandas
98107 dataframes as inputs.
99- inputs (optional): A map from input source names to the actual input sources,
108+ inputs (optional): (Deprecated) A map from input source names to the actual input sources,
100109 which may be feature views, feature view projections, or request data sources.
101110 These sources serve as inputs to the udf, which will refer to them by name.
102111 schema (optional): The list of features in the output of the on demand feature
@@ -123,8 +132,7 @@ def __init__(
123132 ),
124133 DeprecationWarning ,
125134 )
126-
127- _sources = sources or inputs
135+ _sources = sources or []
128136 if inputs and sources :
129137 raise ValueError ("At most one of `sources` or `inputs` can be specified." )
130138 elif inputs :
@@ -135,7 +143,17 @@ def __init__(
135143 ),
136144 DeprecationWarning ,
137145 )
138-
146+ for _ , source in inputs .items ():
147+ if isinstance (source , FeatureView ):
148+ _sources .append (feature_view_to_batch_feature_view (source ))
149+ elif isinstance (source , RequestSource ) or isinstance (
150+ source , FeatureViewProjection
151+ ):
152+ _sources .append (source )
153+ else :
154+ raise ValueError (
155+ "input can only accept FeatureView, FeatureViewProjection, or RequestSource"
156+ )
139157 _udf = udf
140158
141159 if args :
@@ -169,7 +187,18 @@ def __init__(
169187 DeprecationWarning ,
170188 )
171189 if len (args ) >= 3 :
172- _sources = args [2 ]
190+ _inputs = args [2 ]
191+ for _ , source in _inputs .items ():
192+ if isinstance (source , FeatureView ):
193+ _sources .append (feature_view_to_batch_feature_view (source ))
194+ elif isinstance (source , RequestSource ) or isinstance (
195+ source , FeatureViewProjection
196+ ):
197+ _sources .append (source )
198+ else :
199+ raise ValueError (
200+ "input can only accept FeatureView, FeatureViewProjection, or RequestSource"
201+ )
173202 warnings .warn (
174203 (
175204 "The `inputs` parameter is being deprecated. Please use `sources` instead. "
@@ -195,18 +224,17 @@ def __init__(
195224 tags = tags ,
196225 owner = owner ,
197226 )
198-
199227 assert _sources is not None
200228 self .source_feature_view_projections : Dict [str , FeatureViewProjection ] = {}
201229 self .source_request_sources : Dict [str , RequestSource ] = {}
202- for source_name , odfv_source in _sources . items () :
230+ for odfv_source in _sources :
203231 if isinstance (odfv_source , RequestSource ):
204- self .source_request_sources [source_name ] = odfv_source
232+ self .source_request_sources [odfv_source . name ] = odfv_source
205233 elif isinstance (odfv_source , FeatureViewProjection ):
206- self .source_feature_view_projections [source_name ] = odfv_source
234+ self .source_feature_view_projections [odfv_source . name ] = odfv_source
207235 else :
208236 self .source_feature_view_projections [
209- source_name
237+ odfv_source . name
210238 ] = odfv_source .projection
211239
212240 if _udf is None :
@@ -219,12 +247,12 @@ def proto_class(self) -> Type[OnDemandFeatureViewProto]:
219247 return OnDemandFeatureViewProto
220248
221249 def __copy__ (self ):
250+
222251 fv = OnDemandFeatureView (
223252 name = self .name ,
224253 schema = self .features ,
225- sources = dict (
226- ** self .source_feature_view_projections , ** self .source_request_sources ,
227- ),
254+ sources = list (self .source_feature_view_projections .values ())
255+ + list (self .source_request_sources .values ()),
228256 udf = self .udf ,
229257 description = self .description ,
230258 tags = self .tags ,
@@ -302,22 +330,21 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
302330 Returns:
303331 A OnDemandFeatureView object based on the on-demand feature view protobuf.
304332 """
305- sources = {}
306- for (
307- source_name ,
308- on_demand_source ,
309- ) in on_demand_feature_view_proto .spec .sources .items ():
333+ sources = []
334+ for (_ , on_demand_source ,) in on_demand_feature_view_proto .spec .sources .items ():
310335 if on_demand_source .WhichOneof ("source" ) == "feature_view" :
311- sources [ source_name ] = FeatureView . from_proto (
312- on_demand_source .feature_view
313- ). projection
336+ sources . append (
337+ FeatureView . from_proto ( on_demand_source .feature_view ). projection
338+ )
314339 elif on_demand_source .WhichOneof ("source" ) == "feature_view_projection" :
315- sources [source_name ] = FeatureViewProjection .from_proto (
316- on_demand_source .feature_view_projection
340+ sources .append (
341+ FeatureViewProjection .from_proto (
342+ on_demand_source .feature_view_projection
343+ )
317344 )
318345 else :
319- sources [ source_name ] = RequestSource . from_proto (
320- on_demand_source .request_data_source
346+ sources . append (
347+ RequestSource . from_proto ( on_demand_source .request_data_source )
321348 )
322349 on_demand_feature_view_obj = cls (
323350 name = on_demand_feature_view_proto .spec .name ,
@@ -476,7 +503,16 @@ def get_requested_odfvs(feature_refs, project, registry):
476503def on_demand_feature_view (
477504 * args ,
478505 features : Optional [List [Feature ]] = None ,
479- sources : Optional [Dict [str , Union [FeatureView , RequestSource ]]] = None ,
506+ sources : Optional [
507+ List [
508+ Union [
509+ BatchFeatureView ,
510+ StreamFeatureView ,
511+ RequestSource ,
512+ FeatureViewProjection ,
513+ ]
514+ ]
515+ ] = None ,
480516 inputs : Optional [Dict [str , Union [FeatureView , RequestSource ]]] = None ,
481517 schema : Optional [List [Field ]] = None ,
482518 description : str = "" ,
@@ -490,7 +526,7 @@ def on_demand_feature_view(
490526 features (deprecated): The list of features in the output of the on demand
491527 feature view, after the transformation has been applied.
492528 sources (optional): A map from input source names to the actual input sources,
493- which may be feature views, feature view projections, or request data sources.
529+ which may be feature views, or request data sources.
494530 These sources serve as inputs to the udf, which will refer to them by name.
495531 inputs (optional): A map from input source names to the actual input sources,
496532 which may be feature views, feature view projections, or request data sources.
@@ -517,8 +553,7 @@ def on_demand_feature_view(
517553 ),
518554 DeprecationWarning ,
519555 )
520-
521- _sources = sources or inputs
556+ _sources = sources or []
522557 if inputs and sources :
523558 raise ValueError ("At most one of `sources` or `inputs` can be specified." )
524559 elif inputs :
@@ -529,6 +564,17 @@ def on_demand_feature_view(
529564 ),
530565 DeprecationWarning ,
531566 )
567+ for _ , source in inputs .items ():
568+ if isinstance (source , FeatureView ):
569+ _sources .append (feature_view_to_batch_feature_view (source ))
570+ elif isinstance (source , RequestSource ) or isinstance (
571+ source , FeatureViewProjection
572+ ):
573+ _sources .append (source )
574+ else :
575+ raise ValueError (
576+ "input can only accept FeatureView, FeatureViewProjection, or RequestSource"
577+ )
532578
533579 if args :
534580 warnings .warn (
@@ -559,14 +605,25 @@ def on_demand_feature_view(
559605 DeprecationWarning ,
560606 )
561607 if len (args ) >= 2 :
562- _sources = args [1 ]
563- warnings .warn (
564- (
565- "The `inputs` parameter is being deprecated. Please use `sources` instead. "
566- "Feast 0.21 and onwards will not support the `inputs` parameter."
567- ),
568- DeprecationWarning ,
569- )
608+ _inputs = args [1 ]
609+ for _ , source in _inputs .items ():
610+ if isinstance (source , FeatureView ):
611+ _sources .append (feature_view_to_batch_feature_view (source ))
612+ elif isinstance (source , RequestSource ) or isinstance (
613+ source , FeatureViewProjection
614+ ):
615+ _sources .append (source )
616+ else :
617+ raise ValueError (
618+ "input can only accept FeatureView, FeatureViewProjection, or RequestSource"
619+ )
620+ warnings .warn (
621+ (
622+ "The `inputs` parameter is being deprecated. Please use `sources` instead. "
623+ "Feast 0.21 and onwards will not support the `inputs` parameter."
624+ ),
625+ DeprecationWarning ,
626+ )
570627
571628 if not _sources :
572629 raise ValueError ("The `sources` parameter must be specified." )
@@ -587,3 +644,16 @@ def decorator(user_function):
587644 return on_demand_feature_view_obj
588645
589646 return decorator
647+
648+
649+ def feature_view_to_batch_feature_view (fv : FeatureView ) -> BatchFeatureView :
650+ return BatchFeatureView (
651+ name = fv .name ,
652+ entities = fv .entities ,
653+ ttl = fv .ttl ,
654+ tags = fv .tags ,
655+ online = fv .online ,
656+ owner = fv .owner ,
657+ schema = fv .schema ,
658+ source = fv .source ,
659+ )
0 commit comments