From a41fbcedfab3544d2c9e7ee51a91051bdf84fda4 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Thu, 29 Sep 2022 19:35:31 +0800 Subject: [PATCH 1/3] Add flag to enable generation non-agg features --- feathr_project/feathr/client.py | 17 +++++++- .../test/test_azure_snowflake_e2e.py | 2 +- feathr_project/test/test_azure_spark_e2e.py | 2 +- .../test/test_azure_spark_maven_e2e.py | 41 +++++++++++++++++-- .../test/test_feature_materialization.py | 2 +- .../test/test_pyduf_preprocessing_e2e.py | 2 +- 6 files changed, 58 insertions(+), 8 deletions(-) diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index f21d37d23..2a79be53b 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -8,6 +8,7 @@ import redis from azure.identity import DefaultAzureCredential +from feathr.definition.transformation import WindowAggTransformation from jinja2 import Template from pyhocon import ConfigFactory from feathr.definition.sink import Sink @@ -612,17 +613,31 @@ def _valid_materialize_keys(self, features: List[str], allow_empty_key=False): return False return True - def materialize_features(self, settings: MaterializationSettings, execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False): + def materialize_features(self, settings: MaterializationSettings, execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, allow_materialize_non_agg_feature: bool = False): """Materialize feature data Args: settings: Feature materialization settings execution_configurations: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. + allow_materialize_non_agg_feature: Normally materializing non-aggregated features doesn't output meaningful results so it's forbidden, but if you really want to do this, set this to True. """ feature_list = settings.feature_names if len(feature_list) > 0 and not self._valid_materialize_keys(feature_list): raise RuntimeError(f"Invalid materialization features: {feature_list}, since they have different keys. Currently Feathr only supports materializing features of the same keys.") + if not allow_materialize_non_agg_feature: + # Check if there are non-aggregation features in the list + for fn in feature_list: + # Check over anchor features + for anchor in self.anchor_list: + for feature in anchor: + if feature.name == fn and not isinstance(feature.transform, WindowAggTransformation): + raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features.") + # Check over derived features + for feature in self.derived_feature_list: + if feature.name == fn and not isinstance(feature.transform, WindowAggTransformation): + raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features.") + # Collect secrets from sinks secrets = [] for sink in settings.sinks: diff --git a/feathr_project/test/test_azure_snowflake_e2e.py b/feathr_project/test/test_azure_snowflake_e2e.py index c84aa9153..17474ab1b 100644 --- a/feathr_project/test/test_azure_snowflake_e2e.py +++ b/feathr_project/test/test_azure_snowflake_e2e.py @@ -30,7 +30,7 @@ def test_feathr_online_store_agg_features(): feature_names=['f_snowflake_call_center_division_name', 'f_snowflake_call_center_zipcode'], backfill_time=backfill_time) - client.materialize_features(settings) + client.materialize_features(settings, allow_materialize_non_agg_feature=True) # just assume the job is successful without validating the actual result in Redis. Might need to consolidate # this part with the test_feathr_online_store test case client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS) diff --git a/feathr_project/test/test_azure_spark_e2e.py b/feathr_project/test/test_azure_spark_e2e.py index d2aa0b032..9c4ab8c5a 100644 --- a/feathr_project/test/test_azure_spark_e2e.py +++ b/feathr_project/test/test_azure_spark_e2e.py @@ -119,7 +119,7 @@ def test_feathr_online_store_non_agg_features(): feature_names=["f_gen_trip_distance", "f_gen_is_long_trip_distance", "f1", "f2", "f3", "f4", "f5", "f6"], backfill_time=backfill_time) - client.materialize_features(settings) + client.materialize_features(settings, allow_materialize_non_agg_feature=True) # just assume the job is successful without validating the actual result in Redis. Might need to consolidate # this part with the test_feathr_online_store test case client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS) diff --git a/feathr_project/test/test_azure_spark_maven_e2e.py b/feathr_project/test/test_azure_spark_maven_e2e.py index b8e7cefb0..bfdf6d020 100644 --- a/feathr_project/test/test_azure_spark_maven_e2e.py +++ b/feathr_project/test/test_azure_spark_maven_e2e.py @@ -3,8 +3,12 @@ from pathlib import Path from feathr import (BackfillTime, MaterializationSettings) -from feathr import RedisSink +# from feathr import * from feathr.client import FeathrClient +from feathr.definition.dtype import ValueType +from feathr.definition.query_feature_list import FeatureQuery +from feathr.definition.settings import ObservationSettings +from feathr.definition.typed_key import TypedKey from test_fixture import (basic_test_setup, get_online_test_table_name) from test_utils.constants import Constants @@ -20,8 +24,37 @@ def test_feathr_online_store_agg_features(): # The `feathr_runtime_location` was commented out in this config file, so feathr should use # Maven package as the dependency and `noop.jar` as the main file - client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config_maven.yaml")) + client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config_xch.yaml")) + + + location_id = TypedKey(key_column="DOLocationID", + key_column_type=ValueType.INT32, + description="location id in NYC", + full_name="nyc_taxi.location_id") + + feature_query = FeatureQuery( + feature_list=["f_location_avg_fare"], key=location_id) + settings = ObservationSettings( + observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", + event_timestamp_column="lpep_dropoff_datetime", + timestamp_format="yyyy-MM-dd HH:mm:ss") + + now = datetime.now() + # set output folder based on different runtime + if client.spark_runtime == 'databricks': + output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".avro"]) + else: + output_path = ''.join(['abfss://xchfeathrtest4fs@xchfeathrtest4sto.dfs.core.windows.net/demo_data/output','_', str(now.minute), '_', str(now.second), ".avro"]) + + + client.get_offline_features(observation_settings=settings, + feature_query=feature_query, + output_path=output_path) + + # assuming the job can successfully run; otherwise it will throw exception + client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS) + return backfill_time = BackfillTime(start=datetime( 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) @@ -51,4 +84,6 @@ def test_feathr_online_store_agg_features(): assert res['239'][0] != None assert res['239'][1] != None assert res['265'][0] != None - assert res['265'][1] != None \ No newline at end of file + assert res['265'][1] != None + +test_feathr_online_store_agg_features() \ No newline at end of file diff --git a/feathr_project/test/test_feature_materialization.py b/feathr_project/test/test_feature_materialization.py index 62b84d367..d6914077a 100644 --- a/feathr_project/test/test_feature_materialization.py +++ b/feathr_project/test/test_feature_materialization.py @@ -231,7 +231,7 @@ def test_delete_feature_from_redis(): "f_day_of_week" ], backfill_time=backfill_time) - client.materialize_features(settings) + client.materialize_features(settings, allow_materialize_non_agg_feature=True) client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS) diff --git a/feathr_project/test/test_pyduf_preprocessing_e2e.py b/feathr_project/test/test_pyduf_preprocessing_e2e.py index 9ac9c1917..83ace12ea 100644 --- a/feathr_project/test/test_pyduf_preprocessing_e2e.py +++ b/feathr_project/test/test_pyduf_preprocessing_e2e.py @@ -103,7 +103,7 @@ def test_non_swa_feature_gen_with_offline_preprocessing(): "f_day_of_week" ], backfill_time=backfill_time) - client.materialize_features(settings) + client.materialize_features(settings, allow_materialize_non_agg_feature=True) # just assume the job is successful without validating the actual result in Redis. Might need to consolidate # this part with the test_feathr_online_store test case client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS) From 20a81f30d0f2f8bc07bfe9de28d3f2d1cf11e04b Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Thu, 29 Sep 2022 20:14:05 +0800 Subject: [PATCH 2/3] Typo --- feathr_project/feathr/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index 2a79be53b..cfb02f29c 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -630,7 +630,7 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf for fn in feature_list: # Check over anchor features for anchor in self.anchor_list: - for feature in anchor: + for feature in anchor.features: if feature.name == fn and not isinstance(feature.transform, WindowAggTransformation): raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features.") # Check over derived features From 2d229789c10eb35c66dff6afc5dc0ce8ec03a771 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Sat, 8 Oct 2022 15:53:23 +0800 Subject: [PATCH 3/3] Resolve comments --- feathr_project/feathr/client.py | 6 +++--- feathr_project/test/test_azure_spark_maven_e2e.py | 4 +--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index cfb02f29c..54d2ff39e 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -619,7 +619,7 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf Args: settings: Feature materialization settings execution_configurations: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. - allow_materialize_non_agg_feature: Normally materializing non-aggregated features doesn't output meaningful results so it's forbidden, but if you really want to do this, set this to True. + allow_materialize_non_agg_feature: Materializing non-aggregated features (the features without WindowAggTransformation) doesn't output meaningful results so it's by default set to False, but if you really want to materialize non-aggregated features, set this to True. """ feature_list = settings.feature_names if len(feature_list) > 0 and not self._valid_materialize_keys(feature_list): @@ -632,11 +632,11 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf for anchor in self.anchor_list: for feature in anchor.features: if feature.name == fn and not isinstance(feature.transform, WindowAggTransformation): - raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features.") + raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features. If you want to materialize {fn}, please set allow_materialize_non_agg_feature to True.") # Check over derived features for feature in self.derived_feature_list: if feature.name == fn and not isinstance(feature.transform, WindowAggTransformation): - raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features.") + raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features. If you want to materialize {fn}, please set allow_materialize_non_agg_feature to True.") # Collect secrets from sinks secrets = [] diff --git a/feathr_project/test/test_azure_spark_maven_e2e.py b/feathr_project/test/test_azure_spark_maven_e2e.py index bfdf6d020..6b93bb7a8 100644 --- a/feathr_project/test/test_azure_spark_maven_e2e.py +++ b/feathr_project/test/test_azure_spark_maven_e2e.py @@ -24,7 +24,7 @@ def test_feathr_online_store_agg_features(): # The `feathr_runtime_location` was commented out in this config file, so feathr should use # Maven package as the dependency and `noop.jar` as the main file - client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config_xch.yaml")) + client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config_maven.yaml")) @@ -85,5 +85,3 @@ def test_feathr_online_store_agg_features(): assert res['239'][1] != None assert res['265'][0] != None assert res['265'][1] != None - -test_feathr_online_store_agg_features() \ No newline at end of file