-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Contrib azure provider with synapse/mssql offline store and Azure registry store #3072
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit
Hold shift + click to select a range
e1e210d
Broken state
kevjumba 011d1e0
working state
kevjumba a6a2fce
Fix the lint issues
kevjumba 57b63bb
Semi working state
kevjumba ae7ed8a
Fix
kevjumba 421645b
Fremove print
kevjumba 07fece5
Fix lint
kevjumba 4062031
Run build-sphinx
kevjumba cb39329
Add tutorials
kevjumba 554ca1a
Fix
kevjumba 4a969e7
Fix?
kevjumba 116320a
Fix lint
kevjumba c0b16ef
Fix
kevjumba 44d09d0
Fix lint
kevjumba b6f0a79
Begin configuring tests
adchia 2b2ff40
Fix
kevjumba 4616366
Working version
kevjumba c7d9852
Fix
kevjumba d2e290b
Fix
kevjumba a726a9a
Fix
kevjumba 32992e3
Fix lint
kevjumba ebb934b
Fix lint
kevjumba e456acb
Fix
kevjumba 45f479f
Fix lint
kevjumba 4b8c4a2
Fix
kevjumba b1bf602
Fix
kevjumba 4586f00
Fix azure
kevjumba 3b88c0b
Fix
kevjumba 9ae8ee3
Fix
kevjumba 1b12e4a
Fix lint and address issues
kevjumba 0ca5048
Fix integration tests
kevjumba 883f314
Fix
kevjumba ccf8716
Fix lint and address issues
kevjumba f05288e
Fix
kevjumba ee30e73
Fix
kevjumba ab17db9
Fix
kevjumba be162f5
Revert
kevjumba f5aa476
Fix
kevjumba 4423dfa
Fix
kevjumba 5806507
Fix
kevjumba 7a4d055
Fix lint
kevjumba 78b74b1
Fix
kevjumba a9e8119
Fix lint
kevjumba 1341e3e
Fix pyarrow
kevjumba 3d42093
Fix lint
kevjumba 1c591f0
add requirements files
adchia b4da607
fix name of docs
adchia c3a0423
fix offline store readme
adchia 576b57e
fix offline store readme
adchia 69940ac
fix
adchia 516ff76
fix
adchia File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add tutorials
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
- Loading branch information
commit cb393293839b27cbe7ab7a15686246686206486e
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| # Getting started with Feast on Azure | ||
|
|
||
| The objective of this tutorial is to build a model that predicts if a driver will complete a trip based on a number of features ingested into Feast. During this tutorial you will: | ||
|
|
||
| 1. Deploy the infrastructure for a feature store (using an ARM template) | ||
| 1. Register features into a central feature registry hosted on Blob Storage | ||
| 1. Consume features from the feature store for training and inference | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| For this tutorial you will require: | ||
|
|
||
| 1. An Azure subscription. | ||
| 1. Working knowledge of Python and ML concepts. | ||
| 1. Basic understanding of Azure Machine Learning - using notebooks, etc. | ||
|
|
||
| ## 1. Deploy Infrastructure | ||
|
|
||
| We have created an ARM template that deploys and configures all the infrastructure required to run feast in Azure. This makes the set-up very simple - select the **Deploy to Azure** button below. | ||
|
|
||
| The only 2 required parameters during the set-up are: | ||
|
|
||
| - **Admin Password** for the the Dedicated SQL Pool being deployed. | ||
| - **Principal ID** this is to set the storage permissions for the feast registry store. You can find the value for this by opening **Cloud Shell** and run the following command: | ||
|
|
||
| ```bash | ||
| # If you are using Azure portal CLI or Azure CLI 2.37.0 or above | ||
| az ad signed-in-user show --query id -o tsv | ||
|
|
||
| # If you are using Azure CLI below 2.37.0 | ||
| az ad signed-in-user show --query objectId -o tsv | ||
| ``` | ||
|
|
||
| > You may want to first make sure your subscription has registered `Microsoft.Synapse`, `Microsoft.SQL`, `Microsoft.Network` and `Microsoft.Compute` providers before running the template below, as some of them may require explicit registration. If you are on a Free Subscription, you will not be able to deploy the workspace part of this tutorial. | ||
|
|
||
| [](https://portal.azure.com/#create/Microsoft.Template/uri/https%3A%2F%2Fraw.githubusercontent.com%2Ffeast-dev%2Ffeast%2Fmaster%2Fdocs%2Ftutorials%2Fazure%2Fdeployment%2Ffs_synapse_azuredeploy.json) | ||
|
|
||
|  | ||
|
|
||
| The ARM template will not only deploy the infrastructure but it will also: | ||
|
|
||
| - install feast with the azure provider on the compute instance | ||
| - set the Registry Blob path, Dedicated SQL Pool and Redis cache connection strings in the Azure ML default Keyvault. | ||
|
|
||
| > **☕ It can take up to 20 minutes for the Redis cache to be provisioned.** | ||
|
|
||
| ## 2. Git clone this repo to your compute instance | ||
|
|
||
| In the [Azure Machine Learning Studio](https://ml.azure.com), navigate to the left-hand menu and select **Compute**. You should see your compute instance running, select **Terminal** | ||
|
|
||
|  | ||
|
|
||
| In the terminal you need to clone this GitHub repo: | ||
|
|
||
| ```bash | ||
| git clone https://github.com/feast-dev/feast | ||
| ``` | ||
|
|
||
| ### 3. Load feature values into Feature Store | ||
|
|
||
| In the Azure ML Studio, select *Notebooks* from the left-hand menu and then open the [Loading feature values into feature store notebook](./notebooks/part1-load-data.ipynb).Work through this notebook. | ||
|
|
||
| > __💁Ensure the Jupyter kernel is set to Python 3.8 - AzureML__ | ||
|
|
||
|  | ||
|
|
||
|
|
||
| ## 4. Register features in Feature store | ||
|
|
||
| In the Azure ML Studio, select *Notebooks* from the left-hand menu and then open the [register features into your feature registry notebook](notebooks/part2-register-features.ipynb). Work through this notebook. | ||
|
|
||
| > __💁Ensure the Jupyter kernel is set to Python 3.8 - AzureML__ | ||
|
|
||
| ## 5.Train and Deploy a model using the Feature Store | ||
|
|
||
| In the Azure ML Studio, select *Notebooks* from the left-hand menu and then open the [train and deploy a model using feast notebook](notebooks/part3-train-and-deploy-with-feast.ipynb). Work through this notebook. | ||
|
|
||
| > __💁Ensure the Jupyter kernel is set to Python 3.8 - AzureML__ | ||
| > | ||
| > If problems are encountered during model training stage, create a new cell and rexecute `!pip install scikit-learn==0.22.1`. Upon completion, restart the Kernel and start over. | ||
|
|
||
| ## 6. Running Feast Azure Tutorials locally without Azure workspace | ||
|
|
||
| * If you are on a free tier instance, you will not be able to deploy the azure deployment because the azure workspace requires VCPUs and the free trial subscription does not have a quota. | ||
| * The workaround is to remove the `Microsoft.MachineLearningServices/workspaces/computes` resource from `fs_snapse_azure_deploy.json` and setting up the environment locally. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo? assuming should be synapse
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for the catch! fixed |
||
| 1. After deployment, find your `Azure SQL Pool` secrets by going to `Subscriptions-><Your Subscription>->Resource Group->Key Vault` and giving your account admin permissions to the keyvault. Retrieve the `FEAST-REGISTRY-PATH`, `FEAST-OFFLINE-STORE-CONN`, and `FEAST-ONLINE-STORE-CONN` secrets to use in your local environment. | ||
| 2. In your local environment, you will need to install the azure cli and login to the cli using `az login`. | ||
| 3. After everything is setup, you should be able to work through the first 2 tutorial notebooks without any errors (The 3rd notebook requires Azure workspace resources). | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,260 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT license. | ||
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
| from datetime import datetime, timedelta | ||
| from pytz import FixedOffset, timezone, utc | ||
| from random import randint | ||
| from enum import Enum | ||
| from sqlalchemy import create_engine, DateTime | ||
| from datetime import datetime | ||
|
|
||
| DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp" | ||
|
|
||
|
|
||
| class EventTimestampType(Enum): | ||
| TZ_NAIVE = 0 | ||
| TZ_AWARE_UTC = 1 | ||
| TZ_AWARE_FIXED_OFFSET = 2 | ||
| TZ_AWARE_US_PACIFIC = 3 | ||
|
|
||
|
|
||
| def _convert_event_timestamp(event_timestamp: pd.Timestamp, t: EventTimestampType): | ||
| if t == EventTimestampType.TZ_NAIVE: | ||
| return event_timestamp | ||
| elif t == EventTimestampType.TZ_AWARE_UTC: | ||
| return event_timestamp.replace(tzinfo=utc) | ||
| elif t == EventTimestampType.TZ_AWARE_FIXED_OFFSET: | ||
| return event_timestamp.replace(tzinfo=utc).astimezone(FixedOffset(60)) | ||
| elif t == EventTimestampType.TZ_AWARE_US_PACIFIC: | ||
| return event_timestamp.replace(tzinfo=utc).astimezone(timezone("US/Pacific")) | ||
|
|
||
|
|
||
| def create_orders_df( | ||
| customers, | ||
| drivers, | ||
| start_date, | ||
| end_date, | ||
| order_count, | ||
| infer_event_timestamp_col=False, | ||
| ) -> pd.DataFrame: | ||
| """ | ||
| Example df generated by this function: | ||
| | order_id | driver_id | customer_id | order_is_success | event_timestamp | | ||
| +----------+-----------+-------------+------------------+---------------------+ | ||
| | 100 | 5004 | 1007 | 0 | 2021-03-10 19:31:15 | | ||
| | 101 | 5003 | 1006 | 0 | 2021-03-11 22:02:50 | | ||
| | 102 | 5010 | 1005 | 0 | 2021-03-13 00:34:24 | | ||
| | 103 | 5010 | 1001 | 1 | 2021-03-14 03:05:59 | | ||
| """ | ||
| df = pd.DataFrame() | ||
| df["order_id"] = [order_id for order_id in range(100, 100 + order_count)] | ||
| df["driver_id"] = np.random.choice(drivers, order_count) | ||
| df["customer_id"] = np.random.choice(customers, order_count) | ||
| df["order_is_success"] = np.random.randint(0, 2, size=order_count).astype(np.int32) | ||
|
|
||
| if infer_event_timestamp_col: | ||
| df["e_ts"] = [ | ||
| _convert_event_timestamp( | ||
| pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"), | ||
| EventTimestampType(3), | ||
| ) | ||
| for idx, dt in enumerate( | ||
| pd.date_range(start=start_date, end=end_date, periods=order_count) | ||
| ) | ||
| ] | ||
| df.sort_values( | ||
| by=["e_ts", "order_id", "driver_id", "customer_id"], inplace=True, | ||
| ) | ||
| else: | ||
| df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [ | ||
| _convert_event_timestamp( | ||
| pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"), | ||
| EventTimestampType(idx % 4), | ||
| ) | ||
| for idx, dt in enumerate( | ||
| pd.date_range(start=start_date, end=end_date, periods=order_count) | ||
| ) | ||
| ] | ||
| df.sort_values( | ||
| by=[ | ||
| DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, | ||
| "order_id", | ||
| "driver_id", | ||
| "customer_id", | ||
| ], | ||
| inplace=True, | ||
| ) | ||
| return df | ||
|
|
||
|
|
||
| def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame: | ||
| """ | ||
| Example df generated by this function: | ||
| | datetime | driver_id | conv_rate | acc_rate | avg_daily_trips | created | | ||
| |------------------+-----------+-----------+----------+-----------------+------------------| | ||
| | 2021-03-17 19:31 | 5010 | 0.229297 | 0.685843 | 861 | 2021-03-24 19:34 | | ||
| | 2021-03-17 20:31 | 5010 | 0.781655 | 0.861280 | 769 | 2021-03-24 19:34 | | ||
| | 2021-03-17 21:31 | 5010 | 0.150333 | 0.525581 | 778 | 2021-03-24 19:34 | | ||
| | 2021-03-17 22:31 | 5010 | 0.951701 | 0.228883 | 570 | 2021-03-24 19:34 | | ||
| | 2021-03-17 23:31 | 5010 | 0.819598 | 0.262503 | 473 | 2021-03-24 19:34 | | ||
| | | ... | ... | ... | ... | | | ||
| | 2021-03-24 16:31 | 5001 | 0.061585 | 0.658140 | 477 | 2021-03-24 19:34 | | ||
| | 2021-03-24 17:31 | 5001 | 0.088949 | 0.303897 | 618 | 2021-03-24 19:34 | | ||
| | 2021-03-24 18:31 | 5001 | 0.096652 | 0.747421 | 480 | 2021-03-24 19:34 | | ||
| | 2021-03-17 19:31 | 5005 | 0.142936 | 0.707596 | 466 | 2021-03-24 19:34 | | ||
| | 2021-03-17 19:31 | 5005 | 0.142936 | 0.707596 | 466 | 2021-03-24 19:34 | | ||
| """ | ||
| df_hourly = pd.DataFrame( | ||
| { | ||
| "datetime": [ | ||
| pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") | ||
| for dt in pd.date_range( | ||
| start=start_date, end=end_date, freq="1H", closed="left" | ||
| ) | ||
| ] | ||
| # include a fixed timestamp for get_historical_features in the quickstart | ||
| # + [ | ||
| # pd.Timestamp( | ||
| # year=2021, month=4, day=12, hour=7, minute=0, second=0, tz="UTC" | ||
| # ) | ||
| # ] | ||
| } | ||
| ) | ||
| df_all_drivers = pd.DataFrame() | ||
| dates = df_hourly["datetime"].map(pd.Timestamp.date).unique() | ||
|
|
||
| for driver in drivers: | ||
| df_hourly_copy = df_hourly.copy() | ||
| df_hourly_copy["driver_id"] = driver | ||
| for date in dates: | ||
| df_hourly_copy.loc[ | ||
| df_hourly_copy["datetime"].map(pd.Timestamp.date) == date, | ||
| "avg_daily_trips", | ||
| ] = randint(10, 30) | ||
| df_all_drivers = pd.concat([df_hourly_copy, df_all_drivers]) | ||
|
|
||
| df_all_drivers.reset_index(drop=True, inplace=True) | ||
| rows = df_all_drivers["datetime"].count() | ||
|
|
||
| df_all_drivers["conv_rate"] = np.random.random(size=rows).astype(np.float32) | ||
| df_all_drivers["acc_rate"] = np.random.random(size=rows).astype(np.float32) | ||
|
|
||
| df_all_drivers["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) | ||
|
|
||
| # Create duplicate rows that should be filtered by created timestamp | ||
| # TODO: These duplicate rows area indirectly being filtered out by the point in time join already. We need to | ||
| # inject a bad row at a timestamp where we know it will get joined to the entity dataframe, and then test that | ||
| # we are actually filtering it with the created timestamp | ||
| late_row = df_all_drivers.iloc[int(rows / 2)] | ||
| df_all_drivers = df_all_drivers.append(late_row).append(late_row) | ||
|
|
||
| return df_all_drivers | ||
|
|
||
|
|
||
| def create_customer_daily_profile_df(customers, start_date, end_date) -> pd.DataFrame: | ||
| """ | ||
| Example df generated by this function: | ||
| | datetime | customer_id | current_balance | avg_passenger_count | lifetime_trip_count | created | | ||
| |------------------+-------------+-----------------+---------------------+---------------------+------------------| | ||
| | 2021-03-17 19:31 | 1010 | 0.889188 | 0.049057 | 412 | 2021-03-24 19:38 | | ||
| | 2021-03-18 19:31 | 1010 | 0.979273 | 0.212630 | 639 | 2021-03-24 19:38 | | ||
| | 2021-03-19 19:31 | 1010 | 0.976549 | 0.176881 | 70 | 2021-03-24 19:38 | | ||
| | 2021-03-20 19:31 | 1010 | 0.273697 | 0.325012 | 68 | 2021-03-24 19:38 | | ||
| | 2021-03-21 19:31 | 1010 | 0.438262 | 0.313009 | 192 | 2021-03-24 19:38 | | ||
| | | ... | ... | ... | ... | | | ||
| | 2021-03-19 19:31 | 1001 | 0.738860 | 0.857422 | 344 | 2021-03-24 19:38 | | ||
| | 2021-03-20 19:31 | 1001 | 0.848397 | 0.745989 | 106 | 2021-03-24 19:38 | | ||
| | 2021-03-21 19:31 | 1001 | 0.301552 | 0.185873 | 812 | 2021-03-24 19:38 | | ||
| | 2021-03-22 19:31 | 1001 | 0.943030 | 0.561219 | 322 | 2021-03-24 19:38 | | ||
| | 2021-03-23 19:31 | 1001 | 0.354919 | 0.810093 | 273 | 2021-03-24 19:38 | | ||
| """ | ||
| df_daily = pd.DataFrame( | ||
| { | ||
| "datetime": [ | ||
| pd.Timestamp(dt, unit="ms", tz="UTC").round("ms") | ||
| for dt in pd.date_range( | ||
| start=start_date, end=end_date, freq="1D", closed="left" | ||
| ) | ||
| ] | ||
| } | ||
| ) | ||
| df_all_customers = pd.DataFrame() | ||
|
|
||
| for customer in customers: | ||
| df_daily_copy = df_daily.copy() | ||
| rows = df_daily_copy["datetime"].count() | ||
| df_daily_copy["customer_id"] = customer | ||
| df_daily_copy["current_balance"] = np.random.uniform( | ||
| low=10.0, high=50.0, size=rows | ||
| ).astype(np.float32) | ||
| df_daily_copy["lifetime_trip_count"] = np.linspace( | ||
| start=randint(10, 20), stop=randint(40, 50), num=rows | ||
| ).astype(np.int32) | ||
| df_daily_copy["avg_passenger_count"] = np.random.uniform( | ||
| low=1, high=3, size=rows | ||
| ).astype(np.float32) | ||
| df_all_customers = pd.concat([df_daily_copy, df_all_customers]) | ||
|
|
||
| df_all_customers.reset_index(drop=True, inplace=True) | ||
|
|
||
| rows = df_all_customers["datetime"].count() | ||
|
|
||
| # TODO: Remove created timestamp in order to test whether its really optional | ||
| df_all_customers["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) | ||
| return df_all_customers | ||
|
|
||
|
|
||
| def generate_entities(date, n_customers, n_drivers, order_count): | ||
| end_date = date | ||
| before_start_date = end_date - timedelta(days=365) | ||
| start_date = end_date - timedelta(days=7) | ||
| after_end_date = end_date + timedelta(days=365) | ||
| customer_entities = [20000 + c_id for c_id in range(n_customers)] | ||
| driver_entities = [50000 + d_id for d_id in range(n_drivers)] | ||
| orders_df = create_orders_df( | ||
| customers=customer_entities, | ||
| drivers=driver_entities, | ||
| start_date=start_date, | ||
| end_date=end_date, | ||
| order_count=order_count, | ||
| infer_event_timestamp_col=False, | ||
| ) | ||
| return customer_entities, driver_entities, end_date, orders_df, start_date | ||
|
|
||
|
|
||
| def save_df_to_csv(df, table_name, dtype): | ||
| df.to_csv(table_name+".csv", index=False) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| start_date = datetime.now().replace(microsecond=0, second=0, minute=0) | ||
| ( | ||
| customer_entities, | ||
| driver_entities, | ||
| end_date, | ||
| orders_df, | ||
| start_date, | ||
| ) = generate_entities(start_date, 1000, 1000, 20000) | ||
|
|
||
| customer_df = create_customer_daily_profile_df( | ||
| customer_entities, start_date, end_date | ||
| ) | ||
| print(customer_df.head()) | ||
|
|
||
| drivers_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) | ||
|
|
||
| print(drivers_df.head()) | ||
|
|
||
|
|
||
| orders_table = "orders" | ||
| driver_hourly_table = "driver_hourly" | ||
| customer_profile_table = "customer_profile" | ||
|
|
||
| print("uploading orders") | ||
| save_df_to_csv(orders_df, orders_table, dtype={"event_timestamp": DateTime()}) | ||
| print("uploading drivers") | ||
| save_df_to_csv(drivers_df, driver_hourly_table, dtype={"datetime": DateTime()}) | ||
| print("uploading customers") | ||
| save_df_to_csv(customer_df, customer_profile_table, dtype={"datetime": DateTime()}) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these numbers seem wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed