Skip to content

Commit 5e1f0df

Browse files
authored
Merge pull request #10 from adchia/main
Update repo to work with latest Feast version and add an Airflow example
2 parents 51d1b6c + 36d8028 commit 5e1f0df

File tree

15 files changed

+216
-92
lines changed

15 files changed

+216
-92
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,7 @@
44
**/__pycache__
55
terraform.tfstate
66
terraform.tfstate.backup
7-
.terraform*
7+
.terraform*
8+
*.iml
9+
**/feast-postgres-data/*
10+
**/airflow_demo/airflow_home/*

README.md

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Feast solves several common challenges teams face:
2020
### Pre-requisites
2121
This workshop assumes you have the following installed:
2222
- A local development environment that supports running Jupyter notebooks (e.g. VSCode with Jupyter plugin)
23-
- Python 3.7+
23+
- Python 3.8+
2424
- pip
2525
- Docker & Docker Compose (e.g. `brew install docker docker-compose`)
2626
- **Module 0 pre-requisites**:
@@ -46,13 +46,13 @@ Since we'll be learning how to leverage Feast in CI/CD, you'll also need to fork
4646

4747
These are meant mostly to be done in order, with examples building on previous concepts.
4848

49-
| Time (min) | Description | Module    |
50-
| :--------: | :---------------------------------------------------------------------- | :----------------------------- |
51-
| 30-45 | Setting up Feast projects & CI/CD + powering batch predictions | [Module 0](module_0/README.md) |
52-
| 15-20 | Streaming ingestion & online feature retrieval with Kafka, Spark, Redis | [Module 1](module_1/README.md) |
53-
| 10-15 | Real-time feature engineering with on demand transformations | [Module 2](module_2/README.md) |
54-
| TBD | Feature server deployment (embed, as a service, AWS Lambda) | TBD |
55-
| TBD | Versioning features / models in Feast | TBD |
56-
| TBD | Data quality monitoring in Feast | TBD |
57-
| TBD | Batch transformations | TBD |
58-
| TBD | Stream transformations | TBD |
49+
| Time (min) | Description | Module    |
50+
| :--------: | :------------------------------------------------------------------------------- | :----------------------------- |
51+
| 30-45 | Setting up Feast projects & CI/CD + powering batch predictions | [Module 0](module_0/README.md) |
52+
| 15-20 | Streaming ingestion & online feature retrieval with Kafka, Spark, Airflow, Redis | [Module 1](module_1/README.md) |
53+
| 10-15 | Real-time feature engineering with on demand transformations | [Module 2](module_2/README.md) |
54+
| TBD | Feature server deployment (embed, as a service, AWS Lambda) | TBD |
55+
| TBD | Versioning features / models in Feast | TBD |
56+
| TBD | Data quality monitoring in Feast | TBD |
57+
| TBD | Batch transformations | TBD |
58+
| TBD | Stream transformations | TBD |

module_0/client_aws/feature_store.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ provider: aws
33
registry: s3://feast-workshop-danny/registry.pb # TODO: Replace with your bucket
44
online_store: null
55
offline_store:
6-
type: file
6+
type: file
7+
entity_key_serialization_version: 2

module_0/client_gcp/feature_store.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ provider: gcp
33
registry: gs://feast-workshop-danny/registry.pb # TODO: Replace with your bucket
44
online_store: null
55
offline_store:
6-
type: file
6+
type: file
7+
entity_key_serialization_version: 2

module_0/feature_repo_aws/feature_store.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ provider: aws
33
registry: s3://feast-workshop-danny/registry.pb # TODO: Replace with your bucket
44
online_store: null
55
offline_store:
6-
type: file
6+
type: file
7+
entity_key_serialization_version: 2

module_0/feature_repo_gcp/feature_store.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ provider: gcp
33
registry: gs://feast-workshop-danny/registry.pb # TODO: Replace with your bucket
44
online_store: null
55
offline_store:
6-
type: file
6+
type: file
7+
entity_key_serialization_version: 2

module_1/README.md

Lines changed: 77 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,20 @@ In this module, we focus on building features for online serving, and keeping th
1414
- [Step 1: Install Feast](#step-1-install-feast)
1515
- [Step 2: Inspect the data](#step-2-inspect-the-data)
1616
- [Step 3: Inspect the `feature_store.yaml`](#step-3-inspect-the-feature_storeyaml)
17-
- [Step 4: Spin up Kafka + Redis + Feast services](#step-4-spin-up-kafka--redis--feast-services)
17+
- [Step 4: Spin up Kafka + Redis + Feast SQL Registry + Feast services](#step-4-spin-up-kafka--redis--feast-sql-registry--feast-services)
1818
- [Step 5: Why register streaming features in Feast?](#step-5-why-register-streaming-features-in-feast)
1919
- [Understanding the PushSource](#understanding-the-pushsource)
2020
- [Step 6: Materialize batch features & ingest streaming features](#step-6-materialize-batch-features--ingest-streaming-features)
21+
- [Configuring materialization](#configuring-materialization)
2122
- [Scheduling materialization](#scheduling-materialization)
22-
- [Airflow PythonOperator](#airflow-pythonoperator)
23-
- [Airflow BashOperator](#airflow-bashoperator)
23+
- [Examine the Airflow DAG](#examine-the-airflow-dag)
24+
- [Q: What if different feature views have different freshness requirements?](#q-what-if-different-feature-views-have-different-freshness-requirements)
2425
- [A note on Feast feature servers + push servers](#a-note-on-feast-feature-servers--push-servers)
2526
- [Conclusion](#conclusion)
2627
- [FAQ](#faq)
2728
- [How do you synchronize materialized features with pushed features from streaming?](#how-do-you-synchronize-materialized-features-with-pushed-features-from-streaming)
2829
- [Does Feast allow pushing features to the offline store?](#does-feast-allow-pushing-features-to-the-offline-store)
2930
- [Can feature / push servers refresh their registry in response to an event? e.g. after a PR merges and `feast apply` is run?](#can-feature--push-servers-refresh-their-registry-in-response-to-an-event-eg-after-a-pr-merges-and-feast-apply-is-run)
30-
- [How do I speed up or scale up materialization?](#how-do-i-speed-up-or-scale-up-materialization)
3131

3232
# Workshop
3333
## Step 1: Install Feast
@@ -54,50 +54,54 @@ The key thing to note is that there are now a `miles_driven` field and a `daily_
5454
```yaml
5555
project: feast_demo_local
5656
provider: local
57-
registry:
58-
path: data/local_registry.db
59-
cache_ttl_seconds: 5
57+
registry:
58+
registry_type: sql
59+
path: postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast
6060
online_store:
6161
type: redis
6262
connection_string: localhost:6379
6363
offline_store:
6464
type: file
65+
entity_key_serialization_version: 2
6566
```
6667
67-
The key thing to note for now is the online store has been configured to be Redis. This is specifically for a single Redis node. If you want to use a Redis cluster, then you'd change this to something like:
68+
The key thing to note for now is the registry is now swapped for a SQL backed registry (Postgres) and the online store has been configured to be Redis. This is specifically for a single Redis node. If you want to use a Redis cluster, then you'd change this to something like:
6869
6970
```yaml
7071
project: feast_demo_local
7172
provider: local
72-
registry:
73-
path: data/local_registry.db
74-
cache_ttl_seconds: 5
73+
registry:
74+
registry_type: sql
75+
path: postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast
7576
online_store:
7677
type: redis
7778
redis_type: redis_cluster
7879
connection_string: "redis1:6379,redis2:6379,ssl=true,password=my_password"
7980
offline_store:
8081
type: file
82+
entity_key_serialization_version: 2
8183
```
8284
8385
Because we use `redis-py` under the hood, this means Feast also works well with hosted Redis instances like AWS Elasticache ([docs](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/ElastiCache-Getting-Started-Tutorials-Connecting.html)).
8486

85-
## Step 4: Spin up Kafka + Redis + Feast services
87+
## Step 4: Spin up Kafka + Redis + Feast SQL Registry + Feast services
8688

8789
We then use Docker Compose to spin up the services we need.
8890
- This leverages a script (in `kafka_demo/`) that creates a topic, reads from `feature_repo/data/driver_stats.parquet`, generates newer timestamps, and emits them to the topic.
8991
- This also deploys an instance of Redis.
92+
- **Note:** one big difference between this and the previous module is its choice of using Postgres as the registry. See [Using Scalable Registry](https://docs.feast.dev/tutorials/using-scalable-registry) for details.
9093
- This also deploys a Feast push server (on port 6567) + a Feast feature server (on port 6566).
9194
- These servers embed a `feature_store.yaml` file that enables them to connect to a remote registry. The Dockerfile mostly delegates to calling the `feast serve` CLI command, which instantiates a Feast python server ([docs](https://docs.feast.dev/reference/feature-servers/python-feature-server)):
9295
```yaml
93-
FROM python:3.7
96+
FROM python:3.8
9497
95-
RUN pip install "feast[redis]"
98+
RUN pip install "feast[redis,postgres]"
9699
97100
COPY feature_repo/feature_store.yaml feature_store.yaml
98101
99-
# Needed to reach online store within Docker network.
102+
# Needed to reach online store and registry within Docker network.
100103
RUN sed -i 's/localhost:6379/redis:6379/g' feature_store.yaml
104+
RUN sed -i 's/127.0.0.1:55001/registry:5432/g' feature_store.yaml
101105
ENV FEAST_USAGE=False
102106
103107
CMD ["feast", "serve", "-h", "0.0.0.0"]
@@ -115,7 +119,8 @@ Creating broker ... done
115119
Creating feast_feature_server ... done
116120
Creating feast_push_server ... done
117121
Creating kafka_events ... done
118-
Attaching to zookeeper, redis, broker, feast_push_server, feast_feature_server, kafka_events
122+
Creating registry ... done
123+
Attaching to zookeeper, redis, broker, feast_push_server, feast_feature_server, kafka_events, registry
119124
...
120125
```
121126
## Step 5: Why register streaming features in Feast?
@@ -168,51 +173,78 @@ In the upcoming release, Feast will support the concept of a `StreamFeatureView`
168173

169174
We'll switch gears into a Jupyter notebook. This will guide you through:
170175
- Registering a `FeatureView` that has a single schema across both a batch source (`FileSource`) with aggregate features and a stream source (`PushSource`).
171-
- **Note:** Feast will, in the future, also support directly authoring a `StreamFeatureView` that contains stream transformations / aggregations (e.g. via Spark, Flink, or Bytewax)
176+
- **Note:** Feast also supports directly authoring a `StreamFeatureView` that contains stream transformations / aggregations (e.g. via Spark, Flink, or Bytewax), but the onus is on you to actually execute those transformations.
172177
- Materializing feature view values from batch sources to the online store (e.g. Redis).
173178
- Ingesting feature view values from streaming sources (e.g. window aggregate features from Spark + Kafka)
174179
- Retrieve features at low latency from Redis through Feast.
175180
- Working with a Feast push server + feature server to ingest and retrieve features through HTTP endpoints (instead of needing `feature_store.yaml` and `FeatureStore` instances)
176181

177182
Run the Jupyter notebook ([feature_repo/workshop.ipynb](feature_repo/module_1.ipynb)).
178183

184+
### Configuring materialization
185+
By default, materialization will pull all the latest feature values for each unique entity into memory, and then write to the online store.
186+
187+
You can speed up / scale this up in different ways:
188+
- Using a more scalable materialization mechanism (e.g. using the Bytewax or Spark materialization engines)
189+
- Running materialization jobs on a per feature view basis
190+
- Running materialization jobs in parallel
191+
192+
To run many parallel materialization jobs, you'll want to use the **SQL registry** (which is already used in this module).
193+
Then you could run multiple materialization jobs in parallel (e.g. using `feast materialize [FEATURE_VIEW_NAME] start_time end_time`)
194+
179195
### Scheduling materialization
180-
To ensure fresh features, you'll want to schedule materialization jobs regularly. This can be as simple as having a cron job that calls `feast materialize-incremental`.
196+
To ensure fresh features, you'll want to schedule materialization jobs regularly. This can be as simple as having a cron job that calls `feast materialize-incremental`.
181197

182-
Users may also be interested in integrating with Airflow, in which case you can build a custom Airflow image with the Feast SDK installed, and then use a `BashOperator` (with `feast materialize-incremental`) or `PythonOperator` (with `store.materialize_incremental(datetime.datetime.now())`):
198+
Users may also be interested in integrating with Airflow, in which case you can build a custom Airflow image with the Feast SDK installed, and then use a `PythonOperator` (with `store.materialize`).
183199

184-
#### Airflow PythonOperator
200+
We setup a standalone version of Airflow to set up the PythonOperator (Airflow now prefers @task for this).
185201

186-
```python
187-
# Define Python callable
188-
def materialize():
189-
repo_config = RepoConfig(
190-
registry=RegistryConfig(path="s3://[YOUR BUCKET]/registry.pb"),
191-
project="feast_demo_aws",
192-
provider="aws",
193-
offline_store="file",
194-
online_store=DynamoDBOnlineStoreConfig(region="us-west-2")
195-
)
196-
store = FeatureStore(config=repo_config)
197-
store.materialize_incremental(datetime.datetime.now())
198-
199-
# Use PythonOperator
200-
materialize_python = PythonOperator(
201-
task_id='materialize_python',
202-
python_callable=materialize,
203-
)
202+
```bash
203+
cd airflow_demo; sh setup_airflow.sh
204204
```
205205

206-
#### Airflow BashOperator
206+
#### Examine the Airflow DAG
207+
208+
The example dag is going to run on a daily basis and materialize *all* feature views based on the start and end interval. Note that there is a 1 hr overlap in the start time to account for potential late arriving data in the offline store.
209+
207210
```python
208-
# Use BashOperator
209-
materialize_bash = BashOperator(
210-
task_id='materialize',
211-
bash_command=f'feast materialize-incremental {datetime.datetime.now().replace(microsecond=0).isoformat()}',
211+
@dag(
212+
schedule="@daily",
213+
catchup=False,
214+
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
215+
tags=["feast"],
212216
)
217+
def materialize_dag():
218+
@task()
219+
def materialize(data_interval_start=None, data_interval_end=None):
220+
repo_config = RepoConfig(
221+
registry=RegistryConfig(
222+
registry_type="sql",
223+
path="postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast",
224+
),
225+
project="feast_demo_local",
226+
provider="local",
227+
offline_store="file",
228+
online_store=RedisOnlineStoreConfig(connection_string="localhost:6379"),
229+
entity_key_serialization_version=2,
230+
)
231+
store = FeatureStore(config=repo_config)
232+
# Add 1 hr overlap to account for late data
233+
# Note: normally, you'll probably have different feature views with different freshness requirements, instead of materializing all feature views every day.
234+
store.materialize(data_interval_start.subtract(hours=1), data_interval_end)
235+
236+
materialize()
237+
238+
materialization_dag = materialize_dag()
213239
```
214240

215-
See also [FAQ: How do I speed up or scale up materialization?](#how-do-i-speed-up-or-scale-up-materialization)
241+
In this test case, you can also use a single command `feast materialize-incremental $(date +%Y-%m-%d)` and that will materialize until the current time.
242+
243+
#### Q: What if different feature views have different freshness requirements?
244+
245+
There's no built in mechanism for this, but you could store this logic in the feature view tags (e.g. a `batch_schedule`).
246+
247+
Then, you can parse these feature view in your Airflow job. You could for example have one DAG that runs all the daily `batch_schedule` feature views, and another DAG that runs all feature views with an hourly `batch_schedule`.
216248

217249
### A note on Feast feature servers + push servers
218250
The above notebook introduces a way to curl an HTTP endpoint to push or retrieve features from Redis.
@@ -253,24 +285,7 @@ This relies on individual online store implementations. The existing Redis onlin
253285
Doing this event timestamp checking is expensive though and slows down writes. In many cases, this is not preferred. Databases often support storing multiple versions of the same value, so you can leverage that (+ TTLs) to query the most recent version at read time.
254286

255287
### Does Feast allow pushing features to the offline store?
256-
Not yet! See more details at https://github.com/feast-dev/feast/issues/2732
257-
258-
Many users have asked for this functionality as a quick way to get started, but often users work with two flows:
259-
- To power the online store, using stream processing to generate fresh features and pushing to the online store
260-
- To power the offline store, using some ETL / ELT pipelines that process and clean the raw data.
261-
262-
Though this is more complex, one key advantage of this is that you can construct new features based on the data while iterating on model training. If you rely on streaming features to generate historical feature values, then you need to rely on a log-and-wait approach, which can slow down model iteration.
288+
Yes! See more details at https://docs.feast.dev/reference/data-sources/push#pushing-data
263289

264290
### Can feature / push servers refresh their registry in response to an event? e.g. after a PR merges and `feast apply` is run?
265291
Unfortunately, currently the servers don't support this. Feel free to contribute a PR though to enable this! The tricky part here is that Feast would need to keep track of these servers in the registry (or in some other way), which is not the way Feast is currently designed.
266-
267-
### How do I speed up or scale up materialization?
268-
Materialization in Feast by default pulls the latest feature values for each unique entity locally and writes in batches to the online store.
269-
270-
- Feast users can materialize multiple feature views by using the CLI:
271-
`feast materialize-incremental [FEATURE_VIEW_NAME]`
272-
- **Caveat**: By default, Feast's registry store is a single protobuf written to a file. This means that there's the chance that metadata around materialization intervals gets lost if the registry has changed during materialization.
273-
- The community is ideating on how to improve this. See [RFC-035: Scalable Materialization](https://docs.google.com/document/d/1tCZzClj3H8CfhJzccCytWK-bNDw_lkZk4e3fUbPYIP0/edit#)
274-
- Users often also implement their own custom providers. The provider interface has a `materialize_single_feature_view` method, which users are free to implement differently (e.g. materializing with Spark or Dataflow jobs).
275-
276-
In general, the community is actively investigating ways to speed up materialization. Contributions are welcome!

0 commit comments

Comments
 (0)