1+ import datetime
2+
13import pandas as pd
4+ import pyarrow as pa
25import pytest
36from great_expectations .core import ExpectationSuite
47from great_expectations .dataset import PandasDataset
58
9+ from feast import FeatureService
610from feast .dqm .errors import ValidationFailed
711from feast .dqm .profilers .ge_profiler import ge_profiler
12+ from feast .feature_logging import (
13+ LOG_TIMESTAMP_FIELD ,
14+ FeatureServiceLoggingSource ,
15+ LoggingConfig ,
16+ )
17+ from feast .protos .feast .serving .ServingService_pb2 import FieldStatus
18+ from feast .wait import wait_retry_backoff
819from tests .integration .feature_repos .repo_configuration import (
920 construct_universal_feature_views ,
1021)
1324 driver ,
1425 location ,
1526)
27+ from tests .utils .logged_features import prepare_logs
1628
1729_features = [
1830 "customer_profile:current_balance" ,
@@ -32,6 +44,39 @@ def configurable_profiler(dataset: PandasDataset) -> ExpectationSuite:
3244
3345 return UserConfigurableProfiler (
3446 profile_dataset = dataset ,
47+ ignored_columns = ["event_timestamp" ],
48+ excluded_expectations = [
49+ "expect_table_columns_to_match_ordered_list" ,
50+ "expect_table_row_count_to_be_between" ,
51+ ],
52+ value_set_threshold = "few" ,
53+ ).build_suite ()
54+
55+
56+ @ge_profiler (with_feature_metadata = True )
57+ def profiler_with_feature_metadata (dataset : PandasDataset ) -> ExpectationSuite :
58+ from great_expectations .profile .user_configurable_profiler import (
59+ UserConfigurableProfiler ,
60+ )
61+
62+ # always present
63+ dataset .expect_column_values_to_be_in_set (
64+ "global_stats__avg_ride_length__status" , {FieldStatus .PRESENT }
65+ )
66+
67+ # present at least in 70% of rows
68+ dataset .expect_column_values_to_be_in_set (
69+ "customer_profile__current_balance__status" , {FieldStatus .PRESENT }, mostly = 0.7
70+ )
71+
72+ return UserConfigurableProfiler (
73+ profile_dataset = dataset ,
74+ ignored_columns = ["event_timestamp" ]
75+ + [
76+ c
77+ for c in dataset .columns
78+ if c .endswith ("__timestamp" ) or c .endswith ("__status" )
79+ ],
3580 excluded_expectations = [
3681 "expect_table_columns_to_match_ordered_list" ,
3782 "expect_table_row_count_to_be_between" ,
@@ -127,3 +172,88 @@ def test_historical_retrieval_fails_on_validation(environment, universal_data_so
127172
128173 assert failed_expectations [1 ].check_name == "expect_column_values_to_be_in_set"
129174 assert failed_expectations [1 ].column_name == "avg_passenger_count"
175+
176+
177+ @pytest .mark .integration
178+ def test_logged_features_validation (environment , universal_data_sources ):
179+ store = environment .feature_store
180+
181+ (_ , datasets , data_sources ) = universal_data_sources
182+ feature_views = construct_universal_feature_views (data_sources )
183+ feature_service = FeatureService (
184+ name = "test_service" ,
185+ features = [
186+ feature_views .customer [
187+ ["current_balance" , "avg_passenger_count" , "lifetime_trip_count" ]
188+ ],
189+ feature_views .order [["order_is_success" ]],
190+ feature_views .global_fv [["num_rides" , "avg_ride_length" ]],
191+ ],
192+ logging_config = LoggingConfig (
193+ destination = environment .data_source_creator .create_logged_features_destination ()
194+ ),
195+ )
196+
197+ store .apply (
198+ [driver (), customer (), location (), feature_service , * feature_views .values ()]
199+ )
200+
201+ entity_df = datasets .entity_df .drop (
202+ columns = ["order_id" , "origin_id" , "destination_id" ]
203+ )
204+
205+ # add some non-existing entities to check NotFound feature handling
206+ for i in range (5 ):
207+ entity_df = entity_df .append (
208+ {
209+ "customer_id" : 2000 + i ,
210+ "driver_id" : 6000 + i ,
211+ "event_timestamp" : datetime .datetime .now (),
212+ },
213+ ignore_index = True ,
214+ )
215+
216+ reference_dataset = store .create_saved_dataset (
217+ from_ = store .get_historical_features (
218+ entity_df = entity_df , features = feature_service , full_feature_names = True
219+ ),
220+ name = "reference_for_validating_logged_features" ,
221+ storage = environment .data_source_creator .create_saved_dataset_destination (),
222+ )
223+
224+ log_source_df = store .get_historical_features (
225+ entity_df = entity_df , features = feature_service , full_feature_names = False
226+ ).to_df ()
227+ logs_df = prepare_logs (log_source_df , feature_service , store )
228+
229+ schema = FeatureServiceLoggingSource (
230+ feature_service = feature_service , project = store .project
231+ ).get_schema (store ._registry )
232+ store .write_logged_features (
233+ pa .Table .from_pandas (logs_df , schema = schema ), source = feature_service
234+ )
235+
236+ def validate ():
237+ """
238+ Return Tuple[succeed, completed]
239+ Succeed will be True if no ValidateFailed exception was raised
240+ """
241+ try :
242+ store .validate_logged_features (
243+ feature_service ,
244+ start = logs_df [LOG_TIMESTAMP_FIELD ].min (),
245+ end = logs_df [LOG_TIMESTAMP_FIELD ].max () + datetime .timedelta (seconds = 1 ),
246+ reference = reference_dataset .as_reference (
247+ profiler = profiler_with_feature_metadata
248+ ),
249+ )
250+ except ValidationFailed :
251+ return False , True
252+ except Exception :
253+ # log table is still being created
254+ return False , False
255+
256+ return True , True
257+
258+ success = wait_retry_backoff (validate , timeout_secs = 30 )
259+ assert success , "Validation failed (unexpectedly)"
0 commit comments