@@ -493,8 +493,7 @@ def test_invalid_python_transformation_raises_type_error_on_apply():
493493 schema = [Field (name = "driver_name_lower" , dtype = String )],
494494 mode = "python" ,
495495 )
496- def python_view (inputs : dict [str , Any ]) -> dict [str , Any ]:
497- return {"driver_name_lower" : []}
496+ def python_view (inputs : dict [str , Any ]) -> dict [str , Any ]: return {"driver_name_lower" : []}
498497
499498 with pytest .raises (
500499 TypeError ,
@@ -506,7 +505,7 @@ def python_view(inputs: dict[str, Any]) -> dict[str, Any]:
506505
507506
508507class TestOnDemandTransformationsWithWrites (unittest .TestCase ):
509- def setUp (self ):
508+ def test_stored_writes (self ):
510509 with tempfile .TemporaryDirectory () as data_dir :
511510 self .store = FeatureStore (
512511 config = RepoConfig (
@@ -593,7 +592,6 @@ def python_stored_writes_feature_view(
593592 "counter" : [c + 1 for c in inputs ["counter" ]],
594593 "input_datetime" : [d for d in inputs ["input_datetime" ]],
595594 }
596- print ("running odfv transform" )
597595 return output
598596
599597 assert python_stored_writes_feature_view .entities == [driver .name ]
@@ -613,15 +611,13 @@ def python_stored_writes_feature_view(
613611 )
614612
615613 assert fv_applied .entities == [driver .name ]
616- assert odfv_applied .entites == [driver .name ]
614+ assert odfv_applied .entities == [driver .name ]
617615
618616 # Note here that after apply() is called, the entity_columns are populated with the join_key
619- assert fv_applied .entity_columns [0 ].name == driver .join_key
617+ # assert fv_applied.entity_columns[0].name == driver.join_key
618+ assert fv_applied .entity_columns == []
620619 assert odfv_applied .entity_columns [0 ].name == driver .join_key
621620
622- self .store .write_to_online_store (
623- feature_view_name = "driver_hourly_stats" , df = driver_df
624- )
625621 assert len (self .store .list_all_feature_views ()) == 2
626622 assert len (self .store .list_feature_views ()) == 1
627623 assert len (self .store .list_on_demand_feature_views ()) == 1
@@ -631,84 +627,81 @@ def python_stored_writes_feature_view(
631627 == self .store .get_feature_view ("driver_hourly_stats" ).entity_columns
632628 )
633629 assert (
634- self .store .get_on_demand_feature_view (
635- "python_stored_writes_feature_view"
636- ).entity_columns
637- == self .store .get_feature_view ("driver_hourly_stats" ).entity_columns
638- )
639-
640- def test_stored_writes (self ):
641- current_datetime = _utc_now ()
642- fv_entity_rows_to_write = [
643- {
644- "driver_id" : 1001 ,
645- "conv_rate" : 0.25 ,
646- "acc_rate" : 0.25 ,
647- "avg_daily_trips" : 2 ,
648- "event_timestamp" : current_datetime ,
649- "created" : current_datetime ,
650- }
651- ]
652- odfv_entity_rows_to_write = [
653- {
654- "driver_id" : 1001 ,
655- "counter" : 0 ,
656- "input_datetime" : current_datetime ,
657- }
658- ]
659- fv_entity_rows_to_read = [
660- {
661- "driver_id" : 1001 ,
662- }
663- ]
664- # Note that here we shouldn't have to pass the request source features for reading
665- # because they should have already been written to the online store
666- odfv_entity_rows_to_read = [
667- {
668- "driver_id" : 1001 ,
669- "conv_rate" : 0.25 ,
670- "acc_rate" : 0.25 ,
671- "counter" : 0 ,
672- "input_datetime" : current_datetime ,
673- }
674- ]
675- print ("storing fv features" )
676- self .store .write_to_online_store (
677- feature_view_name = "driver_hourly_stats" ,
678- df = fv_entity_rows_to_write ,
679- )
680- print ("reading fv features" )
681- online_python_response = self .store .get_online_features (
682- entity_rows = fv_entity_rows_to_read ,
683- features = [
684- "driver_hourly_stats:conv_rate" ,
685- "driver_hourly_stats:acc_rate" ,
686- "driver_hourly_stats:avg_daily_trips" ,
687- ],
688- ).to_dict ()
689- print (online_python_response )
690- print ("storing odfv features" )
691- self .store .write_to_online_store (
692- feature_view_name = "python_stored_writes_feature_view" ,
693- df = odfv_entity_rows_to_write ,
694- )
695- print ("reading odfv features" )
696- online_python_response = self .store .get_online_features (
697- entity_rows = odfv_entity_rows_to_read ,
698- features = [
699- "python_stored_writes_feature_view:conv_rate_plus_acc" ,
700- "python_stored_writes_feature_view:current_datetime" ,
701- "python_stored_writes_feature_view:counter" ,
702- "python_stored_writes_feature_view:input_datetime" ,
703- ],
704- ).to_dict ()
705- print (online_python_response )
706- assert sorted (list (online_python_response .keys ())) == sorted (
707- [
708- "driver_id" ,
709- "conv_rate_plus_acc" ,
710- "counter" ,
711- "current_datetime" ,
712- "input_datetime" ,
630+ python_stored_writes_feature_view .entity_columns
631+ == self .store .get_on_demand_feature_view ("python_stored_writes_feature_view" ).entity_columns
632+ )
633+
634+ current_datetime = _utc_now ()
635+ fv_entity_rows_to_write = [
636+ {
637+ "driver_id" : 1001 ,
638+ "conv_rate" : 0.25 ,
639+ "acc_rate" : 0.25 ,
640+ "avg_daily_trips" : 2 ,
641+ "event_timestamp" : current_datetime ,
642+ "created" : current_datetime ,
643+ }
713644 ]
714- )
645+ odfv_entity_rows_to_write = [
646+ {
647+ "driver_id" : 1001 ,
648+ "counter" : 0 ,
649+ "input_datetime" : current_datetime ,
650+ }
651+ ]
652+ fv_entity_rows_to_read = [
653+ {
654+ "driver_id" : 1001 ,
655+ }
656+ ]
657+ # Note that here we shouldn't have to pass the request source features for reading
658+ # because they should have already been written to the online store
659+ odfv_entity_rows_to_read = [
660+ {
661+ "driver_id" : 1001 ,
662+ "conv_rate" : 0.25 ,
663+ "acc_rate" : 0.25 ,
664+ "counter" : 0 ,
665+ "input_datetime" : current_datetime ,
666+ }
667+ ]
668+ print ("storing fv features" )
669+ self .store .write_to_online_store (
670+ feature_view_name = "driver_hourly_stats" ,
671+ df = fv_entity_rows_to_write ,
672+ )
673+ print ("reading fv features" )
674+ online_python_response = self .store .get_online_features (
675+ entity_rows = fv_entity_rows_to_read ,
676+ features = [
677+ "driver_hourly_stats:conv_rate" ,
678+ "driver_hourly_stats:acc_rate" ,
679+ "driver_hourly_stats:avg_daily_trips" ,
680+ ],
681+ ).to_dict ()
682+ print (online_python_response )
683+ print ("storing odfv features" )
684+ self .store .write_to_online_store (
685+ feature_view_name = "python_stored_writes_feature_view" ,
686+ df = odfv_entity_rows_to_write ,
687+ )
688+ print ("reading odfv features" )
689+ online_python_response = self .store .get_online_features (
690+ entity_rows = odfv_entity_rows_to_read ,
691+ features = [
692+ "python_stored_writes_feature_view:conv_rate_plus_acc" ,
693+ "python_stored_writes_feature_view:current_datetime" ,
694+ "python_stored_writes_feature_view:counter" ,
695+ "python_stored_writes_feature_view:input_datetime" ,
696+ ],
697+ ).to_dict ()
698+ print (online_python_response )
699+ assert sorted (list (online_python_response .keys ())) == sorted (
700+ [
701+ "driver_id" ,
702+ "conv_rate_plus_acc" ,
703+ "counter" ,
704+ "current_datetime" ,
705+ "input_datetime" ,
706+ ]
707+ )
0 commit comments