Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
* [Snowflake](reference/compute-engine/snowflake.md)
* [AWS Lambda (alpha)](reference/compute-engine/lambda.md)
* [Spark (contrib)](reference/compute-engine/spark.md)
* [Apache Flink](reference/compute-engine/flink.md)
* [Ray (contrib)](reference/compute-engine/ray.md)
* [Feature repository](reference/feature-repository/README.md)
* [feature\_store.yaml](reference/feature-repository/feature-store-yaml.md)
Expand Down
4 changes: 2 additions & 2 deletions docs/getting-started/components/compute-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ engines.
| SparkComputeEngine | Runs on Apache Spark, designed for large-scale distributed feature generation. || |
| SnowflakeComputeEngine | Runs on Snowflake, designed for scalable feature generation using Snowflake SQL. || |
| LambdaComputeEngine | Runs on AWS Lambda, designed for serverless feature generation. || |
| FlinkComputeEngine | Runs on Apache Flink, designed for stream processing and real-time feature generation. | | |
| FlinkComputeEngine | Runs on Apache Flink, designed for distributed feature generation through PyFlink Table API. | | |
| RayComputeEngine | Runs on Ray, designed for distributed feature generation and machine learning workloads. || |
```

Expand Down Expand Up @@ -156,4 +156,4 @@ DAG nodes are defined as follows:
+----------------+ +----------------+
| OnlineStoreWrite| OfflineStoreWrite|
+----------------+ +----------------+
```
```
8 changes: 8 additions & 0 deletions docs/reference/compute-engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ An example of built output from FeatureBuilder:
- Supports point-in-time joins and large-scale materialization
- Integrates with `SparkOfflineStore` and `SparkMaterializationJob`

### 🌊 FlinkComputeEngine

{% page-ref page="flink.md" %}

- Distributed DAG execution through Apache Flink's PyFlink Table API
- Supports materialization and historical retrieval with Feast offline stores
- Integrates with `FlinkMaterializationJob` and `FlinkDAGRetrievalJob`

### ⚡ RayComputeEngine (contrib)

- Distributed DAG execution via Ray
Expand Down
123 changes: 123 additions & 0 deletions docs/reference/compute-engine/flink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Apache Flink

## Description

The Apache Flink compute engine provides a distributed execution engine for
feature pipelines through the PyFlink Table API. It implements Feast's unified
`ComputeEngine` interface and can be used for batch materialization operations
(`materialize` and `materialize-incremental`) and historical retrieval
(`get_historical_features`).

The engine reads data through the configured Feast offline store and executes
the Feast DAG as PyFlink tables. Offline stores that expose a native
`to_flink_table(table_env)` retrieval job hand Flink tables directly to the
engine. The engine then uses Flink Table/SQL operations for join, filter,
aggregate, dedupe, and projection steps, and writes materialization results to
the configured online and/or offline store.

## Configuration

Install the Flink extra from a Feast source checkout with `uv` before using the
engine:

```bash
uv sync --extra flink --no-dev
```

The `flink` extra installs PyFlink directly. PyFlink currently requires
`pyarrow<21`, while the default Feast install keeps `pyarrow>=21`; Feast's uv
lock resolves the Flink extra in a separate dependency fork so normal Feast
installs do not downgrade Arrow.

Configure the engine in `feature_store.yaml`:

```yaml
project: my_project
registry: data/registry.db
provider: local
offline_store:
type: file
online_store:
type: sqlite
path: data/online_store.db
batch_engine:
type: flink.engine
execution_mode: batch
parallelism: 4
table_config:
pipeline.name: "Feast Flink Compute Engine"
pandas_split_num: 4
```

## Configuration Options

| Option | Type | Default | Description |
| --- | --- | --- | --- |
| `type` | string | `flink.engine` | Must be `flink.engine`. |
| `execution_mode` | string | `batch` | PyFlink execution mode: `batch` or `streaming`. |
| `parallelism` | integer | `null` | Default Flink parallelism for jobs created by the engine. |
| `table_config` | map | `null` | Additional PyFlink table configuration entries. |
| `pandas_split_num` | integer | `1` | Number of PyFlink Arrow source splits when converting pandas entity DataFrames into Flink tables. |

## Flink Transformations

Use `mode="flink"` when a `BatchFeatureView` transformation should receive and
return PyFlink table objects:

```python
from feast import BatchFeatureView, Field
from feast.types import Float32


def double_rates(table):
# In production this can use PyFlink Table API operations and return a table.
return table


driver_stats = BatchFeatureView(
name="driver_stats",
entities=[driver],
mode="flink",
udf=double_rates,
schema=[Field(name="conv_rate", dtype=Float32)],
source=driver_stats_source,
online=True,
)
```

Flink transformations must return PyFlink table objects. pandas-returning UDFs
are not accepted by the Flink compute engine.

## DAG Support

The Flink engine implements Feast's compute DAG with Flink-specific nodes:

- Source reads from Feast offline stores, preferring native Flink tables when a
retrieval job supports `to_flink_table(table_env)`.
- Transform nodes pass PyFlink tables to `mode="flink"` UDFs and preserve native
Flink table outputs.
- Join nodes use Flink SQL temporary views for feature joins and entity joins.
- Filter nodes apply point-in-time, TTL, and custom filter expressions in Flink
SQL.
- Aggregate nodes support non-windowed Feast aggregations using Flink SQL
aggregate functions.
- Dedupe nodes use `ROW_NUMBER()` over entity keys or internal entity-row ids so
historical retrieval keeps one latest feature row per entity row.
- Validation nodes check required output columns. JSON value validation must be
handled upstream in Flink SQL.
- Output nodes write only for materialization tasks; historical retrieval is
read-only.
- Historical retrieval accepts pandas entity DataFrames and SQL-string entity
DataFrames. SQL strings are interpreted as Flink SQL queries against the
configured TableEnvironment/catalog and must select an `event_timestamp`
column.

## Current Limitations

- Windowed aggregations are not yet implemented in the Flink compute engine. Use
non-windowed Feast aggregations or pre-window upstream in Flink.
- Offline store retrieval jobs must implement `to_flink_table(table_env)`.
Arrow/pandas-only retrieval jobs are rejected instead of converted.
- JSON value validation is not implemented inside the Flink compute engine
because the engine does not collect intermediate data out of Flink for
validation.
24 changes: 23 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ dependencies = [
"mmh3",
"numpy>=2.0.0,<3",
"pandas>=1.4.3,<3",
"pyarrow>=21.0.0",
"pyarrow>=21.0.0; extra != 'flink'",
"pyarrow>=16.1.0,<21.0.0; extra == 'flink'",
"pydantic>=2.10.6",
"pygments>=2.12.0,<3",
"PyYAML>=5.4.0,<7",
Expand Down Expand Up @@ -63,6 +64,7 @@ docling = ["docling==2.27.0"]
duckdb = ["ibis-framework[duckdb]>=10.0.0"]
elasticsearch = ["elasticsearch>=8.13.0"]
faiss = ["faiss-cpu>=1.7.0,<=1.10.0"]
flink = ["apache-flink>=2.2.1,<3"]
gcp = [
"google-api-core>=1.23.0,<3",
"googleapis-common-protos>=1.52.0,<2",
Expand Down Expand Up @@ -278,6 +280,26 @@ dev = [
"pytest-xdist>=3.8.0",
]

[tool.uv]
conflicts = [
[
{ extra = "flink" },
{ extra = "ge" },
],
[
{ extra = "flink" },
{ extra = "ci" },
],
[
{ extra = "flink" },
{ extra = "dev" },
],
[
{ extra = "flink" },
{ extra = "docs" },
],
]

# Pixi configuration
[tool.pixi.workspace]
channels = ["conda-forge"]
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/batch_feature_view.py
Comment thread
XuananLe marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,14 @@ def get_feature_transformation(self) -> Optional[Transformation]:
TransformationMode.PYTHON,
TransformationMode.SQL,
TransformationMode.RAY,
) or self.mode in ("pandas", "python", "sql", "ray"):
TransformationMode.FLINK,
) or self.mode in ("pandas", "python", "sql", "ray", "flink"):
return Transformation(
mode=self.mode, udf=self.udf, udf_string=self.udf_string or ""
)
else:
raise ValueError(
f"Unsupported transformation mode: {self.mode} for StreamFeatureView"
f"Unsupported transformation mode: {self.mode} for BatchFeatureView"
)


Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/compute_engines/dag/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ class DAGFormat(str, Enum):
PANDAS = "pandas"
ARROW = "arrow"
RAY = "ray"
FLINK = "flink"
5 changes: 4 additions & 1 deletion sdk/python/feast/infra/compute_engines/feature_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,13 @@ def get_column_info(
# we need to read ALL source columns, not just the output feature columns.
# This is specifically for transformations that create new columns or need raw data.
mode = getattr(getattr(view, "feature_transformation", None), "mode", None)
if mode in ("ray", "pandas", "python") or getattr(mode, "value", None) in (
if mode in ("ray", "pandas", "python", "flink") or getattr(
mode, "value", None
) in (
"ray",
"pandas",
"python",
"flink",
):
# Signal to read all columns by passing empty list for feature_cols.
# "python" (BatchFeatureView) transformations need all raw source columns — the
Expand Down
11 changes: 11 additions & 0 deletions sdk/python/feast/infra/compute_engines/flink/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from __future__ import annotations

from feast.infra.compute_engines.flink.compute import (
FlinkComputeEngine,
FlinkComputeEngineConfig,
)

__all__ = [
"FlinkComputeEngine",
"FlinkComputeEngineConfig",
]
Loading