Skip to content

fix(spark): Replace mapInArrow with foreachPartition and bound write memory#6441

Open
abhijeet-dhumal wants to merge 6 commits into
feast-dev:masterfrom
abhijeet-dhumal:fix/spark-foreachpartition-materialization
Open

fix(spark): Replace mapInArrow with foreachPartition and bound write memory#6441
abhijeet-dhumal wants to merge 6 commits into
feast-dev:masterfrom
abhijeet-dhumal:fix/spark-foreachpartition-materialization

Conversation

@abhijeet-dhumal
Copy link
Copy Markdown
Contributor

@abhijeet-dhumal abhijeet-dhumal commented May 27, 2026

What this PR does / why we need it

Spark 3.5 inserts WindowGroupLimitExec upstream of MapInArrowExec when UDFs use Window operations, causing materialization to fail with:

AttributeError: 'list' object has no attribute 'dtype'

Fix 1: Replace mapInArrow with foreachPartition (pickle-based, no Arrow bridge) to eliminate the serializer mismatch.

Fix 2: Re-apply spark.sql.* / spark.hadoop.* session configs after SparkSession.getOrCreate() — these are silently dropped when reusing a warm session in K8s-mode, causing S3 access failures.

Fix 3: Chunk foreachPartition writes (chunk_size=1000) to prevent MemoryError / OOMKill on large feature views (10M+ rows).

Which issue(s) this PR fixes

Fixes BatchFeatureView materialization with Spark 3.5+ and vector stores (Milvus, Redis).

Checks

  • I've made sure the tests are passing.
  • My commits are signed off (git commit -s)
  • My PR title follows conventional commits format

Testing Strategy

  • Unit tests — session config forwarding, chunked write, empty partition handling
  • Manual tests — 26M+ keys materialized to Redis across 4 K8s GPU executors

…config forwarding for vector store materialization

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…MemoryError/OOMKill on large feature views

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
@abhijeet-dhumal abhijeet-dhumal changed the title Fix/spark foreachpartition materialization fix(spark): Replace mapInArrow with foreachPartition and bound write memory May 27, 2026
…leted _apply_bfv_transformations

Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
@abhijeet-dhumal abhijeet-dhumal marked this pull request as ready for review May 28, 2026 06:23
@abhijeet-dhumal abhijeet-dhumal requested a review from a team as a code owner May 28, 2026 06:23
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 6 additional findings in Devin Review.

Open in Devin Review

spark_session, feature_views, query_context
)

spark_query_context = [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Removal of _apply_bfv_transformations silently returns untransformed features via SparkOfflineStore.get_historical_features()

The _apply_bfv_transformations call was the only mechanism applying BatchFeatureView UDF transformations in the SparkOfflineStore.get_historical_features() codepath. That path is still the one used by the standard FeatureStore.get_historical_features() API (which routes through passthrough_provider.get_historical_features() at sdk/python/feast/infra/passthrough_provider.py:481SparkOfflineStore.get_historical_features()). The compute engine DAG path (SparkComputeEngine.get_historical_features) handles transformations via SparkTransformationNode, but nothing in FeatureStore.get_historical_features() routes through that path.

After this removal, any user with a BatchFeatureView that has a UDF transformation, using the Spark offline store, calling fs.get_historical_features() will silently receive raw untransformed feature values. The table_subquery in the query context now always points to the raw source table, and the PIT join query reads raw columns without applying the UDF.

Prompt for agents
The removal of _apply_bfv_transformations from SparkOfflineStore.get_historical_features() breaks BFV UDF transformations for the standard FeatureStore.get_historical_features() API path, which routes through passthrough_provider → SparkOfflineStore rather than the SparkComputeEngine DAG path.

Options to fix:
1. Restore the _apply_bfv_transformations call in SparkOfflineStore.get_historical_features() until the FeatureStore API is updated to route through the compute engine for historical features.
2. Update FeatureStore.get_historical_features() (or PassthroughProvider.get_historical_features()) to detect when a BatchFeatureView has a UDF and route through SparkComputeEngine.get_historical_features() instead of the offline store directly.
3. If the intent is to deprecate this path, add an explicit warning or error when a BFV with a UDF is passed through the offline store path.

Relevant files: sdk/python/feast/infra/passthrough_provider.py (get_historical_features at line 470), sdk/python/feast/feature_store.py (get_historical_features at line 1574), sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py (get_historical_features at line 160).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhijeet-dhumal why this removal ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants