-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Compute Engine Initial Implementation #5223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
1183a9a
add compute engine
HaoXuAI 2825ee4
fix linting
HaoXuAI 4210f68
fix linting
HaoXuAI a398075
fix linting
HaoXuAI 68c48dc
fix linting
HaoXuAI 1c9ae31
add doc
HaoXuAI 25af94e
add test
HaoXuAI ed0cdf4
add integration test
HaoXuAI 6b57e94
update API
HaoXuAI 227f8f4
update API
HaoXuAI e9362de
update API
HaoXuAI 68cf242
update API
HaoXuAI 3a5cf92
update API
HaoXuAI c1ba3d6
fix linting
HaoXuAI 95f757d
update doc
HaoXuAI e5445a2
update doc
HaoXuAI 263024b
Merge branch 'master' into compute-engine
HaoXuAI 2433064
update test
HaoXuAI 87e51c7
update doc
HaoXuAI c698b64
Merge branch 'master' into compute-engine
HaoXuAI 0877c03
Merge branch 'master' into compute-engine
HaoXuAI File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
add doc
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
- Loading branch information
commit 1c9ae31bf73598a57345242afc08266afaf8438a
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| # 🧠 ComputeEngine (WIP) | ||
|
|
||
| The `ComputeEngine` is Feast’s pluggable abstraction for executing feature pipelines — including transformations, aggregations, joins, and materialization/get_historical_features — on a backend of your choice (e.g., Spark, PyArrow, Pandas, Ray). | ||
|
HaoXuAI marked this conversation as resolved.
Outdated
|
||
|
|
||
| It powers both: | ||
|
|
||
| - `materialize()` – for batch and stream generation of features to offline/online stores | ||
| - `get_historical_features()` – for point-in-time correct training dataset retrieval | ||
|
|
||
| This system builds and executes DAGs (Directed Acyclic Graphs) of typed operations, enabling modular and scalable workflows. | ||
|
|
||
| --- | ||
|
|
||
| ## 🧠 Core Concepts | ||
|
|
||
| | Component | Description | | ||
| |--------------------|--------------------------------------------------------------------| | ||
| | `ComputeEngine` | Interface for executing materialization and retrieval tasks | | ||
| | `DAGBuilder` | Constructs a DAG for a specific backend | | ||
| | `DAGNode` | Represents a logical operation (read, aggregate, join, etc.) | | ||
| | `ExecutionPlan` | Executes nodes in dependency order and stores intermediate outputs | | ||
| | `ExecutionContext` | Holds config, registry, stores, entity data, and node outputs | | ||
|
|
||
| --- | ||
|
|
||
| ## ✨ Available Engines | ||
|
|
||
| ### 🔥 SparkComputeEngine | ||
|
|
||
| - Distributed DAG execution via Apache Spark | ||
| - Supports point-in-time joins and large-scale materialization | ||
| - Integrates with `SparkOfflineStore` and `SparkMaterializationJob` | ||
|
|
||
| ### 🧪 LocalComputeEngine (WIP) | ||
|
|
||
| - Runs on Arrow + Pandas (or optionally DuckDB) | ||
| - Designed for local dev, testing, or lightweight feature generation | ||
|
|
||
| --- | ||
|
|
||
| ## 🛠️ Example DAG Flow | ||
| `Read → Aggregate → Join → Transform → Write` | ||
|
|
||
| Each step is implemented as a `DAGNode`. An `ExecutionPlan` executes these nodes in topological order, caching `DAGValue` outputs. | ||
|
|
||
| --- | ||
|
|
||
| ## 🧩 Implementing a Custom Compute Engine | ||
|
|
||
| To create your own compute engine: | ||
|
|
||
| 1. **Implement the interface** | ||
|
|
||
| ```python | ||
| class MyComputeEngine(ComputeEngine): | ||
| def materialize(self, task: MaterializationTask) -> MaterializationJob: | ||
| ... | ||
|
|
||
| def get_historical_features(self, task: HistoricalRetrievalTask) -> pa.Table: | ||
| ... | ||
| ``` | ||
|
|
||
| 2. Create a DAGBuilder | ||
| ```python | ||
| class MyDAGBuilder(DAGBuilder): | ||
| def build_source_node(self): ... | ||
| def build_aggregation_node(self, input_node): ... | ||
| def build_join_node(self, input_node): ... | ||
| def build_transformation_node(self, input_node): ... | ||
| def build_output_nodes(self, input_node): ... | ||
| ``` | ||
|
|
||
| 3. Define DAGNode subclasses | ||
| * ReadNode, AggregationNode, JoinNode, WriteNode, etc. | ||
| * Each DAGNode.execute(context) -> DAGValue | ||
|
|
||
| 4. Return an ExecutionPlan | ||
| * ExecutionPlan stores DAG nodes in topological order | ||
| * Automatically handles intermediate value caching | ||
|
|
||
| ## 🚧 Roadmap | ||
| - [x] Modular, backend-agnostic DAG execution framework | ||
| - [x] Spark engine with native support for materialization + PIT joins | ||
| - [ ] PyArrow + Pandas engine for local compute | ||
| - [ ] Native multi-feature-view DAG optimization | ||
| - [ ] DAG validation, metrics, and debug output | ||
| - [ ] Scalable distributed backend via Ray or Polars | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| from typing import List, Optional | ||
|
|
||
| import pyspark | ||
| from pyspark.sql import SparkSession | ||
|
|
||
| from feast import OnDemandFeatureView, RepoConfig | ||
| from feast.infra.compute_engines.dag.context import ExecutionContext | ||
| from feast.infra.compute_engines.dag.plan import ExecutionPlan | ||
| from feast.infra.offline_stores.contrib.spark_offline_store.spark import ( | ||
| SparkRetrievalJob, | ||
| ) | ||
| from feast.infra.offline_stores.offline_store import RetrievalMetadata | ||
|
|
||
|
|
||
| class SparkDAGRetrievalJob(SparkRetrievalJob): | ||
| def __init__( | ||
| self, | ||
| spark_session: SparkSession, | ||
| plan: ExecutionPlan, | ||
| context: ExecutionContext, | ||
| full_feature_names: bool, | ||
| config: RepoConfig, | ||
| on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None, | ||
| metadata: Optional[RetrievalMetadata] = None, | ||
| ): | ||
| super().__init__( | ||
| spark_session=spark_session, | ||
| query="", | ||
| full_feature_names=full_feature_names, | ||
| config=config, | ||
| on_demand_feature_views=on_demand_feature_views, | ||
| metadata=metadata, | ||
| ) | ||
| self._plan = plan | ||
| self._context = context | ||
| self._metadata = metadata | ||
| self._spark_df = None # Will be populated on first access | ||
|
|
||
| def _ensure_executed(self): | ||
| if self._spark_df is None: | ||
| result = self._plan.execute(self._context) | ||
| self._spark_df = result.data | ||
|
|
||
| def to_spark_df(self) -> pyspark.sql.DataFrame: | ||
| self._ensure_executed() | ||
| assert self._spark_df is not None, "Execution plan did not produce a DataFrame" | ||
| return self._spark_df | ||
|
|
||
| def to_sql(self) -> str: | ||
| self._ensure_executed() | ||
| return self._plan.to_sql(self._context) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.