1+ import asyncio
12import datetime
23import os
34import time
1213import requests
1314from botocore .exceptions import BotoCoreError
1415
16+ from feast import FeatureStore
1517from feast .entity import Entity
1618from feast .errors import FeatureNameCollisionError
1719from feast .feature_service import FeatureService
@@ -400,19 +402,15 @@ def test_online_retrieval_with_shared_batch_source(environment, universal_data_s
400402 )
401403
402404
403- @pytest .mark .integration
404- @pytest .mark .universal_online_stores
405- @pytest .mark .parametrize ("full_feature_names" , [True , False ], ids = lambda v : str (v ))
406- def test_online_retrieval_with_event_timestamps (
407- environment , universal_data_sources , full_feature_names
408- ):
409- fs = environment .feature_store
405+ def setup_feature_store_universal_feature_views (
406+ environment , universal_data_sources
407+ ) -> FeatureStore :
408+ fs : FeatureStore = environment .feature_store
410409 entities , datasets , data_sources = universal_data_sources
411410 feature_views = construct_universal_feature_views (data_sources )
412411
413412 fs .apply ([driver (), feature_views .driver , feature_views .global_fv ])
414413
415- # fake data to ingest into Online Store
416414 data = {
417415 "driver_id" : [1 , 2 ],
418416 "conv_rate" : [0.5 , 0.3 ],
@@ -429,18 +427,11 @@ def test_online_retrieval_with_event_timestamps(
429427 }
430428 df_ingest = pd .DataFrame (data )
431429
432- # directly ingest data into the Online Store
433430 fs .write_to_online_store ("driver_stats" , df_ingest )
431+ return fs
434432
435- response = fs .get_online_features (
436- features = [
437- "driver_stats:avg_daily_trips" ,
438- "driver_stats:acc_rate" ,
439- "driver_stats:conv_rate" ,
440- ],
441- entity_rows = [{"driver_id" : 1 }, {"driver_id" : 2 }],
442- )
443- df = response .to_df (True )
433+
434+ def assert_feature_store_universal_feature_views_response (df : pd .DataFrame ):
444435 assertpy .assert_that (len (df )).is_equal_to (2 )
445436 assertpy .assert_that (df ["driver_id" ].iloc [0 ]).is_equal_to (1 )
446437 assertpy .assert_that (df ["driver_id" ].iloc [1 ]).is_equal_to (2 )
@@ -464,6 +455,50 @@ def test_online_retrieval_with_event_timestamps(
464455 )
465456
466457
458+ @pytest .mark .integration
459+ @pytest .mark .universal_online_stores
460+ def test_online_retrieval_with_event_timestamps (environment , universal_data_sources ):
461+ fs = setup_feature_store_universal_feature_views (
462+ environment , universal_data_sources
463+ )
464+
465+ response = fs .get_online_features (
466+ features = [
467+ "driver_stats:avg_daily_trips" ,
468+ "driver_stats:acc_rate" ,
469+ "driver_stats:conv_rate" ,
470+ ],
471+ entity_rows = [{"driver_id" : 1 }, {"driver_id" : 2 }],
472+ )
473+ df = response .to_df (True )
474+
475+ assert_feature_store_universal_feature_views_response (df )
476+
477+
478+ @pytest .mark .integration
479+ @pytest .mark .universal_online_stores (only = ["redis" ])
480+ def test_async_online_retrieval_with_event_timestamps (
481+ environment , universal_data_sources
482+ ):
483+ fs = setup_feature_store_universal_feature_views (
484+ environment , universal_data_sources
485+ )
486+
487+ response = asyncio .run (
488+ fs .get_online_features_async (
489+ features = [
490+ "driver_stats:avg_daily_trips" ,
491+ "driver_stats:acc_rate" ,
492+ "driver_stats:conv_rate" ,
493+ ],
494+ entity_rows = [{"driver_id" : 1 }, {"driver_id" : 2 }],
495+ )
496+ )
497+ df = response .to_df (True )
498+
499+ assert_feature_store_universal_feature_views_response (df )
500+
501+
467502@pytest .mark .integration
468503@pytest .mark .universal_online_stores (only = ["redis" ])
469504def test_online_store_cleanup (environment , universal_data_sources ):
0 commit comments