Skip to content

feat: Add apache flink compute engine#6476

Open
XuananLe wants to merge 2 commits into
feast-dev:masterfrom
XuananLe:feat/add-apache-flink-compute-engine
Open

feat: Add apache flink compute engine#6476
XuananLe wants to merge 2 commits into
feast-dev:masterfrom
XuananLe:feat/add-apache-flink-compute-engine

Conversation

@XuananLe
Copy link
Copy Markdown

@XuananLe XuananLe commented Jun 6, 2026

What this PR does / why we need it

  • Adds an initial Apache Flink compute engine backed by PyFlink Table API.
  • Implements Feast compute DAG nodes for source reads, joins, filters, aggregations, dedupe, transformations, validation, and output writes.
  • Registers the flink.engine batch engine and mode="flink" transformation mode.
  • Adds docs/navigation for configuring the Flink engine and documents the current PyFlink/PyArrow install constraint.
  • Adds focused unit coverage for Flink DAG behavior, SQL-string entity_df, pandas entity_df, strict native source retrieval, TTL interval rendering, and Flink-specific PyArrow dependency metadata.

Feast has a pluggable compute engine abstraction, but does not currently include a Flink implementation. This PR adds a Flink/PyFlink backend so users can run Feast batch materialization and historical retrieval through Flink's Table API while preserving Feast's existing DAG execution model.

Which issue(s) this PR fixes

None filed.

Does this PR introduce a user-facing change

Yes.

Users can configure batch_engine.type: flink.engine and use mode="flink" transformations on BatchFeatureView. Flink transformations receive and return PyFlink table objects. Source reads require offline-store retrieval jobs that expose to_flink_table(table_env); Arrow/pandas-only retrieval jobs are rejected instead of silently falling back.

The Flink extra must currently be installed with uv sync --extra flink --no-dev from a source checkout because PyFlink requires pyarrow<21, while default Feast installs keep pyarrow>=21.

Release note

Add an Apache Flink compute engine backed by PyFlink Table API for batch materialization and historical retrieval.

Testing

  • uv lock
  • uv run ruff check sdk/python/feast/batch_feature_view.py sdk/python/feast/infra/compute_engines/flink sdk/python/feast/transformation/flink_transformation.py sdk/python/tests/unit/infra/compute_engines/flink/test_flink_compute_engine.py
  • uv run bash -c "cd sdk/python && mypy feast/batch_feature_view.py feast/infra/compute_engines/flink/compute.py feast/infra/compute_engines/flink/feature_builder.py feast/infra/compute_engines/flink/job.py feast/infra/compute_engines/flink/nodes.py feast/infra/compute_engines/flink/utils.py feast/transformation/flink_transformation.py tests/unit/infra/compute_engines/flink/test_flink_compute_engine.py --follow-imports=skip --check-untyped-defs"
  • uv run python -m pytest sdk/python/tests/unit/infra/compute_engines/flink/test_flink_compute_engine.py -v
  • Temp source checkout: uv sync --extra flink --no-dev
  • Real PyFlink smoke: PYFLINK_NATIVE_SOURCE_SMOKE_OK pyarrow=16.1.0 pyflink=2.2.1

Note: pip install -e '.[flink]' cannot currently resolve the Flink extra because pip evaluates Feast's default pyarrow>=21 requirement together with PyFlink's pyarrow<21 requirement. The docs now point to the uv source-checkout install path that was verified above.


Open in Devin Review

Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
@XuananLe XuananLe requested a review from a team as a code owner June 6, 2026 08:25
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 potential issues.

View 6 additional findings in Devin Review.

Open in Devin Review

Comment thread sdk/python/feast/batch_feature_view.py
Comment thread pyproject.toml Outdated
@XuananLe
Copy link
Copy Markdown
Author

XuananLe commented Jun 6, 2026

Can you review this PR and add any suggestions @cburroughs

Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
@XuananLe
Copy link
Copy Markdown
Author

XuananLe commented Jun 6, 2026

Maintainer action needed per the Feast development guide: please add the kind/feature label and approve/run the fork PR workflows (or apply ok-to-test if that is the preferred process). I do not have repository permissions to add labels or approve Actions runs from the fork.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an initial Apache Flink compute-engine implementation for Feast (PyFlink Table API), wiring it into Feast’s compute-engine registry and transformation mode system, along with documentation and unit tests.

Changes:

  • Introduces flink.engine compute engine (config, feature builder, DAG nodes, retrieval/materialization jobs) and DAGFormat.FLINK.
  • Registers mode="flink" transformation support and updates transformation factory/feature view mode handling.
  • Adds Flink engine docs plus dependency/uv conflict handling and a focused unit test suite.

Reviewed changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
sdk/python/tests/unit/infra/compute_engines/flink/test_flink_compute_engine.py Adds unit coverage for Flink DAG behavior, entity_df handling, and dependency metadata.
sdk/python/tests/unit/infra/compute_engines/flink/init.py Adds test package marker.
sdk/python/feast/transformation/mode.py Adds TransformationMode.FLINK.
sdk/python/feast/transformation/flink_transformation.py Adds FlinkTransformation wrapper for Flink UDFs.
sdk/python/feast/transformation/factory.py Registers "flink" transformation type mapping.
sdk/python/feast/stream_feature_view.py Allows mode="flink" in transformation resolution.
sdk/python/feast/repo_config.py Registers flink.engine compute engine type.
sdk/python/feast/infra/compute_engines/flink/utils.py Adds TableEnvironment creation + pandas↔Flink table helpers.
sdk/python/feast/infra/compute_engines/flink/nodes.py Implements Flink DAG nodes for source reads, joins, filters/TTL, aggs, dedupe, validation, output writes.
sdk/python/feast/infra/compute_engines/flink/job.py Adds retrieval/materialization job wrappers for Flink engine execution.
sdk/python/feast/infra/compute_engines/flink/feature_builder.py Adds Flink FeatureBuilder wiring nodes into an execution plan.
sdk/python/feast/infra/compute_engines/flink/compute.py Adds FlinkComputeEngine + config model and engine entrypoints.
sdk/python/feast/infra/compute_engines/flink/init.py Exports Flink engine symbols.
sdk/python/feast/infra/compute_engines/feature_builder.py Treats flink transformations as “read all source cols” modes.
sdk/python/feast/infra/compute_engines/dag/model.py Adds DAGFormat.FLINK.
sdk/python/feast/batch_feature_view.py Allows mode="flink" and fixes error message text.
pyproject.toml Adds Flink optional dependency + pyarrow fork markers + uv conflict configuration.
docs/SUMMARY.md Adds Flink compute-engine doc entry.
docs/reference/compute-engine/README.md Documents FlinkComputeEngine in compute-engine overview.
docs/reference/compute-engine/flink.md Adds Flink engine configuration and limitations documentation.
docs/getting-started/components/compute-engine.md Updates getting-started engine table to mark Flink as supported.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +38 to +41
def pandas_to_flink_table(table_env: Any, df: pd.DataFrame, split_num: int = 1) -> Any:
"""Convert a pandas DataFrame to a PyFlink table."""
schema = list(df.columns)
return table_env.from_pandas(df, schema=schema, splits_num=split_num)
Comment on lines +711 to +713
output_df = flink_table_to_pandas(output_table)
output_arrow = pa.Table.from_pandas(output_df)

Comment on lines +107 to +110
def _register_table(table_env: Any, table: Any, prefix: str) -> str:
view_name = f"__feast_{prefix}_{uuid.uuid4().hex}"
table_env.create_temporary_view(view_name, table)
return view_name
Comment on lines +449 to +455
if self.ttl:
ttl_interval = _flink_interval_literal(self.ttl)
conditions.append(
f"{_quote_identifier(timestamp_column)} >= "
f"{_quote_identifier(ENTITY_TS_ALIAS)} - "
f"({ttl_interval})"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants