33
44import pandas as pd
55
6- from feast import Entity , Feature , FeatureStore , FeatureView , ValueType
6+ from feast import Entity , FeatureStore , FeatureView , Field
77from feast .infra .offline_stores .contrib .athena_offline_store .athena_source import (
88 AthenaSource ,
99)
10+ from feast .types import Float64 , Int64
1011
1112
1213def test_end_to_end ():
1314
1415 try :
15- fs = FeatureStore ("." )
16+
17+ # Before running this test method
18+ # 1. Upload the driver_stats.parquet file to your S3 bucket.
19+ # (https://github.com/feast-dev/feast-custom-offline-store-demo/tree/main/feature_repo/data)
20+ # 2. Using AWS Glue Crawler, create a table in the data catalog. The generated table can be queried through Athena.
21+ # 3. Specify the S3 bucket name, data source(AwsDataCatalog), database name, Athena's workgroup, etc. in feature_store.yaml
22+
23+ fs = FeatureStore ("./feature_repo" )
24+
25+ # Partition pruning has a significant impact on Athena's query performance and cost.
26+ # If offline feature dataset is large, it is highly recommended to create partitions using date columns such as ('created','event_timestamp')
27+ # The date_partition_column must be in form of YYYY-MM-DD(string) as in the beginning of the date column.
1628
1729 driver_hourly_stats = AthenaSource (
1830 timestamp_field = "event_timestamp" ,
@@ -21,31 +33,29 @@ def test_end_to_end():
2133 database = "sampledb" ,
2234 data_source = "AwsDataCatalog" ,
2335 created_timestamp_column = "created" ,
24- # date_partition_column="std_date"
36+ # date_partition_column="std_date" #YYYY-MM-DD
2537 )
2638
2739 driver = Entity (
2840 name = "driver_id" ,
29- value_type = ValueType .INT64 ,
3041 description = "driver id" ,
3142 )
3243
3344 driver_hourly_stats_view = FeatureView (
3445 name = "driver_hourly_stats" ,
35- entities = ["driver_id" ],
36- ttl = timedelta (days = 365 ),
37- features = [
38- Feature (name = "conv_rate" , dtype = ValueType . FLOAT ),
39- Feature (name = "acc_rate" , dtype = ValueType . FLOAT ),
40- Feature (name = "avg_daily_trips" , dtype = ValueType . INT64 ),
46+ entities = [driver ],
47+ ttl = timedelta (days = 500 ),
48+ schema = [
49+ Field (name = "conv_rate" , dtype = Float64 ),
50+ Field (name = "acc_rate" , dtype = Float64 ),
51+ Field (name = "avg_daily_trips" , dtype = Int64 ),
4152 ],
4253 online = True ,
43- batch_source = driver_hourly_stats ,
54+ source = driver_hourly_stats ,
4455 )
4556
4657 # apply repository
4758 fs .apply ([driver_hourly_stats , driver , driver_hourly_stats_view ])
48-
4959 print (fs .list_data_sources ())
5060 print (fs .list_feature_views ())
5161
@@ -54,7 +64,6 @@ def test_end_to_end():
5464 )
5565
5666 # Read features from offline store
57-
5867 feature_vector = (
5968 fs .get_historical_features (
6069 features = ["driver_hourly_stats:conv_rate" ], entity_df = entity_df
0 commit comments