Skip to content

Commit 58d0325

Browse files
authored
fix: Spark Materialization Engine Cannot Infer Schema (#5806)
* fix: build Spark DataFrame from Arrow with schema and empty handling(#5594) Signed-off-by: Jacob Weinhold <29459386+jfw-ppi@users.noreply.github.com> * fix: build Spark DataFrame from Arrow with schema and empty handling(#5594) Signed-off-by: Jacob Weinhold <29459386+jfw-ppi@users.noreply.github.com> --------- Signed-off-by: Jacob Weinhold <29459386+jfw-ppi@users.noreply.github.com>
1 parent 103c5e9 commit 58d0325

File tree

1 file changed

+9
-1
lines changed
  • sdk/python/feast/infra/compute_engines/spark

1 file changed

+9
-1
lines changed

sdk/python/feast/infra/compute_engines/spark/nodes.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pandas as pd
55
from pyspark.sql import DataFrame, SparkSession, Window
66
from pyspark.sql import functions as F
7+
from pyspark.sql.pandas.types import from_arrow_schema
78

89
from feast import BatchFeatureView, StreamFeatureView
910
from feast.aggregation import Aggregation
@@ -80,7 +81,14 @@ def execute(self, context: ExecutionContext) -> DAGValue:
8081
if isinstance(retrieval_job, SparkRetrievalJob):
8182
spark_df = cast(SparkRetrievalJob, retrieval_job).to_spark_df()
8283
else:
83-
spark_df = self.spark_session.createDataFrame(retrieval_job.to_arrow())
84+
arrow_table = retrieval_job.to_arrow()
85+
if arrow_table.num_rows == 0:
86+
spark_schema = from_arrow_schema(arrow_table.schema)
87+
spark_df = self.spark_session.createDataFrame(
88+
self.spark_session.sparkContext.emptyRDD(), schema=spark_schema
89+
)
90+
else:
91+
spark_df = self.spark_session.createDataFrame(arrow_table.to_pandas())
8492

8593
return DAGValue(
8694
data=spark_df,

0 commit comments

Comments
 (0)