Skip to content

Commit 2f18957

Browse files
feat: Get Snowflake Query Output As Pyspark Dataframe (#2504) (#3358)
1. Added feature to offline_store-> snowflake.py to return results of snowflake query as pyspark data frame.This helps spark-based users to distribute data, which often doesn't fit in driver nodes through pandas output. 2. Also added relevant error class, to notify user on missing spark session , particular to this usecase. Signed-off-by: amithadiraju1694 <amith.adiraju@gmail.com> Signed-off-by: amithadiraju1694 <amith.adiraju@gmail.com>
1 parent 4d6932c commit 2f18957

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

sdk/python/feast/errors.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ def __init__(self, name, project=None):
5656
super().__init__(f"Feature view {name} does not exist")
5757

5858

59+
class InvalidSparkSessionException(Exception):
60+
def __init__(self, spark_arg):
61+
super().__init__(
62+
f" Need Spark Session to convert results to spark data frame\
63+
recieved {type(spark_arg)} instead. "
64+
)
65+
66+
5967
class OnDemandFeatureViewNotFoundException(FeastObjectNotFoundException):
6068
def __init__(self, name, project=None):
6169
if project:

sdk/python/feast/infra/offline_stores/snowflake.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import contextlib
22
import os
33
import uuid
4+
import warnings
45
from datetime import datetime
6+
from functools import reduce
57
from pathlib import Path
68
from typing import (
79
Any,
@@ -21,11 +23,16 @@
2123
import pyarrow
2224
from pydantic import Field, StrictStr
2325
from pydantic.typing import Literal
26+
from pyspark.sql import DataFrame, SparkSession
2427
from pytz import utc
2528

2629
from feast import OnDemandFeatureView
2730
from feast.data_source import DataSource
28-
from feast.errors import EntitySQLEmptyResults, InvalidEntityType
31+
from feast.errors import (
32+
EntitySQLEmptyResults,
33+
InvalidEntityType,
34+
InvalidSparkSessionException,
35+
)
2936
from feast.feature_logging import LoggingConfig, LoggingSource
3037
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView
3138
from feast.infra.offline_stores import offline_utils
@@ -57,6 +64,8 @@
5764

5865
raise FeastExtrasDependencyImportError("snowflake", str(e))
5966

67+
warnings.filterwarnings("ignore", category=DeprecationWarning)
68+
6069

6170
class SnowflakeOfflineStoreConfig(FeastConfigBaseModel):
6271
"""Offline store config for Snowflake"""
@@ -447,6 +456,41 @@ def to_sql(self) -> str:
447456
with self._query_generator() as query:
448457
return query
449458

459+
def to_spark_df(self, spark_session: SparkSession) -> DataFrame:
460+
"""
461+
Method to convert snowflake query results to pyspark data frame.
462+
463+
Args:
464+
spark_session: spark Session variable of current environment.
465+
466+
Returns:
467+
spark_df: A pyspark dataframe.
468+
"""
469+
470+
if isinstance(spark_session, SparkSession):
471+
with self._query_generator() as query:
472+
473+
arrow_batches = execute_snowflake_statement(
474+
self.snowflake_conn, query
475+
).fetch_arrow_batches()
476+
477+
if arrow_batches:
478+
spark_df = reduce(
479+
DataFrame.unionAll,
480+
[
481+
spark_session.createDataFrame(batch.to_pandas())
482+
for batch in arrow_batches
483+
],
484+
)
485+
486+
return spark_df
487+
488+
else:
489+
raise EntitySQLEmptyResults(query)
490+
491+
else:
492+
raise InvalidSparkSessionException(spark_session)
493+
450494
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
451495
assert isinstance(storage, SavedDatasetSnowflakeStorage)
452496
self.to_snowflake(table_name=storage.snowflake_options.table)

0 commit comments

Comments
 (0)