Skip to content

Commit a802cfe

Browse files
committed
Fix materialization when running on Spark cluster.
When running materialization and have Spark offline store configured to use cluster (`spark.master` pointing to actual Spark master node) `self.to_spark_df().write.parquet(temp_dir, mode="overwrite")` will create parquet file in worker node but `return pq.read_table(temp_dir)` is executed on driver node and it can't read from worker. Proposed fix makes materialization work when run on Spark cluster. Signed-off-by: ckarwicki <104110169+ckarwicki-deloitte@users.noreply.github.com> Signed-off-by: ckarwicki <71740096+ckarwicki@users.noreply.github.com>
1 parent d7b0c52 commit a802cfe

File tree

1 file changed

+2
-5
lines changed
  • sdk/python/feast/infra/offline_stores/contrib/spark_offline_store

1 file changed

+2
-5
lines changed

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,11 +325,8 @@ def _to_df_internal(self) -> pd.DataFrame:
325325

326326
def _to_arrow_internal(self) -> pyarrow.Table:
327327
"""Return dataset as pyarrow Table synchronously"""
328-
329-
# write to temp parquet and then load it as pyarrow table from disk
330-
with tempfile.TemporaryDirectory() as temp_dir:
331-
self.to_spark_df().write.parquet(temp_dir, mode="overwrite")
332-
return pq.read_table(temp_dir)
328+
329+
return pyarrow.Table.from_pandas(self._to_df_internal())
333330

334331
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
335332
"""

0 commit comments

Comments
 (0)