fix(spark): Replace mapInArrow with foreachPartition and bound write memory#6441
Conversation
…config forwarding for vector store materialization Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…MemoryError/OOMKill on large feature views Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
…leted _apply_bfv_transformations Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
| spark_session, feature_views, query_context | ||
| ) | ||
|
|
||
| spark_query_context = [ |
There was a problem hiding this comment.
🔴 Removal of _apply_bfv_transformations silently returns untransformed features via SparkOfflineStore.get_historical_features()
The _apply_bfv_transformations call was the only mechanism applying BatchFeatureView UDF transformations in the SparkOfflineStore.get_historical_features() codepath. That path is still the one used by the standard FeatureStore.get_historical_features() API (which routes through passthrough_provider.get_historical_features() at sdk/python/feast/infra/passthrough_provider.py:481 → SparkOfflineStore.get_historical_features()). The compute engine DAG path (SparkComputeEngine.get_historical_features) handles transformations via SparkTransformationNode, but nothing in FeatureStore.get_historical_features() routes through that path.
After this removal, any user with a BatchFeatureView that has a UDF transformation, using the Spark offline store, calling fs.get_historical_features() will silently receive raw untransformed feature values. The table_subquery in the query context now always points to the raw source table, and the PIT join query reads raw columns without applying the UDF.
Prompt for agents
The removal of _apply_bfv_transformations from SparkOfflineStore.get_historical_features() breaks BFV UDF transformations for the standard FeatureStore.get_historical_features() API path, which routes through passthrough_provider → SparkOfflineStore rather than the SparkComputeEngine DAG path.
Options to fix:
1. Restore the _apply_bfv_transformations call in SparkOfflineStore.get_historical_features() until the FeatureStore API is updated to route through the compute engine for historical features.
2. Update FeatureStore.get_historical_features() (or PassthroughProvider.get_historical_features()) to detect when a BatchFeatureView has a UDF and route through SparkComputeEngine.get_historical_features() instead of the offline store directly.
3. If the intent is to deprecate this path, add an explicit warning or error when a BFV with a UDF is passed through the offline store path.
Relevant files: sdk/python/feast/infra/passthrough_provider.py (get_historical_features at line 470), sdk/python/feast/feature_store.py (get_historical_features at line 1574), sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py (get_historical_features at line 160).
Was this helpful? React with 👍 or 👎 to provide feedback.
What this PR does / why we need it
Spark 3.5 inserts
WindowGroupLimitExecupstream ofMapInArrowExecwhen UDFs use Window operations, causing materialization to fail with:Fix 1: Replace
mapInArrowwithforeachPartition(pickle-based, no Arrow bridge) to eliminate the serializer mismatch.Fix 2: Re-apply
spark.sql.*/spark.hadoop.*session configs afterSparkSession.getOrCreate()— these are silently dropped when reusing a warm session in K8s-mode, causing S3 access failures.Fix 3: Chunk
foreachPartitionwrites (chunk_size=1000) to preventMemoryError/ OOMKill on large feature views (10M+ rows).Which issue(s) this PR fixes
Fixes
BatchFeatureViewmaterialization with Spark 3.5+ and vector stores (Milvus, Redis).Checks
git commit -s)Testing Strategy