Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Dict, List, Union

from azure.identity import DefaultAzureCredential
from feathr.definition.transformation import WindowAggTransformation
from jinja2 import Template
from pyhocon import ConfigFactory
import redis
Expand Down Expand Up @@ -608,17 +609,31 @@ def _valid_materialize_keys(self, features: List[str], allow_empty_key=False):
self.logger.error(f"Inconsistent feature keys. Current keys are {str(keys)}")
return False
return True

def materialize_features(self, settings: MaterializationSettings, execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False):
def materialize_features(self, settings: MaterializationSettings, execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, allow_materialize_non_agg_feature: bool = False):
"""Materialize feature data

Args:
settings: Feature materialization settings
execution_configurations: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations.
allow_materialize_non_agg_feature: Materializing non-aggregated features (the features without WindowAggTransformation) doesn't output meaningful results so it's by default set to False, but if you really want to materialize non-aggregated features, set this to True.
"""
feature_list = settings.feature_names
if len(feature_list) > 0 and not self._valid_materialize_keys(feature_list):
raise RuntimeError(f"Invalid materialization features: {feature_list}, since they have different keys. Currently Feathr only supports materializing features of the same keys.")

if not allow_materialize_non_agg_feature:
# Check if there are non-aggregation features in the list
for fn in feature_list:
# Check over anchor features
for anchor in self.anchor_list:
for feature in anchor.features:
if feature.name == fn and not isinstance(feature.transform, WindowAggTransformation):
raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features. If you want to materialize {fn}, please set allow_materialize_non_agg_feature to True.")
# Check over derived features
for feature in self.derived_feature_list:
if feature.name == fn and not isinstance(feature.transform, WindowAggTransformation):
raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features. If you want to materialize {fn}, please set allow_materialize_non_agg_feature to True.")

# Collect secrets from sinks
secrets = []
Expand Down
2 changes: 1 addition & 1 deletion feathr_project/test/test_azure_snowflake_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_feathr_online_store_agg_features():
feature_names=['f_snowflake_call_center_division_name',
'f_snowflake_call_center_zipcode'],
backfill_time=backfill_time)
client.materialize_features(settings)
client.materialize_features(settings, allow_materialize_non_agg_feature=True)
# just assume the job is successful without validating the actual result in Redis. Might need to consolidate
# this part with the test_feathr_online_store test case
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)
Expand Down
2 changes: 1 addition & 1 deletion feathr_project/test/test_azure_spark_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def test_feathr_online_store_non_agg_features():
feature_names=["f_gen_trip_distance", "f_gen_is_long_trip_distance", "f1", "f2", "f3", "f4", "f5", "f6"],
backfill_time=backfill_time)

client.materialize_features(settings)
client.materialize_features(settings, allow_materialize_non_agg_feature=True)
# just assume the job is successful without validating the actual result in Redis. Might need to consolidate
# this part with the test_feathr_online_store test case
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)
Expand Down
37 changes: 35 additions & 2 deletions feathr_project/test/test_azure_spark_maven_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
from pathlib import Path

from feathr import (BackfillTime, MaterializationSettings)
from feathr import RedisSink
# from feathr import *
from feathr.client import FeathrClient
from feathr.definition.dtype import ValueType
from feathr.definition.query_feature_list import FeatureQuery
from feathr.definition.settings import ObservationSettings
from feathr.definition.typed_key import TypedKey
from test_fixture import (basic_test_setup, get_online_test_table_name)
from test_utils.constants import Constants

Expand All @@ -22,6 +26,35 @@ def test_feathr_online_store_agg_features():
# Maven package as the dependency and `noop.jar` as the main file
client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config_maven.yaml"))



location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")

feature_query = FeatureQuery(
feature_list=["f_location_avg_fare"], key=location_id)
settings = ObservationSettings(
observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")

now = datetime.now()
# set output folder based on different runtime
if client.spark_runtime == 'databricks':
output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".avro"])
else:
output_path = ''.join(['abfss://xchfeathrtest4fs@xchfeathrtest4sto.dfs.core.windows.net/demo_data/output','_', str(now.minute), '_', str(now.second), ".avro"])


client.get_offline_features(observation_settings=settings,
feature_query=feature_query,
output_path=output_path)

# assuming the job can successfully run; otherwise it will throw exception
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)
return
backfill_time = BackfillTime(start=datetime(
2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))
redisSink = RedisSink(table_name=online_test_table)
Expand Down Expand Up @@ -51,4 +84,4 @@ def test_feathr_online_store_agg_features():
assert res['239'][0] != None
assert res['239'][1] != None
assert res['265'][0] != None
assert res['265'][1] != None
assert res['265'][1] != None
2 changes: 1 addition & 1 deletion feathr_project/test/test_feature_materialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def test_delete_feature_from_redis():
"f_day_of_week"
],
backfill_time=backfill_time)
client.materialize_features(settings)
client.materialize_features(settings, allow_materialize_non_agg_feature=True)

client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

Expand Down
2 changes: 1 addition & 1 deletion feathr_project/test/test_pyduf_preprocessing_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_non_swa_feature_gen_with_offline_preprocessing():
"f_day_of_week"
],
backfill_time=backfill_time)
client.materialize_features(settings)
client.materialize_features(settings, allow_materialize_non_agg_feature=True)
# just assume the job is successful without validating the actual result in Redis. Might need to consolidate
# this part with the test_feathr_online_store test case
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)
Expand Down