|
| 1 | +import datetime |
1 | 2 | import itertools |
2 | 3 | import unittest |
3 | 4 | from datetime import timedelta |
|
19 | 20 | driver, |
20 | 21 | location, |
21 | 22 | ) |
| 23 | +from tests.integration.feature_repos.universal.feature_views import ( |
| 24 | + create_driver_hourly_stats_feature_view, |
| 25 | +) |
| 26 | + |
| 27 | + |
| 28 | +@pytest.mark.integration |
| 29 | +def test_write_to_online_store(environment, universal_data_sources): |
| 30 | + fs = environment.feature_store |
| 31 | + entities, datasets, data_sources = universal_data_sources |
| 32 | + driver_hourly_stats = create_driver_hourly_stats_feature_view( |
| 33 | + data_sources["driver"] |
| 34 | + ) |
| 35 | + driver_entity = driver() |
| 36 | + |
| 37 | + # Register Feature View and Entity |
| 38 | + fs.apply([driver_hourly_stats, driver_entity]) |
| 39 | + |
| 40 | + # fake data to ingest into Online Store |
| 41 | + data = { |
| 42 | + "driver_id": [123], |
| 43 | + "conv_rate": [0.85], |
| 44 | + "acc_rate": [0.91], |
| 45 | + "avg_daily_trips": [14], |
| 46 | + "event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], |
| 47 | + "created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], |
| 48 | + } |
| 49 | + df_data = pd.DataFrame(data) |
| 50 | + |
| 51 | + # directly ingest data into the Online Store |
| 52 | + fs.write_to_online_store("driver_stats", df_data) |
| 53 | + |
| 54 | + # assert the right data is in the Online Store |
| 55 | + df = fs.get_online_features( |
| 56 | + features=[ |
| 57 | + "driver_stats:avg_daily_trips", |
| 58 | + "driver_stats:acc_rate", |
| 59 | + "driver_stats:conv_rate", |
| 60 | + ], |
| 61 | + entity_rows=[{"driver": 123}], |
| 62 | + ).to_df() |
| 63 | + assert df["avg_daily_trips"].iloc[0] == 14 |
| 64 | + assert df["acc_rate"].iloc[0] == 0.91 |
| 65 | + assert df["conv_rate"].iloc[0] == 0.85 |
22 | 66 |
|
23 | 67 |
|
24 | 68 | @pytest.mark.integration |
|
0 commit comments