From 57574eddbd292c946d1a0b895845e61fd714aec8 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Mon, 27 Jun 2022 21:10:17 -0700 Subject: [PATCH 01/26] Update README.md --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4c673a11c..98aee2d2f 100644 --- a/README.md +++ b/README.md @@ -19,10 +19,12 @@ Feathr automatically computes your feature values and joins them to your trainin ## 🌟 Feathr Highlights -- **Scalable with built-in optimizations.** For example, based on some internal use case, Feathr can process billions of rows and PB scale data with built-in optimizations such as bloom filters and salted joins. +- **Battle tested in production for more than 6 years:** LinkedIn has been using Feathr in production for over 6 years and have a dedicated team improving it. +- **Scalable with built-in optimizations:** For example, based on some internal use case, Feathr can process billions of rows and PB scale data with built-in optimizations such as bloom filters and salted joins. - **Rich support for point-in-time joins and aggregations:** Feathr has high performant built-in operators designed for Feature Store, including time-based aggregation, sliding window joins, look-up features, all with point-in-time correctness. - **Highly customizable user-defined functions (UDFs)** with native PySpark and Spark SQL support to lower the learning curve for data scientists. - **Pythonic APIs** to access everything with low learning curve; Integrated with model building so data scientists can be productive from day one. +- **Derived Features** which is a unique capability across all the feature store solutions. This encourage feature consumers to build features on existing features and encouraging feature reuse. - **Rich type system** including support for embeddings for advanced machine learning/deep learning scenarios. One of the common use cases is to build embeddings for customer profiles, and those embeddings can be reused across an organization in all the machine learning applications. - **Native cloud integration** with simplified and scalable architecture, which is illustrated in the next section. - **Feature sharing and reuse made easy:** Feathr has built-in feature registry so that features can be easily shared across different teams and boost team productivity. From 91533d33b644a160409b80e93f6645ced0e1feb6 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Fri, 15 Jul 2022 10:03:46 -0700 Subject: [PATCH 02/26] update docs per feedback --- .../concepts/feathr-concepts-for-beginners.md | 4 +++ docs/quickstart_databricks.md | 25 ++++++++++++++++++- .../feathr/definition/transformation.py | 2 +- feathr_project/test/test_fixture.py | 4 ++- 4 files changed, 32 insertions(+), 3 deletions(-) diff --git a/docs/concepts/feathr-concepts-for-beginners.md b/docs/concepts/feathr-concepts-for-beginners.md index 825fb3e6d..d50e0305a 100644 --- a/docs/concepts/feathr-concepts-for-beginners.md +++ b/docs/concepts/feathr-concepts-for-beginners.md @@ -51,6 +51,10 @@ request_anchor = FeatureAnchor(name="request_features", features=features) ``` +### A bit more on `Observation Data` + +The "Observation Data" is a concept that is a bit confusing for some beginners, and simply think it as an immutable dataset, but this dataset could be enhanced by other dataset. For example, you usually cannot drop a column for your "observation data", but you can add additional columns to it. + ## Motivation on `Derived Feature` That sounds all good, but what if we want to share a feature, and others want to build additional features on top of that feature? Thats's why there is a concept in Feathr called `derived feature`, which allows you to calculate features based on other features, with certain transformation support. diff --git a/docs/quickstart_databricks.md b/docs/quickstart_databricks.md index fee93738a..d559b97dd 100644 --- a/docs/quickstart_databricks.md +++ b/docs/quickstart_databricks.md @@ -11,9 +11,32 @@ For Databricks, you can simply upload [this notebook](./samples/databricks/datab ![Import Notebooks](./images/databricks_quickstart1.png) - 2. Paste the [link to Databricks getting started notebook](./samples/databricks/databricks_quickstart_nyc_taxi_driver.ipynb): ![Import Notebooks](./images/databricks_quickstart2.png) 3. Run the whole notebook. It will automatically install Feathr in your cluster and run the feature ingestion jobs. + +# Authoring Feathr jobs in local environment and submit to remote Databricks cluster + +Not everyone wants to use databricks notebook as the main development environment, and the above part is more for quick start purpose. For a more serious development, we usually recommend using Visual Studio Code, where [it has native support for Python and Jupyter Notebooks](https://code.visualstudio.com/docs/datascience/jupyter-notebooks) with many great features such as syntax highlight and IntelliSense. + +In [this notebook](./samples/databricks/databricks_quickstart_nyc_taxi_driver.ipynb), there are a few lines of code like this: + +```python +# Get current databricks notebook context +ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext() +host_name = ctx.tags().get("browserHostName").get() +host_token = ctx.apiToken().get() +cluster_id = ctx.tags().get("clusterId").get() +``` + +This is the only part you need to change to author the Feathr job in local environment and submit to a remote Databricks cluster. When running those code in Databricks, Feathr will automatically read the current cluster's host name and authentication token using the above code, but this is not true if authoring the job locally. In that case, you will need to change the above lines to below: + +```python +# Authoring Feathr jobs in local environment and submit to remote Databricks cluster +host_name = 'https://adb-6885802458123232.12.azuredatabricks.net/' +host_token = 'dapi11111111111111111111' +``` + +And that's it! Feathr will automatically submit the job to the cluster you specified. diff --git a/feathr_project/feathr/definition/transformation.py b/feathr_project/feathr/definition/transformation.py index ca4dfdcf1..762cd4927 100644 --- a/feathr_project/feathr/definition/transformation.py +++ b/feathr_project/feathr/definition/transformation.py @@ -43,7 +43,7 @@ class WindowAggTransformation(Transformation): agg_func: aggregation function. Available values: `SUM`, `COUNT`, `MAX`, `MIN`, `AVG`, `MAX_POOLING`, `MIN_POOLING`, `AVG_POOLING`, `LATEST` window: Time window length to apply the aggregation. support 4 type of units: d(day), h(hour), m(minute), s(second). The example value are "7d' or "5h" or "3m" or "1s" group_by: Feathr expressions applied after the `agg_expr` transformation as groupby field, before aggregation, same as 'group by' in SQL - filter: Feathr expression applied to each row as a filter before aggregation + filter: Feathr expression applied to each row as a filter before aggregation. This should be a string and should be a valid Spark SQL Expression. For example: filter = 'age > 3'. This is similar to PySpark filter operation and more details can be learned here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html """ def __init__(self, agg_expr: str, agg_func: str, window: str, group_by: Optional[str] = None, filter: Optional[str] = None, limit: Optional[int] = None) -> None: super().__init__() diff --git a/feathr_project/test/test_fixture.py b/feathr_project/test/test_fixture.py index f5c30ed31..a0a616a05 100644 --- a/feathr_project/test/test_fixture.py +++ b/feathr_project/test/test_fixture.py @@ -68,7 +68,9 @@ def basic_test_setup(config_path: str): feature_type=FLOAT, transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", agg_func="AVG", - window="90d")), + window="90d", + filter="fare_amount > 0", + )), Feature(name="f_location_max_fare", key=location_id, feature_type=FLOAT, From e1c16c669d4e554ddcdb7024b6aabea790c1f7c6 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Fri, 15 Jul 2022 10:27:34 -0700 Subject: [PATCH 03/26] Update feathr-concepts-for-beginners.md --- docs/concepts/feathr-concepts-for-beginners.md | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/concepts/feathr-concepts-for-beginners.md b/docs/concepts/feathr-concepts-for-beginners.md index d50e0305a..a8330b769 100644 --- a/docs/concepts/feathr-concepts-for-beginners.md +++ b/docs/concepts/feathr-concepts-for-beginners.md @@ -51,10 +51,6 @@ request_anchor = FeatureAnchor(name="request_features", features=features) ``` -### A bit more on `Observation Data` - -The "Observation Data" is a concept that is a bit confusing for some beginners, and simply think it as an immutable dataset, but this dataset could be enhanced by other dataset. For example, you usually cannot drop a column for your "observation data", but you can add additional columns to it. - ## Motivation on `Derived Feature` That sounds all good, but what if we want to share a feature, and others want to build additional features on top of that feature? Thats's why there is a concept in Feathr called `derived feature`, which allows you to calculate features based on other features, with certain transformation support. @@ -132,7 +128,16 @@ client.get_online_features(feature_table = "agg_features", An illustration of the concepts and process that we talked about is like this: ![Feature Join Process](../images/observation_data.jpg) -## Point in time joins and aggregations +## Miscellaneous topics + +### A bit more on `Observation Data` + +The "Observation Data" is a concept that is a bit confusing for some beginners, and simply think it as an immutable dataset, but this dataset could be enhanced by other dataset. For example, you usually cannot drop a column for your "observation data", but you can add additional columns to it. + +### What's the relationship between `Source` and `Anchor`? +Usually an Anchor can only have one source, but one source can be consumed by different anchors. + +### Point in time joins and aggregations - why we need them? Assuming users are already familiar with the "regular" joins, for example inner join or outer join, and in many of the use cases, we care about time. From 1fa08a77113b00796ea805bb43f5b2c34cbefe0e Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Fri, 15 Jul 2022 10:28:58 -0700 Subject: [PATCH 04/26] Update feathr-concepts-for-beginners.md --- docs/concepts/feathr-concepts-for-beginners.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/concepts/feathr-concepts-for-beginners.md b/docs/concepts/feathr-concepts-for-beginners.md index a8330b769..797a8e469 100644 --- a/docs/concepts/feathr-concepts-for-beginners.md +++ b/docs/concepts/feathr-concepts-for-beginners.md @@ -135,7 +135,7 @@ An illustration of the concepts and process that we talked about is like this: The "Observation Data" is a concept that is a bit confusing for some beginners, and simply think it as an immutable dataset, but this dataset could be enhanced by other dataset. For example, you usually cannot drop a column for your "observation data", but you can add additional columns to it. ### What's the relationship between `Source` and `Anchor`? -Usually an Anchor can only have one source, but one source can be consumed by different anchors. +Usually an Anchor can only have one source, but one source can be consumed by different anchors. From `Source` to `Anchor`, there might be an intermediate step, which is the "preprocessing" function and allows you to customize the input a bit. ### Point in time joins and aggregations - why we need them? From e1b0aecb5009045ba10158fb756d9acd7b2dadf6 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 19 Jul 2022 06:27:33 -0700 Subject: [PATCH 05/26] update materialization setting doc --- docs/concepts/feature-join.md | 94 ------------------- docs/concepts/get-offline-features.md | 85 +++++++++++++++++ ...eneration.md => materializing-features.md} | 6 +- 3 files changed, 88 insertions(+), 97 deletions(-) delete mode 100644 docs/concepts/feature-join.md create mode 100644 docs/concepts/get-offline-features.md rename docs/concepts/{feature-generation.md => materializing-features.md} (94%) diff --git a/docs/concepts/feature-join.md b/docs/concepts/feature-join.md deleted file mode 100644 index c1bb63d07..000000000 --- a/docs/concepts/feature-join.md +++ /dev/null @@ -1,94 +0,0 @@ ---- -layout: default -title: Getting Historical Features -parent: Feathr Concepts ---- - -# Feature Join - -## Intuitions of Frame Join - -Observation dataset has 2 records as below, and we want to use it as the 'spine' dataset, joining two -features onto it: - -1. Feature 'page_view_count' from dataset 'page_view_data' - -2. Feature 'like_count' from dataset 'like_count_data' - -2) Feature `like_count` from dataset `like_count_data` - -| id | observe_time | Label | -| --- | ------------ | ----- | -| 1 | 2022-01-01 | Yes | -| 1 | 2022-01-02 | Yes | -| 2 | 2022-01-02 | No | - -Dataset `page_view_data` contains `page_view_count` of each user at a given time: - -| UserId | log_time | page_view_count | -| ------ | ---------- | --------------- | -| 1 | 2022-01-01 | 101 | -| 1 | 2022-01-02 | 102 | -| 1 | 2022-01-03 | 103 | -| 2 | 2022-01-02 | 200 | -| 3 | 2022-01-02 | 300 | - -Dataset 'like_count_data' contains "like_count" of each user at a given time: - -| UserId | updated_time | 'like_count' | -| ------ | ------------ | ------------ | -| 1 | 2022-01-01 | 11 | -| 1 | 2022-01-02 | 12 | -| 1 | 2022-01-03 | 13 | -| 2 | 2022-01-02 | 20 | -| 3 | 2022-01-02 | 30 | - -The expected joined output, a.k.a. training dataset would be assuming feature: - -| id | observe_time | Label | f_page_view_count | f_like_count | -| --- | ------------ | ----- | ----------------- | ------------ | -| 1 | 2022-01-01 | Yes | 101 | 11 | -| 1 | 2022-01-02 | Yes | 102 | 12 | -| 2 | 2022-01-02 | No | 200 | 20 | - -Note: In the above example, feature `f_page_view_count` and `f_like_count` are defined as simply a reference of field `page_view_count` and `like_count` respectively. Timestamp in these 3 datasets are considered automatically. - -## Feature join config - -An example is like below: - -```python -feature_query = FeatureQuery( - feature_list=["f_location_avg_fare"], key=location_id) -settings = ObservationSettings( - observation_path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/green_tripdata_2020-04.csv", - event_timestamp_column="lpep_dropoff_datetime", - timestamp_format="yyyy-MM-dd HH:mm:ss") -client.get_offline_features(observation_settings=settings, - feature_query=feature_query, - output_path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/output.avro") - -``` - -([ObservationSettings API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.ObservationSettings), -[client.get_offline_feature API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.FeathrClient.get_offline_features)) - -After you have defined the features (as described in the [Feature Definition](feature-definition.md)) part, you can define how you want to join them. - -### Observation data - -The path of a dataset as the 'spine' for the to-be-created training dataset. We call this input 'spine' dataset the 'observation' dataset. Typically, each row of the observation data contains: - -1. Column(s) representing entity id(s), which will be used as the join key to look up(join) feature value. - -2. A column representing the event time of the row. By default, Feathr will make sure the feature values joined have a timestamp earlier than it, ensuring no data leakage in the resulting training dataset. - -3. Other columns will be simply pass through onto the output training dataset. - The key fields from the observation data, which are used to joined with the feature data. - List of feature names to be joined with the observation data. They must be pre-defined in the Python APIs. - -The time information of the observation data used to compare with the feature's timestamp during the join. - -## FeatureQuery - -After you have defined all the features, you probably don't want to use all of them in this particular program. In this case, instead of putting every features in this `FeatureQuery` part, you can just put a selected list of features. Note that they have to be of the same key. diff --git a/docs/concepts/get-offline-features.md b/docs/concepts/get-offline-features.md new file mode 100644 index 000000000..d1fd689d7 --- /dev/null +++ b/docs/concepts/get-offline-features.md @@ -0,0 +1,85 @@ +--- +layout: default +title: Getting Offline Features using Feature Query +parent: Feathr Concepts +--- + +# Getting Offline Features using Feature Query + +## Intuitions + +After the feature producers have defined the features (as described in the [Feature Definition](./feature-definition.md) part), the feature consumers may want to consume those features. + +For example, the dataset is like below, where there are 3 tables that feature producers want to extract features from: `user_profile_mock_data`, `user_purchase_history_mock_data`, and `product_detail_mock_data`. + +For feature consumers, they will usually use a central dataset ("observation data", `user_observation_mock_data` in this case) which contains a couple of IDs (`user_id` and `product_id` in this case), timestamps, and other columns. Feature consumers will use this "observation data" to query from different feature tables (using `Feature Query` below). + +![Feature Flow](https://github.com/linkedin/feathr/blob/main/docs/images/product_recommendation_advanced.jpg?raw=true) + +As we can see, the use case for getting offline features using Feathr is straightforward. Feature consumers want to get a few features - for a particular user, what's the gift card balance? What's the total purchase in the last 90 days; Feature consumers can also get a few features for other entities in the same `Feature Query`. For example, in the meanwhile, feature consumers can also query the product feature such as product quantity and price. + +In this case, Feathr users can simply specify the feature name that they want to query, and specify for which entity/key that they want to query on, like below. Note that for feature consumers, they don't have to query all the features; instead they can just query a subset of the features that the feature producers have defined. + +```python +user_feature_query = FeatureQuery( + feature_list=["feature_user_age", + "feature_user_tax_rate", + "feature_user_gift_card_balance", + "feature_user_has_valid_credit_card", + "feature_user_total_purchase_in_90days", + "feature_user_purchasing_power" + ], + key=user_id) + +product_feature_query = FeatureQuery( + feature_list=[ + "feature_product_quantity", + "feature_product_price" + ], + key=product_id) +``` + +And specify the location for the observation data: + +```python +settings = ObservationSettings( + observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/product_recommendation_sample/user_observation_mock_data.csv", + event_timestamp_column="event_timestamp", + timestamp_format="yyyy-MM-dd") +``` + +And finally, specify the feature query and finally trigger the computation: + +```python +client.get_offline_features(observation_settings=settings, + feature_query=[user_feature_query, product_feature_query], + output_path=output_path) + +``` + +More details for the above APIs can be read from: + +- [ObservationSettings API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.ObservationSettings) +- [client.get_offline_feature API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.FeathrClient.get_offline_features) + +## More on `Observation data` + +The path of a dataset as the 'spine' for the to-be-created training dataset. We call this input 'spine' dataset the 'observation' dataset. Typically, each row of the observation data contains: + +1. **Entity ID Column:** Column(s) representing entity id(s), which will be used as the join key to query feature value. + +2. **Timestamp Column:** A column representing the event time of the row. By default, Feathr will make sure the feature values queried have a timestamp earlier than the timestamp in observation data, ensuring no data leakage in the resulting training dataset. This tim + +3. **Other columns** will be simply pass through to the output training dataset, which can be treated as immutable columns. + +## More on `Feature Query`ß + +After you have defined all the features, you probably don't want to use all of them in this particular program. In this case, instead of putting every features in this `FeatureQuery` part, you can just put a selected list of features. Note that they have to be of the same key. + +## Difference between materializing features and get offline features + +It is sometimes confusing between "getting offline features" in this document and the "[getting materialized features](./materializing-features.md)" part, given they both seem to "get features and put it somewhere". However there are some differences and you should know when to use which: + +1. For `get_offline_features` API, feature consumers usually need to have a central `observation data` so they can use `Feature Query` to query different features for different entities from different tables. For `materialize_features` API, feature consumers don't have the `observation data`, because they don't need to query from existing feature definitions. In this case, feature consumers only need to specify for a specific entity (say `user_id`), which features they want to materialize to offline or online store. Note that for a feature table in the materialization settings, feature consumers can only materialize features for the same key for the same table. + +2. For \ No newline at end of file diff --git a/docs/concepts/feature-generation.md b/docs/concepts/materializing-features.md similarity index 94% rename from docs/concepts/feature-generation.md rename to docs/concepts/materializing-features.md index b163a235e..596d6cbdd 100644 --- a/docs/concepts/feature-generation.md +++ b/docs/concepts/materializing-features.md @@ -6,11 +6,11 @@ parent: Feathr Concepts # Feature Generation and Materialization -Feature generation (also known as feature materialization) is the process to create features from raw source data into a certain persisted storage in either offline store (for further reuse), or online store (for online inference). +Feature materialization (also known as feature generation) is the process to create features for a certain entity from raw source data into a certain persisted storage in either offline store (for further reuse), or online store (for online inference). -User can utilize feature generation to pre-compute and materialize pre-defined features to online and/or offline storage. This is desirable when the feature transformation is computation intensive or when the features can be reused (usually in offline setting). Feature generation is also useful in generating embedding features, where those embeddings distill information from large data and is usually more compact. +User can utilize feature generation to pre-compute and materialize pre-defined features to online and/or offline storage. This is desirable when the feature transformation is computation intensive or when the features can be reused (usually in offline setting). Feature generation is also useful in generating embedding features, where those embeddings distill information from large data and is usually more compact. Also, please note that you can only materialize features for a specific entity/key in the same `materialize_features` call. -## Generating Features to Online Store +## Materializing Features to Online Store When the models are served in an online environment, we also need to serve the corresponding features in the same online environment as well. Feathr provides APIs to generate features to online storage for future consumption. For example: From 1f26445fb037abf1b2fadc2536967726ca085040 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 19 Jul 2022 06:47:30 -0700 Subject: [PATCH 06/26] Update get-offline-features.md --- docs/concepts/get-offline-features.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/concepts/get-offline-features.md b/docs/concepts/get-offline-features.md index d1fd689d7..0393a220f 100644 --- a/docs/concepts/get-offline-features.md +++ b/docs/concepts/get-offline-features.md @@ -68,11 +68,11 @@ The path of a dataset as the 'spine' for the to-be-created training dataset. We 1. **Entity ID Column:** Column(s) representing entity id(s), which will be used as the join key to query feature value. -2. **Timestamp Column:** A column representing the event time of the row. By default, Feathr will make sure the feature values queried have a timestamp earlier than the timestamp in observation data, ensuring no data leakage in the resulting training dataset. This tim +2. **Timestamp Column:** A column representing the event time of the row. By default, Feathr will make sure the feature values queried have a timestamp earlier than the timestamp in observation data, ensuring no data leakage in the resulting training dataset. Refer to [Point in time Joins](./point-in-time-join.md) for more details. 3. **Other columns** will be simply pass through to the output training dataset, which can be treated as immutable columns. -## More on `Feature Query`ß +## More on `Feature Query` After you have defined all the features, you probably don't want to use all of them in this particular program. In this case, instead of putting every features in this `FeatureQuery` part, you can just put a selected list of features. Note that they have to be of the same key. @@ -82,4 +82,4 @@ It is sometimes confusing between "getting offline features" in this document an 1. For `get_offline_features` API, feature consumers usually need to have a central `observation data` so they can use `Feature Query` to query different features for different entities from different tables. For `materialize_features` API, feature consumers don't have the `observation data`, because they don't need to query from existing feature definitions. In this case, feature consumers only need to specify for a specific entity (say `user_id`), which features they want to materialize to offline or online store. Note that for a feature table in the materialization settings, feature consumers can only materialize features for the same key for the same table. -2. For \ No newline at end of file +2. For the timestamps, `get_offline_features` API, Feathr will make sure the feature values queried have a timestamp earlier than the timestamp in observation data, ensuring no data leakage in the resulting training dataset. For `materialize_features` API, Feathr will always materialize the latest feature available in the dataset. \ No newline at end of file From c752e016b64906feb6ae580e675cdc7a738449fe Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 19 Jul 2022 06:50:25 -0700 Subject: [PATCH 07/26] Update get-offline-features.md --- docs/concepts/get-offline-features.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/concepts/get-offline-features.md b/docs/concepts/get-offline-features.md index 0393a220f..f1a68924e 100644 --- a/docs/concepts/get-offline-features.md +++ b/docs/concepts/get-offline-features.md @@ -76,7 +76,7 @@ The path of a dataset as the 'spine' for the to-be-created training dataset. We After you have defined all the features, you probably don't want to use all of them in this particular program. In this case, instead of putting every features in this `FeatureQuery` part, you can just put a selected list of features. Note that they have to be of the same key. -## Difference between materializing features and get offline features +## Difference between `materialize_features` and `get_offline_features` API It is sometimes confusing between "getting offline features" in this document and the "[getting materialized features](./materializing-features.md)" part, given they both seem to "get features and put it somewhere". However there are some differences and you should know when to use which: From 132e1bdd65203c88c17de78e73cfa71f984a411c Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 19 Jul 2022 07:48:12 -0700 Subject: [PATCH 08/26] Update feathr-concepts-for-beginners.md --- docs/concepts/feathr-concepts-for-beginners.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/concepts/feathr-concepts-for-beginners.md b/docs/concepts/feathr-concepts-for-beginners.md index 797a8e469..4a85d89c7 100644 --- a/docs/concepts/feathr-concepts-for-beginners.md +++ b/docs/concepts/feathr-concepts-for-beginners.md @@ -126,7 +126,7 @@ client.get_online_features(feature_table = "agg_features", ## Illustration An illustration of the concepts and process that we talked about is like this: -![Feature Join Process](../images/observation_data.jpg) +![Observation Data and Feature Query Process](../images/observation_data.jpg) ## Miscellaneous topics From 85a1c4421c3dbe39acc115104c9765753e2ac940 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 19 Jul 2022 22:03:02 -0700 Subject: [PATCH 09/26] resolve comments --- docs/concepts/materializing-features.md | 6 +++--- feathr_project/feathr/definition/transformation.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/concepts/materializing-features.md b/docs/concepts/materializing-features.md index 596d6cbdd..20cfe0a97 100644 --- a/docs/concepts/materializing-features.md +++ b/docs/concepts/materializing-features.md @@ -1,10 +1,10 @@ --- layout: default -title: Feature Generation and Materialization +title: Feature Materialization (also known as feature generation) parent: Feathr Concepts --- -# Feature Generation and Materialization +# Feature Materialization (also known as feature generation) Feature materialization (also known as feature generation) is the process to create features for a certain entity from raw source data into a certain persisted storage in either offline store (for further reuse), or online store (for online inference). @@ -119,7 +119,7 @@ client.materialize_features(settings, execution_configurations={ "spark.feathr.o For reading those materialized features, Feathr has a convenient helper function called `get_result_df` to help you view the data. For example, you can use the sample code below to read from the materialized result in offline store: ```python - +from feathr import get_result_df path = "abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/materialize_offline_test_data/df0/daily/2020/05/20/" res = get_result_df(client=client, format="parquet", res_url=path) ``` diff --git a/feathr_project/feathr/definition/transformation.py b/feathr_project/feathr/definition/transformation.py index 762cd4927..1aa6864be 100644 --- a/feathr_project/feathr/definition/transformation.py +++ b/feathr_project/feathr/definition/transformation.py @@ -43,7 +43,7 @@ class WindowAggTransformation(Transformation): agg_func: aggregation function. Available values: `SUM`, `COUNT`, `MAX`, `MIN`, `AVG`, `MAX_POOLING`, `MIN_POOLING`, `AVG_POOLING`, `LATEST` window: Time window length to apply the aggregation. support 4 type of units: d(day), h(hour), m(minute), s(second). The example value are "7d' or "5h" or "3m" or "1s" group_by: Feathr expressions applied after the `agg_expr` transformation as groupby field, before aggregation, same as 'group by' in SQL - filter: Feathr expression applied to each row as a filter before aggregation. This should be a string and should be a valid Spark SQL Expression. For example: filter = 'age > 3'. This is similar to PySpark filter operation and more details can be learned here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html + filter: Feathr expression applied to each row as a filter before aggregation. This should be a string and a valid Spark SQL Expression. For example: filter = 'age > 3'. This is similar to PySpark filter operation and more details can be learned here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html """ def __init__(self, agg_expr: str, agg_func: str, window: str, group_by: Optional[str] = None, filter: Optional[str] = None, limit: Optional[int] = None) -> None: super().__init__() From 3f26272684be5c5b50332f06b1a98e78a4e0f832 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 19 Jul 2022 22:33:22 -0700 Subject: [PATCH 10/26] Update job_utils.py --- feathr_project/feathr/utils/job_utils.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index 32e0b8e5a..190138deb 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -5,6 +5,8 @@ from loguru import logger import pandas as pd import tempfile +from pandas.errors import EmptyDataError + def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, local_folder: str = None) -> pd.DataFrame: @@ -38,16 +40,27 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, delta = DeltaTable(local_dir_path) if not client.spark_runtime == 'azure_synapse': # don't detect for synapse result with Delta as there's a problem with underlying system - # Issues are trached here: https://github.com/delta-io/delta-rs/issues/582 + # Issues are tracked here: https://github.com/delta-io/delta-rs/issues/582 result_df = delta.to_pyarrow_table().to_pandas() else: - logger.info("Please use Azure Synapse to read the result in the Azure Synapse cluster. Reading local results is not supported for Azure Synapse. Emtpy DataFrame is returned.") + logger.info("Please use Azure Synapse to read the result in the Azure Synapse cluster. Reading local results is not supported for Azure Synapse. Empty DataFrame is returned.") result_df = pd.DataFrame() elif format.casefold()=="avro": import pandavro as pdx for file in glob.glob(os.path.join(local_dir_path, '*.avro')): dataframe_list.append(pdx.read_avro(file)) result_df = pd.concat(dataframe_list, axis=0) + elif format.casefold()=="csv": + try: + df = pd.read_csv(file, index_col=None, header=None) + except EmptyDataError: + # in case there are empty files + df = pd.DataFrame() + dataframe_list.append(df) + result_df = pd.concat(dataframe_list, axis=0) + else: + raise RuntimeError(f"{format} is currently not supported in get_result_df. Please consider writing a customized function to read the result.") + else: # by default use avro import pandavro as pdx From bea2a176393ffcb1bfc3c516e69cd2a8b39c5cdc Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Tue, 19 Jul 2022 22:38:12 -0700 Subject: [PATCH 11/26] fix typos --- docs/quickstart_synapse.md | 2 +- .../feathrcli/data/feathr_user_workspace/feathr_config.yaml | 2 +- .../feathr_user_workspace/product_recommendation_demo.ipynb | 2 +- .../test/test_user_workspace/feathr_config_maven.yaml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/quickstart_synapse.md b/docs/quickstart_synapse.md index 0d0a536bf..1df49efa9 100644 --- a/docs/quickstart_synapse.md +++ b/docs/quickstart_synapse.md @@ -54,7 +54,7 @@ We've provided a self-contained [sample notebook](https://github.com/linkedin/fe ## Step 4: Update Feathr config -In the sample notebook, you will see some settings like below. You should update those settings based on your environment, for exmaple the spark runtime, synapse/databricks endpoint, etc. +In the sample notebook, you will see some settings like below. You should update those settings based on your environment, for example the spark runtime, synapse/databricks endpoint, etc. ```yaml # DO NOT MOVE OR DELETE THIS FILE diff --git a/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml b/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml index fd0bccbbc..9f5ff2dca 100644 --- a/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml +++ b/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml @@ -113,7 +113,7 @@ feature_registry: # configure the name of the purview endpoint purview_name: "feathrazuretest3-purview1" # delimiter indicates that how the project/workspace name, feature names etc. are delimited. By default it will be '__' - # this is for global reference (mainly for feature sharing). For exmaple, when we setup a project called foo, and we have an anchor called 'taxi_driver' and the feature name is called 'f_daily_trips' + # this is for global reference (mainly for feature sharing). For example, when we setup a project called foo, and we have an anchor called 'taxi_driver' and the feature name is called 'f_daily_trips' # the feature will have a globally unique name called 'foo__taxi_driver__f_daily_trips' delimiter: "__" # controls whether the type system will be initialized or not. Usually this is only required to be executed once. diff --git a/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo.ipynb b/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo.ipynb index 5922897b0..9f818adc3 100644 --- a/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo.ipynb +++ b/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo.ipynb @@ -290,7 +290,7 @@ "source": [ "## View the data\n", "\n", - "In this tutorial, we use Feathr Feature Store to create a model that predicts users product rating. To make it simple, let's just predict users' rating for one product. (We will expand the exmaple to predict ratings for arbitrary product in future tutorials.)\n", + "In this tutorial, we use Feathr Feature Store to create a model that predicts users product rating. To make it simple, let's just predict users' rating for one product. (We will expand the example to predict ratings for arbitrary product in future tutorials.)\n", "\n", "We have 3 datasets to work with: one observation dataset(a.k.a. label dataset) and two raw datasets to generate features." ] diff --git a/feathr_project/test/test_user_workspace/feathr_config_maven.yaml b/feathr_project/test/test_user_workspace/feathr_config_maven.yaml index ed3af5826..29dc0370e 100644 --- a/feathr_project/test/test_user_workspace/feathr_config_maven.yaml +++ b/feathr_project/test/test_user_workspace/feathr_config_maven.yaml @@ -113,6 +113,6 @@ feature_registry: # configure the name of the purview endpoint purview_name: 'feathrazuretest3-purview1' # delimiter indicates that how the project/workspace name, feature names etc. are delimited. By default it will be '__' - # this is for global reference (mainly for feature sharing). For exmaple, when we setup a project called foo, and we have an anchor called 'taxi_driver' and the feature name is called 'f_daily_trips' + # this is for global reference (mainly for feature sharing). For example, when we setup a project called foo, and we have an anchor called 'taxi_driver' and the feature name is called 'f_daily_trips' # the feature will have a globally unique name called 'foo__taxi_driver__f_daily_trips' delimiter: '__' \ No newline at end of file From 16b5c6d911057f405720a5859e229199c5aae7fc Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Wed, 20 Jul 2022 03:07:34 -0700 Subject: [PATCH 12/26] Update job_utils.py --- feathr_project/feathr/utils/job_utils.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index 190138deb..62cad0a26 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -51,13 +51,16 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, dataframe_list.append(pdx.read_avro(file)) result_df = pd.concat(dataframe_list, axis=0) elif format.casefold()=="csv": - try: - df = pd.read_csv(file, index_col=None, header=None) - except EmptyDataError: - # in case there are empty files - df = pd.DataFrame() - dataframe_list.append(df) + for file in glob.glob(os.path.join(local_dir_path, '*.csv')): + try: + df = pd.read_csv(file, index_col=None, header=None) + except EmptyDataError: + # in case there are empty files + df = pd.DataFrame() + dataframe_list.append(df) result_df = pd.concat(dataframe_list, axis=0) + # Reset index to avoid duplicated indices + result_df.reset_index(drop=True) else: raise RuntimeError(f"{format} is currently not supported in get_result_df. Please consider writing a customized function to read the result.") From f2e06e013f522caeb736ca43cc4065a4225604a4 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Thu, 21 Jul 2022 02:17:15 -0700 Subject: [PATCH 13/26] Update client.py --- feathr_project/feathr/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index 17512945e..dc86c13fb 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -562,7 +562,7 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf raise RuntimeError("Please call FeathrClient.build_features() first in order to materialize the features") udf_files = _PreprocessingPyudfManager.prepare_pyspark_udf_files(settings.feature_names, self.local_workspace_dir) - # CLI will directly call this so the experiene won't be broken + # CLI will directly call this so the experience won't be broken self._materialize_features_with_config(config_file_path, execution_configurations, udf_files) if os.path.exists(config_file_path): os.remove(config_file_path) From 7d9d488538f3233ea6239de7125067f518940d66 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Sat, 30 Jul 2022 07:35:16 -0700 Subject: [PATCH 14/26] format doc --- .../dev_guide/feathr_overall_release_guide.md | 22 ++++++++++++++----- docs/dev_guide/publish_to_maven.md | 20 ++++++++++++----- .../WriteToHDFSOutputProcessor.scala | 2 +- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/docs/dev_guide/feathr_overall_release_guide.md b/docs/dev_guide/feathr_overall_release_guide.md index f9cccd37d..04b3d3ca3 100644 --- a/docs/dev_guide/feathr_overall_release_guide.md +++ b/docs/dev_guide/feathr_overall_release_guide.md @@ -4,26 +4,36 @@ title: Developer Guide for Feathr Overall Release Guide parent: Developer Guides --- -# When to Release -- For each major and minor version release, please follow these steps. +# Feathr Overall Release Guide + +This document describes all the release process for the development team. + +## When to Release + +- For each major and minor version release, please follow these steps. - For patch versions, there should be no releases. -# Writing Release Note +## Writing Release Note + Write a release note following past examples [here](https://github.com/linkedin/feathr/releases). Read through the [commit log](https://github.com/linkedin/feathr/commits/main) to identify the commits after last release to include in the release note. Here are the major things to include + - highlights of the release - improvements and changes of this release - new contributors of this release +## Release Maven -# Release Maven See [Developer Guide for publishing to maven](publish_to_maven.md) ## Upload Feathr Jar + Run the command to generate the Java jar. After the jar is generated, please upload to [Azure storage](https://ms.portal.azure.com/#view/Microsoft_Azure_Storage/ContainerMenuBlade/~/overview/storageAccountId/%2Fsubscriptions%2Fa6c2a7cc-d67e-4a1a-b765-983f08c0423a%2FresourceGroups%2Fazurefeathrintegration%2Fproviders%2FMicrosoft.Storage%2FstorageAccounts%2Fazurefeathrstorage/path/public/etag/%220x8D9E6F64D62D599%22/defaultEncryptionScope/%24account-encryption-key/denyEncryptionScopeOverride//defaultId//publicAccessVal/Container) for faster access. -# Release PyPi +## Release PyPi + See [Python Package Release Note](python_package_release.md) -# Announcement +## Announcement + Please announce the release in our #general Slack channel. diff --git a/docs/dev_guide/publish_to_maven.md b/docs/dev_guide/publish_to_maven.md index 2987ff1f6..a64e858b7 100644 --- a/docs/dev_guide/publish_to_maven.md +++ b/docs/dev_guide/publish_to_maven.md @@ -3,13 +3,15 @@ layout: default title: Developer Guide for publishing to maven parent: Developer Guides --- + # Developer Guide for publishing to maven ## Manual Publishing 1. Get account details to login to https://oss.sonatype.org/ 2. Install GPG, setup keys, and export to a key server -``` + +```bash $ gpg --gen-key ... Real name: Central Repo Test @@ -32,37 +34,46 @@ $ gpg --keyserver keyserver.ubuntu.com --recv-keys CA925CD6C9E8D064FF05B4728190C if failing to programmatically export to key server, you can export it manually and upload to http://keyserver.ubuntu.com/ via `submit key` run the following command to generated the ASCII-armored public key needed by the key server + ``` gpg --armor --export user-id > pubkey.asc ``` + https://www.linuxbabe.com/security/a-practical-guide-to-gpg-part-1-generate-your-keypair 3. Setup your credentials locally at `$HOME/.sbt/0.13/sonatype.sbt` + ``` credentials += Credentials("Sonatype Nexus Repository Manager", "oss.sonatype.org", "(Sonatype user name)", "(Sonatype password)") ``` + (ref, https://github.com/xerial/sbt-sonatype) 4. Publish to maven via sbt -In your feathr directory, clear your cache to prevent stale errors + In your feathr directory, clear your cache to prevent stale errors + ``` rm -rf target/sonatype-staging/ ``` + Start sbt console by running + ``` sbt -java-home /Library/Java/JavaVirtualMachines/jdk1.8.0_282-msft.jdk/Contents/Home ``` + Execute command in sbt console to publish to maven + ``` reload ; publishSigned; sonatypeBundleRelease ``` 5. "Upon release, your component will be published to Central: this typically occurs within 30 minutes, though updates to search can take up to four hours." -https://central.sonatype.org/publish/publish-guide/#releasing-to-central + https://central.sonatype.org/publish/publish-guide/#releasing-to-central 6. After new version is released via Maven, use the released version to run a test to ensure it actually works. You can do this by running a codebase that imports Feathr scala code. @@ -72,9 +83,6 @@ https://central.sonatype.org/publish/publish-guide/#releasing-to-central ### References - - https://central.sonatype.org/publish/publish-guide/#deployment https://www.scala-sbt.org/1.x/docs/Using-Sonatype.html - diff --git a/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala index 7ac29a779..4ef44f701 100644 --- a/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala +++ b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala @@ -74,7 +74,7 @@ private[offline] class WriteToHDFSOutputProcessor(val config: OutputProcessorCon val allFeatureList = featureData.keySet.map(_.getFeatureName).toSeq val featureListToJoin = config.getParams().getStringListOpt(FeatureGenerationPathName.FEATURES) val storeName = config.getParams().getStringOpt(FeatureGenerationPathName.STORE_NAME) - + print("val featureListToJoin", featureListToJoin) if (featureListToJoin.isDefined) { val selectedFeatureNames = featureListToJoin.getOrElse(allFeatureList) // filter unwanted feature From f7bdc21b67147d41ce9ccfd0c92b04b2c1bd65c4 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Sat, 30 Jul 2022 08:37:45 -0700 Subject: [PATCH 15/26] Address comments --- docs/concepts/get-offline-features.md | 4 +++- .../outputProcessor/WriteToHDFSOutputProcessor.scala | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/concepts/get-offline-features.md b/docs/concepts/get-offline-features.md index f1a68924e..0171a1358 100644 --- a/docs/concepts/get-offline-features.md +++ b/docs/concepts/get-offline-features.md @@ -82,4 +82,6 @@ It is sometimes confusing between "getting offline features" in this document an 1. For `get_offline_features` API, feature consumers usually need to have a central `observation data` so they can use `Feature Query` to query different features for different entities from different tables. For `materialize_features` API, feature consumers don't have the `observation data`, because they don't need to query from existing feature definitions. In this case, feature consumers only need to specify for a specific entity (say `user_id`), which features they want to materialize to offline or online store. Note that for a feature table in the materialization settings, feature consumers can only materialize features for the same key for the same table. -2. For the timestamps, `get_offline_features` API, Feathr will make sure the feature values queried have a timestamp earlier than the timestamp in observation data, ensuring no data leakage in the resulting training dataset. For `materialize_features` API, Feathr will always materialize the latest feature available in the dataset. \ No newline at end of file +2. For the timestamps, in `get_offline_features` API, Feathr will make sure the feature values queried have a timestamp earlier than the timestamp in observation data, ensuring no data leakage in the resulting training dataset. For `materialize_features` API, Feathr will always materialize the latest feature available in the dataset. + +3. Those two APIs are used in two different stage of feature engineering pipeline, and serves different purpose. For `get_offline_features`, it is usually to get data for model training and usually is focused on getting historical data from an offline storage; while for `materialize_features`, it is usually to pre-compute features for model inference via online store. diff --git a/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala index 4ef44f701..3d722ab23 100644 --- a/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala +++ b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala @@ -74,7 +74,6 @@ private[offline] class WriteToHDFSOutputProcessor(val config: OutputProcessorCon val allFeatureList = featureData.keySet.map(_.getFeatureName).toSeq val featureListToJoin = config.getParams().getStringListOpt(FeatureGenerationPathName.FEATURES) val storeName = config.getParams().getStringOpt(FeatureGenerationPathName.STORE_NAME) - print("val featureListToJoin", featureListToJoin) if (featureListToJoin.isDefined) { val selectedFeatureNames = featureListToJoin.getOrElse(allFeatureList) // filter unwanted feature From 1e12b97582fa01a9032c16668bee76ed27964241 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Sat, 30 Jul 2022 08:38:55 -0700 Subject: [PATCH 16/26] Update WriteToHDFSOutputProcessor.scala --- .../generation/outputProcessor/WriteToHDFSOutputProcessor.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala index 3d722ab23..dc883c820 100644 --- a/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala +++ b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala @@ -74,6 +74,7 @@ private[offline] class WriteToHDFSOutputProcessor(val config: OutputProcessorCon val allFeatureList = featureData.keySet.map(_.getFeatureName).toSeq val featureListToJoin = config.getParams().getStringListOpt(FeatureGenerationPathName.FEATURES) val storeName = config.getParams().getStringOpt(FeatureGenerationPathName.STORE_NAME) + if (featureListToJoin.isDefined) { val selectedFeatureNames = featureListToJoin.getOrElse(allFeatureList) // filter unwanted feature From 2bb736908f92757caa01fde5f52cab3a7734aa34 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Sat, 30 Jul 2022 08:39:20 -0700 Subject: [PATCH 17/26] Update WriteToHDFSOutputProcessor.scala --- .../generation/outputProcessor/WriteToHDFSOutputProcessor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala index dc883c820..7ac29a779 100644 --- a/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala +++ b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/WriteToHDFSOutputProcessor.scala @@ -74,7 +74,7 @@ private[offline] class WriteToHDFSOutputProcessor(val config: OutputProcessorCon val allFeatureList = featureData.keySet.map(_.getFeatureName).toSeq val featureListToJoin = config.getParams().getStringListOpt(FeatureGenerationPathName.FEATURES) val storeName = config.getParams().getStringOpt(FeatureGenerationPathName.STORE_NAME) - + if (featureListToJoin.isDefined) { val selectedFeatureNames = featureListToJoin.getOrElse(allFeatureList) // filter unwanted feature From f38a9037cbe92accb16b7e60f2ef09cd07d30369 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Sun, 31 Jul 2022 22:03:25 -0700 Subject: [PATCH 18/26] resolve comments --- docs/concepts/feathr-concepts-for-beginners.md | 5 +++-- feathr_project/feathr/utils/job_utils.py | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/concepts/feathr-concepts-for-beginners.md b/docs/concepts/feathr-concepts-for-beginners.md index 4a85d89c7..fe8d2a603 100644 --- a/docs/concepts/feathr-concepts-for-beginners.md +++ b/docs/concepts/feathr-concepts-for-beginners.md @@ -128,14 +128,15 @@ client.get_online_features(feature_table = "agg_features", An illustration of the concepts and process that we talked about is like this: ![Observation Data and Feature Query Process](../images/observation_data.jpg) -## Miscellaneous topics +## FAQs on the Concepts ### A bit more on `Observation Data` The "Observation Data" is a concept that is a bit confusing for some beginners, and simply think it as an immutable dataset, but this dataset could be enhanced by other dataset. For example, you usually cannot drop a column for your "observation data", but you can add additional columns to it. ### What's the relationship between `Source` and `Anchor`? -Usually an Anchor can only have one source, but one source can be consumed by different anchors. From `Source` to `Anchor`, there might be an intermediate step, which is the "preprocessing" function and allows you to customize the input a bit. + +Usually an Anchor can only have one source, but one source can be consumed by different anchors. From `Source` to `Anchor`, there might be an intermediate step, which is the "preprocessing" function and allows you to customize the input a bit. ### Point in time joins and aggregations - why we need them? diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index 62cad0a26..4446e0092 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -70,6 +70,7 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, for file in glob.glob(os.path.join(local_dir_path, '*.avro')): dataframe_list.append(pdx.read_avro(file)) result_df = pd.concat(dataframe_list, axis=0) + if local_folder is None: tmp_dir.cleanup() return result_df \ No newline at end of file From d17c5fae867a3d8bbbec31676c508fb875ddbd3b Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Sun, 31 Jul 2022 23:32:43 -0700 Subject: [PATCH 19/26] Resolve comments --- docs/quickstart_databricks.md | 8 +-- feathr_project/feathr/utils/job_utils.py | 77 ++++++++++++------------ 2 files changed, 42 insertions(+), 43 deletions(-) diff --git a/docs/quickstart_databricks.md b/docs/quickstart_databricks.md index d559b97dd..ffd6e42a9 100644 --- a/docs/quickstart_databricks.md +++ b/docs/quickstart_databricks.md @@ -19,7 +19,7 @@ For Databricks, you can simply upload [this notebook](./samples/databricks/datab # Authoring Feathr jobs in local environment and submit to remote Databricks cluster -Not everyone wants to use databricks notebook as the main development environment, and the above part is more for quick start purpose. For a more serious development, we usually recommend using Visual Studio Code, where [it has native support for Python and Jupyter Notebooks](https://code.visualstudio.com/docs/datascience/jupyter-notebooks) with many great features such as syntax highlight and IntelliSense. +Although Databricks Notebooks are great tools, there are also large developer communities that prefer the usage of Visual Studio Code, where [it has native support for Python and Jupyter Notebooks](https://code.visualstudio.com/docs/datascience/jupyter-notebooks) with many great features such as syntax highlight and IntelliSense. In [this notebook](./samples/databricks/databricks_quickstart_nyc_taxi_driver.ipynb), there are a few lines of code like this: @@ -31,12 +31,12 @@ host_token = ctx.apiToken().get() cluster_id = ctx.tags().get("clusterId").get() ``` -This is the only part you need to change to author the Feathr job in local environment and submit to a remote Databricks cluster. When running those code in Databricks, Feathr will automatically read the current cluster's host name and authentication token using the above code, but this is not true if authoring the job locally. In that case, you will need to change the above lines to below: +This is the only part you need to change to author the Feathr job in local environment (such as VS Code) and submit to a remote Databricks cluster. When running those code in Databricks, Feathr will automatically read the current cluster's host name and authentication token using the above code, but this is not true if authoring the job locally. In that case, you will need to change the above lines to below: ```python # Authoring Feathr jobs in local environment and submit to remote Databricks cluster -host_name = 'https://adb-6885802458123232.12.azuredatabricks.net/' -host_token = 'dapi11111111111111111111' +host_name = 'https://.azuredatabricks.net/' +host_token = '' ``` And that's it! Feathr will automatically submit the job to the cluster you specified. diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index 4446e0092..b08932a2d 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -10,14 +10,21 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, local_folder: str = None) -> pd.DataFrame: - """Download the job result dataset from cloud as a Pandas dataframe. + """Download the job result dataset from cloud as a Pandas dataframe to make it easier for the client to read. - format: format override, could be "parquet", "delta", etc. + format: format to read the downloaded files. Currently support `paruqet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified. res_url: output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data. local_folder: optional parameter to specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe. """ + # use a result url if it's provided by the user, otherwise use the one provided by the job res_url: str = res_url or client.get_job_result_uri(block=True, timeout_sec=1200) + if res_url is None: + raise RuntimeError("res_url is None. Please make sure either you provide a res_url or make sure the job finished in FeathrClient has a valid result URI.") + + # use a format if it's provided by the user, otherwise use the one provided by the job format: str = format or client.get_job_tags().get(OUTPUT_FORMAT, "") + format = "avro" if format is None else format + # if local_folder params is not provided then create a temporary folder if local_folder is not None: local_dir_path = local_folder @@ -28,48 +35,40 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=local_dir_path) dataframe_list = [] # by default the result are in avro format - if format: - # helper function for only parquet and avro - if format.casefold()=="parquet": - files = glob.glob(os.path.join(local_dir_path, '*.parquet')) - from pyarrow.parquet import ParquetDataset - ds = ParquetDataset(files) - result_df = ds.read().to_pandas() - elif format.casefold()=="delta": - from deltalake import DeltaTable - delta = DeltaTable(local_dir_path) - if not client.spark_runtime == 'azure_synapse': - # don't detect for synapse result with Delta as there's a problem with underlying system - # Issues are tracked here: https://github.com/delta-io/delta-rs/issues/582 - result_df = delta.to_pyarrow_table().to_pandas() - else: - logger.info("Please use Azure Synapse to read the result in the Azure Synapse cluster. Reading local results is not supported for Azure Synapse. Empty DataFrame is returned.") - result_df = pd.DataFrame() - elif format.casefold()=="avro": - import pandavro as pdx - for file in glob.glob(os.path.join(local_dir_path, '*.avro')): - dataframe_list.append(pdx.read_avro(file)) - result_df = pd.concat(dataframe_list, axis=0) - elif format.casefold()=="csv": - for file in glob.glob(os.path.join(local_dir_path, '*.csv')): - try: - df = pd.read_csv(file, index_col=None, header=None) - except EmptyDataError: - # in case there are empty files - df = pd.DataFrame() - dataframe_list.append(df) - result_df = pd.concat(dataframe_list, axis=0) - # Reset index to avoid duplicated indices - result_df.reset_index(drop=True) + if format.casefold()=="parquet": + files = glob.glob(os.path.join(local_dir_path, '*.parquet')) + from pyarrow.parquet import ParquetDataset + ds = ParquetDataset(files) + result_df = ds.read().to_pandas() + elif format.casefold()=="delta": + from deltalake import DeltaTable + delta = DeltaTable(local_dir_path) + if not client.spark_runtime == 'azure_synapse': + # don't detect for synapse result with Delta as there's a problem with underlying system + # Issues are tracked here: https://github.com/delta-io/delta-rs/issues/582 + result_df = delta.to_pyarrow_table().to_pandas() else: - raise RuntimeError(f"{format} is currently not supported in get_result_df. Please consider writing a customized function to read the result.") - - else: - # by default use avro + logger.info("Please use Azure Synapse to read the result in the Azure Synapse cluster. Reading local results is not supported for Azure Synapse. Empty DataFrame is returned.") + result_df = pd.DataFrame() + elif format.casefold()=="avro": import pandavro as pdx for file in glob.glob(os.path.join(local_dir_path, '*.avro')): dataframe_list.append(pdx.read_avro(file)) result_df = pd.concat(dataframe_list, axis=0) + elif format.casefold()=="csv": + for file in glob.glob(os.path.join(local_dir_path, '*.csv')): + try: + df = pd.read_csv(file, index_col=None, header=None) + except EmptyDataError: + # in case there are empty files + df = pd.DataFrame() + dataframe_list.append(df) + result_df = pd.concat(dataframe_list, axis=0) + # Reset index to avoid duplicated indices + result_df.reset_index(drop=True) + else: + raise RuntimeError(f"{format} is currently not supported in get_result_df. Please consider writing a customized function to read the result.") + if local_folder is None: tmp_dir.cleanup() From 64760cebe310338a3305498c70c44c2e47318d60 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Mon, 1 Aug 2022 00:08:28 -0700 Subject: [PATCH 20/26] fix test failures and typos --- .../feathr/spark_provider/_databricks_submission.py | 2 +- feathr_project/feathr/udf/_preprocessing_pyudf_manager.py | 2 +- feathr_project/feathr/utils/_envvariableutil.py | 8 ++++---- feathr_project/feathr/utils/job_utils.py | 8 +++++--- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/feathr_project/feathr/spark_provider/_databricks_submission.py b/feathr_project/feathr/spark_provider/_databricks_submission.py index 88196148f..8cb135e26 100644 --- a/feathr_project/feathr/spark_provider/_databricks_submission.py +++ b/feathr_project/feathr/spark_provider/_databricks_submission.py @@ -36,7 +36,7 @@ class _FeathrDatabricksJobLauncher(SparkJobLauncher): 5. will override the name of this job Args: - workspace_instance_url (str): the workinstance url. Document to get workspakce_instance_url: https://docs.microsoft.com/en-us/azure/databricks/workspace/workspace-details#workspace-url + workspace_instance_url (str): the workinstance url. Document to get workspace_instance_url: https://docs.microsoft.com/en-us/azure/databricks/workspace/workspace-details#workspace-url token_value (str): see here on how to get tokens: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/authentication config_template (str): config template for databricks cluster. See https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs#--runs-submit for more details. databricks_work_dir (_type_, optional): databricks_work_dir must start with dbfs:/. Defaults to 'dbfs:/feathr_jobs'. diff --git a/feathr_project/feathr/udf/_preprocessing_pyudf_manager.py b/feathr_project/feathr/udf/_preprocessing_pyudf_manager.py index dfa3e4200..142ae8032 100644 --- a/feathr_project/feathr/udf/_preprocessing_pyudf_manager.py +++ b/feathr_project/feathr/udf/_preprocessing_pyudf_manager.py @@ -90,7 +90,7 @@ def persist_pyspark_udf_to_file(user_func, local_workspace_dir): """persist the pyspark UDF to a file in `local_workspace_dir` for later usage. The user_func could be either a string that represents a function body, or a callable object. The reason being - if we are defining a regular Python function, it will be a callable object; - however if we reterive features from registry, the current implementation is to use plain strings to store the function body. In that case, the user_fuc will be string. + however if we retrieve features from registry, the current implementation is to use plain strings to store the function body. In that case, the user_fuc will be string. """ if isinstance(user_func, str): udf_source_code = [user_func] diff --git a/feathr_project/feathr/utils/_envvariableutil.py b/feathr_project/feathr/utils/_envvariableutil.py index 96559b1cd..8fcc85842 100644 --- a/feathr_project/feathr/utils/_envvariableutil.py +++ b/feathr_project/feathr/utils/_envvariableutil.py @@ -20,7 +20,7 @@ def get_environment_variable_with_default(self, *args): A environment variable for the variable key. It will retrieve the value of the environment variables in the following order: If the key is set in the environment variable, Feathr will use the value of that environment variable If it's not set in the environment, then a default is retrieved from the feathr_config.yaml file with the same config key. - If it's not available in the feathr_config.yaml file, Feathr will try to reterive the value from key vault + If it's not available in the feathr_config.yaml file, Feathr will try to retrieve the value from key vault If not found, an empty string will be returned with a warning error message. """ @@ -52,7 +52,7 @@ def get_environment_variable_with_default(self, *args): except yaml.YAMLError as exc: logger.warning(exc) - # If it's not available in the feathr_config.yaml file, Feathr will try to reterive the value from key vault + # If it's not available in the feathr_config.yaml file, Feathr will try to retrieve the value from key vault if self.akv_name: try: return self.akv_client.get_feathr_akv_secret(env_keyword) @@ -70,7 +70,7 @@ def get_environment_variable(self, variable_key): Return: A environment variable for the variable key. It will retrieve the value of the environment variables in the following order: If the key is set in the environment variable, Feathr will use the value of that environment variable - If it's not available in the environment variable file, Feathr will try to reterive the value from key vault + If it's not available in the environment variable file, Feathr will try to retrieve the value from key vault If not found, an empty string will be returned with a warning error message. """ env_var_value = os.environ.get(variable_key) @@ -78,7 +78,7 @@ def get_environment_variable(self, variable_key): if env_var_value: return env_var_value - # If it's not available in the environment variable file, Feathr will try to reterive the value from key vault + # If it's not available in the environment variable file, Feathr will try to retrieve the value from key vault logger.info(variable_key + ' is not set in the environment variables.') if self.akv_name: diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index b08932a2d..c864bdb4e 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -12,7 +12,7 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, local_folder: str = None) -> pd.DataFrame: """Download the job result dataset from cloud as a Pandas dataframe to make it easier for the client to read. - format: format to read the downloaded files. Currently support `paruqet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified. + format: format to read the downloaded files. Currently support `parquet`, `delta`, `avro`, and `csv`. Default to `avro` if not specified. res_url: output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data. local_folder: optional parameter to specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe. """ @@ -21,9 +21,11 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, if res_url is None: raise RuntimeError("res_url is None. Please make sure either you provide a res_url or make sure the job finished in FeathrClient has a valid result URI.") - # use a format if it's provided by the user, otherwise use the one provided by the job + # use user provided format, if there isn't one, then otherwise use the one provided by the job; + # if none of them is available, "avro" is the default format. format: str = format or client.get_job_tags().get(OUTPUT_FORMAT, "") - format = "avro" if format is None else format + if format is None or format is "": + format = "avro" # if local_folder params is not provided then create a temporary folder if local_folder is not None: From 8916be4b4737dee61663b0fdfb550b87d0f80368 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Mon, 1 Aug 2022 00:12:24 -0700 Subject: [PATCH 21/26] Update job_utils.py --- feathr_project/feathr/utils/job_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index c864bdb4e..5c815afe7 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -69,7 +69,7 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, # Reset index to avoid duplicated indices result_df.reset_index(drop=True) else: - raise RuntimeError(f"{format} is currently not supported in get_result_df. Please consider writing a customized function to read the result.") + raise RuntimeError(f"{format} is currently not supported in get_result_df. Currently only parquet, delta, avro, and csv are supported, please consider writing a customized function to read the result.") if local_folder is None: From 9a213baf70f3b3d5474bfb0af25406d9e5b0fed0 Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Mon, 1 Aug 2022 00:18:18 -0700 Subject: [PATCH 22/26] fix comments and formats/typos --- docs/how-to-guides/azure-deployment-arm.md | 99 ++++++++++--------- docs/how-to-guides/azure-deployment-cli.md | 17 ++-- .../deploy-feathr-api-as-webapp.md | 6 +- docs/how-to-guides/feathr-udfs.md | 6 +- docs/quickstart_databricks.md | 4 +- docs/quickstart_synapse.md | 6 +- 6 files changed, 70 insertions(+), 68 deletions(-) diff --git a/docs/how-to-guides/azure-deployment-arm.md b/docs/how-to-guides/azure-deployment-arm.md index b594caea0..077cda299 100644 --- a/docs/how-to-guides/azure-deployment-arm.md +++ b/docs/how-to-guides/azure-deployment-arm.md @@ -4,11 +4,11 @@ title: Azure Resource Provisioning through Azure Resource Manager parent: How-to Guides --- - # Azure Resource Provisioning The provided Azure Resource Manager (ARM) template deploys the following resources, please make sure you have enough quota in the subscription and region you are deploying this in. You can view your quota and make request on Azure [portal](https://ms.portal.azure.com/#view/Microsoft_Azure_Capacity/QuotaMenuBlade/~/overview) -1. Azure Storage account + +1. Azure Storage account 2. Azure Purview (metadata store if you selected Azure-Purview as registry backend) 3. Azure SQL Server and Database (for RBAC and metadata store if you selected Azure-SQL as registry backend) 4. Azure Synapse workspace and Spark Pool @@ -17,9 +17,10 @@ The provided Azure Resource Manager (ARM) template deploys the following resourc 7. Azure Event Hub 8. Azure Redis -Please note, you need to have __owner access__ in the resource group you are deploying this in. Owner access is required to assign role to managed identity within ARM template so it can access key vault and store secrets. +Please note, you need to have **owner access** in the resource group you are deploying this in. Owner access is required to assign role to managed identity within ARM template so it can access key vault and store secrets. ## Architecture + The architecture diagram demonstrates how different Azure components interact with each other within Feathr. ![architecture](../images/architecture.png) @@ -29,80 +30,80 @@ Feathr has native cloud integration and getting started with Feathr is very stra 1. The very first step is to create an Azure Active Directory (AAD) application to enable authentication on the Feathr UI (which gets created as part of the deployment script). Currently it is not possible to create one through ARM template but you can easily create one by running the following CLI commands in the [Cloud Shell](https://shell.azure.com/bash). - ```bash - # This is the prefix you want to name your resources with, make a note of it, you will need it during deployment. - # Note: please keep the `resourcePrefix` short (less than 15 chars), since some of the Azure resources need the full name to be less than 24 characters. Only lowercase alphanumeric characters are allowed for resource prefix. - prefix="userprefix1" + ```bash + # This is the prefix you want to name your resources with, make a note of it, you will need it during deployment. + # Note: please keep the `resourcePrefix` short (less than 15 chars), since some of the Azure resources need the full name to be less than 24 characters. Only lowercase alphanumeric characters are allowed for resource prefix. + prefix="userprefix1" + + # Please don't change this name, a corresponding webapp with same name gets created in subsequent steps. + sitename="${prefix}webapp" + + # Use the following configuration command to enable dynamic install of az extensions without a prompt. This is required for the az account command group used in the following steps. + az config set extension.use_dynamic_install=yes_without_prompt - # Please don't change this name, a corresponding webapp with same name gets created in subsequent steps. - sitename="${prefix}webapp" + # This will create the Azure AD application, note that we need to create an AAD app of platform type Single Page Application(SPA). By default passing the redirect-uris with create command creates an app of type web. Setting Sign in audience to AzureADMyOrg limits the application access to just your tenant. + az ad app create --display-name $sitename --sign-in-audience AzureADMyOrg --web-home-page-url "https://$sitename.azurewebsites.net" --enable-id-token-issuance true + ``` - # Use the following configuration command to enable dynamic install of az extensions without a prompt. This is required for the az account command group used in the following steps. - az config set extension.use_dynamic_install=yes_without_prompt - - # This will create the Azure AD application, note that we need to create an AAD app of platform type Single Page Application(SPA). By default passing the redirect-uris with create command creates an app of type web. Setting Sign in audience to AzureADMyOrg limits the application access to just your tenant. - az ad app create --display-name $sitename --sign-in-audience AzureADMyOrg --web-home-page-url "https://$sitename.azurewebsites.net" --enable-id-token-issuance true - ``` - After the above step, an AAD application will be created. Note that it will take a few minutes to complete, so make sure the `aad_clientId`, `aad_objectId`, and `aad_tenantId` below are not empty. If they are empty, re-run the three commands to refresh the values for `aad_clientId`, `aad_objectId`, and `aad_tenantId`, as they will be required later. + After the above step, an AAD application will be created. Note that it will take a few minutes to complete, so make sure the `aad_clientId`, `aad_objectId`, and `aad_tenantId` below are not empty. If they are empty, re-run the three commands to refresh the values for `aad_clientId`, `aad_objectId`, and `aad_tenantId`, as they will be required later. - ```bash - # Fetch the ClientId, TenantId and ObjectId for the created app - aad_clientId=$(az ad app list --display-name $sitename --query [].appId -o tsv) + ```bash + # Fetch the ClientId, TenantId and ObjectId for the created app + aad_clientId=$(az ad app list --display-name $sitename --query [].appId -o tsv) - # We just use the homeTenantId since a user could have access to multiple tenants - aad_tenantId=$(az account show --query "[homeTenantId]" -o tsv) + # We just use the homeTenantId since a user could have access to multiple tenants + aad_tenantId=$(az account show --query "[homeTenantId]" -o tsv) - #Fetch the objectId of AAD app to patch it and add redirect URI in next step. - aad_objectId=$(az ad app list --display-name $sitename --query [].id -o tsv) + #Fetch the objectId of AAD app to patch it and add redirect URI in next step. + aad_objectId=$(az ad app list --display-name $sitename --query [].id -o tsv) - # Make sure the above command ran successfully and the values are not empty. If they are empty, re-run the above commands as the app creation could take some time. - # MAKE NOTE OF THE CLIENT_ID & TENANT_ID FOR STEP #2 - echo "AZURE_AAD_OBJECT_ID: $aad_objectId" - echo "AAD_CLIENT_ID: $aad_clientId" - echo "AZURE_TENANT_ID: $aad_tenantId" + # Make sure the above command ran successfully and the values are not empty. If they are empty, re-run the above commands as the app creation could take some time. + # MAKE NOTE OF THE CLIENT_ID & TENANT_ID FOR STEP #2 + echo "AZURE_AAD_OBJECT_ID: $aad_objectId" + echo "AAD_CLIENT_ID: $aad_clientId" + echo "AZURE_TENANT_ID: $aad_tenantId" - # Updating the SPA app created above, currently there is no CLI support to add redirectUris to a SPA, so we have to patch manually via az rest - az rest --method PATCH --uri "https://graph.microsoft.com/v1.0/applications/$aad_objectId" --headers "Content-Type=application/json" --body "{spa:{redirectUris:['https://$sitename.azurewebsites.net']}}" - ``` + # Updating the SPA app created above, currently there is no CLI support to add redirectUris to a SPA, so we have to patch manually via az rest + az rest --method PATCH --uri "https://graph.microsoft.com/v1.0/applications/$aad_objectId" --headers "Content-Type=application/json" --body "{spa:{redirectUris:['https://$sitename.azurewebsites.net']}}" + ``` 2. Click the button below to deploy a minimal set of Feathr resources. This is not for production use as we choose a minimal set of resources, but treat it as a template that you can modify for further use. Note that you should have "Owner" access in your subscription to perform some of the actions. - [![Deploy to Azure](https://aka.ms/deploytoazurebutton)](https://portal.azure.com/#create/Microsoft.Template/uri/https%3A%2F%2Fraw.githubusercontent.com%2Flinkedin%2Ffeathr%2Fmain%2Fdocs%2Fhow-to-guides%2Fazure_resource_provision.json) + [![Deploy to Azure](https://aka.ms/deploytoazurebutton)](https://portal.azure.com/#create/Microsoft.Template/uri/https%3A%2F%2Fraw.githubusercontent.com%2Flinkedin%2Ffeathr%2Fmain%2Fdocs%2Fhow-to-guides%2Fazure_resource_provision.json) -3. If you are using Purview registry there is an additional step required for the deployment to work. Registry Server authenticates with Azure Purview using Managed Identity that was created by ARM template. The Managed Identity needs to be added to Azure Purview Collections as a __Data Curator__. For more details, please refer to [Access control in the Microsoft Purview governance portal](https://docs.microsoft.com/en-us/azure/purview/catalog-permissions) - ![purview data curator role add](../images/purview_permission_setting.png) +3. If you are using Purview registry there is an additional step required for the deployment to work. Registry Server authenticates with Azure Purview using Managed Identity that was created by ARM template. The Managed Identity needs to be added to Azure Purview Collections as a **Data Curator**. For more details, please refer to [Access control in the Microsoft Purview governance portal](https://docs.microsoft.com/en-us/azure/purview/catalog-permissions) + ![purview data curator role add](../images/purview_permission_setting.png) - Only collection admins can perform the above operation, the user who created this Purview account is already one. If you want to add additional admins, you can do so by clicking on _Root collection permission_ option on Azure Purview page. + Only collection admins can perform the above operation, the user who created this Purview account is already one. If you want to add additional admins, you can do so by clicking on _Root collection permission_ option on Azure Purview page. + +Congratulations, you have successfully deployed Feathr on Azure. You can access your resources by going to the resource group that you created for the deployment. A good first test would be to access Feathr UI, you can access it by clicking on App Service URL. The URL would have the following format: -Congratulations, you have successfully deployed Feathr on Azure. You can access your resources by going to the resource group that you created for the deployment. A good first test would be to access Feathr UI, you can access it by clicking on App Service URL. The URL would have the following format: ```bash https://{prefix}webapp.azurewebsites.net ``` ![app service url](../images/app-service-url.png) - ![feathr ui landing page](../images/feathr-ui-landingpage.png) ## Next Steps -Follow the quick start guide [here](https://linkedin.github.io/feathr/quickstart_synapse.html) to try out a notebook example. +Follow the quick start guide [here](https://linkedin.github.io/feathr/quickstart_synapse.html) to try out a notebook example. ## Known Issues/Workaround -1. For SQL Registry backend and RBAC, we create the database using a backup file and it might sometimes time out, [as documented here](https://docs.microsoft.com/en-us/azure/azure-sql/database/database-import-export-hang?view=azuresql). Suggested workaround is to manually run the sql queries to create the table schema for SQL Registry backend and/or RBAC. - - - In Azure Portal, you can directly go to the database that was created as part of the template and click on __Query Editor__. - This will allow you to run queries directly on the database. - ![sql-query-editor](../images/sqldb-query-editor.png) - - For credentials, put in the SQL username and password that you passed to the template. You might have to whitelist your IP and add it to the firewall, the screen will prompt you for this if required. Select OK - ![sql-query-editor-auth](../images/sql-query-editor-auth.png) +1. For SQL Registry backend and RBAC, we create the database using a backup file and it might sometimes time out, [as documented here](https://docs.microsoft.com/en-us/azure/azure-sql/database/database-import-export-hang?view=azuresql). Suggested workaround is to manually run the sql queries to create the table schema for SQL Registry backend and/or RBAC. - - Once the login is successful, you will see the query editor screen. Run the below queries in the editor and create the rquired schema. - ![sql-query-editor-auth](../images/sql-query-editor-open.png) + - In Azure Portal, you can directly go to the database that was created as part of the template and click on **Query Editor**. + This will allow you to run queries directly on the database. + ![sql-query-editor](../images/sqldb-query-editor.png) - - [SQL Registry DB Schema](https://github.com/linkedin/feathr/blob/main/registry/sql-registry/scripts/schema.sql) + - For credentials, put in the SQL username and password that you passed to the template. You might have to whitelist your IP and add it to the firewall, the screen will prompt you for this if required. Select OK + ![sql-query-editor-auth](../images/sql-query-editor-auth.png) - - [RBAC DB Schema](https://github.com/linkedin/feathr/blob/main/registry/access_control/scripts/schema.sql) + - Once the login is successful, you will see the query editor screen. Run the below queries in the editor and create the rquired schema. + ![sql-query-editor-auth](../images/sql-query-editor-open.png) + - [SQL Registry DB Schema](https://github.com/linkedin/feathr/blob/main/registry/sql-registry/scripts/schema.sql) + - [RBAC DB Schema](https://github.com/linkedin/feathr/blob/main/registry/access_control/scripts/schema.sql) diff --git a/docs/how-to-guides/azure-deployment-cli.md b/docs/how-to-guides/azure-deployment-cli.md index 42aa65de5..659446e2f 100644 --- a/docs/how-to-guides/azure-deployment-cli.md +++ b/docs/how-to-guides/azure-deployment-cli.md @@ -4,13 +4,11 @@ title: Azure Resource Provisioning through Azure CLI parent: How-to Guides --- - # Azure Resource Provisioning Due to the complexity of the possible cloud environment, it is almost impossible to create a script that works for all the cloud setup use cases. We have a quick start guide below that will facilitate this process. - -## Provision Azure Resources using CLI: +## Provision Azure Resources using CLI 1. Use the content below as a detailed explanation of [Azure resource provisioning script](./azure_resource_provision.sh). **DO NOT** run that script directly given the complexity of cloud environment setup. Instead, follow the steps in this documentation so you can always go back and check your step in case of some failures. 2. We provide an [Azure resource provisioning script](./azure_resource_provision.sh) which can be used to automate the process @@ -83,7 +81,7 @@ else fi ``` -# Setup all the resource names +## Setup all the resource names Setup all the resource names which will be used later. @@ -103,7 +101,7 @@ And then set the current subscription ID: az account set -s $subscription_id ``` -# Create a Service Principal which you can automate later +## Create a Service Principal which you can automate later You can simply think an Azure Service Principal is an account that can be used for automation (so you don't have to use your own account to perform all the actions), though it can do much more than that. We will use a service principal for the steps below, but you can also use your own Azure account for all those actions as long as you have sufficient permission. @@ -119,7 +117,8 @@ echo "AZURE_TENANT_ID: $sp_tenantid" echo "AZURE_CLIENT_SECRET: $sp_password" This will give three variables: AZURE_CLIENT_ID, AZURE_TENANT_ID and AZURE_CLIENT_SECRET. You will need them later. ``` -__You should save AZURE_CLIENT_SECRET because you will only see it once here__ + +Note: **You should save AZURE_CLIENT_SECRET because you will only see it once here** ## Create a storage account @@ -137,7 +136,7 @@ az storage fs create -n $storage_file_system_name --account-name $storage_accoun az role assignment create --role "Storage Blob Data Contributor" --assignee "$sp_objectid" --scope "/subscriptions/$subscription_id/resourceGroups/$resource_group_name/providers/Microsoft.Storage/storageAccounts/$storage_account_name" ``` -## Create a Synapse cluster: +## Create a Synapse cluster Next, we need to create an Azure Synapse Workspace. You can learn more about Azure Synapse in the [documentation page](https://azure.microsoft.com/en-us/services/synapse-analytics/). In a nutshell, in a Synapse Workspace, you can get a managed Apache Spark Pool which you can stop at anytime, as well as an enterprise datawarehouse. @@ -145,7 +144,7 @@ The script below helps you to: - Create a Synapse Workspace - Create a Synapse Spark Pool -- Assign the right permssion to this Spark Pool +- Assign the right permission to this Spark Pool ```bash # Create Synapse Cluster @@ -197,7 +196,7 @@ This config is only for demo purpose. For production scenario, you should consid - using a Premium SKU with larger VM - enable multiple zones - use more replicas - Please refer to the article [Enable zone redundancy for Azure Cache for Redis](https://docs.microsoft.com/en-us/azure/azure-cache-for-redis/cache-how-to-zone-redundancy) for more information on enabling zone redundency. + Please refer to the article [Enable zone redundancy for Azure Cache for Redis](https://docs.microsoft.com/en-us/azure/azure-cache-for-redis/cache-how-to-zone-redundancy) for more information on enabling zone redundancy. You should also record the password which will be used later. diff --git a/docs/how-to-guides/deploy-feathr-api-as-webapp.md b/docs/how-to-guides/deploy-feathr-api-as-webapp.md index 0607fbe98..e0e1fda85 100644 --- a/docs/how-to-guides/deploy-feathr-api-as-webapp.md +++ b/docs/how-to-guides/deploy-feathr-api-as-webapp.md @@ -1,10 +1,10 @@ --- layout: default -title: Feathr API Deployment +title: Feathr REST API Deployment parent: How-to Guides --- -# Feathr API +# Feathr REST API The API currently supports following functionality @@ -21,7 +21,7 @@ The API currently supports following functionality You can install dependencies through the requirements file ```bash -$ pip install -r requirements.txt +pip install -r requirements.txt ``` ### Run diff --git a/docs/how-to-guides/feathr-udfs.md b/docs/how-to-guides/feathr-udfs.md index 036ebd94f..a28ae5aac 100644 --- a/docs/how-to-guides/feathr-udfs.md +++ b/docs/how-to-guides/feathr-udfs.md @@ -60,7 +60,7 @@ def add_new_dropoff_and_fare_amount_column(df: DataFrame): return df ``` -4. Currently, "chained" functions are not supported. I.e. the example below is not supported: +5. Currently, "chained" functions are not supported. I.e. the example below is not supported: ```python def multiply_100(input_val): @@ -126,7 +126,7 @@ def feathr_udf_filter_location_id(df: DataFrame) -> DataFrame: return sqlDF ``` -### UDF with Pandas Example: +### UDF with Pandas Example Feathr also supports using pandas to deal with the data. Behind the scene it's using pandas-on-spark so some limitation applies here. Please refer to [Pandas-on-Spark's Best Practice](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/best_practices.html#best-practices) for more details. @@ -174,7 +174,7 @@ derived_feature = DerivedFeature(name="f_trip_time_distance", feature = Feature(name="f_day_of_week", feature_type=INT32, - transform="dayofweek(lpep_dropoff_datetime)") + transform="dayofweek(lpep_dropoff_datetime)") ``` ## Illustration on How Feathr UDF works diff --git a/docs/quickstart_databricks.md b/docs/quickstart_databricks.md index ffd6e42a9..dff5b5f0f 100644 --- a/docs/quickstart_databricks.md +++ b/docs/quickstart_databricks.md @@ -17,7 +17,7 @@ For Databricks, you can simply upload [this notebook](./samples/databricks/datab 3. Run the whole notebook. It will automatically install Feathr in your cluster and run the feature ingestion jobs. -# Authoring Feathr jobs in local environment and submit to remote Databricks cluster +## Authoring Feathr jobs in local environment and submit to remote Databricks cluster Although Databricks Notebooks are great tools, there are also large developer communities that prefer the usage of Visual Studio Code, where [it has native support for Python and Jupyter Notebooks](https://code.visualstudio.com/docs/datascience/jupyter-notebooks) with many great features such as syntax highlight and IntelliSense. @@ -36,7 +36,7 @@ This is the only part you need to change to author the Feathr job in local envir ```python # Authoring Feathr jobs in local environment and submit to remote Databricks cluster host_name = 'https://.azuredatabricks.net/' -host_token = '' +host_token = '' ``` And that's it! Feathr will automatically submit the job to the cluster you specified. diff --git a/docs/quickstart_synapse.md b/docs/quickstart_synapse.md index 2ea3cfcbc..604e699fe 100644 --- a/docs/quickstart_synapse.md +++ b/docs/quickstart_synapse.md @@ -22,7 +22,7 @@ First step is to provision required cloud resources if you want to use Feathr. F Feathr has native cloud integration. Here are the steps to use Feathr on Azure: -1. Follow the [Feathr ARM deployment guide ](https://linkedin.github.io/feathr/how-to-guides/azure-deployment-arm.html) to run Feathr on Azure. This allows you to quickly get started with automated deployment using Azure Resource Manager template. Alternatively, if you want to set up everything manually, you can checkout the [Feathr CLI deployment guide](https://linkedin.github.io/feathr/how-to-guides/azure-deployment-cli.html) to run Feathr on Azure. This allows you to understand what is going on and set up one resource at a time. +1. Follow the [Feathr ARM deployment guide](https://linkedin.github.io/feathr/how-to-guides/azure-deployment-arm.html) to run Feathr on Azure. This allows you to quickly get started with automated deployment using Azure Resource Manager template. Alternatively, if you want to set up everything manually, you can checkout the [Feathr CLI deployment guide](https://linkedin.github.io/feathr/how-to-guides/azure-deployment-cli.html) to run Feathr on Azure. This allows you to understand what is going on and set up one resource at a time. 2. Once the deployment is complete,run the Feathr Jupyter Notebook by clicking the button below. You only need to change the specified `Resource Prefix`. @@ -43,6 +43,7 @@ pip install git+https://github.com/linkedin/feathr.git#subdirectory=feathr_proje ``` ## Step 3: Run the sample notebook + We've provided a self-contained [sample notebook](./samples/product_recommendation_demo.ipynb) to act as the main content of this getting started guide. This documentation should be used more like highlights and further explanations of that demo notebook. ## Step 4: Update Feathr config @@ -89,7 +90,7 @@ Or set this in python: os.environ['ONLINE_STORE__REDIS__HOST'] = 'feathrazure.redis.cache.windows.net' ``` -## Step 5: Setup environment variables. +## Step 5: Setup environment variables In the self-contained [sample notebook](./samples/product_recommendation_demo.ipynb), you also have to setup a few environment variables like below in order to access those cloud resources. You should be able to get those values from the first step. @@ -102,6 +103,7 @@ os.environ['AZURE_CLIENT_ID'] = '' os.environ['AZURE_TENANT_ID'] = '' os.environ['AZURE_CLIENT_SECRET'] = '' ``` + Please refer to [A note on using azure key vault to store credentials](https://github.com/linkedin/feathr/blob/41e7496b38c43af6d7f8f1de842f657b27840f6d/docs/how-to-guides/feathr-configuration-and-env.md#a-note-on-using-azure-key-vault-to-store-credentials) for more details. ## Step 6: Create features with Python APIs From 8edb70664565f836408d54b119c18e92a76b330b Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Mon, 1 Aug 2022 01:39:12 -0700 Subject: [PATCH 23/26] fix typos and test failures --- feathr_project/feathr/utils/job_utils.py | 2 +- feathr_project/test/test_azure_spark_e2e.py | 2 +- feathr_project/test/test_azure_spark_maven_e2e.py | 11 ++++++----- feathr_project/test/test_fixture.py | 6 ++++-- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index 5c815afe7..6a6bd63c0 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -24,7 +24,7 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, # use user provided format, if there isn't one, then otherwise use the one provided by the job; # if none of them is available, "avro" is the default format. format: str = format or client.get_job_tags().get(OUTPUT_FORMAT, "") - if format is None or format is "": + if format is None or format == "": format = "avro" # if local_folder params is not provided then create a temporary folder diff --git a/feathr_project/test/test_azure_spark_e2e.py b/feathr_project/test/test_azure_spark_e2e.py index aaeaafa5a..9252a8e29 100644 --- a/feathr_project/test/test_azure_spark_e2e.py +++ b/feathr_project/test/test_azure_spark_e2e.py @@ -59,7 +59,7 @@ def test_feathr_online_store_agg_features(): Test FeathrClient() get_online_features and batch_get can get data correctly. """ - online_test_table = get_online_test_table_name("nycTaxiCITable") + online_test_table = get_online_test_table_name("nycTaxiCITableSparkE2E") test_workspace_dir = Path( __file__).parent.resolve() / "test_user_workspace" # os.chdir(test_workspace_dir) diff --git a/feathr_project/test/test_azure_spark_maven_e2e.py b/feathr_project/test/test_azure_spark_maven_e2e.py index d37e8ba04..4a4fb7cf1 100644 --- a/feathr_project/test/test_azure_spark_maven_e2e.py +++ b/feathr_project/test/test_azure_spark_maven_e2e.py @@ -4,6 +4,7 @@ from feathr import (BackfillTime, MaterializationSettings) from feathr import RedisSink +from feathr.client import FeathrClient from test_fixture import (basic_test_setup, get_online_test_table_name) from test_utils.constants import Constants @@ -12,14 +13,14 @@ def test_feathr_online_store_agg_features(): Test FeathrClient() get_online_features and batch_get can get data correctly. """ - online_test_table = get_online_test_table_name("nycTaxiCITable") + online_test_table = get_online_test_table_name("nycTaxiCITableMaven") test_workspace_dir = Path( __file__).parent.resolve() / "test_user_workspace" # os.chdir(test_workspace_dir) # The `feathr_runtime_location` was commented out in this config file, so feathr should use # Maven package as the dependency and `noop.jar` as the main file - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config_maven.yaml")) + client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config_maven.yaml")) backfill_time = BackfillTime(start=datetime( 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) @@ -36,11 +37,11 @@ def test_feathr_online_store_agg_features(): res = client.get_online_features(online_test_table, '265', [ 'f_location_avg_fare', 'f_location_max_fare']) - # just assme there are values. We don't hard code the values for now for testing - # the correctness of the feature generation should be garunteed by feathr runtime. + # just assume there are values. We don't hard code the values for now for testing + # the correctness of the feature generation should be guaranteed by feathr runtime. # ID 239 and 265 are available in the `DOLocationID` column in this file: # https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2020-04.csv - # View more detials on this dataset: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page + # View more details on this dataset: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page assert len(res) == 2 assert res[0] != None assert res[1] != None diff --git a/feathr_project/test/test_fixture.py b/feathr_project/test/test_fixture.py index a0a616a05..6fd76800e 100644 --- a/feathr_project/test/test_fixture.py +++ b/feathr_project/test/test_fixture.py @@ -196,6 +196,7 @@ def registry_test_setup_append(config_path: str): def generate_entities(): def add_new_dropoff_and_fare_amount_column(df: DataFrame): + from pyspark.sql.functions import col df = df.withColumn("new_lpep_dropoff_datetime", col("lpep_dropoff_datetime")) df = df.withColumn("new_fare_amount", col("fare_amount") + 1000000) return df @@ -269,7 +270,7 @@ def add_new_dropoff_and_fare_amount_column(df: DataFrame): derived_feature_list = [ f_trip_time_distance, f_trip_time_rounded, f_trip_time_rounded_plus] - # shuffule the order to make sure they can be parsed correctly + # shuffle the order to make sure they can be parsed correctly # Those input derived features can be in arbitrary order, but in order to parse the right dependencies, we need to reorder them internally in a certain order. # This shuffle is to make sure that each time we have random shuffle for the input and make sure the internal sorting algorithm works (we are using topological sort). random.shuffle(derived_feature_list) @@ -285,6 +286,7 @@ def registry_test_setup_append(config_path: str): client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"}) def add_new_dropoff_and_fare_amount_column(df: DataFrame): + from pyspark.sql.functions import col df = df.withColumn("new_lpep_dropoff_datetime", col("lpep_dropoff_datetime")) df = df.withColumn("new_fare_amount", col("fare_amount") + 1000000) return df @@ -364,7 +366,7 @@ def add_new_dropoff_and_fare_amount_column(df: DataFrame): derived_feature_list = [ f_trip_time_distance, f_trip_time_rounded, f_trip_time_rounded_plus] - # shuffule the order to make sure they can be parsed correctly + # shuffle the order to make sure they can be parsed correctly # Those input derived features can be in arbitrary order, but in order to parse the right dependencies, we need to reorder them internally in a certain order. # This shuffle is to make sure that each time we have random shuffle for the input and make sure the internal sorting algorithm works (we are using topological sort). random.shuffle(derived_feature_list) From 5a97bcdd16ef547f815dff8eb2ca760f4c077c3d Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Mon, 1 Aug 2022 07:03:07 -0700 Subject: [PATCH 24/26] update test names --- feathr_project/test/test_azure_feature_monitoring_e2e.py | 4 ++-- feathr_project/test/test_azure_spark_e2e.py | 4 +--- feathr_project/test/test_azure_spark_maven_e2e.py | 2 +- feathr_project/test/test_feature_materialization.py | 9 +++++---- feathr_project/test/test_pyduf_preprocessing_e2e.py | 7 ++++--- feathr_project/test/test_sql_source.py | 1 - 6 files changed, 13 insertions(+), 14 deletions(-) diff --git a/feathr_project/test/test_azure_feature_monitoring_e2e.py b/feathr_project/test/test_azure_feature_monitoring_e2e.py index ebbc40cf1..08a75ae32 100644 --- a/feathr_project/test/test_azure_feature_monitoring_e2e.py +++ b/feathr_project/test/test_azure_feature_monitoring_e2e.py @@ -8,13 +8,13 @@ def test_feature_monitoring(): - online_test_table = get_online_test_table_name("nycTaxiCITable") + monitor_sink_table = get_online_test_table_name("nycTaxiCITableMonitoring") test_workspace_dir = Path( __file__).parent.resolve() / "test_user_workspace" client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) - monitor_sink = MonitoringSqlSink(table_name=online_test_table) + monitor_sink = MonitoringSqlSink(table_name=monitor_sink_table) settings = MonitoringSettings("monitoringSetting", sinks=[monitor_sink], feature_names=[ diff --git a/feathr_project/test/test_azure_spark_e2e.py b/feathr_project/test/test_azure_spark_e2e.py index 9252a8e29..3914a0bf5 100644 --- a/feathr_project/test/test_azure_spark_e2e.py +++ b/feathr_project/test/test_azure_spark_e2e.py @@ -23,8 +23,6 @@ def test_feathr_materialize_to_offline(): """ Test FeathrClient() HdfsSink. """ - - online_test_table = get_online_test_table_name("nycTaxiCITable") test_workspace_dir = Path( __file__).parent.resolve() / "test_user_workspace" # os.chdir(test_workspace_dir) @@ -106,7 +104,7 @@ def test_feathr_online_store_non_agg_features(): __file__).parent.resolve() / "test_user_workspace" client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) - online_test_table = get_online_test_table_name('nycTaxiCITable') + online_test_table = get_online_test_table_name('nycTaxiCITableNonAggFeature') backfill_time = BackfillTime(start=datetime( 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) diff --git a/feathr_project/test/test_azure_spark_maven_e2e.py b/feathr_project/test/test_azure_spark_maven_e2e.py index 4a4fb7cf1..b8e7cefb0 100644 --- a/feathr_project/test/test_azure_spark_maven_e2e.py +++ b/feathr_project/test/test_azure_spark_maven_e2e.py @@ -25,7 +25,7 @@ def test_feathr_online_store_agg_features(): backfill_time = BackfillTime(start=datetime( 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) - settings = MaterializationSettings("nycTaxiTable", + settings = MaterializationSettings("TestJobName", sinks=[redisSink], feature_names=[ "f_location_avg_fare", "f_location_max_fare"], diff --git a/feathr_project/test/test_feature_materialization.py b/feathr_project/test/test_feature_materialization.py index 9a325b81d..c0545d1cd 100644 --- a/feathr_project/test/test_feature_materialization.py +++ b/feathr_project/test/test_feature_materialization.py @@ -1,6 +1,7 @@ import os from datetime import datetime, timedelta from pathlib import Path +from feathr.client import FeathrClient from pyspark.sql import DataFrame from pyspark.sql.functions import col @@ -161,10 +162,10 @@ def test_get_offline_features_verbose(): ) def test_materialize_features_verbose(): - online_test_table = get_online_test_table_name("nycTaxiCITable") + online_test_table = get_online_test_table_name("nycTaxiCITableMaterializeVerbose") test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) settings = MaterializationSettings("nycTaxiTable", @@ -186,7 +187,7 @@ def test_delete_feature_from_redis(): test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) batch_source = HdfsSource(name="nycTaxiBatchSource_add_new_fare_amount", path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", @@ -217,7 +218,7 @@ def test_delete_feature_from_redis(): client.build_features(anchor_list=[regular_anchor]) - online_test_table = get_online_test_table_name('nycTaxiCITable') + online_test_table = get_online_test_table_name('nycTaxiCITableDeletion') backfill_time = BackfillTime(start=datetime( 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) diff --git a/feathr_project/test/test_pyduf_preprocessing_e2e.py b/feathr_project/test/test_pyduf_preprocessing_e2e.py index da79a00d8..9ac9c1917 100644 --- a/feathr_project/test/test_pyduf_preprocessing_e2e.py +++ b/feathr_project/test/test_pyduf_preprocessing_e2e.py @@ -1,6 +1,7 @@ import os from datetime import datetime, timedelta from pathlib import Path +from feathr.client import FeathrClient from pyspark.sql import DataFrame from pyspark.sql.functions import col @@ -59,7 +60,7 @@ def test_non_swa_feature_gen_with_offline_preprocessing(): """ test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client:FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) batch_source = HdfsSource(name="nycTaxiBatchSource_add_new_fare_amount", path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", @@ -90,7 +91,7 @@ def test_non_swa_feature_gen_with_offline_preprocessing(): client.build_features(anchor_list=[regular_anchor]) - online_test_table = get_online_test_table_name('nycTaxiCITable') + online_test_table = get_online_test_table_name('nycTaxiCITableOfflineProcessing') backfill_time = BackfillTime(start=datetime( 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) @@ -151,7 +152,7 @@ def test_feature_swa_feature_gen_with_preprocessing(): client.build_features(anchor_list=[agg_anchor]) - online_test_table = get_online_test_table_name('nycTaxiCITable') + online_test_table = get_online_test_table_name('nycTaxiCITableSWAFeatureMaterialization') backfill_time = BackfillTime(start=datetime( 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) diff --git a/feathr_project/test/test_sql_source.py b/feathr_project/test/test_sql_source.py index 97098fa3f..b584f2667 100644 --- a/feathr_project/test/test_sql_source.py +++ b/feathr_project/test/test_sql_source.py @@ -100,7 +100,6 @@ def test_feathr_get_offline_features(): These 2 variables will be passed to the Spark job in `--system-properties` parameter so Spark can access the database """ - online_test_table = get_online_test_table_name("nycTaxiCITable") test_workspace_dir = Path( __file__).parent.resolve() / "test_user_workspace" # os.chdir(test_workspace_dir) From 2f0066710940dfadee801cdcc634da13e1421d5d Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Mon, 1 Aug 2022 08:02:16 -0700 Subject: [PATCH 25/26] Update test_fixture.py --- feathr_project/test/test_fixture.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/feathr_project/test/test_fixture.py b/feathr_project/test/test_fixture.py index 6fd76800e..d7eb738f4 100644 --- a/feathr_project/test/test_fixture.py +++ b/feathr_project/test/test_fixture.py @@ -376,4 +376,6 @@ def add_new_dropoff_and_fare_amount_column(df: DataFrame): def get_online_test_table_name(table_name: str): # use different time for testing to avoid write conflicts now = datetime.now() - return '_'.join([table_name, str(now.minute), str(now.second)]) \ No newline at end of file + res_table = '_'.join([table_name, str(now.minute), str(now.second)]) + print("The online Redis table is", res_table) + return res_table \ No newline at end of file From e0c7427546a025cf86b4fcf77a8edf295d7ecbec Mon Sep 17 00:00:00 2001 From: Xiaoyong Zhu Date: Mon, 1 Aug 2022 08:57:29 -0700 Subject: [PATCH 26/26] Update test_fixture.py --- feathr_project/test/test_fixture.py | 1 - 1 file changed, 1 deletion(-) diff --git a/feathr_project/test/test_fixture.py b/feathr_project/test/test_fixture.py index d7eb738f4..af088b65d 100644 --- a/feathr_project/test/test_fixture.py +++ b/feathr_project/test/test_fixture.py @@ -69,7 +69,6 @@ def basic_test_setup(config_path: str): transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", agg_func="AVG", window="90d", - filter="fare_amount > 0", )), Feature(name="f_location_max_fare", key=location_id,