Skip to content
Merged
148 changes: 148 additions & 0 deletions docs/getting-started/concepts/batch-feature-view.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# 🧬 BatchFeatureView in Feast

`BatchFeatureView` is a flexible abstraction in Feast that allows users to define features derived from batch data sources or even other `FeatureView`s, enabling composable and reusable feature pipelines. It is an extension of the `FeatureView` class, with support for user-defined transformations, aggregations, and recursive chaining of feature logic.

---

## ✅ Key Capabilities

- **Composable DAG of FeatureViews**: Supports defining a `BatchFeatureView` on top of one or more other `FeatureView`s.
- **Transformations**: Apply PySpark-based transformation logic (`feature_transformation` or `udf`) to raw data source, can also be used to deal with multiple data sources.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this exclusively limited to PySpark?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not exclusively to Pyspark, will update it

- **Aggregations**: Define time-windowed aggregations (e.g. `sum`, `avg`) over event-timestamped data.
- **Feature resolution & execution**: Automatically resolves and executes DAGs of dependent views during materialization or retrieval. More details in the [Compute engine documentation](../../reference/compute-engine/README.md).
- **Materialization Sink Customization**: Specify a custom `sink_source` to define where derived feature data should be persisted.

---

## 📐 Class Signature

```python
class BatchFeatureView(FeatureView):
def __init__(
*,
name: str,
source: Union[DataSource, FeatureView, List[FeatureView]],
sink_source: Optional[DataSource] = None,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some reason i thought we agreed on naming it sink.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, on a second thought, I think sink_source is more explicit for the user to know it is passing a data source to this config.

schema: Optional[List[Field]] = None,
entities: Optional[List[Entity]] = None,
aggregations: Optional[List[Aggregation]] = None,
udf: Optional[Callable[[DataFrame], DataFrame]] = None,
udf_string: Optional[str] = None,
ttl: Optional[timedelta] = timedelta(days=0),
online: bool = True,
offline: bool = False,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
)
```

---

## 🧠 Usage

### 1. Simple Feature View from Data Source

```python
from feast import BatchFeatureView, Field
from feast.types import Float32, Int32
from feast import FileSource
from feast.aggregation import Aggregation
from datetime import timedelta

source = FileSource(
path="s3://bucket/path/data.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver_fv = BatchFeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
schema=[
Field(name="driver_id", dtype=Int32),
Field(name="conv_rate", dtype=Float32),
],
aggregations=[
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)),
Copy link
Copy Markdown
Member

@franciscojavierarceo franciscojavierarceo Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we should make it avg since summing a conversion rate is weird

],
source=source,
)
```

---

### 2. Derived Feature View from Another View

Comment thread
HaoXuAI marked this conversation as resolved.
Outdated
```python
from feast import BatchFeatureView, Field
from pyspark.sql import DataFrame
from feast.types import Float32, Int32
from feast import FileSource

def transform(df: DataFrame) -> DataFrame:
return df.withColumn("conv_rate", df["conv_rate"] * 2)

daily_driver_stats = BatchFeatureView(
name="daily_driver_stats",
entities=["driver_id"],
schema=[
Field(name="driver_id", dtype=Int32),
Field(name="conv_rate", dtype=Float32),
],
udf=transform,
source=driver_fv,
sink_source=FileSource( # Required to specify where to sink the derived view
name="daily_driver_stats_sink",
path="s3://bucket/daily_stats/",
file_format="parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
),
)
```

---

## 🔄 Execution Flow

Feast automatically resolves the DAG of `BatchFeatureView` dependencies during:

- `materialize()`: recursively resolves and executes the feature view graph.
- `get_historical_features()`: builds the execution plan for retrieving point-in-time correct features.
- `apply()`: registers the feature view DAG structure to the registry.

Each transformation and aggregation is turned into a DAG node (e.g., `SparkTransformationNode`, `SparkAggregationNode`) executed by the compute engine (e.g., `SparkComputeEngine`).

---

## ⚙️ How Materialization Works

- If the `BatchFeatureView` is backed by a base source (`FileSource`, `BigQuerySource`, `SparkSource` etc), the `batch_source` is used directly.
- If the source is another feature view (i.e., chained views), the `sink_source` must be provided to define the materialization target data source.
- During DAG planning, `SparkWriteNode` uses the `sink_source` as the batch sink.

---

## 🧪 Example Tests

See:

- `test_spark_dag_materialize_recursive_view()`: Validates chaining of two feature views and output validation.
- `test_spark_compute_engine_materialize()`: Validates transformation and write of features into offline and online stores.

---

## 🛑 Gotchas

- `sink_source` is **required** when chaining views (i.e., `source` is another FeatureView or list of them).
- Schema fields must be consistent with `sink_source`, `batch_source.field_mapping` if field mappings exist.
- Aggregation logic must reference columns present in the raw source or transformed inputs.

---

## 🔮 Future Directions

- Support additional offline stores (e.g., Snowflake, Redshift) with auto-generated sink sources.
- Enable fully declarative transform logic (SQL + UDF mix).
- Introduce optimization passes for DAG pruning and fusion.
76 changes: 60 additions & 16 deletions docs/reference/compute-engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,28 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio

## 🧠 Core Concepts

| Component | Description |
|--------------------|----------------------------------------------------------------------|
| `ComputeEngine` | Interface for executing materialization and retrieval tasks |
| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend |
| `DAGNode` | Represents a logical operation (read, aggregate, join, etc.) |
| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs |
| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs |
| Component | Description | API |
|--------------------|----------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
| `ComputeEngine` | Interface for executing materialization and retrieval tasks | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/base.py) |
| `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_builder.py) |
| `FeatureResolver` | Resolves feature DAG by topological order for execution | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_resolver.py) |
| `DAG` | Represents a logical DAG operation (read, aggregate, join, etc.) | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) |
| `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) |
| `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) |

---

## Feature resolver and builder
The `FeatureBuilder` initialize a `FeatureResolver` that extracts a DAG from the `FeatureView` definitions, resolving dependencies and ensuring correct execution order. \
Comment thread
HaoXuAI marked this conversation as resolved.
Outdated
The FeatureView represents a logical data source, while DataSource represents the physical data source (e.g., BigQuery, Spark, etc.). \
When defines the FeatureView, the source can be a physical DataSource, a derived FeatureView, or a list of FeatureViews.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing a word here. ?

The FeatureResolver walks through the FeatureView sources, and topologically sorts the DAG nodes based on dependencies, and return a head node that represents the final output of the DAG. \
Comment thread
HaoXuAI marked this conversation as resolved.
Outdated
Then the `FeatureBuilder` builds the DAG nodes from the resolved head node, creating a `DAGNode` for each operation (read, join, filter, aggregate, etc.).

## Diagram
![feature_dag.png](feature_dag.png)


## ✨ Available Engines

### 🔥 SparkComputeEngine
Expand All @@ -44,7 +56,7 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio
SourceReadNode
|
v
JoinNode (Only for get_historical_features with entity df)
TransformationNode (If feature_transformation is defined) | JoinNode (default behavior for multiple sources)
|
v
FilterNode (Always included; applies TTL or user-defined filters)
Expand All @@ -56,9 +68,6 @@ AggregationNode (If aggregations are defined in FeatureView)
DeduplicationNode (If no aggregation is defined for get_historical_features)
|
v
TransformationNode (If feature_transformation is defined)
|
v
ValidationNode (If enable_validation = True)
|
v
Expand All @@ -79,20 +88,54 @@ To create your own compute engine:

```python
from feast.infra.compute_engines.base import ComputeEngine
from feast.infra.materialization.batch_materialization_engine import MaterializationTask, MaterializationJob
from feast.infra.compute_engines.tasks import HistoricalRetrievalTask
from typing import Sequence, Union
from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.common.materialization_job import (
MaterializationJob,
MaterializationTask,
)
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.stream_feature_view import StreamFeatureView


class MyComputeEngine(ComputeEngine):
def materialize(self, task: MaterializationTask) -> MaterializationJob:
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
...

def _materialize_one(
self,
registry: BaseRegistry,
task: MaterializationTask,
**kwargs,
) -> MaterializationJob:
...

def get_historical_features(self, task: HistoricalRetrievalTask) -> RetrievalJob:
...

```

2. Create a FeatureBuilder
```python
from feast.infra.compute_engines.feature_builder import FeatureBuilder


class CustomFeatureBuilder(FeatureBuilder):
def build_source_node(self): ...
def build_aggregation_node(self, input_node): ...
Expand All @@ -101,6 +144,7 @@ class CustomFeatureBuilder(FeatureBuilder):
def build_dedup_node(self, input_node):
def build_transformation_node(self, input_node): ...
def build_output_nodes(self, input_node): ...
def build_validation_node(self, input_node): ...
```

3. Define DAGNode subclasses
Expand All @@ -114,7 +158,7 @@ class CustomFeatureBuilder(FeatureBuilder):
## 🚧 Roadmap
- [x] Modular, backend-agnostic DAG execution framework
- [x] Spark engine with native support for materialization + PIT joins
- [ ] PyArrow + Pandas engine for local compute
- [ ] Native multi-feature-view DAG optimization
- [x] PyArrow + Pandas engine for local compute
- [x] Native multi-feature-view DAG optimization
- [ ] DAG validation, metrics, and debug output
- [ ] Scalable distributed backend via Ray or Polars
Binary file added docs/reference/compute-engine/feature_dag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
52 changes: 46 additions & 6 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,14 @@ def __copy__(self):
fv = FeatureView(
name=self.name,
ttl=self.ttl,
source=self.stream_source if self.stream_source else self.batch_source,
source=self.source_views
if self.source_views
else (self.stream_source if self.stream_source else self.batch_source),
schema=self.schema,
tags=self.tags,
online=self.online,
offline=self.offline,
sink_source=self.batch_source if self.source_views else None,
)

# This is deliberately set outside of the FV initialization as we do not have the Entity objects.
Expand All @@ -289,6 +292,7 @@ def __eq__(self, other):
or self.batch_source != other.batch_source
or self.stream_source != other.stream_source
or sorted(self.entity_columns) != sorted(other.entity_columns)
or self.source_views != other.source_views
):
return False

Expand Down Expand Up @@ -371,7 +375,25 @@ def to_proto(self) -> FeatureViewProto:
Returns:
A FeatureViewProto protobuf.
"""
return self._to_proto_internal(seen={})

def _to_proto_internal(self, seen: Dict[str, FeatureViewProto]) -> FeatureViewProto:
if self.name in seen:
if seen[self.name] is not None:
raise ValueError(
f"Cycle detected during serialization of FeatureView: {self.name}"
)
return seen[self.name]

seen[self.name] = None # type: ignore[assignment]

spec = self.to_proto_spec(seen)
meta = self.to_proto_meta()
proto = FeatureViewProto(spec=spec, meta=meta)
seen[self.name] = proto
return proto

def to_proto_spec(self, seen: Dict[str, FeatureViewProto]) -> FeatureViewSpecProto:
ttl_duration = self.get_ttl_duration()

batch_source_proto = None
Expand All @@ -385,8 +407,10 @@ def to_proto(self) -> FeatureViewProto:
stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}"
source_view_protos = None
if self.source_views:
source_view_protos = [view.to_proto().spec for view in self.source_views]
spec = FeatureViewSpecProto(
source_view_protos = [
view._to_proto_internal(seen).spec for view in self.source_views
]
return FeatureViewSpecProto(
name=self.name,
entities=self.entities,
entity_columns=[field.to_proto() for field in self.entity_columns],
Expand All @@ -402,8 +426,6 @@ def to_proto(self) -> FeatureViewProto:
source_views=source_view_protos,
)

return FeatureViewProto(spec=spec, meta=meta)

def to_proto_meta(self):
meta = FeatureViewMetaProto(materialization_intervals=[])
if self.created_timestamp:
Expand All @@ -425,16 +447,33 @@ def get_ttl_duration(self):
return ttl_duration

@classmethod
def from_proto(cls, feature_view_proto: FeatureViewProto):
def from_proto(cls, feature_view_proto: FeatureViewProto) -> "FeatureView":
return cls._from_proto_internal(feature_view_proto, seen={})

@classmethod
def _from_proto_internal(
cls, feature_view_proto: FeatureViewProto, seen: Dict[str, "FeatureView"]
) -> "FeatureView":
"""
Creates a feature view from a protobuf representation of a feature view.

Args:
feature_view_proto: A protobuf representation of a feature view.
seen: A dictionary to keep track of already seen feature views to avoid recursion.

Returns:
A FeatureViewProto object based on the feature view protobuf.
"""
feature_view_name = feature_view_proto.spec.name

if feature_view_name in seen:
if seen[feature_view_name] is None:
raise ValueError(
f"Cycle detected while deserializing FeatureView: {feature_view_name}"
)
return seen[feature_view_name]
seen[feature_view_name] = None # type: ignore[assignment]

batch_source = (
DataSource.from_proto(feature_view_proto.spec.batch_source)
if feature_view_proto.spec.HasField("batch_source")
Expand Down Expand Up @@ -509,6 +548,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
)
)

seen[feature_view_name] = feature_view
return feature_view

@property
Expand Down
Loading
Loading