1313import requests
1414from botocore .exceptions import BotoCoreError
1515
16+ from feast import FeatureStore
1617from feast .entity import Entity
1718from feast .errors import FeatureNameCollisionError
1819from feast .feature_service import FeatureService
@@ -401,19 +402,13 @@ def test_online_retrieval_with_shared_batch_source(environment, universal_data_s
401402 )
402403
403404
404- @pytest .mark .integration
405- @pytest .mark .universal_online_stores
406- @pytest .mark .parametrize ("full_feature_names" , [True , False ], ids = lambda v : str (v ))
407- def test_online_retrieval_with_event_timestamps (
408- environment , universal_data_sources , full_feature_names
409- ):
410- fs = environment .feature_store
405+ def setup_feature_store_universal_feature_views (environment , universal_data_sources ) -> FeatureStore :
406+ fs : FeatureStore = environment .feature_store
411407 entities , datasets , data_sources = universal_data_sources
412408 feature_views = construct_universal_feature_views (data_sources )
413409
414410 fs .apply ([driver (), feature_views .driver , feature_views .global_fv ])
415411
416- # fake data to ingest into Online Store
417412 data = {
418413 "driver_id" : [1 , 2 ],
419414 "conv_rate" : [0.5 , 0.3 ],
@@ -430,18 +425,11 @@ def test_online_retrieval_with_event_timestamps(
430425 }
431426 df_ingest = pd .DataFrame (data )
432427
433- # directly ingest data into the Online Store
434428 fs .write_to_online_store ("driver_stats" , df_ingest )
429+ return fs
435430
436- response = fs .get_online_features (
437- features = [
438- "driver_stats:avg_daily_trips" ,
439- "driver_stats:acc_rate" ,
440- "driver_stats:conv_rate" ,
441- ],
442- entity_rows = [{"driver_id" : 1 }, {"driver_id" : 2 }],
443- )
444- df = response .to_df (True )
431+
432+ def assert_feature_store_universal_feature_views_response (df : pd .DataFrame ):
445433 assertpy .assert_that (len (df )).is_equal_to (2 )
446434 assertpy .assert_that (df ["driver_id" ].iloc [0 ]).is_equal_to (1 )
447435 assertpy .assert_that (df ["driver_id" ].iloc [1 ]).is_equal_to (2 )
@@ -466,33 +454,32 @@ def test_online_retrieval_with_event_timestamps(
466454
467455
468456@pytest .mark .integration
469- @pytest .mark .universal_online_stores (only = ["redis" ])
470- def test_async_online_retrieval_with_event_timestamps (
471- environment , universal_data_sources
457+ @pytest .mark .universal_online_stores
458+ @pytest .mark .parametrize ("full_feature_names" , [True , False ], ids = lambda v : str (v ))
459+ def test_online_retrieval_with_event_timestamps (
460+ environment , universal_data_sources , full_feature_names
472461):
473- fs = environment .feature_store
474- entities , datasets , data_sources = universal_data_sources
475- feature_views = construct_universal_feature_views (data_sources )
476-
477- fs .apply ([driver (), feature_views .driver , feature_views .global_fv ])
462+ fs = setup_feature_store_universal_feature_views (fs , universal_data_sources )
478463
479- data = {
480- "driver_id" : [1 , 2 ],
481- "conv_rate" : [0.5 , 0.3 ],
482- "acc_rate" : [0.6 , 0.4 ],
483- "avg_daily_trips" : [4 , 5 ],
484- "event_timestamp" : [
485- pd .to_datetime (1646263500 , utc = True , unit = "s" ),
486- pd .to_datetime (1646263600 , utc = True , unit = "s" ),
487- ],
488- "created" : [
489- pd .to_datetime (1646263500 , unit = "s" ),
490- pd .to_datetime (1646263600 , unit = "s" ),
464+ response = fs .get_online_features (
465+ features = [
466+ "driver_stats:avg_daily_trips" ,
467+ "driver_stats:acc_rate" ,
468+ "driver_stats:conv_rate" ,
491469 ],
492- }
493- df_ingest = pd .DataFrame (data )
470+ entity_rows = [{"driver_id" : 1 }, {"driver_id" : 2 }],
471+ )
472+ df = response .to_df (True )
494473
495- fs .write_to_online_store ("driver_stats" , df_ingest )
474+ assert_feature_store_universal_feature_views_response (df )
475+
476+
477+ @pytest .mark .integration
478+ @pytest .mark .universal_online_stores (only = ["redis" ])
479+ def test_async_online_retrieval_with_event_timestamps (
480+ environment , universal_data_sources
481+ ):
482+ fs = setup_feature_store_universal_feature_views (fs , universal_data_sources )
496483
497484 response = asyncio .run (
498485 fs .get_online_features_async (
@@ -506,27 +493,7 @@ def test_async_online_retrieval_with_event_timestamps(
506493 )
507494 df = response .to_df (True )
508495
509- assertpy .assert_that (len (df )).is_equal_to (2 )
510- assertpy .assert_that (df ["driver_id" ].iloc [0 ]).is_equal_to (1 )
511- assertpy .assert_that (df ["driver_id" ].iloc [1 ]).is_equal_to (2 )
512- assertpy .assert_that (df ["avg_daily_trips" + TIMESTAMP_POSTFIX ].iloc [0 ]).is_equal_to (
513- 1646263500
514- )
515- assertpy .assert_that (df ["avg_daily_trips" + TIMESTAMP_POSTFIX ].iloc [1 ]).is_equal_to (
516- 1646263600
517- )
518- assertpy .assert_that (df ["acc_rate" + TIMESTAMP_POSTFIX ].iloc [0 ]).is_equal_to (
519- 1646263500
520- )
521- assertpy .assert_that (df ["acc_rate" + TIMESTAMP_POSTFIX ].iloc [1 ]).is_equal_to (
522- 1646263600
523- )
524- assertpy .assert_that (df ["conv_rate" + TIMESTAMP_POSTFIX ].iloc [0 ]).is_equal_to (
525- 1646263500
526- )
527- assertpy .assert_that (df ["conv_rate" + TIMESTAMP_POSTFIX ].iloc [1 ]).is_equal_to (
528- 1646263600
529- )
496+ assert_feature_store_universal_feature_views_response (df )
530497
531498
532499@pytest .mark .integration
0 commit comments