Skip to content

Commit d81d140

Browse files
committed
Add a module 3 which demonstrates dbt for batch transformations
Signed-off-by: Danny Chiao <danny@tecton.ai>
1 parent 4c46068 commit d81d140

22 files changed

+1012
-9
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ terraform.tfstate.backup
77
.terraform*
88
*.iml
99
**/feast-postgres-data/*
10-
**/airflow_demo/airflow_home/*
10+
**/airflow_demo/airflow_home/*
11+
.vscode/*

module_1/docker-compose.yml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,6 @@ services:
5151
condition: service_healthy
5252
ports:
5353
- "6567:6566"
54-
volumes:
55-
- type: bind
56-
source: ./feature_repo/data/local_registry.db
57-
target: /data/local_registry.db
5854
links:
5955
- redis
6056

@@ -68,10 +64,6 @@ services:
6864
condition: service_healthy
6965
ports:
7066
- "6566:6566"
71-
volumes:
72-
- type: bind
73-
source: ./feature_repo/data/local_registry.db
74-
target: /data/local_registry.db
7567
links:
7668
- redis
7769
healthcheck:

module_3/README.md

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
<h1>Module 3: Scheduled batch transformations (Snowflake)</h1>
2+
3+
This is a very similar module to module 1. The key difference is now we'll be using a data warehouse (Snowflake) in combination with dbt + Airflow to ensure that batch features are regularly generated.
4+
5+
- **Data sources**: Snowflake
6+
- **Online store**: Redis
7+
- **Use case**: Predicting churn for drivers in real time.
8+
9+
<h2>Table of Contents</h2>
10+
11+
- [Workshop](#workshop)
12+
- [Step 1: Install Feast](#step-1-install-feast)
13+
- [Step 2: Inspect the `feature_store.yaml`](#step-2-inspect-the-feature_storeyaml)
14+
- [Step 4: Spin up services](#step-4-spin-up-services)
15+
- [Step 4a: Kafka + Redis + Feast SQL Registry + Feast services](#step-4a-kafka--redis--feast-sql-registry--feast-services)
16+
- [Step 4b: Setting up Airflow + dbt](#step-4b-setting-up-airflow--dbt)
17+
- [Step 4c: Examine the Airflow DAG](#step-4c-examine-the-airflow-dag)
18+
- [Q: What if different feature views have different freshness requirements?](#q-what-if-different-feature-views-have-different-freshness-requirements)
19+
- [Step 4d: Enable the Airflow DAG](#step-4d-enable-the-airflow-dag)
20+
- [Step 4e (optional): Run a backfill](#step-4e-optional-run-a-backfill)
21+
- [Step 5: Streaming](#step-5-streaming)
22+
- [Conclusion](#conclusion)
23+
- [FAQ](#faq)
24+
- [How do you synchronize materialized features with pushed features from streaming?](#how-do-you-synchronize-materialized-features-with-pushed-features-from-streaming)
25+
- [Does Feast allow pushing features to the offline store?](#does-feast-allow-pushing-features-to-the-offline-store)
26+
- [Can feature / push servers refresh their registry in response to an event? e.g. after a PR merges and `feast apply` is run?](#can-feature--push-servers-refresh-their-registry-in-response-to-an-event-eg-after-a-pr-merges-and-feast-apply-is-run)
27+
28+
# Workshop
29+
## Step 1: Install Feast
30+
31+
First, we install Feast with Spark and Postgres and Redis support:
32+
```bash
33+
pip install "feast[snowflake,postgres,redis]"
34+
```
35+
36+
## Step 2: Inspect the `feature_store.yaml`
37+
38+
```yaml
39+
project: feast_demo_local
40+
provider: local
41+
registry:
42+
registry_type: sql
43+
path: postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast
44+
online_store:
45+
type: redis
46+
connection_string: localhost:6379
47+
offline_store:
48+
type: snowflake.offline
49+
account: ${SNOWFLAKE_DEPLOYMENT_URL}
50+
user: ${SNOWFLAKE_USER}
51+
password: ${SNOWFLAKE_PASSWORD}
52+
role: ${SNOWFLAKE_ROLE}
53+
warehouse: ${SNOWFLAKE_WAREHOUSE}
54+
database: TECTON_DEMO_DATA
55+
schema: FRAUD
56+
entity_key_serialization_version: 2
57+
```
58+
59+
Now we're using a test database in Snowflake.
60+
61+
## Step 4: Spin up services
62+
63+
### Step 4a: Kafka + Redis + Feast SQL Registry + Feast services
64+
65+
We use Docker Compose to spin up the services we need.
66+
- This is similar to the previous module, which feeds data from `feature_repo/data/driver_stats.parquet`into a Kafka topic
67+
- This also deploys an instance of Redis, Postgres for a registry, a Feast feature server + push server.
68+
69+
Start up the Docker daemon and then use Docker Compose to spin up the services as described above:
70+
- You may need to run `sudo docker-compose up` if you run into a Docker permission denied error
71+
```console
72+
$ docker-compose up
73+
74+
Creating network "module_1_default" with the default driver
75+
Creating zookeeper ... done
76+
Creating redis ... done
77+
Creating broker ... done
78+
Creating feast_feature_server ... done
79+
Creating feast_push_server ... done
80+
Creating kafka_events ... done
81+
Creating registry ... done
82+
Attaching to zookeeper, redis, broker, feast_push_server, feast_feature_server, kafka_events, registry
83+
...
84+
```
85+
86+
### Step 4b: Set up dbt
87+
> **TODO(adchia):** Generate parquet file to upload for public Snowflake dataset for features
88+
89+
There's already a dbt model that generates batch transformations. You just need to init this:
90+
91+
> **Note:** You'll need to install dbt-snowflake as well! `brew tap dbt-labs/dbt` and `brew install dbt-snowflake`
92+
93+
To initialize dbt with your own credentials, do this
94+
```bash
95+
cd dbt/feast_demo; dbt init
96+
```
97+
98+
### Step 4c: Setting up Airflow
99+
100+
We setup a standalone version of Airflow to set up the `PythonOperator` (Airflow now prefers @task for this) and `BashOperator` which will run incremental dbt models. We use dbt to define batch transformations from Snowflake, and once the incremental model is tested / ran, we run materialization.
101+
102+
```bash
103+
cd airflow_demo; sh setup_airflow.sh
104+
```
105+
106+
### Step 4d: Examine the Airflow DAG
107+
108+
The example dag is going to run on a daily basis and materialize *all* feature views based on the start and end interval. Note that there is a 1 hr overlap in the start time to account for potential late arriving data in the offline store.
109+
110+
With dbt incremental models, the model itself in incremental mode selects overlapping windows of data to account for late arriving data. Feast materialization similarly has a late arriving threshold.
111+
112+
```python
113+
dbt_test = BashOperator(
114+
task_id="dbt test",
115+
bash_command=f"""
116+
dbt test --models "aggregate_transaction_features"
117+
""",
118+
dag=dag,
119+
)
120+
121+
dbt_run = BashOperator(
122+
task_id="dbt run",
123+
bash_command=f"""
124+
dbt run --models "aggregate_transaction_features"
125+
""",
126+
dag=dag,
127+
)
128+
129+
@task()
130+
def materialize(data_interval_start=None, data_interval_end=None):
131+
repo_config = RepoConfig(
132+
registry=RegistryConfig(
133+
registry_type="sql",
134+
path="postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast",
135+
),
136+
project="feast_demo_local",
137+
provider="local",
138+
offline_store=SnowflakeOfflineStoreConfig(
139+
account=os.getenv("SNOWFLAKE_DEPLOYMENT_URL"),
140+
user=os.getenv("SNOWFLAKE_USER"),
141+
password=os.getenv("SNOWFLAKE_PASSWORD"),
142+
role=os.getenv("SNOWFLAKE_ROLE"),
143+
warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
144+
database=os.getenv("SNOWFLAKE_DATABASE"),
145+
schema_=os.getenv("SNOWFLAKE_SCHEMA")
146+
),
147+
online_store=RedisOnlineStoreConfig(connection_string="localhost:6379"),
148+
entity_key_serialization_version=2
149+
)
150+
store = FeatureStore(config=repo_config)
151+
store.materialize(data_interval_start.subtract(hours=1), data_interval_end)
152+
153+
# Setup DAG
154+
dbt_test >> dbt_run >> materialize()
155+
```
156+
157+
#### Q: What if different feature views have different freshness requirements?
158+
159+
There's no built in mechanism for this, but you could store this logic in the feature view tags (e.g. a `batch_schedule`).
160+
161+
Then, you can parse these feature view in your Airflow job. You could for example have one DAG that runs all the daily `batch_schedule` feature views, and another DAG that runs all feature views with an hourly `batch_schedule`.
162+
163+
### Step 4e: Enable the Airflow DAG
164+
Now go to `localhost:8080`, use Airflow's auto-generated admin password to login, and toggle on the `materialize_dag`. It should run one task automatically.
165+
166+
### Step 4f (optional): Run a backfill
167+
To run a backfill (i.e. process previous days of the above while letting Airflow manage state), you can do (from the `airflow_demo` directory):
168+
169+
> **Warning:** This works correctly with the Redis online store because it conditionally writes. This logic has not been implemented for other online stores yet, and so can result in incorrect behavior
170+
171+
```bash
172+
export AIRFLOW_HOME=$(pwd)/airflow_home
173+
airflow dags backfill \
174+
--start-date 2021-07-01 \
175+
--end-date 2021-07-15 \
176+
feature_dag
177+
```
178+
179+
## Step 5: Streaming
180+
- TODO(danny): re-add streaming
181+
182+
# Conclusion
183+
By the end of this module, you will have learned how to build a full feature platform, with orchestrated batch transformations (using dbt + Airflow), orchestrated materialization (with Feast + Airflow), and streaming ingestion (with Spark + Feast).
184+
185+
Feast abstracts away the need to think about data modeling in the online store and helps you:
186+
- maintain fresh features in the online store by
187+
- ingesting batch features into the online store (via `feast materialize` or `feast materialize-incremental`)
188+
- ingesting streaming features into the online store (e.g. through `feature_store.push` or a Push server endpoint (`/push`))
189+
- serve features (e.g. through `feature_store.get_online_features` or through feature servers)
190+
191+
# FAQ
192+
193+
### How do you synchronize materialized features with pushed features from streaming?
194+
This relies on individual online store implementations. The existing Redis online store implementation for example will check timestamps of incoming events and prefer the latest version.
195+
196+
Doing this event timestamp checking is expensive though and slows down writes. In many cases, this is not preferred. Databases often support storing multiple versions of the same value, so you can leverage that (+ TTLs) to query the most recent version at read time.
197+
198+
### Does Feast allow pushing features to the offline store?
199+
Yes! See more details at https://docs.feast.dev/reference/data-sources/push#pushing-data
200+
201+
### Can feature / push servers refresh their registry in response to an event? e.g. after a PR merges and `feast apply` is run?
202+
Unfortunately, currently the servers don't support this. Feel free to contribute a PR though to enable this! The tricky part here is that Feast would need to keep track of these servers in the registry (or in some other way), which is not the way Feast is currently designed.

module_3/airflow_demo/dag.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import os
2+
from airflow.decorators import task
3+
from airflow.models.dag import DAG
4+
from airflow.operators.bash import BashOperator
5+
from feast import RepoConfig, FeatureStore
6+
from feast.infra.offline_stores.snowflake import SnowflakeOfflineStoreConfig
7+
from feast.repo_config import RegistryConfig
8+
from feast.infra.online_stores.redis import RedisOnlineStoreConfig
9+
import pendulum
10+
11+
with DAG(
12+
dag_id='feature_dag',
13+
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
14+
description='A dbt + Feast DAG',
15+
schedule="@daily",
16+
catchup=False,
17+
tags=["feast"],
18+
) as dag:
19+
dbt_test = BashOperator(
20+
task_id="dbt_test",
21+
bash_command="""
22+
cd ${AIRFLOW_HOME}; dbt test --models "aggregate_transaction_features"
23+
""",
24+
dag=dag,
25+
)
26+
27+
dbt_run = BashOperator(
28+
task_id="dbt_run",
29+
bash_command="""
30+
cd ${AIRFLOW_HOME}; dbt run --models "aggregate_transaction_features"
31+
""",
32+
dag=dag,
33+
)
34+
35+
@task()
36+
def materialize(data_interval_start=None, data_interval_end=None):
37+
repo_config = RepoConfig(
38+
registry=RegistryConfig(
39+
registry_type="sql",
40+
path="postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast",
41+
),
42+
project="feast_demo_local",
43+
provider="local",
44+
offline_store=SnowflakeOfflineStoreConfig(
45+
account=os.getenv("SNOWFLAKE_DEPLOYMENT_URL"),
46+
user=os.getenv("SNOWFLAKE_USER"),
47+
password=os.getenv("SNOWFLAKE_PASSWORD"),
48+
role=os.getenv("SNOWFLAKE_ROLE"),
49+
warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
50+
database=os.getenv("SNOWFLAKE_DATABASE"),
51+
schema_=os.getenv("SNOWFLAKE_SCHEMA")
52+
),
53+
online_store=RedisOnlineStoreConfig(connection_string="localhost:6379"),
54+
entity_key_serialization_version=2,
55+
)
56+
# Needed for Mac OS users because of a seg fault in requests for standalone Airflow (not needed in prod)
57+
os.environ["NO_PROXY"] = "*"
58+
store = FeatureStore(config=repo_config)
59+
# Add 1 hr overlap to account for late data
60+
# Note: normally, you'll probably have different feature views with different freshness requirements, instead
61+
# of materializing all feature views every day.
62+
store.materialize(data_interval_start.subtract(hours=1), data_interval_end)
63+
64+
dbt_test >> dbt_run >> materialize()
65+
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Airflow needs a home. `~/airflow` is the default, but you can put it
2+
# somewhere else if you prefer (optional)
3+
export AIRFLOW_HOME=$(pwd)/airflow_home
4+
export AIRFLOW__CORE__LOAD_EXAMPLES=False
5+
6+
# Cleanup previous state, if it exists
7+
rm -rf $AIRFLOW_HOME
8+
9+
# Install Airflow using the constraints file
10+
AIRFLOW_VERSION=2.4.0
11+
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
12+
# For example: 3.7
13+
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
14+
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.4.0/constraints-3.7.txt
15+
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
16+
17+
# Setup Feast dags
18+
mkdir -p $AIRFLOW_HOME/dags
19+
cp dag.py $AIRFLOW_HOME/dags
20+
21+
# Setup dbt dags
22+
cd ../dbt/feast_demo
23+
cp -R * $AIRFLOW_HOME
24+
cd $AIRFLOW_HOME
25+
26+
# The Standalone command will initialise the database, make a user,
27+
# and start all components for you.
28+
airflow standalone
29+
30+
# Visit localhost:8080 in the browser and use the admin account details
31+
# shown on the terminal to login.

module_3/architecture.png

100 KB
Loading

module_3/dbt/feast_demo/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
target/
3+
dbt_packages/
4+
logs/
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
2+
# Name your project! Project names should contain only lowercase characters
3+
# and underscores. A good package name should reflect your organization's
4+
# name or the intended use of these models
5+
name: 'feast_demo'
6+
version: '1.0.0'
7+
config-version: 2
8+
9+
# This setting configures which "profile" dbt uses for this project.
10+
profile: 'feast_demo'
11+
12+
# These configurations specify where dbt should look for different types of files.
13+
# The `model-paths` config, for example, states that models in this project can be
14+
# found in the "models/" directory. You probably won't need to change these!
15+
model-paths: ["models"]
16+
17+
target-path: "target" # directory which will store compiled SQL files
18+
clean-targets: # directories to be removed by `dbt clean`
19+
- "target"
20+
- "dbt_packages"
21+
22+
models:
23+
feast_demo:
24+
# Config indicated by + and applies to all files under models/example/
25+
example:
26+
+materialized: view
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{{ config(materialized='incremental') }}
2+
3+
SELECT
4+
NAMEORIG as USER_ID,
5+
TIMESTAMP,
6+
AVG(AMOUNT) OVER (PARTITION BY NAMEORIG ORDER BY TIMESTAMP ROWS BETWEEN 6 preceding AND CURRENT ROW) as "7D_AVG_AMT"
7+
FROM TRANSACTIONS
8+
{% if is_incremental() %}
9+
WHERE TIMESTAMP > (SELECT DATEADD(day, -3, MAX(TIMESTAMP)::date) FROM {{ this }})
10+
{% endif %}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
version: 2
3+
4+
models:
5+
- name: aggregate_transaction_features
6+
description: ""
7+
columns:
8+
- name: "USER_ID"
9+
description: "The primary key for this table"
10+
tests:
11+
- not_null

0 commit comments

Comments
 (0)