Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 1 addition & 20 deletions feathr_project/feathr/definition/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,14 @@ class HdfsSource(Source):
- `epoch` (seconds since epoch), for example `1647737463`
- `epoch_millis` (milliseconds since epoch), for example `1647737517761`
- Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).

registry_tags: A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {"deprecated": "true"} to indicate this source is deprecated, etc.
time_partition_pattern(Optional[str]): Format of the time partitioned feature data. e.g. yyyy/MM/DD. All formats supported in dateTimeFormatter.
config:
timeSnapshotHdfsSource:
{
location:
{
path: "/data/somePath/daily"
}
timePartitionPattern: "yyyy/MM/dd"
}
Given the above HDFS path: /data/somePath/daily,
then the expectation is that the following sub directorie(s) should exist:
/data/somePath/daily/{yyyy}/{MM}/{dd}

"""

def __init__(self, name: str, path: str, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = "epoch", registry_tags: Optional[Dict[str, str]] = None, time_partition_pattern: Optional[str] = None) -> None:
def __init__(self, name: str, path: str, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = "epoch", registry_tags: Optional[Dict[str, str]] = None) -> None:
super().__init__(name, event_timestamp_column,
timestamp_format, registry_tags=registry_tags)
self.path = path
self.preprocessing = preprocessing
self.time_partition_pattern = time_partition_pattern
if path.startswith("http"):
logger.warning(
"Your input path {} starts with http, which is not supported. Consider using paths starting with wasb[s]/abfs[s]/s3.", path)
Expand All @@ -132,9 +116,6 @@ def to_feature_config(self) -> str:
tm = Template("""
{{source.name}}: {
location: {path: "{{source.path}}"}
{% if source.time_partition_pattern %}
timePartitionPattern: "{{source.time_partition_pattern}}"
{% endif %}
{% if source.event_timestamp_column %}
timeWindowParameters: {
timestampColumn: "{{source.event_timestamp_column}}"
Expand Down
2 changes: 0 additions & 2 deletions feathr_project/feathr/spark_provider/_synapse_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ def download_file(self, target_adls_directory: str, local_dir_cache: str):

# returns the paths to all the files in the target director in ADLS
# get all the paths that are not under a directory
test_paths = self.file_system_client.get_paths(
path=parse_result.path, recursive=False)
result_paths = [basename(file_path.name) for file_path in self.file_system_client.get_paths(
path=parse_result.path, recursive=False) if not file_path.is_directory]

Expand Down
36 changes: 1 addition & 35 deletions feathr_project/test/test_azure_spark_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from feathr import ValueType
from feathr.utils.job_utils import get_result_df
from feathrcli.cli import init
from test_fixture import (basic_test_setup, get_online_test_table_name, time_partition_pattern_test_setup)
from test_fixture import (basic_test_setup, get_online_test_table_name)
from test_utils.constants import Constants

# make sure you have run the upload feature script before running these tests
Expand Down Expand Up @@ -58,40 +58,6 @@ def test_feathr_materialize_to_offline():
res_df = get_result_df(client, "avro", output_path + "/df0/daily/2020/05/20")
assert res_df.shape[0] > 0

def test_feathr_materialize_with_time_partition_pattern():
"""
Test FeathrClient() using HdfsSource with 'timePartitionPattern'.
"""
test_workspace_dir = Path(
__file__).parent.resolve() / "test_user_workspace"
# os.chdir(test_workspace_dir)

client: FeathrClient = time_partition_pattern_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))

backfill_time = BackfillTime(start=datetime(
2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))

now = datetime.now()
if client.spark_runtime == 'databricks':
output_path = ''.join(['dbfs:/feathrazure_cijob_materialize_offline_','_', str(now.minute), '_', str(now.second), ""])
else:
output_path = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/feathrazure_cijob_materialize_offline_','_', str(now.minute), '_', str(now.second), ""])
offline_sink = HdfsSink(output_path=output_path)
settings = MaterializationSettings("nycTaxiTable",
sinks=[offline_sink],
feature_names=[
"f_location_avg_fare", "f_location_max_fare"],
backfill_time=backfill_time)
client.materialize_features(settings)
# assuming the job can successfully run; otherwise it will throw exception
client.wait_job_to_finish(timeout_sec=Constants.SPARK_JOB_TIMEOUT_SECONDS)

# download result and just assert the returned result is not empty
# by default, it will write to a folder appended with date
res_df = get_result_df(client, "avro", output_path + "/df0/daily/2020/05/20")
assert res_df.shape[0] > 0


def test_feathr_online_store_agg_features():
"""
Test FeathrClient() get_online_features and batch_get can get data correctly.
Expand Down
58 changes: 1 addition & 57 deletions feathr_project/test/test_feature_anchor.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,60 +167,4 @@ def test_agg_anchor_to_config():
}
}
"""
assert ''.join(agg_anchor.to_feature_config().split()) == ''.join(expected_agg_feature_config.split())

def test_time_partition_to_config():
batch_source = HdfsSource(name="testTimePartitionSource",
path="abfss://public@azurefeathrstorage.blob.core.windows.net/sample_data/time_partition_pattern",
time_partition_pattern="yyyy/MM/dd"
)
key = TypedKey(key_column="key0",
key_column_type=ValueType.INT32)
agg_features = [
Feature(name="f_loc_avg",
key=[key],
feature_type=FLOAT,
transform="f_location_avg_fare"),
Feature(name="f_loc_max",
feature_type=FLOAT,
key=[key],
transform="f_location_max_fare"),
]
agg_anchor = FeatureAnchor(name="testTimePartitionFeaturesSource",
source=batch_source,
features=agg_features)
expected_time_partition_config = """
anchors: {
testTimePartitionFeatures: {
source: testTimePartitionSource
key.sqlExpr: [key0]
features: {
f_loc_avg: {
def.sqlExpr: "f_location_avg_fare"
type: {
type: TENSOR
tensorCategory: DENSE
dimensionType: []
valType: FLOAT
}
}
f_loc_max: {
def.sqlExpr: "f_location_max_fare"
type: {
type: TENSOR
tensorCategory: DENSE
dimensionType: []
valType: FLOAT
}
}
}
}
}
sources: {
testTimePartitionSource: {
location: {path: "abfss://public@azurefeathrstorage.blob.core.windows.net/sample_data/time_partition_pattern"}
timePartitionPattern: "yyyy/MM/dd"
}
}
"""
assert ''.join(agg_anchor.to_feature_config().split()) == ''.join(expected_time_partition_config.split())
assert ''.join(agg_anchor.to_feature_config().split()) == ''.join(expected_agg_feature_config.split())
28 changes: 0 additions & 28 deletions feathr_project/test/test_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,34 +88,6 @@ def basic_test_setup(config_path: str):

return client

def time_partition_pattern_test_setup(config_path: str):
now = datetime.now()
# set workspace folder by time; make sure we don't have write conflict if there are many CI tests running
os.environ['SPARK_CONFIG__DATABRICKS__WORK_DIR'] = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])
os.environ['SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR'] = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/feathr_github_ci','_', str(now.minute), '_', str(now.second) ,'_', str(now.microsecond)])
client = FeathrClient(config_path=config_path)
batch_source = HdfsSource(name="testTimePartitionSource",
path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/time_partition_pattern/daily",
time_partition_pattern="yyyy/MM/dd"
)
key = TypedKey(key_column="key0",
key_column_type=ValueType.INT32)
agg_features = [
Feature(name="f_loc_avg",
key=[key],
feature_type=FLOAT,
transform="f_location_avg_fare"),
Feature(name="f_loc_max",
feature_type=FLOAT,
key=[key],
transform="f_location_max_fare"),
]

agg_anchor = FeatureAnchor(name="testTimePartitionFeatures",
source=batch_source,
features=agg_features)
client.build_features(anchor_list=[agg_anchor])
return client

def snowflake_test_setup(config_path: str):
now = datetime.now()
Expand Down