Skip to content

Commit 4bfaff4

Browse files
feat: Enable Arrow-based columnar data transfers when to pandas in sparksource retrieval job
Signed-off-by: tanlocnguyen <tanlocnguyen296@gmail.com>
1 parent e6fc736 commit 4bfaff4

File tree

1 file changed

+7
-1
lines changed
  • sdk/python/feast/infra/offline_stores/contrib/spark_offline_store

1 file changed

+7
-1
lines changed

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,11 @@ def to_spark_df(self) -> pyspark.sql.DataFrame:
338338

339339
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
340340
"""Return dataset as Pandas DataFrame synchronously"""
341+
spark_session = get_spark_session_or_start_new_with_repoconfig(
342+
self._config.offline_store
343+
)
344+
spark_session.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
345+
spark_session.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
341346
return self.to_spark_df().toPandas()
342347

343348
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
@@ -496,7 +501,8 @@ def _get_entity_df_event_timestamp_range(
496501

497502

498503
def _get_entity_schema(
499-
spark_session: SparkSession, entity_df: Union[pandas.DataFrame, str]
504+
spark_session: SparkSession,
505+
entity_df: Union[pandas.DataFrame, str],
500506
) -> Dict[str, np.dtype]:
501507
if isinstance(entity_df, pd.DataFrame):
502508
return dict(zip(entity_df.columns, entity_df.dtypes))

0 commit comments

Comments
 (0)