-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Add apache flink compute engine #6476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
XuananLe
wants to merge
2
commits into
feast-dev:master
Choose a base branch
from
XuananLe:feat/add-apache-flink-compute-engine
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| # Apache Flink | ||
|
|
||
| ## Description | ||
|
|
||
| The Apache Flink compute engine provides a distributed execution engine for | ||
| feature pipelines through the PyFlink Table API. It implements Feast's unified | ||
| `ComputeEngine` interface and can be used for batch materialization operations | ||
| (`materialize` and `materialize-incremental`) and historical retrieval | ||
| (`get_historical_features`). | ||
|
|
||
| The engine reads data through the configured Feast offline store and executes | ||
| the Feast DAG as PyFlink tables. Offline stores that expose a native | ||
| `to_flink_table(table_env)` retrieval job hand Flink tables directly to the | ||
| engine. The engine then uses Flink Table/SQL operations for join, filter, | ||
| aggregate, dedupe, and projection steps, and writes materialization results to | ||
| the configured online and/or offline store. | ||
|
|
||
| ## Configuration | ||
|
|
||
| Install the Flink extra from a Feast source checkout with `uv` before using the | ||
| engine: | ||
|
|
||
| ```bash | ||
| uv sync --extra flink --no-dev | ||
| ``` | ||
|
|
||
| The `flink` extra installs PyFlink directly. PyFlink currently requires | ||
| `pyarrow<21`, while the default Feast install keeps `pyarrow>=21`; Feast's uv | ||
| lock resolves the Flink extra in a separate dependency fork so normal Feast | ||
| installs do not downgrade Arrow. | ||
|
|
||
| Configure the engine in `feature_store.yaml`: | ||
|
|
||
| ```yaml | ||
| project: my_project | ||
| registry: data/registry.db | ||
| provider: local | ||
| offline_store: | ||
| type: file | ||
| online_store: | ||
| type: sqlite | ||
| path: data/online_store.db | ||
| batch_engine: | ||
| type: flink.engine | ||
| execution_mode: batch | ||
| parallelism: 4 | ||
| table_config: | ||
| pipeline.name: "Feast Flink Compute Engine" | ||
| pandas_split_num: 4 | ||
| ``` | ||
|
|
||
| ## Configuration Options | ||
|
|
||
| | Option | Type | Default | Description | | ||
| | --- | --- | --- | --- | | ||
| | `type` | string | `flink.engine` | Must be `flink.engine`. | | ||
| | `execution_mode` | string | `batch` | PyFlink execution mode: `batch` or `streaming`. | | ||
| | `parallelism` | integer | `null` | Default Flink parallelism for jobs created by the engine. | | ||
| | `table_config` | map | `null` | Additional PyFlink table configuration entries. | | ||
| | `pandas_split_num` | integer | `1` | Number of PyFlink Arrow source splits when converting pandas entity DataFrames into Flink tables. | | ||
|
|
||
| ## Flink Transformations | ||
|
|
||
| Use `mode="flink"` when a `BatchFeatureView` transformation should receive and | ||
| return PyFlink table objects: | ||
|
|
||
| ```python | ||
| from feast import BatchFeatureView, Field | ||
| from feast.types import Float32 | ||
|
|
||
|
|
||
| def double_rates(table): | ||
| # In production this can use PyFlink Table API operations and return a table. | ||
| return table | ||
|
|
||
|
|
||
| driver_stats = BatchFeatureView( | ||
| name="driver_stats", | ||
| entities=[driver], | ||
| mode="flink", | ||
| udf=double_rates, | ||
| schema=[Field(name="conv_rate", dtype=Float32)], | ||
| source=driver_stats_source, | ||
| online=True, | ||
| ) | ||
| ``` | ||
|
|
||
| Flink transformations must return PyFlink table objects. pandas-returning UDFs | ||
| are not accepted by the Flink compute engine. | ||
|
|
||
| ## DAG Support | ||
|
|
||
| The Flink engine implements Feast's compute DAG with Flink-specific nodes: | ||
|
|
||
| - Source reads from Feast offline stores, preferring native Flink tables when a | ||
| retrieval job supports `to_flink_table(table_env)`. | ||
| - Transform nodes pass PyFlink tables to `mode="flink"` UDFs and preserve native | ||
| Flink table outputs. | ||
| - Join nodes use Flink SQL temporary views for feature joins and entity joins. | ||
| - Filter nodes apply point-in-time, TTL, and custom filter expressions in Flink | ||
| SQL. | ||
| - Aggregate nodes support non-windowed Feast aggregations using Flink SQL | ||
| aggregate functions. | ||
| - Dedupe nodes use `ROW_NUMBER()` over entity keys or internal entity-row ids so | ||
| historical retrieval keeps one latest feature row per entity row. | ||
| - Validation nodes check required output columns. JSON value validation must be | ||
| handled upstream in Flink SQL. | ||
| - Output nodes write only for materialization tasks; historical retrieval is | ||
| read-only. | ||
| - Historical retrieval accepts pandas entity DataFrames and SQL-string entity | ||
| DataFrames. SQL strings are interpreted as Flink SQL queries against the | ||
| configured TableEnvironment/catalog and must select an `event_timestamp` | ||
| column. | ||
|
|
||
| ## Current Limitations | ||
|
|
||
| - Windowed aggregations are not yet implemented in the Flink compute engine. Use | ||
| non-windowed Feast aggregations or pre-window upstream in Flink. | ||
| - Offline store retrieval jobs must implement `to_flink_table(table_env)`. | ||
| Arrow/pandas-only retrieval jobs are rejected instead of converted. | ||
| - JSON value validation is not implemented inside the Flink compute engine | ||
| because the engine does not collect intermediate data out of Flink for | ||
| validation. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,3 +6,4 @@ class DAGFormat(str, Enum): | |
| PANDAS = "pandas" | ||
| ARROW = "arrow" | ||
| RAY = "ray" | ||
| FLINK = "flink" | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from feast.infra.compute_engines.flink.compute import ( | ||
| FlinkComputeEngine, | ||
| FlinkComputeEngineConfig, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "FlinkComputeEngine", | ||
| "FlinkComputeEngineConfig", | ||
| ] |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.