Skip to content

Commit 4d65e4a

Browse files
committed
add airflow setup
Signed-off-by: Danny Chiao <danny@tecton.ai>
1 parent 5be6151 commit 4d65e4a

File tree

4 files changed

+26
-11
lines changed

4 files changed

+26
-11
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ terraform.tfstate
66
terraform.tfstate.backup
77
.terraform*
88
*.iml
9-
**/feast-postgres-data/*
9+
**/feast-postgres-data/*
10+
**/airflow_demo/airflow_home/*

module_1/airflow_demo/dag.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
1+
import os
12
from airflow.decorators import dag, task
23
from datetime import datetime
3-
from feast import RepoConfig, RegistryConfig, FeatureStore, RedisOnlineStoreConfig
4+
from feast import RepoConfig, FeatureStore
5+
from feast.repo_config import RegistryConfig
6+
from feast.infra.online_stores.redis import RedisOnlineStoreConfig
7+
import pendulum
48

59

610
@dag(
711
schedule="@daily",
812
catchup=False,
13+
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
914
tags=["feast"],
1015
)
1116
def materialize_dag():
1217
@task()
13-
def materialize():
18+
def materialize(data_interval_start=None, data_interval_end=None):
1419
repo_config = RepoConfig(
1520
registry=RegistryConfig(
1621
registry_type="sql",
@@ -20,11 +25,15 @@ def materialize():
2025
provider="local",
2126
offline_store="file",
2227
online_store=RedisOnlineStoreConfig(connection_string="localhost:6379"),
28+
entity_key_serialization_version=2,
2329
)
30+
# Needed for Mac OS users because of a seg fault in requests for standalone Airflow (not needed in prod)
31+
os.environ["NO_PROXY"] = "*"
2432
store = FeatureStore(config=repo_config)
25-
store.materialize_incremental(datetime.now())
33+
# Add 1 hr overlap to account for late data
34+
store.materialize(data_interval_start.subtract(hours=1), data_interval_end)
2635

2736
materialize()
2837

2938

30-
materialize_dag()
39+
materialization_dag = materialize_dag()

module_1/airflow_demo/setup_airflow.sh

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
export AIRFLOW_HOME=$(pwd)/airflow_home
44
export AIRFLOW__CORE__LOAD_EXAMPLES=False
55

6+
# Cleanup previous state, if it exists
7+
rm -rf $AIRFLOW_HOME
8+
69
# Install Airflow using the constraints file
710
AIRFLOW_VERSION=2.4.0
811
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
@@ -11,13 +14,13 @@ CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${A
1114
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.4.0/constraints-3.7.txt
1215
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
1316

17+
# Setup Feast dags
18+
mkdir -p $AIRFLOW_HOME/dags
19+
cp dag.py $AIRFLOW_HOME/dags
20+
1421
# The Standalone command will initialise the database, make a user,
1522
# and start all components for you.
1623
airflow standalone
1724

1825
# Visit localhost:8080 in the browser and use the admin account details
1926
# shown on the terminal to login.
20-
21-
# Setup dags
22-
mkdir $AIRFLOW_HOME/dags
23-
cp dag.py $AIRFLOW_HOME/dags
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
from feast import (
23
FileSource,
34
PushSource,
@@ -6,7 +7,7 @@
67
# Feast also supports pulling data from data warehouses like BigQuery, Snowflake, Redshift and data lakes (e.g. via Redshift Spectrum, Trino, Spark)
78
driver_stats = FileSource(
89
name="driver_stats_source",
9-
path="data/driver_stats.parquet",
10+
path=f"{os.getcwd()}/data/driver_stats.parquet",
1011
timestamp_field="event_timestamp",
1112
created_timestamp_column="created",
1213
description="A table describing the stats of a driver based on hourly logs",
@@ -15,5 +16,6 @@
1516

1617
# A push source is useful if you have upstream systems that transform features (e.g. stream processing jobs)
1718
driver_stats_push_source = PushSource(
18-
name="driver_stats_push_source", batch_source=driver_stats,
19+
name="driver_stats_push_source",
20+
batch_source=driver_stats,
1921
)

0 commit comments

Comments
 (0)