|
| 1 | +<h1>Module 3: Scheduled batch transformations (Snowflake)</h1> |
| 2 | + |
| 3 | +This is a very similar module to module 1. The key difference is now we'll be using a data warehouse (Snowflake) in combination with dbt + Airflow to ensure that batch features are regularly generated. |
| 4 | + |
| 5 | +- **Data sources**: Snowflake |
| 6 | +- **Online store**: Redis |
| 7 | +- **Use case**: Predicting churn for drivers in real time. |
| 8 | + |
| 9 | +<h2>Table of Contents</h2> |
| 10 | + |
| 11 | +- [Workshop](#workshop) |
| 12 | + - [Step 1: Install Feast](#step-1-install-feast) |
| 13 | + - [Step 2: Inspect the `feature_store.yaml`](#step-2-inspect-the-feature_storeyaml) |
| 14 | + - [Step 4: Spin up services](#step-4-spin-up-services) |
| 15 | + - [Step 4a: Kafka + Redis + Feast SQL Registry + Feast services](#step-4a-kafka--redis--feast-sql-registry--feast-services) |
| 16 | + - [Step 4b: Setting up Airflow + dbt](#step-4b-setting-up-airflow--dbt) |
| 17 | + - [Step 4c: Examine the Airflow DAG](#step-4c-examine-the-airflow-dag) |
| 18 | + - [Q: What if different feature views have different freshness requirements?](#q-what-if-different-feature-views-have-different-freshness-requirements) |
| 19 | + - [Step 4d: Enable the Airflow DAG](#step-4d-enable-the-airflow-dag) |
| 20 | + - [Step 4e (optional): Run a backfill](#step-4e-optional-run-a-backfill) |
| 21 | + - [Step 5: Streaming](#step-5-streaming) |
| 22 | +- [Conclusion](#conclusion) |
| 23 | +- [FAQ](#faq) |
| 24 | + - [How do you synchronize materialized features with pushed features from streaming?](#how-do-you-synchronize-materialized-features-with-pushed-features-from-streaming) |
| 25 | + - [Does Feast allow pushing features to the offline store?](#does-feast-allow-pushing-features-to-the-offline-store) |
| 26 | + - [Can feature / push servers refresh their registry in response to an event? e.g. after a PR merges and `feast apply` is run?](#can-feature--push-servers-refresh-their-registry-in-response-to-an-event-eg-after-a-pr-merges-and-feast-apply-is-run) |
| 27 | + |
| 28 | +# Workshop |
| 29 | +## Step 1: Install Feast |
| 30 | + |
| 31 | +First, we install Feast with Spark and Postgres and Redis support: |
| 32 | +```bash |
| 33 | +pip install "feast[snowflake,postgres,redis]" |
| 34 | +``` |
| 35 | + |
| 36 | +## Step 2: Inspect the `feature_store.yaml` |
| 37 | + |
| 38 | +```yaml |
| 39 | +project: feast_demo_local |
| 40 | +provider: local |
| 41 | +registry: |
| 42 | + registry_type: sql |
| 43 | + path: postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast |
| 44 | +online_store: |
| 45 | + type: redis |
| 46 | + connection_string: localhost:6379 |
| 47 | +offline_store: |
| 48 | + type: snowflake.offline |
| 49 | + account: ${SNOWFLAKE_DEPLOYMENT_URL} |
| 50 | + user: ${SNOWFLAKE_USER} |
| 51 | + password: ${SNOWFLAKE_PASSWORD} |
| 52 | + role: ${SNOWFLAKE_ROLE} |
| 53 | + warehouse: ${SNOWFLAKE_WAREHOUSE} |
| 54 | + database: TECTON_DEMO_DATA |
| 55 | + schema: FRAUD |
| 56 | +entity_key_serialization_version: 2 |
| 57 | +``` |
| 58 | +
|
| 59 | +Now we're using a test database in Snowflake. |
| 60 | +
|
| 61 | +## Step 4: Spin up services |
| 62 | +
|
| 63 | +### Step 4a: Kafka + Redis + Feast SQL Registry + Feast services |
| 64 | +
|
| 65 | +We use Docker Compose to spin up the services we need. |
| 66 | +- This is similar to the previous module, which feeds data from `feature_repo/data/driver_stats.parquet`into a Kafka topic |
| 67 | +- This also deploys an instance of Redis, Postgres for a registry, a Feast feature server + push server. |
| 68 | + |
| 69 | +Start up the Docker daemon and then use Docker Compose to spin up the services as described above: |
| 70 | +- You may need to run `sudo docker-compose up` if you run into a Docker permission denied error |
| 71 | +```console |
| 72 | +$ docker-compose up |
| 73 | +
|
| 74 | +Creating network "module_1_default" with the default driver |
| 75 | +Creating zookeeper ... done |
| 76 | +Creating redis ... done |
| 77 | +Creating broker ... done |
| 78 | +Creating feast_feature_server ... done |
| 79 | +Creating feast_push_server ... done |
| 80 | +Creating kafka_events ... done |
| 81 | +Creating registry ... done |
| 82 | +Attaching to zookeeper, redis, broker, feast_push_server, feast_feature_server, kafka_events, registry |
| 83 | +... |
| 84 | +``` |
| 85 | + |
| 86 | +### Step 4b: Set up dbt |
| 87 | +> **TODO(adchia):** Generate parquet file to upload for public Snowflake dataset for features |
| 88 | + |
| 89 | +There's already a dbt model that generates batch transformations. You just need to init this: |
| 90 | + |
| 91 | +> **Note:** You'll need to install dbt-snowflake as well! `brew tap dbt-labs/dbt` and `brew install dbt-snowflake` |
| 92 | + |
| 93 | +To initialize dbt with your own credentials, do this |
| 94 | +```bash |
| 95 | +cd dbt/feast_demo; dbt init |
| 96 | +``` |
| 97 | + |
| 98 | +### Step 4c: Setting up Airflow |
| 99 | + |
| 100 | +We setup a standalone version of Airflow to set up the `PythonOperator` (Airflow now prefers @task for this) and `BashOperator` which will run incremental dbt models. We use dbt to define batch transformations from Snowflake, and once the incremental model is tested / ran, we run materialization. |
| 101 | + |
| 102 | +```bash |
| 103 | +cd airflow_demo; sh setup_airflow.sh |
| 104 | +``` |
| 105 | + |
| 106 | +### Step 4d: Examine the Airflow DAG |
| 107 | + |
| 108 | +The example dag is going to run on a daily basis and materialize *all* feature views based on the start and end interval. Note that there is a 1 hr overlap in the start time to account for potential late arriving data in the offline store. |
| 109 | + |
| 110 | +With dbt incremental models, the model itself in incremental mode selects overlapping windows of data to account for late arriving data. Feast materialization similarly has a late arriving threshold. |
| 111 | + |
| 112 | +```python |
| 113 | +dbt_test = BashOperator( |
| 114 | + task_id="dbt test", |
| 115 | + bash_command=f""" |
| 116 | + dbt test --models "aggregate_transaction_features" |
| 117 | + """, |
| 118 | + dag=dag, |
| 119 | +) |
| 120 | +
|
| 121 | +dbt_run = BashOperator( |
| 122 | + task_id="dbt run", |
| 123 | + bash_command=f""" |
| 124 | + dbt run --models "aggregate_transaction_features" |
| 125 | + """, |
| 126 | + dag=dag, |
| 127 | +) |
| 128 | +
|
| 129 | +@task() |
| 130 | +def materialize(data_interval_start=None, data_interval_end=None): |
| 131 | + repo_config = RepoConfig( |
| 132 | + registry=RegistryConfig( |
| 133 | + registry_type="sql", |
| 134 | + path="postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast", |
| 135 | + ), |
| 136 | + project="feast_demo_local", |
| 137 | + provider="local", |
| 138 | + offline_store=SnowflakeOfflineStoreConfig( |
| 139 | + account=os.getenv("SNOWFLAKE_DEPLOYMENT_URL"), |
| 140 | + user=os.getenv("SNOWFLAKE_USER"), |
| 141 | + password=os.getenv("SNOWFLAKE_PASSWORD"), |
| 142 | + role=os.getenv("SNOWFLAKE_ROLE"), |
| 143 | + warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"), |
| 144 | + database=os.getenv("SNOWFLAKE_DATABASE"), |
| 145 | + schema_=os.getenv("SNOWFLAKE_SCHEMA") |
| 146 | + ), |
| 147 | + online_store=RedisOnlineStoreConfig(connection_string="localhost:6379"), |
| 148 | + entity_key_serialization_version=2 |
| 149 | + ) |
| 150 | + store = FeatureStore(config=repo_config) |
| 151 | + store.materialize(data_interval_start.subtract(hours=1), data_interval_end) |
| 152 | +
|
| 153 | +# Setup DAG |
| 154 | +dbt_test >> dbt_run >> materialize() |
| 155 | +``` |
| 156 | + |
| 157 | +#### Q: What if different feature views have different freshness requirements? |
| 158 | + |
| 159 | +There's no built in mechanism for this, but you could store this logic in the feature view tags (e.g. a `batch_schedule`). |
| 160 | + |
| 161 | +Then, you can parse these feature view in your Airflow job. You could for example have one DAG that runs all the daily `batch_schedule` feature views, and another DAG that runs all feature views with an hourly `batch_schedule`. |
| 162 | + |
| 163 | +### Step 4e: Enable the Airflow DAG |
| 164 | +Now go to `localhost:8080`, use Airflow's auto-generated admin password to login, and toggle on the `materialize_dag`. It should run one task automatically. |
| 165 | + |
| 166 | +### Step 4f (optional): Run a backfill |
| 167 | +To run a backfill (i.e. process previous days of the above while letting Airflow manage state), you can do (from the `airflow_demo` directory): |
| 168 | + |
| 169 | +> **Warning:** This works correctly with the Redis online store because it conditionally writes. This logic has not been implemented for other online stores yet, and so can result in incorrect behavior |
| 170 | + |
| 171 | +```bash |
| 172 | +export AIRFLOW_HOME=$(pwd)/airflow_home |
| 173 | +airflow dags backfill \ |
| 174 | + --start-date 2021-07-01 \ |
| 175 | + --end-date 2021-07-15 \ |
| 176 | + feature_dag |
| 177 | +``` |
| 178 | + |
| 179 | +## Step 5: Streaming |
| 180 | +- TODO(danny): re-add streaming |
| 181 | + |
| 182 | +# Conclusion |
| 183 | +By the end of this module, you will have learned how to build a full feature platform, with orchestrated batch transformations (using dbt + Airflow), orchestrated materialization (with Feast + Airflow), and streaming ingestion (with Spark + Feast). |
| 184 | + |
| 185 | +Feast abstracts away the need to think about data modeling in the online store and helps you: |
| 186 | +- maintain fresh features in the online store by |
| 187 | + - ingesting batch features into the online store (via `feast materialize` or `feast materialize-incremental`) |
| 188 | + - ingesting streaming features into the online store (e.g. through `feature_store.push` or a Push server endpoint (`/push`)) |
| 189 | +- serve features (e.g. through `feature_store.get_online_features` or through feature servers) |
| 190 | + |
| 191 | +# FAQ |
| 192 | + |
| 193 | +### How do you synchronize materialized features with pushed features from streaming? |
| 194 | +This relies on individual online store implementations. The existing Redis online store implementation for example will check timestamps of incoming events and prefer the latest version. |
| 195 | + |
| 196 | +Doing this event timestamp checking is expensive though and slows down writes. In many cases, this is not preferred. Databases often support storing multiple versions of the same value, so you can leverage that (+ TTLs) to query the most recent version at read time. |
| 197 | + |
| 198 | +### Does Feast allow pushing features to the offline store? |
| 199 | +Yes! See more details at https://docs.feast.dev/reference/data-sources/push#pushing-data |
| 200 | + |
| 201 | +### Can feature / push servers refresh their registry in response to an event? e.g. after a PR merges and `feast apply` is run? |
| 202 | +Unfortunately, currently the servers don't support this. Feel free to contribute a PR though to enable this! The tricky part here is that Feast would need to keep track of these servers in the registry (or in some other way), which is not the way Feast is currently designed. |
0 commit comments