|
28 | 28 |
|
29 | 29 | from feast import OnDemandFeatureView |
30 | 30 | from feast.data_source import DataSource |
31 | | -from feast.errors import ( |
32 | | - EntitySQLEmptyResults, |
33 | | - InvalidEntityType, |
34 | | - InvalidSparkSessionException, |
35 | | -) |
| 31 | +from feast.errors import EntitySQLEmptyResults, InvalidEntityType |
36 | 32 | from feast.feature_logging import LoggingConfig, LoggingSource |
37 | 33 | from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView |
38 | 34 | from feast.infra.offline_stores import offline_utils |
@@ -528,28 +524,22 @@ def to_spark_df(self, spark_session: "SparkSession") -> "DataFrame": |
528 | 524 | """ |
529 | 525 |
|
530 | 526 | try: |
531 | | - from pyspark.sql import DataFrame, SparkSession |
| 527 | + from pyspark.sql import DataFrame |
532 | 528 | except ImportError as e: |
533 | 529 | from feast.errors import FeastExtrasDependencyImportError |
534 | 530 |
|
535 | 531 | raise FeastExtrasDependencyImportError("spark", str(e)) |
536 | 532 |
|
537 | | - if isinstance(spark_session, SparkSession): |
538 | | - arrow_batches = self.to_arrow_batches() |
| 533 | + spark_session.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") |
539 | 534 |
|
540 | | - if arrow_batches: |
541 | | - spark_df = reduce( |
542 | | - DataFrame.unionAll, |
543 | | - [ |
544 | | - spark_session.createDataFrame(batch.to_pandas()) |
545 | | - for batch in arrow_batches |
546 | | - ], |
547 | | - ) |
548 | | - return spark_df |
549 | | - else: |
550 | | - raise EntitySQLEmptyResults(self.to_sql()) |
551 | | - else: |
552 | | - raise InvalidSparkSessionException(spark_session) |
| 535 | + # This can be improved by parallelizing the read of chunks |
| 536 | + pandas_batches = self.to_pandas_batches() |
| 537 | + |
| 538 | + spark_df = reduce( |
| 539 | + DataFrame.unionAll, |
| 540 | + [spark_session.createDataFrame(batch) for batch in pandas_batches], |
| 541 | + ) |
| 542 | + return spark_df |
553 | 543 |
|
554 | 544 | def persist( |
555 | 545 | self, |
|
0 commit comments