@@ -390,6 +390,7 @@ def test_online_retrieval_with_event_timestamps(
390390
391391@pytest .mark .integration
392392@pytest .mark .universal
393+ @pytest .mark .goserver
393394@pytest .mark .parametrize ("full_feature_names" , [True , False ], ids = lambda v : str (v ))
394395def test_online_retrieval (environment , universal_data_sources , full_feature_names ):
395396 fs = environment .feature_store
@@ -695,206 +696,6 @@ def eventually_apply() -> Tuple[None, bool]:
695696 assert all (v is None for v in online_features ["value" ])
696697
697698
698- @pytest .mark .integration
699- @pytest .mark .goserver
700- @pytest .mark .parametrize ("full_feature_names" , [True , False ], ids = lambda v : str (v ))
701- def test_online_retrieval_with_go_server (
702- go_environment , go_data_sources , full_feature_names
703- ):
704- fs = go_environment .feature_store
705- entities , datasets , data_sources = go_data_sources
706- feature_views = construct_universal_feature_views (data_sources , with_odfv = False )
707-
708- feature_service_entity_mapping = FeatureService (
709- name = "entity_mapping" ,
710- features = [
711- feature_views .location .with_name ("origin" ).with_join_key_map (
712- {"location_id" : "origin_id" }
713- ),
714- feature_views .location .with_name ("destination" ).with_join_key_map (
715- {"location_id" : "destination_id" }
716- ),
717- ],
718- )
719-
720- feast_objects = []
721- feast_objects .extend (
722- [feature_view for feature_view in feature_views .values () if feature_view ]
723- )
724- feast_objects .extend (
725- [driver (), customer (), location (), feature_service_entity_mapping ]
726- )
727- fs .apply (feast_objects )
728- fs .materialize (
729- go_environment .start_date - timedelta (days = 1 ),
730- go_environment .end_date + timedelta (days = 1 ),
731- )
732-
733- entity_sample = datasets .orders_df .sample (10 )[
734- ["customer_id" , "driver_id" , "order_id" , "event_timestamp" ]
735- ]
736- orders_df = datasets .orders_df [
737- (
738- datasets .orders_df ["customer_id" ].isin (entity_sample ["customer_id" ])
739- & datasets .orders_df ["driver_id" ].isin (entity_sample ["driver_id" ])
740- )
741- ]
742-
743- sample_drivers = entity_sample ["driver_id" ]
744- drivers_df = datasets .driver_df [
745- datasets .driver_df ["driver_id" ].isin (sample_drivers )
746- ]
747-
748- sample_customers = entity_sample ["customer_id" ]
749- customers_df = datasets .customer_df [
750- datasets .customer_df ["customer_id" ].isin (sample_customers )
751- ]
752-
753- location_pairs = np .array (list (itertools .permutations (entities .location_vals , 2 )))
754- sample_location_pairs = location_pairs [
755- np .random .choice (len (location_pairs ), 10 )
756- ].T .tolist ()
757- origins_df = datasets .location_df [
758- datasets .location_df ["location_id" ].isin (sample_location_pairs [0 ])
759- ]
760- destinations_df = datasets .location_df [
761- datasets .location_df ["location_id" ].isin (sample_location_pairs [1 ])
762- ]
763-
764- global_df = datasets .global_df
765-
766- entity_rows = [
767- {"driver_id" : d , "customer_id" : c }
768- for (d , c ) in zip (sample_drivers , sample_customers )
769- ]
770-
771- # All returned features are numbers
772- feature_refs = [
773- "driver_stats:conv_rate" ,
774- "driver_stats:avg_daily_trips" ,
775- "customer_profile:current_balance" ,
776- "customer_profile:avg_passenger_count" ,
777- "customer_profile:lifetime_trip_count" ,
778- "order:order_is_success" ,
779- "global_stats:num_rides" ,
780- "global_stats:avg_ride_length" ,
781- ]
782- unprefixed_feature_refs = [f .rsplit (":" , 1 )[- 1 ] for f in feature_refs if ":" in f ]
783- # Remove the on demand feature view output features, since they're not present in the source dataframe
784-
785- online_features_dict = get_online_features_dict (
786- environment = go_environment ,
787- features = feature_refs ,
788- entity_rows = entity_rows ,
789- full_feature_names = full_feature_names ,
790- )
791-
792- keys = online_features_dict .keys ()
793- assert (
794- len (keys ) == len (feature_refs ) + 2
795- ) # Add two for the driver id and the customer id entity keys
796- for feature in feature_refs :
797-
798- if full_feature_names :
799- assert feature .replace (":" , "__" ) in keys
800- else :
801- assert feature .rsplit (":" , 1 )[- 1 ] in keys
802- assert (
803- "driver_stats" not in keys
804- and "customer_profile" not in keys
805- and "order" not in keys
806- and "global_stats" not in keys
807- )
808-
809- tc = unittest .TestCase ()
810- for i , entity_row in enumerate (entity_rows ):
811- df_features = get_latest_feature_values_from_dataframes (
812- driver_df = drivers_df ,
813- customer_df = customers_df ,
814- orders_df = orders_df ,
815- global_df = global_df ,
816- entity_row = entity_row ,
817- )
818-
819- assert df_features ["customer_id" ] == online_features_dict ["customer_id" ][i ]
820- assert df_features ["driver_id" ] == online_features_dict ["driver_id" ][i ]
821-
822- # All returned features are numbers
823- for unprefixed_feature_ref in unprefixed_feature_refs :
824- tc .assertAlmostEqual (
825- df_features [unprefixed_feature_ref ],
826- online_features_dict [
827- response_feature_name (
828- unprefixed_feature_ref , feature_refs , full_feature_names
829- )
830- ][i ],
831- delta = 0.0001 ,
832- )
833-
834- # Check what happens for missing values
835- missing_responses_dict = get_online_features_dict (
836- environment = go_environment ,
837- features = feature_refs ,
838- entity_rows = [{"driver_id" : 0 , "customer_id" : 0 }],
839- full_feature_names = full_feature_names ,
840- )
841- assert missing_responses_dict is not None
842- for unprefixed_feature_ref in unprefixed_feature_refs :
843- if unprefixed_feature_ref not in {"num_rides" , "avg_ride_length" }:
844- tc .assertIsNone (
845- missing_responses_dict [
846- response_feature_name (
847- unprefixed_feature_ref , feature_refs , full_feature_names
848- )
849- ][0 ]
850- )
851-
852- entity_rows = [
853- {"origin_id" : origin , "destination_id" : destination }
854- for (_driver , _customer , origin , destination ) in zip (
855- sample_drivers , sample_customers , * sample_location_pairs
856- )
857- ]
858- assert_feature_service_entity_mapping_correctness (
859- go_environment ,
860- feature_service_entity_mapping ,
861- entity_rows ,
862- full_feature_names ,
863- origins_df ,
864- destinations_df ,
865- )
866-
867-
868- def setup_feature_store (environment , go_data_sources ):
869- fs = environment .feature_store
870- fs .kill_go_server ()
871- entities , datasets , data_sources = go_data_sources
872- driver_stats_fv = construct_universal_feature_views (
873- data_sources , with_odfv = False
874- ).driver
875- driver_entities = entities .driver_vals
876- df = pd .DataFrame (
877- {
878- "ts_1" : [environment .end_date ] * len (driver_entities ),
879- "created_ts" : [environment .end_date ] * len (driver_entities ),
880- "driver_id" : driver_entities ,
881- "value" : np .random .random (size = len (driver_entities )),
882- }
883- )
884- ds = environment .data_source_creator .create_data_source (
885- df , destination_name = "simple_driver_dataset"
886- )
887- simple_driver_fv = driver_feature_view (
888- data_source = ds , name = "test_universal_online_simple_driver"
889- )
890- fs .apply ([driver (), simple_driver_fv , driver_stats_fv ])
891- fs .materialize (
892- environment .start_date - timedelta (days = 1 ),
893- environment .end_date + timedelta (days = 1 ),
894- )
895- return driver_entities , fs , simple_driver_fv , driver_stats_fv , df
896-
897-
898699def response_feature_name (
899700 feature : str , feature_refs : List [str ], full_feature_names : bool
900701) -> str :
0 commit comments