|
| 1 | +# Streaming feature computation with Denormalized |
| 2 | + |
| 3 | +Denormalized makes it easy to compute real-time features and write them directly to your Feast feature store. This guide will walk you through setting up a streaming pipeline that computes feature aggregations and pushes them to Feast in real-time. |
| 4 | + |
| 5 | + |
| 6 | + |
| 7 | +## Prerequisites |
| 8 | + |
| 9 | +- Python 3.8+ |
| 10 | +- Kafka cluster (local or remote) |
| 11 | + |
| 12 | +For a full working demo, check out the [feast-example](https://github.com/probably-nothing-labs/feast-example) repo. |
| 13 | + |
| 14 | +## Quick Start |
| 15 | + |
| 16 | +1. First, create a new Python project or use our template: |
| 17 | +```bash |
| 18 | +mkdir my-feature-project |
| 19 | +cd my-feature-project |
| 20 | +python -m venv .venv |
| 21 | +source .venv/bin/activate # or `.venv\Scripts\activate` on Windows |
| 22 | +pip install denormalized[feast] feast |
| 23 | +``` |
| 24 | + |
| 25 | +2. Set up your Feast feature repository: |
| 26 | +```bash |
| 27 | +feast init feature_repo |
| 28 | +``` |
| 29 | + |
| 30 | +## Project Structure |
| 31 | + |
| 32 | +Your project should look something like this: |
| 33 | +``` |
| 34 | +my-feature-project/ |
| 35 | +├── feature_repo/ |
| 36 | +│ ├── feature_store.yaml |
| 37 | +│ └── sensor_data.py # Feature definitions |
| 38 | +├── stream_job.py # Denormalized pipeline |
| 39 | +└── main.py # Pipeline runner |
| 40 | +``` |
| 41 | + |
| 42 | +## Define Your Features |
| 43 | + |
| 44 | +In `feature_repo/sensor_data.py`, define your feature view and entity: |
| 45 | + |
| 46 | +```python |
| 47 | +from feast import Entity, FeatureView, PushSource, Field |
| 48 | +from feast.types import Float64, String |
| 49 | + |
| 50 | +# Define your entity |
| 51 | +sensor = Entity( |
| 52 | + name="sensor", |
| 53 | + join_keys=["sensor_name"], |
| 54 | +) |
| 55 | + |
| 56 | +# Create a push source for real-time features |
| 57 | +source = PushSource( |
| 58 | + name="push_sensor_statistics", |
| 59 | + batch_source=your_batch_source # Define your batch source |
| 60 | +) |
| 61 | + |
| 62 | +# Define your feature view |
| 63 | +stats_view = FeatureView( |
| 64 | + name="sensor_statistics", |
| 65 | + entities=[sensor], |
| 66 | + schema=ds.get_feast_schema(), # Denormalized handles this for you! |
| 67 | + source=source, |
| 68 | + online=True, |
| 69 | +) |
| 70 | +``` |
| 71 | + |
| 72 | +## Create Your Streaming Pipeline |
| 73 | + |
| 74 | +In `stream_job.py`, define your streaming computations: |
| 75 | + |
| 76 | +```python |
| 77 | +from denormalized import Context, FeastDataStream |
| 78 | +from denormalized.datafusion import col, functions as f |
| 79 | +from feast import FeatureStore |
| 80 | + |
| 81 | +sample_event = { |
| 82 | + "occurred_at_ms": 100, |
| 83 | + "sensor_name": "foo", |
| 84 | + "reading": 0.0, |
| 85 | +} |
| 86 | + |
| 87 | +# Create a stream from your Kafka topic |
| 88 | +ds = FeastDataStream(Context().from_topic("temperature", json.dumps(sample_event), "localhost:9092")) |
| 89 | + |
| 90 | +# Define your feature computations |
| 91 | +ds = ds.window( |
| 92 | + [col("sensor_name")], # Group by sensor |
| 93 | + [ |
| 94 | + f.count(col("reading")).alias("count"), |
| 95 | + f.min(col("reading")).alias("min"), |
| 96 | + f.max(col("reading")).alias("max"), |
| 97 | + f.avg(col("reading")).alias("average"), |
| 98 | + ], |
| 99 | + 1000, # Window size in ms |
| 100 | + None # Slide interval (None = tumbling window) |
| 101 | +) |
| 102 | + |
| 103 | +feature_store = FeatureStore(repo_path="feature_repo/") |
| 104 | + |
| 105 | +# This single line connects Denormalized to Feast! |
| 106 | +ds.write_feast_feature(feature_store, "push_sensor_statistics") |
| 107 | +``` |
| 108 | + |
| 109 | +## Need Help? |
| 110 | + |
| 111 | +- Email us at hello@denormalized.io |
| 112 | +- Check out more examples on our [GitHub](https://github.com/probably-nothing-labs/denormalized) |
0 commit comments