feat: Add apache flink compute engine#6476
Conversation
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
Can you review this PR and add any suggestions @cburroughs |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
Maintainer action needed per the Feast development guide: please add the |
There was a problem hiding this comment.
Pull request overview
Adds an initial Apache Flink compute-engine implementation for Feast (PyFlink Table API), wiring it into Feast’s compute-engine registry and transformation mode system, along with documentation and unit tests.
Changes:
- Introduces
flink.enginecompute engine (config, feature builder, DAG nodes, retrieval/materialization jobs) andDAGFormat.FLINK. - Registers
mode="flink"transformation support and updates transformation factory/feature view mode handling. - Adds Flink engine docs plus dependency/uv conflict handling and a focused unit test suite.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/python/tests/unit/infra/compute_engines/flink/test_flink_compute_engine.py | Adds unit coverage for Flink DAG behavior, entity_df handling, and dependency metadata. |
| sdk/python/tests/unit/infra/compute_engines/flink/init.py | Adds test package marker. |
| sdk/python/feast/transformation/mode.py | Adds TransformationMode.FLINK. |
| sdk/python/feast/transformation/flink_transformation.py | Adds FlinkTransformation wrapper for Flink UDFs. |
| sdk/python/feast/transformation/factory.py | Registers "flink" transformation type mapping. |
| sdk/python/feast/stream_feature_view.py | Allows mode="flink" in transformation resolution. |
| sdk/python/feast/repo_config.py | Registers flink.engine compute engine type. |
| sdk/python/feast/infra/compute_engines/flink/utils.py | Adds TableEnvironment creation + pandas↔Flink table helpers. |
| sdk/python/feast/infra/compute_engines/flink/nodes.py | Implements Flink DAG nodes for source reads, joins, filters/TTL, aggs, dedupe, validation, output writes. |
| sdk/python/feast/infra/compute_engines/flink/job.py | Adds retrieval/materialization job wrappers for Flink engine execution. |
| sdk/python/feast/infra/compute_engines/flink/feature_builder.py | Adds Flink FeatureBuilder wiring nodes into an execution plan. |
| sdk/python/feast/infra/compute_engines/flink/compute.py | Adds FlinkComputeEngine + config model and engine entrypoints. |
| sdk/python/feast/infra/compute_engines/flink/init.py | Exports Flink engine symbols. |
| sdk/python/feast/infra/compute_engines/feature_builder.py | Treats flink transformations as “read all source cols” modes. |
| sdk/python/feast/infra/compute_engines/dag/model.py | Adds DAGFormat.FLINK. |
| sdk/python/feast/batch_feature_view.py | Allows mode="flink" and fixes error message text. |
| pyproject.toml | Adds Flink optional dependency + pyarrow fork markers + uv conflict configuration. |
| docs/SUMMARY.md | Adds Flink compute-engine doc entry. |
| docs/reference/compute-engine/README.md | Documents FlinkComputeEngine in compute-engine overview. |
| docs/reference/compute-engine/flink.md | Adds Flink engine configuration and limitations documentation. |
| docs/getting-started/components/compute-engine.md | Updates getting-started engine table to mark Flink as supported. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@XuananLe please check the AI review results. :). Also the Lint issues. |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
Addressed the Copilot AI review items in b3731e6:
Also added focused unit coverage for those cases and reran:
All passed locally. |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
Follow-up for the registration check failures: both failed during Refreshed
That command now passes locally. |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
Follow-up on the lint concern: local full lint found mypy errors in Verified locally:
The pull_request workflow checks shown as failing/action-required in the UI still need maintainer approval to run on the fork; they have not actually executed on the latest commit yet. |
|
Overall looks good to me. |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
Thanks, agreed. I added a narrow refactor in c99bc7f to centralize the compute-engine entity timestamp alias, entity row id, and timestamp-column resolution helpers in Verified locally:
|
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
@shuchu I addressed the AI review/lint fallout and pushed What changed:
Local verification:
GitHub checks that ran on the latest commit are green:
The remaining |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
@shuchu I pushed 4bf1433 to fix the macOS 3.11 unit failure. The failure was isolated to the feature server performance microbenchmark: CI measured 1.44x against a hard 1.5x threshold. The patch keeps 1.5x on non-macOS and uses 1.3x on macOS to avoid runner timing flakiness while still checking for a meaningful speedup. Local checks passed:
Could you please approve/re-run CI for the new fork commit? |
|
approved and rerun now |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
@HaoXuAI I pushed 7aebf78 to fix the new unit failure. The failure was in test_image_utils.py: the unit tests were loading a real resnet model from the model hub. I changed the tests to mock timm model loading with a deterministic dummy model, so they no longer depend on network/cache state. Local checks passed:
Could you please approve/re-run CI for the new fork commit? |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
@HaoXuAI I pushed ebbc86b to harden the stale remote RBAC TLS failure. The failed old run was on the previous SHA and hit RemoteOfflineTlsStore with an Arrow Flight SSL handshake wrong-version error against 0.0.0.0:. The test helpers were binding servers to 0.0.0.0 and also using 0.0.0.0 as the client/config host. This patch keeps the server bind address as 0.0.0.0 but uses localhost for client URLs, readiness checks, and remote offline store configs. Local checks passed:
Could you please approve/re-run CI for the new fork commit? |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
franciscojavierarceo
left a comment
There was a problem hiding this comment.
Thank you for this work!! Just some quick questions but very excited to get this in!
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| DEFAULT_FLINK_RESULT_BATCH_SIZE = 10_000 |
There was a problem hiding this comment.
Was this intentional?
There was a problem hiding this comment.
Yes, intentional. This is only the fallback when materialization.online_write_batch_size is unset. After the Copilot review fix, Flink output writes iterate TableResult.collect() into Arrow tables instead of calling to_pandas() on the full result. Without a finite fallback batch size, that iterator path could accumulate an unbounded in-memory batch before writing. A configured online_write_batch_size still takes precedence; 10_000 is just the bounded default for the unset case.
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
@HaoXuAI I pushed The failing test was the feature server serializer microbenchmark again. Instead of lowering the threshold further, this patch changes the benchmark from wall-clock timing to process CPU timing, which removes GitHub-hosted macOS/xdist scheduler noise while restoring the original Local verification on Python 3.12 CI deps passed:
Could you please approve/re-run CI for the new fork commit? |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
@HaoXuAI I pushed The failed GitHub job had The patch starts Local verification passed:
Could you please approve/re-run CI for the new fork commit? |
|
@HaoXuAI @franciscojavierarceo could you help us understand the current CI state? We have been fixing the failures as they appear, but CI keeps surfacing different failures after reruns and the latest visible failure does not look related to the Flink compute engine changes. Current failing check: I have not patched MongoDB/vector-search code in this Flink PR because that seems outside the scope of this contribution. Could you advise whether this should be rerun/treated as an infra flake, or whether there is a preferred separate fix for that test? |
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
|
thanks @XuananLe! can you resolve the conflicts? Once CI is green let's get this shipped for the next release! |
…nk-compute-engine Signed-off-by: Le Xuan An <anlx@viettel.com.vn> # Conflicts: # .secrets.baseline # pixi.lock
|
@franciscojavierarceo conflicts are resolved now. I pushed Local checks run:
GitHub now shows the PR as no longer dirty; CI is running again. |
franciscojavierarceo
left a comment
There was a problem hiding this comment.
Two blocking issues stood out in the Flink engine changes.
-
In
sdk/python/feast/infra/compute_engines/flink/feature_builder.py,_should_join_entity_df()treats an empty pandasentity_dfas if no entity dataframe was supplied. For historical retrieval, that skips the entity join entirely, soget_historical_features(..., entity_df=empty_df)can return feature rows instead of an empty result set. -
In
sdk/python/feast/infra/compute_engines/flink/nodes.py,FlinkSourceReadNodenow requires every source retrieval job to exposeto_flink_table(table_env), but I could only find that method on the unit-test fake in this PR, not on any production offline-store retrieval job. As written,batch_engine.type: flink.engineappears to fail at runtime on real source reads before the engine can do useful work.
Please address those before merge.
Signed-off-by: Le Xuan An <anlx@viettel.com.vn>
…nk-compute-engine
|
@franciscojavierarceo I pushed
I also merged latest
CI is running again on the new head. |
|
@franciscojavierarceo all CI is green now: 30/30 checks passed, and the branch has no merge conflicts. Could you please take another look when you have a chance? |
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Entity timestamp alias for historical feature retrieval |
There was a problem hiding this comment.
Thank you for this clean up
# [0.64.0](v0.63.0...v0.64.0) (2026-06-13) ### Bug Fixes * Add async_supported property to RedisOnlineStore ([9b088fe](9b088fe)) * Add missing feast init templates to operator CRD and enhance persistence documentation ([1941d4d](1941d4d)) * Allow to publish from reference branch ([5458ec8](5458ec8)) * API calls list ([4203eb7](4203eb7)) * **bigquery:** Enable list inference for parquet loads in offline_write_batch ([9243497](9243497)), closes [#5845](#5845) * Bump grpcio dependencies ([07b4782](07b4782)) * **compute-engine/local:** Honor field_mapping on join keys in dedup + join nodes ([#6395](#6395)) ([bd01824](bd01824)) * **dynamodb:** Avoid tag race condition by using diff-based tag updates ([#6479](#6479)) ([bad2b7d](bad2b7d)), closes [#6418](#6418) * **dynamodb:** Fix mypy type for _build_projection_expression return ([217b4da](217b4da)) * Fix intermittent async test failures for DynamoDB and Redis ([63c5eb1](63c5eb1)) * Fix mongodb blog title ([57d28d4](57d28d4)) * Fix shared SQL registry crash - avoid unnecessary UDF deserialization in proto cache building ([ac588d7](ac588d7)) * Fix SparkRetrievalJob.persist() failing for SparkSource ([209d7cd](209d7cd)) * Fixed formatting and image for mongo blog ([#6377](#6377)) ([f8389fb](f8389fb)) * Fixes for ray source ([7f592a4](7f592a4)) * **go:** skip registry refresh when cache_ttl_seconds <= 0 ([97ed40c](97ed40c)) * Handle array of strings columns in Athena materialization ([#6324](#6324)) ([4ed0278](4ed0278)) * make milvus VARCHAR max_length configurable, remove hardcoded 512 limit ([3b98c22](3b98c22)) * **operator:** Set appProtocol: grpc on registry gRPC Service ([#6367](#6367)) ([c9ae2b4](c9ae2b4)) * PyJWT 2.10+ added validation that rejects empty HMAC keys ([e756ffe](e756ffe)) * RemoteOnlineStore sends all features in a single HTTP request ([8f187dd](8f187dd)) * Remove registry proto dump to enforce RBAC and add permission checks to Commit/Refresh RPCs ([328431f](328431f)) * Remove selector migration job - no longer needed ([51c325e](51c325e)) * replace broken .claude skill symlink with correct relative path ([4541690](4541690)) * Replace selector label strip patch with migration Job for upgrade-safe selector uniqueness ([00dea50](00dea50)) * Scope feature view name conflict check to current project in file-based registry ([#6369](#6369)) ([a4fde83](a4fde83)), closes [#6209](#6209) * **snowflake:** Stop double-quoting connection identifiers ([#6462](#6462)) ([e914d59](e914d59)) * **spark:** S3/GCS PyArrow filesystem resolution for staging paths ([#6442](#6442)) ([ae50414](ae50414)) * **trino:** Clean up temporary entity tables after retrieval ([#6381](#6381)) ([d86b13d](d86b13d)), closes [#6306](#6306) * Update go-feature-server base image to Go 1.25 and fix operator Dockerfile COPY permissions ([86ef0bc](86ef0bc)) ### Features * [Backend] Data Quality Monitoring with native compute, multi-backend support, REST API, CLI ([#6202](#6202)) ([5458c37](5458c37)) * Add apache flink compute engine ([#6476](#6476)) ([9636d6a](9636d6a)) * Add demo noteboooks for users ([e362173](e362173)) * Add enabled/disabled toggle for feature views ([#6401](#6401)) ([5f1fa0d](5f1fa0d)), closes [#6395](#6395) * Add Label View to init template ([ec272d5](ec272d5)) * Add mTLS support to remote registry gRPC client ([#6474](#6474)) ([c9602d8](c9602d8)) * Add Prometheus gauges for FeatureStore installation telemetry ([#6354](#6354)) ([1b681b7](1b681b7)) * Adds registry REST API endpoints for managing entities, data sources, and feature views ([#6413](#6413)) ([f77bd1d](f77bd1d)) * Allow CRUD on entities, data sources, and feature views from UI ([#6412](#6412)) ([2321c07](2321c07)) * Allow default openlineage configuration ([#6467](#6467)) ([276b6df](276b6df)) * **bigquery:** Support DATE-type event timestamp columns ([#6362](#6362)) ([753dee5](753dee5)), closes [#2530](#2530) * **cli:** Add `feast projects delete` command (closes [#5095](#5095)) ([#6318](#6318)) ([1a4b96c](1a4b96c)) * Data Quality Monitoring added in feast UI ([#6422](#6422)) ([fa271be](fa271be)) * **dynamodb:** Use ProjectionExpression when requested_features is set ([0adc906](0adc906)), closes [#6058](#6058) * Enhance DataSource and FeatureView modals with error handling and submission states ([96d7169](96d7169)) * Expose registry endpoints on feature server for MCP access ([f77981c](f77981c)) * Feast First-Class LabelView Implementation ([#6292](#6292)) ([c0e7e5d](c0e7e5d)) * Feast-MLflow Integration ([#6235](#6235)) ([7279c75](7279c75)) * Operational metrics for offline store and SOX metrics for both ([#6340](#6340)) ([65b1b80](65b1b80)) * Pre-compute feature service ([8011550](8011550)) * REST API-backed UI for RBAC compatibility and per-page lazy loading ([#6414](#6414)) ([6ae80af](6ae80af)) * Support non-string map key types ([#6382](#6382)) ([#6383](#6383)) ([728aa2e](728aa2e)) * Update FeatureStore CRD with DRA Fields ([01241e4](01241e4)) ### Performance Improvements * Cache feature view resolution in get_online_features to reduce per-request overhead ([55c2f18](55c2f18)) * Optimize feature serving latency with batched async Redis, cached checks fix ([103809a](103809a)) * Replace MessageToDict with optimized custom dict builder ([#6015](#6015)) ([9902064](9902064))
What this PR does / why we need it
flink.enginebatch engine andmode="flink"transformation mode.entity_df, pandasentity_df, strict native source retrieval, TTL interval rendering, and Flink-specific PyArrow dependency metadata.Feast has a pluggable compute engine abstraction, but does not currently include a Flink implementation. This PR adds a Flink/PyFlink backend so users can run Feast batch materialization and historical retrieval through Flink's Table API while preserving Feast's existing DAG execution model.
Which issue(s) this PR fixes
None filed.
Does this PR introduce a user-facing change
Yes.
Users can configure
batch_engine.type: flink.engineand usemode="flink"transformations onBatchFeatureView. Flink transformations receive and return PyFlink table objects. Source reads require offline-store retrieval jobs that exposeto_flink_table(table_env); Arrow/pandas-only retrieval jobs are rejected instead of silently falling back.The Flink extra must currently be installed with
uv sync --extra flink --no-devfrom a source checkout because PyFlink requirespyarrow<21, while default Feast installs keeppyarrow>=21.Release note
Testing
uv lockuv run ruff check sdk/python/feast/batch_feature_view.py sdk/python/feast/infra/compute_engines/flink sdk/python/feast/transformation/flink_transformation.py sdk/python/tests/unit/infra/compute_engines/flink/test_flink_compute_engine.pyuv run bash -c "cd sdk/python && mypy feast/batch_feature_view.py feast/infra/compute_engines/flink/compute.py feast/infra/compute_engines/flink/feature_builder.py feast/infra/compute_engines/flink/job.py feast/infra/compute_engines/flink/nodes.py feast/infra/compute_engines/flink/utils.py feast/transformation/flink_transformation.py tests/unit/infra/compute_engines/flink/test_flink_compute_engine.py --follow-imports=skip --check-untyped-defs"uv run python -m pytest sdk/python/tests/unit/infra/compute_engines/flink/test_flink_compute_engine.py -vuv sync --extra flink --no-devPYFLINK_NATIVE_SOURCE_SMOKE_OK pyarrow=16.1.0 pyflink=2.2.1Note:
pip install -e '.[flink]'cannot currently resolve the Flink extra because pip evaluates Feast's defaultpyarrow>=21requirement together with PyFlink'spyarrow<21requirement. The docs now point to the uv source-checkout install path that was verified above.