-
Notifications
You must be signed in to change notification settings - Fork 1.3k
fix: FeatureView serialization with cycle detection #5502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
cdda0a0
a482e1c
d6c2704
3962b73
0237706
af000e7
20182b7
ca075bc
2803839
a554718
9919bcd
4725d1b
dde5028
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| # 🧬 BatchFeatureView in Feast | ||
|
|
||
| `BatchFeatureView` is a flexible abstraction in Feast that allows users to define features derived from batch data sources or even other `FeatureView`s, enabling composable and reusable feature pipelines. It is an extension of the `FeatureView` class, with support for user-defined transformations, aggregations, and recursive chaining of feature logic. | ||
|
|
||
| --- | ||
|
|
||
| ## ✅ Key Capabilities | ||
|
|
||
| - **Composable DAG of FeatureViews**: Supports defining a `BatchFeatureView` on top of one or more other `FeatureView`s. | ||
| - **Transformations**: Apply PySpark-based transformation logic (`feature_transformation` or `udf`) to raw data source, can also be used to deal with multiple data sources. | ||
| - **Aggregations**: Define time-windowed aggregations (e.g. `sum`, `avg`) over event-timestamped data. | ||
| - **Feature resolution & execution**: Automatically resolves and executes DAGs of dependent views during materialization or retrieval. More details in the [Compute engine documentation](../../reference/compute-engine/README.md). | ||
| - **Materialization Sink Customization**: Specify a custom `sink_source` to define where derived feature data should be persisted. | ||
|
|
||
| --- | ||
|
|
||
| ## 📐 Class Signature | ||
|
|
||
| ```python | ||
| class BatchFeatureView(FeatureView): | ||
| def __init__( | ||
| *, | ||
| name: str, | ||
| source: Union[DataSource, FeatureView, List[FeatureView]], | ||
| sink_source: Optional[DataSource] = None, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for some reason i thought we agreed on naming it sink.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, on a second thought, I think |
||
| schema: Optional[List[Field]] = None, | ||
| entities: Optional[List[Entity]] = None, | ||
| aggregations: Optional[List[Aggregation]] = None, | ||
| udf: Optional[Callable[[DataFrame], DataFrame]] = None, | ||
| udf_string: Optional[str] = None, | ||
| ttl: Optional[timedelta] = timedelta(days=0), | ||
| online: bool = True, | ||
| offline: bool = False, | ||
| description: str = "", | ||
| tags: Optional[Dict[str, str]] = None, | ||
| owner: str = "", | ||
| ) | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## 🧠 Usage | ||
|
|
||
| ### 1. Simple Feature View from Data Source | ||
|
|
||
| ```python | ||
| from feast import BatchFeatureView, Field | ||
| from feast.types import Float32, Int32 | ||
| from feast import FileSource | ||
| from feast.aggregation import Aggregation | ||
| from datetime import timedelta | ||
|
|
||
| source = FileSource( | ||
| path="s3://bucket/path/data.parquet", | ||
| timestamp_field="event_timestamp", | ||
| created_timestamp_column="created", | ||
| ) | ||
|
|
||
| driver_fv = BatchFeatureView( | ||
| name="driver_hourly_stats", | ||
| entities=["driver_id"], | ||
| schema=[ | ||
| Field(name="driver_id", dtype=Int32), | ||
| Field(name="conv_rate", dtype=Float32), | ||
| ], | ||
| aggregations=[ | ||
| Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe we should make it avg since summing a conversion rate is weird |
||
| ], | ||
| source=source, | ||
| ) | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ### 2. Derived Feature View from Another View | ||
|
|
||
|
HaoXuAI marked this conversation as resolved.
Outdated
|
||
| ```python | ||
| from feast import BatchFeatureView, Field | ||
| from pyspark.sql import DataFrame | ||
| from feast.types import Float32, Int32 | ||
| from feast import FileSource | ||
|
|
||
| def transform(df: DataFrame) -> DataFrame: | ||
| return df.withColumn("conv_rate", df["conv_rate"] * 2) | ||
|
|
||
| daily_driver_stats = BatchFeatureView( | ||
| name="daily_driver_stats", | ||
| entities=["driver_id"], | ||
| schema=[ | ||
| Field(name="driver_id", dtype=Int32), | ||
| Field(name="conv_rate", dtype=Float32), | ||
| ], | ||
| udf=transform, | ||
| source=driver_fv, | ||
| sink_source=FileSource( # Required to specify where to sink the derived view | ||
| name="daily_driver_stats_sink", | ||
| path="s3://bucket/daily_stats/", | ||
| file_format="parquet", | ||
| timestamp_field="event_timestamp", | ||
| created_timestamp_column="created", | ||
| ), | ||
| ) | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## 🔄 Execution Flow | ||
|
|
||
| Feast automatically resolves the DAG of `BatchFeatureView` dependencies during: | ||
|
|
||
| - `materialize()`: recursively resolves and executes the feature view graph. | ||
| - `get_historical_features()`: builds the execution plan for retrieving point-in-time correct features. | ||
| - `apply()`: registers the feature view DAG structure to the registry. | ||
|
|
||
| Each transformation and aggregation is turned into a DAG node (e.g., `SparkTransformationNode`, `SparkAggregationNode`) executed by the compute engine (e.g., `SparkComputeEngine`). | ||
|
|
||
| --- | ||
|
|
||
| ## ⚙️ How Materialization Works | ||
|
|
||
| - If the `BatchFeatureView` is backed by a base source (`FileSource`, `BigQuerySource`, `SparkSource` etc), the `batch_source` is used directly. | ||
| - If the source is another feature view (i.e., chained views), the `sink_source` must be provided to define the materialization target data source. | ||
| - During DAG planning, `SparkWriteNode` uses the `sink_source` as the batch sink. | ||
|
|
||
| --- | ||
|
|
||
| ## 🧪 Example Tests | ||
|
|
||
| See: | ||
|
|
||
| - `test_spark_dag_materialize_recursive_view()`: Validates chaining of two feature views and output validation. | ||
| - `test_spark_compute_engine_materialize()`: Validates transformation and write of features into offline and online stores. | ||
|
|
||
| --- | ||
|
|
||
| ## 🛑 Gotchas | ||
|
|
||
| - `sink_source` is **required** when chaining views (i.e., `source` is another FeatureView or list of them). | ||
| - Schema fields must be consistent with `sink_source`, `batch_source.field_mapping` if field mappings exist. | ||
| - Aggregation logic must reference columns present in the raw source or transformed inputs. | ||
|
|
||
| --- | ||
|
|
||
| ## 🔮 Future Directions | ||
|
|
||
| - Support additional offline stores (e.g., Snowflake, Redshift) with auto-generated sink sources. | ||
| - Enable fully declarative transform logic (SQL + UDF mix). | ||
| - Introduce optimization passes for DAG pruning and fusion. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,16 +13,28 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio | |
|
|
||
| ## 🧠 Core Concepts | ||
|
|
||
| | Component | Description | | ||
| |--------------------|----------------------------------------------------------------------| | ||
| | `ComputeEngine` | Interface for executing materialization and retrieval tasks | | ||
| | `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend | | ||
| | `DAGNode` | Represents a logical operation (read, aggregate, join, etc.) | | ||
| | `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | | ||
| | `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | | ||
| | Component | Description | API | | ||
| |--------------------|----------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------| | ||
| | `ComputeEngine` | Interface for executing materialization and retrieval tasks | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/base.py) | | ||
| | `FeatureBuilder` | Constructs a DAG from Feature View definition for a specific backend | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_builder.py) | | ||
| | `FeatureResolver` | Resolves feature DAG by topological order for execution | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/feature_resolver.py) | | ||
| | `DAG` | Represents a logical DAG operation (read, aggregate, join, etc.) | [link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md) | | ||
| | `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) | | ||
| | `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | [link]([link](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/compute_engines/dag/README.md)) | | ||
|
|
||
| --- | ||
|
|
||
| ## Feature resolver and builder | ||
| The `FeatureBuilder` initialize a `FeatureResolver` that extracts a DAG from the `FeatureView` definitions, resolving dependencies and ensuring correct execution order. \ | ||
|
HaoXuAI marked this conversation as resolved.
Outdated
|
||
| The FeatureView represents a logical data source, while DataSource represents the physical data source (e.g., BigQuery, Spark, etc.). \ | ||
| When defines the FeatureView, the source can be a physical DataSource, a derived FeatureView, or a list of FeatureViews. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing a word here. ? |
||
| The FeatureResolver walks through the FeatureView sources, and topologically sorts the DAG nodes based on dependencies, and return a head node that represents the final output of the DAG. \ | ||
|
HaoXuAI marked this conversation as resolved.
Outdated
|
||
| Then the `FeatureBuilder` builds the DAG nodes from the resolved head node, creating a `DAGNode` for each operation (read, join, filter, aggregate, etc.). | ||
|
|
||
| ## Diagram | ||
|  | ||
|
|
||
|
|
||
| ## ✨ Available Engines | ||
|
|
||
| ### 🔥 SparkComputeEngine | ||
|
|
@@ -44,7 +56,7 @@ This system builds and executes DAGs (Directed Acyclic Graphs) of typed operatio | |
| SourceReadNode | ||
| | | ||
| v | ||
| JoinNode (Only for get_historical_features with entity df) | ||
| TransformationNode (If feature_transformation is defined) | JoinNode (default behavior for multiple sources) | ||
| | | ||
| v | ||
| FilterNode (Always included; applies TTL or user-defined filters) | ||
|
|
@@ -56,9 +68,6 @@ AggregationNode (If aggregations are defined in FeatureView) | |
| DeduplicationNode (If no aggregation is defined for get_historical_features) | ||
| | | ||
| v | ||
| TransformationNode (If feature_transformation is defined) | ||
| | | ||
| v | ||
| ValidationNode (If enable_validation = True) | ||
| | | ||
| v | ||
|
|
@@ -79,20 +88,54 @@ To create your own compute engine: | |
|
|
||
| ```python | ||
| from feast.infra.compute_engines.base import ComputeEngine | ||
| from feast.infra.materialization.batch_materialization_engine import MaterializationTask, MaterializationJob | ||
| from feast.infra.compute_engines.tasks import HistoricalRetrievalTask | ||
| from typing import Sequence, Union | ||
| from feast.batch_feature_view import BatchFeatureView | ||
| from feast.entity import Entity | ||
| from feast.feature_view import FeatureView | ||
| from feast.infra.common.materialization_job import ( | ||
| MaterializationJob, | ||
| MaterializationTask, | ||
| ) | ||
| from feast.infra.common.retrieval_task import HistoricalRetrievalTask | ||
| from feast.infra.offline_stores.offline_store import RetrievalJob | ||
| from feast.infra.registry.base_registry import BaseRegistry | ||
| from feast.on_demand_feature_view import OnDemandFeatureView | ||
| from feast.stream_feature_view import StreamFeatureView | ||
|
|
||
|
|
||
| class MyComputeEngine(ComputeEngine): | ||
| def materialize(self, task: MaterializationTask) -> MaterializationJob: | ||
| def update( | ||
| self, | ||
| project: str, | ||
| views_to_delete: Sequence[ | ||
| Union[BatchFeatureView, StreamFeatureView, FeatureView] | ||
| ], | ||
| views_to_keep: Sequence[ | ||
| Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView] | ||
| ], | ||
| entities_to_delete: Sequence[Entity], | ||
| entities_to_keep: Sequence[Entity], | ||
| ): | ||
| ... | ||
|
|
||
| def _materialize_one( | ||
| self, | ||
| registry: BaseRegistry, | ||
| task: MaterializationTask, | ||
| **kwargs, | ||
| ) -> MaterializationJob: | ||
| ... | ||
|
|
||
| def get_historical_features(self, task: HistoricalRetrievalTask) -> RetrievalJob: | ||
| ... | ||
|
|
||
| ``` | ||
|
|
||
| 2. Create a FeatureBuilder | ||
| ```python | ||
| from feast.infra.compute_engines.feature_builder import FeatureBuilder | ||
|
|
||
|
|
||
| class CustomFeatureBuilder(FeatureBuilder): | ||
| def build_source_node(self): ... | ||
| def build_aggregation_node(self, input_node): ... | ||
|
|
@@ -101,6 +144,7 @@ class CustomFeatureBuilder(FeatureBuilder): | |
| def build_dedup_node(self, input_node): | ||
| def build_transformation_node(self, input_node): ... | ||
| def build_output_nodes(self, input_node): ... | ||
| def build_validation_node(self, input_node): ... | ||
| ``` | ||
|
|
||
| 3. Define DAGNode subclasses | ||
|
|
@@ -114,7 +158,7 @@ class CustomFeatureBuilder(FeatureBuilder): | |
| ## 🚧 Roadmap | ||
| - [x] Modular, backend-agnostic DAG execution framework | ||
| - [x] Spark engine with native support for materialization + PIT joins | ||
| - [ ] PyArrow + Pandas engine for local compute | ||
| - [ ] Native multi-feature-view DAG optimization | ||
| - [x] PyArrow + Pandas engine for local compute | ||
| - [x] Native multi-feature-view DAG optimization | ||
| - [ ] DAG validation, metrics, and debug output | ||
| - [ ] Scalable distributed backend via Ray or Polars | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this exclusively limited to PySpark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not exclusively to Pyspark, will update it