From 611ead0e6f5d525f58a89fb5ab9c370c462ed8f8 Mon Sep 17 00:00:00 2001 From: Jacob Weinhold <29459386+jfw-ppi@users.noreply.github.com> Date: Thu, 1 Jan 2026 23:07:54 +0100 Subject: [PATCH 1/2] fix: build Spark DataFrame from Arrow with schema and empty handling(#5594) Signed-off-by: Jacob Weinhold <29459386+jfw-ppi@users.noreply.github.com> --- .../feast/infra/compute_engines/spark/nodes.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/spark/nodes.py b/sdk/python/feast/infra/compute_engines/spark/nodes.py index fa5a7bd6208..57f4c439845 100644 --- a/sdk/python/feast/infra/compute_engines/spark/nodes.py +++ b/sdk/python/feast/infra/compute_engines/spark/nodes.py @@ -4,6 +4,7 @@ import pandas as pd from pyspark.sql import DataFrame, SparkSession, Window from pyspark.sql import functions as F +from pyspark.sql.pandas.types import from_arrow_schema from feast import BatchFeatureView, StreamFeatureView from feast.aggregation import Aggregation @@ -80,7 +81,15 @@ def execute(self, context: ExecutionContext) -> DAGValue: if isinstance(retrieval_job, SparkRetrievalJob): spark_df = cast(SparkRetrievalJob, retrieval_job).to_spark_df() else: - spark_df = self.spark_session.createDataFrame(retrieval_job.to_arrow()) + arrow_table = retrieval_job.to_arrow() + if arrow_table.num_rows == 0: + spark_schema = from_arrow_schema(arrow_table.schema) + spark_df = self.spark_session.createDataFrame( + self.spark_session.sparkContext.emptyRDD(), schema=spark_schema + ) + else: + spark_df = self.spark_session.createDataFrame(arrow_table.to_pandas()) + return DAGValue( data=spark_df, @@ -94,7 +103,6 @@ def execute(self, context: ExecutionContext) -> DAGValue: }, ) - class SparkAggregationNode(DAGNode): def __init__( self, From 237a65dbc88ccef498bbf042def9343586560571 Mon Sep 17 00:00:00 2001 From: Jacob Weinhold <29459386+jfw-ppi@users.noreply.github.com> Date: Thu, 1 Jan 2026 23:17:43 +0100 Subject: [PATCH 2/2] fix: build Spark DataFrame from Arrow with schema and empty handling(#5594) Signed-off-by: Jacob Weinhold <29459386+jfw-ppi@users.noreply.github.com> --- sdk/python/feast/infra/compute_engines/spark/nodes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/compute_engines/spark/nodes.py b/sdk/python/feast/infra/compute_engines/spark/nodes.py index 57f4c439845..124ce65ff90 100644 --- a/sdk/python/feast/infra/compute_engines/spark/nodes.py +++ b/sdk/python/feast/infra/compute_engines/spark/nodes.py @@ -90,7 +90,6 @@ def execute(self, context: ExecutionContext) -> DAGValue: else: spark_df = self.spark_session.createDataFrame(arrow_table.to_pandas()) - return DAGValue( data=spark_df, format=DAGFormat.SPARK, @@ -103,6 +102,7 @@ def execute(self, context: ExecutionContext) -> DAGValue: }, ) + class SparkAggregationNode(DAGNode): def __init__( self,