|
33 | 33 | from tests.utils.data_source_utils import prep_file_source |
34 | 34 |
|
35 | 35 |
|
| 36 | +@pytest.mark.integration |
| 37 | +def test_entity_ttl_online_store(local_redis_environment, universal_data_sources): |
| 38 | + if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True": |
| 39 | + return |
| 40 | + fs = local_redis_environment.feature_store |
| 41 | + # setting ttl setting in online store to 1 second |
| 42 | + fs.config.online_store.key_ttl_seconds = 1 |
| 43 | + entities, datasets, data_sources = universal_data_sources |
| 44 | + driver_hourly_stats = create_driver_hourly_stats_feature_view( |
| 45 | + data_sources["driver"] |
| 46 | + ) |
| 47 | + driver_entity = driver() |
| 48 | + |
| 49 | + # Register Feature View and Entity |
| 50 | + fs.apply([driver_hourly_stats, driver_entity]) |
| 51 | + |
| 52 | + # fake data to ingest into Online Store |
| 53 | + data = { |
| 54 | + "driver_id": [1], |
| 55 | + "conv_rate": [0.5], |
| 56 | + "acc_rate": [0.6], |
| 57 | + "avg_daily_trips": [4], |
| 58 | + "event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], |
| 59 | + "created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], |
| 60 | + } |
| 61 | + df_ingest = pd.DataFrame(data) |
| 62 | + |
| 63 | + # directly ingest data into the Online Store |
| 64 | + fs.write_to_online_store("driver_stats", df_ingest) |
| 65 | + |
| 66 | + # assert the right data is in the Online Store |
| 67 | + df = fs.get_online_features( |
| 68 | + features=[ |
| 69 | + "driver_stats:avg_daily_trips", |
| 70 | + "driver_stats:acc_rate", |
| 71 | + "driver_stats:conv_rate", |
| 72 | + ], |
| 73 | + entity_rows=[{"driver": 1}], |
| 74 | + ).to_df() |
| 75 | + assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_equal_to(4) |
| 76 | + assertpy.assert_that(df["acc_rate"].iloc[0]).is_close_to(0.6, 1e-6) |
| 77 | + assertpy.assert_that(df["conv_rate"].iloc[0]).is_close_to(0.5, 1e-6) |
| 78 | + |
| 79 | + # simulate time passing for testing ttl |
| 80 | + time.sleep(1) |
| 81 | + |
| 82 | + # retrieve the same entity again |
| 83 | + df = fs.get_online_features( |
| 84 | + features=[ |
| 85 | + "driver_stats:avg_daily_trips", |
| 86 | + "driver_stats:acc_rate", |
| 87 | + "driver_stats:conv_rate", |
| 88 | + ], |
| 89 | + entity_rows=[{"driver": 1}], |
| 90 | + ).to_df() |
| 91 | + # assert that the entity features expired in the online store |
| 92 | + assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_none() |
| 93 | + assertpy.assert_that(df["acc_rate"].iloc[0]).is_none() |
| 94 | + assertpy.assert_that(df["conv_rate"].iloc[0]).is_none() |
| 95 | + |
| 96 | + |
36 | 97 | # TODO: make this work with all universal (all online store types) |
37 | 98 | @pytest.mark.integration |
38 | 99 | def test_write_to_online_store_event_check(local_redis_environment): |
|
0 commit comments