44import warnings
55from datetime import datetime
66from types import FunctionType
7- from typing import Any , Dict , List , Optional , Type , Union
7+ from typing import Any , Optional , Union
88
99import dill
1010import pandas as pd
@@ -62,24 +62,24 @@ class OnDemandFeatureView(BaseFeatureView):
6262 """
6363
6464 name : str
65- features : List [Field ]
66- source_feature_view_projections : Dict [str , FeatureViewProjection ]
67- source_request_sources : Dict [str , RequestSource ]
65+ features : list [Field ]
66+ source_feature_view_projections : dict [str , FeatureViewProjection ]
67+ source_request_sources : dict [str , RequestSource ]
6868 feature_transformation : Union [
6969 PandasTransformation , PythonTransformation , SubstraitTransformation
7070 ]
7171 mode : str
7272 description : str
73- tags : Dict [str , str ]
73+ tags : dict [str , str ]
7474 owner : str
7575
7676 @log_exceptions # noqa: C901
7777 def __init__ ( # noqa: C901
7878 self ,
7979 * ,
8080 name : str ,
81- schema : List [Field ],
82- sources : List [
81+ schema : list [Field ],
82+ sources : list [
8383 Union [
8484 FeatureView ,
8585 RequestSource ,
@@ -93,7 +93,7 @@ def __init__( # noqa: C901
9393 ],
9494 mode : str = "pandas" ,
9595 description : str = "" ,
96- tags : Optional [Dict [str , str ]] = None ,
96+ tags : Optional [dict [str , str ]] = None ,
9797 owner : str = "" ,
9898 ):
9999 """
@@ -124,32 +124,31 @@ def __init__( # noqa: C901
124124 owner = owner ,
125125 )
126126
127- if mode not in {"python" , "pandas" , "substrait" }:
128- raise Exception (
129- f"Unknown mode { mode } . OnDemandFeatureView only supports python or pandas UDFs and substrait."
127+ self .mode = mode .lower ()
128+
129+ if self .mode not in {"python" , "pandas" , "substrait" }:
130+ raise ValueError (
131+ f"Unknown mode { self .mode } . OnDemandFeatureView only supports python or pandas UDFs and substrait."
130132 )
131- else :
132- self .mode = mode
133+
133134 if not feature_transformation :
134135 if udf :
135136 warnings .warn (
136137 "udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead." ,
137138 DeprecationWarning ,
138139 )
139140 # Note inspecting the return signature won't work with isinstance so this is the best alternative
140- if mode == "pandas" :
141+ if self . mode == "pandas" :
141142 feature_transformation = PandasTransformation (udf , udf_string )
142- elif mode == "python" :
143+ elif self . mode == "python" :
143144 feature_transformation = PythonTransformation (udf , udf_string )
144- else :
145- pass
146145 else :
147- raise Exception (
146+ raise ValueError (
148147 "OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments"
149148 )
150149
151- self .source_feature_view_projections : Dict [str , FeatureViewProjection ] = {}
152- self .source_request_sources : Dict [str , RequestSource ] = {}
150+ self .source_feature_view_projections : dict [str , FeatureViewProjection ] = {}
151+ self .source_request_sources : dict [str , RequestSource ] = {}
153152 for odfv_source in sources :
154153 if isinstance (odfv_source , RequestSource ):
155154 self .source_request_sources [odfv_source .name ] = odfv_source
@@ -163,7 +162,7 @@ def __init__( # noqa: C901
163162 self .feature_transformation = feature_transformation
164163
165164 @property
166- def proto_class (self ) -> Type [OnDemandFeatureViewProto ]:
165+ def proto_class (self ) -> type [OnDemandFeatureViewProto ]:
167166 return OnDemandFeatureViewProto
168167
169168 def __copy__ (self ):
@@ -336,7 +335,7 @@ def from_proto(
336335 user_defined_function_proto = backwards_compatible_udf ,
337336 )
338337 else :
339- raise Exception ("At least one transformation type needs to be provided" )
338+ raise ValueError ("At least one transformation type needs to be provided" )
340339
341340 on_demand_feature_view_obj = cls (
342341 name = on_demand_feature_view_proto .spec .name ,
@@ -372,18 +371,18 @@ def from_proto(
372371
373372 return on_demand_feature_view_obj
374373
375- def get_request_data_schema (self ) -> Dict [str , ValueType ]:
376- schema : Dict [str , ValueType ] = {}
374+ def get_request_data_schema (self ) -> dict [str , ValueType ]:
375+ schema : dict [str , ValueType ] = {}
377376 for request_source in self .source_request_sources .values ():
378- if isinstance (request_source .schema , List ):
377+ if isinstance (request_source .schema , list ):
379378 new_schema = {}
380379 for field in request_source .schema :
381380 new_schema [field .name ] = field .dtype .to_value_type ()
382381 schema .update (new_schema )
383- elif isinstance (request_source .schema , Dict ):
382+ elif isinstance (request_source .schema , dict ):
384383 schema .update (request_source .schema )
385384 else :
386- raise Exception (
385+ raise TypeError (
387386 f"Request source schema is not correct type: ${ str (type (request_source .schema ))} "
388387 )
389388 return schema
@@ -401,7 +400,10 @@ def transform_ibis(
401400 if not isinstance (ibis_table , Table ):
402401 raise TypeError ("transform_ibis only accepts ibis.expr.types.Table" )
403402
404- assert type (self .feature_transformation ) == SubstraitTransformation
403+ if not isinstance (self .feature_transformation , SubstraitTransformation ):
404+ raise TypeError (
405+ "The feature_transformation is not SubstraitTransformation type while calling transform_ibis()."
406+ )
405407
406408 columns_to_cleanup = []
407409 for source_fv_projection in self .source_feature_view_projections .values ():
@@ -423,7 +425,7 @@ def transform_ibis(
423425
424426 transformed_table = transformed_table .drop (* columns_to_cleanup )
425427
426- rename_columns : Dict [str , str ] = {}
428+ rename_columns : dict [str , str ] = {}
427429 for feature in self .features :
428430 short_name = feature .name
429431 long_name = self ._get_projected_feature_name (feature .name )
@@ -454,11 +456,9 @@ def transform_arrow(
454456 pa_table = pa_table .append_column (
455457 feature .name , pa_table [full_feature_ref ]
456458 )
457- # pa_table[feature.name] = pa_table[full_feature_ref]
458459 columns_to_cleanup .append (feature .name )
459460 elif feature .name in pa_table .column_names :
460461 # Make sure the full feature name is always present
461- # pa_table[full_feature_ref] = pa_table[feature.name]
462462 pa_table = pa_table .append_column (
463463 full_feature_ref , pa_table [feature .name ]
464464 )
@@ -469,7 +469,7 @@ def transform_arrow(
469469 )
470470
471471 # Work out whether the correct columns names are used.
472- rename_columns : Dict [str , str ] = {}
472+ rename_columns : dict [str , str ] = {}
473473 for feature in self .features :
474474 short_name = feature .name
475475 long_name = self ._get_projected_feature_name (feature .name )
@@ -494,12 +494,12 @@ def transform_arrow(
494494
495495 def transform_dict (
496496 self ,
497- feature_dict : Dict [str , Any ], # type: ignore
498- ) -> Dict [str , Any ]:
497+ feature_dict : dict [str , Any ], # type: ignore
498+ ) -> dict [str , Any ]:
499499 # we need a mapping from full feature name to short and back to do a renaming
500500 # The simplest thing to do is to make the full reference, copy the columns with the short reference
501501 # and rerun
502- columns_to_cleanup : List [str ] = []
502+ columns_to_cleanup : list [str ] = []
503503 for source_fv_projection in self .source_feature_view_projections .values ():
504504 for feature in source_fv_projection .features :
505505 full_feature_ref = f"{ source_fv_projection .name } __{ feature .name } "
@@ -512,7 +512,7 @@ def transform_dict(
512512 feature_dict [full_feature_ref ] = feature_dict [feature .name ]
513513 columns_to_cleanup .append (str (full_feature_ref ))
514514
515- output_dict : Dict [str , Any ] = self .feature_transformation .transform (
515+ output_dict : dict [str , Any ] = self .feature_transformation .transform (
516516 feature_dict
517517 )
518518 for feature_name in columns_to_cleanup :
@@ -542,8 +542,8 @@ def infer_features(self) -> None:
542542 f"Could not infer Features for the feature view '{ self .name } '." ,
543543 )
544544
545- def _construct_random_input (self ) -> Dict [str , List [Any ]]:
546- rand_dict_value : Dict [ValueType , List [Any ]] = {
545+ def _construct_random_input (self ) -> dict [str , list [Any ]]:
546+ rand_dict_value : dict [ValueType , list [Any ]] = {
547547 ValueType .BYTES : [str .encode ("hello world" )],
548548 ValueType .STRING : ["hello world" ],
549549 ValueType .INT32 : [1 ],
@@ -582,11 +582,11 @@ def _construct_random_input(self) -> Dict[str, List[Any]]:
582582 @staticmethod
583583 def get_requested_odfvs (
584584 feature_refs , project , registry
585- ) -> List ["OnDemandFeatureView" ]:
585+ ) -> list ["OnDemandFeatureView" ]:
586586 all_on_demand_feature_views = registry .list_on_demand_feature_views (
587587 project , allow_cache = True
588588 )
589- requested_on_demand_feature_views : List [OnDemandFeatureView ] = []
589+ requested_on_demand_feature_views : list [OnDemandFeatureView ] = []
590590 for odfv in all_on_demand_feature_views :
591591 for feature in odfv .features :
592592 if f"{ odfv .name } :{ feature .name } " in feature_refs :
@@ -597,8 +597,8 @@ def get_requested_odfvs(
597597
598598def on_demand_feature_view (
599599 * ,
600- schema : List [Field ],
601- sources : List [
600+ schema : list [Field ],
601+ sources : list [
602602 Union [
603603 FeatureView ,
604604 RequestSource ,
@@ -607,7 +607,7 @@ def on_demand_feature_view(
607607 ],
608608 mode : str = "pandas" ,
609609 description : str = "" ,
610- tags : Optional [Dict [str , str ]] = None ,
610+ tags : Optional [dict [str , str ]] = None ,
611611 owner : str = "" ,
612612):
613613 """
@@ -643,9 +643,9 @@ def decorator(user_function):
643643 )
644644 transformation = PandasTransformation (user_function , udf_string )
645645 elif mode == "python" :
646- if return_annotation not in (inspect ._empty , Dict [str , Any ]):
646+ if return_annotation not in (inspect ._empty , dict [str , Any ]):
647647 raise TypeError (
648- f"return signature for { user_function } is { return_annotation } but should be Dict [str, Any]"
648+ f"return signature for { user_function } is { return_annotation } but should be dict [str, Any]"
649649 )
650650 transformation = PythonTransformation (user_function , udf_string )
651651 elif mode == "substrait" :
0 commit comments