Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
2366e85
State of feast
kevjumba Feb 25, 2022
20e82ea
Clean up changes
kevjumba Feb 26, 2022
55e7915
Fix random incorrect changes
kevjumba Feb 26, 2022
43794f7
Fix lint
kevjumba Feb 26, 2022
343ed00
Fix build errors
kevjumba Feb 26, 2022
55c458d
Fix lint
kevjumba Feb 26, 2022
b8ace43
Add spark offline store components to test against current integratio…
kevjumba Mar 1, 2022
d13119f
Fix lint
kevjumba Mar 1, 2022
4b56f55
Rename to pass checks
kevjumba Mar 1, 2022
6e278c4
Fix issues
kevjumba Mar 1, 2022
4bc67d8
Fix type checking issues
kevjumba Mar 1, 2022
c934edc
Fix lint
kevjumba Mar 1, 2022
e01d023
Clean up print statements for first review
kevjumba Mar 1, 2022
01ac14a
Fix lint
kevjumba Mar 1, 2022
26c8a01
Fix flake 8 lint tests
kevjumba Mar 1, 2022
6f8ce3c
Add warnings for alpha version release
kevjumba Mar 2, 2022
551eea1
Format
kevjumba Mar 2, 2022
1e7c2b4
Address review
kevjumba Mar 2, 2022
8e3e9a4
Address review
kevjumba Mar 2, 2022
cc1651e
Fix lint
kevjumba Mar 2, 2022
acf1c28
Add file store functionality
kevjumba Mar 2, 2022
65b113a
lint
kevjumba Mar 2, 2022
7adb8d2
Add example feature repo
kevjumba Mar 2, 2022
648f935
Update data source creator
kevjumba Mar 2, 2022
7b84ac1
Make cli work for feast init with spark
kevjumba Mar 2, 2022
b066a6f
Update the docs
kevjumba Mar 2, 2022
e0099ae
Clean up code
kevjumba Mar 2, 2022
86e74c0
Clean up more code
kevjumba Mar 2, 2022
6fe5b9e
Uncomment repo configs
kevjumba Mar 2, 2022
92c4f87
Fix setup.py
kevjumba Mar 2, 2022
18a2892
Update dependencies
kevjumba Mar 2, 2022
c644388
Fix ci dependencies
kevjumba Mar 3, 2022
9333130
Screwed up rebase
kevjumba Mar 3, 2022
6272f05
Screwed up rebase
kevjumba Mar 3, 2022
cf6bae1
Screwed up rebase
kevjumba Mar 3, 2022
0569b6d
Realign with master
kevjumba Mar 3, 2022
b02e51e
Fix accidental changes
kevjumba Mar 3, 2022
a161fad
Make type map change cleaner
kevjumba Mar 3, 2022
f7c618a
Address review comments
kevjumba Mar 3, 2022
c81fe31
Fix tests accidentally broken
kevjumba Mar 3, 2022
d790a1c
Add comments
kevjumba Mar 3, 2022
1408b8f
Reformat
kevjumba Mar 3, 2022
bf071b3
Fix logger
kevjumba Mar 3, 2022
62a92ac
Remove unused imports
kevjumba Mar 3, 2022
3ec6d22
Fix imports
kevjumba Mar 3, 2022
62ff185
Fix CI dependencies
adchia Mar 3, 2022
0dbc4e7
Prefix destinations with project name
kevjumba Mar 3, 2022
40cb4f8
Update comment
kevjumba Mar 3, 2022
4f5359a
Fix 3.8
kevjumba Mar 3, 2022
513d5bc
temporary fix
kevjumba Mar 3, 2022
8805884
rollback
kevjumba Mar 4, 2022
3ac3b71
update
kevjumba Mar 4, 2022
cfbaef5
Update ci?
kevjumba Mar 4, 2022
6abea5f
Move third party to contrib
kevjumba Mar 4, 2022
05aaeb8
Fix imports
kevjumba Mar 4, 2022
29a60d7
Remove third_party refactor
kevjumba Mar 4, 2022
b43417e
Revert ci requirements and update comment in type map
kevjumba Mar 4, 2022
1acc088
Revert 3.8-requirements
kevjumba Mar 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address review comments
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Mar 4, 2022
commit f7c618a1bddbbea3c971351800d501d34d8b156b
5 changes: 1 addition & 4 deletions docs/reference/data-sources/spark.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Snowflake
# Spark

## Description

Expand Down Expand Up @@ -43,6 +43,3 @@ my_spark_source = SparkSource(
created_timestamp_column="created",
)
```


Configuration options are available [here](https://rtd.feast.dev/en/latest/index.html#feast.data_source.SnowflakeSource).
2 changes: 1 addition & 1 deletion docs/reference/offline-stores/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ Please see [Offline Store](../../getting-started/architecture-and-components/off

{% page-ref page="redshift.md" %}

{% page-ref page="snowflake.md" %}
{% page-ref page="spark.md" %}
4 changes: 1 addition & 3 deletions docs/reference/offline-stores/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ The Spark offline store is an offline store currently in alpha development that

This Spark offline store still does not achieve full test coverage and continues to fail some integration tests when integrating with the feast universal test suite. Please do NOT assume complete stability of the API.

As of 3/1/2022, 179/194 integration tests pass.

* Spark tables and views are allowed as sources that are loaded in from some Spark store(e.g in Hive or in memory).
* Entity dataframes can be provided as a SQL query or can be provided as a Pandas dataframe. Pandas dataframes will be converted to a Spark dataframe and processed as a temporary view.
* A `SparkRetrievalJob` is returned when calling `get_historical_features()`.
* This allows you to call
* `to_df` to retrieve the pandas dataframe.
* `to_arrow` to retrieve the dataframe as a pyarrow Table.
* `to_spark-df` to retrieve the dataframe the spark.
* `to_spark_df` to retrieve the dataframe the spark.

## Example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def pull_latest_from_table_or_query(

warnings.warn(
"The spark offline store is an experimental feature in alpha development. "
"This API is unstable and it could and most probably will be changed in the future.",
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)

Expand Down Expand Up @@ -116,7 +116,7 @@ def get_historical_features(
assert isinstance(config.offline_store, SparkOfflineStoreConfig)
warnings.warn(
"The spark offline store is an experimental feature in alpha development. "
"This API is unstable and it could and most probably will be changed in the future.",
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
spark_session = get_spark_session_or_start_new_with_repoconfig(
Expand Down Expand Up @@ -223,7 +223,6 @@ def pull_all_from_table_or_query(
)


# TODO fix internal abstract methods _to_df_internal _to_arrow_internal
class SparkRetrievalJob(RetrievalJob):
def __init__(
self,
Expand All @@ -236,17 +235,9 @@ def __init__(
super().__init__()
self.spark_session = spark_session
self.query = query
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views
self._metadata = metadata

@property
def full_feature_names(self) -> bool:
return self._full_feature_names

@property
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views
self.full_feature_names = full_feature_names
self.on_demand_feature_views = on_demand_feature_views
self.metadata = metadata

def to_spark_df(self) -> pyspark.sql.DataFrame:
statements = self.query.split(
Expand All @@ -270,14 +261,6 @@ def persist(self, storage: SavedDatasetStorage):
"""
pass

@property
def metadata(self) -> Optional[RetrievalMetadata]:
"""
Return metadata information about retrieval.
Should be available even before materializing the dataset itself.
"""
return self._metadata


def get_spark_session_or_start_new_with_repoconfig(
store_config: SparkOfflineStoreConfig,
Expand All @@ -289,12 +272,12 @@ def get_spark_session_or_start_new_with_repoconfig(
if spark_conf:
spark_builder = spark_builder.config(
conf=SparkConf().setAll([(k, v) for k, v in spark_conf.items()])
) # noqa
)

spark_session = spark_builder.getOrCreate()
spark_session.conf.set(
"spark.sql.parser.quotedRegexColumnNames", "true"
) # important!
)
return spark_session


Expand All @@ -319,6 +302,7 @@ def _get_entity_df_event_timestamp_range(
# If the entity_df is a string (SQL query), determine range
# from table
df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col)
# TODO(kzhang132): need utc conversion here.
entity_df_event_timestamp_range = (
df.agg({entity_df_event_timestamp_col: "max"}).collect()[0][0],
df.agg({entity_df_event_timestamp_col: "min"}).collect()[0][0],
Expand Down Expand Up @@ -358,35 +342,6 @@ def _format_datetime(t: datetime) -> str:
return dt


def _get_feature_view_query_context(
entity_df: Union[pd.DataFrame, str],
entity_df_event_timestamp_col: str,
feature_refs: List[str],
feature_views: List[FeatureView],
spark_session: SparkSession,
table_name: str,
registry: Registry,
project: str,
) -> List[FeatureViewQueryContext]:
# interface of offline_utils.get_feature_view_query_context changed in feast==0.17
arg_spec = inspect.getfullargspec(func=offline_utils.get_feature_view_query_context)
if "entity_df_timestamp_range" in arg_spec.args:
# for feast>=0.17
entity_df_timestamp_range = _get_entity_df_event_timestamp_range(
entity_df=entity_df,
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
spark_session=spark_session,
)
query_context = offline_utils.get_feature_view_query_context(
feature_refs=feature_refs,
feature_views=feature_views,
registry=registry,
project=project,
entity_df_timestamp_range=entity_df_timestamp_range,
)
return query_context


MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """/*
Compute a deterministic hash for the `left_table_query_string` that will be used throughout
all the logic as the field to GROUP BY the data
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging
import traceback
import pickle
import warnings
from enum import Enum
Expand All @@ -16,6 +18,7 @@
from feast.type_map import spark_to_feast_value_type
from feast.value_type import ValueType

logger = logging.getLogger(__name__)

class SparkSourceFormat(Enum):
csv = "csv"
Expand Down Expand Up @@ -64,55 +67,40 @@ def __init__(
f"'file_format' should be one of {self.allowed_formats}"
)

self._spark_options = SparkOptions(
self.spark_options = SparkOptions(
table=table, query=query, path=path, file_format=file_format,
)

@property
def spark_options(self):
"""
Returns the spark options of this data source
"""
return self._spark_options

@spark_options.setter
def spark_options(self, spark_options):
"""
Sets the spark options of this data source
"""
self._spark_options = spark_options

@property
def table(self):
"""
Returns the table of this feature data source
"""
return self._spark_options.table
return self.spark_options.table

@property
def query(self):
"""
Returns the query of this feature data source
"""
return self._spark_options.query
return self.spark_options.query

@property
def path(self):
"""
Returns the path of the spark data source file.
"""
return self._spark_options.path
return self.spark_options.path

@property
def file_format(self):
"""
Returns the file format of this feature data source.
"""
return self._spark_options.file_format
return self.spark_options.file_format

@staticmethod
def from_proto(data_source: DataSourceProto) -> Any:

assert data_source.HasField("custom_options")

spark_options = SparkOptions.from_proto(data_source.custom_options)
Expand Down Expand Up @@ -166,6 +154,7 @@ def get_table_column_names_and_types(
def get_table_query_string(self) -> str:
"""Returns a string that can directly be used to reference this table in SQL"""
if self.table:
# Backticks make sure that spark sql knows this a table reference.
return f"`{self.table}`"
if self.query:
return f"({self.query})"
Expand All @@ -174,8 +163,11 @@ def get_table_query_string(self) -> str:
spark_session = SparkSession.getActiveSession()
if spark_session is None:
raise AssertionError("Could not find an active spark session.")
df = spark_session.read.format(self.file_format).load(self.path)

try:
df = spark_session.read.format(self.file_format).load(self.path)
except Exception as e:
logger.log("Spark read of file source failed.")
logger.exception(traceback.format_exc())
tmp_table_name = get_temp_entity_table_name()
df.createOrReplaceTempView(tmp_table_name)

Expand All @@ -190,66 +182,10 @@ def __init__(
path: Optional[str] = None,
file_format: Optional[str] = None,
):
self._table = table
self._query = query
self._path = path
self._file_format = file_format

@property
def table(self):
"""
Returns the table
"""
return self._table

@table.setter
def table(self, table):
"""
Sets the table
"""
self._table = table

@property
def query(self):
"""
Returns the query
"""
return self._query

@query.setter
def query(self, query):
"""
Sets the query
"""
self._query = query

@property
def path(self):
"""
Returns the path
"""
return self._path

@path.setter
def path(self, path):
"""
Sets the path
"""
self._path = path

@property
def file_format(self):
"""
Returns the file_format
"""
return self._file_format

@file_format.setter
def file_format(self, file_format):
"""
Sets the file_format
"""
self._file_format = file_format
self.table = table
self.query = query
self.path = path
self.file_format = file_format

@classmethod
def from_proto(cls, spark_options_proto: DataSourceProto.CustomSourceOptions):
Expand Down Expand Up @@ -294,10 +230,7 @@ def __init__(self, table_ref: Optional[str] = None, query: Optional[str] = None)

@staticmethod
def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage:
# options = SparkOptions.from_proto(
# storage_proto
# )
# spark_options = SparkOptions(table=options.table, query=options.query)
# TODO: implementation is not correct. Needs fix and update to protos.
return SavedDatasetSparkStorage(table_ref="", query=None)

def to_proto(self) -> SavedDatasetStorageProto:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
from tests.integration.feature_repos.universal.data_sources.snowflake import (
SnowflakeDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.spark_data_source_creator import (
SparkDataSourceCreator,
)
from tests.integration.feature_repos.universal.feature_views import (
conv_rate_plus_100_feature_view,
create_conv_rate_request_data_source,
Expand All @@ -55,7 +52,7 @@
"type": "redis",
"redis_type": "redis_cluster",
# Redis Cluster Port Forwarding is setup in "pr_integration_tests.yaml" under "Setup Redis Cluster".
"connection_string": "127.0.0.1:32001,127.0.0.1:32002,127.0.0.1:32003",
"connection_string": "127.0.0.1:6001,127.0.0.1:6002,127.0.0.1:6003",
}

# FULL_REPO_CONFIGS contains the repo configurations (e.g. provider, offline store,
Expand All @@ -77,9 +74,6 @@
IntegrationTestRepoConfig(online_store=REDIS_CONFIG),
IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG),
# GCP configurations
IntegrationTestRepoConfig(
provider="local", offline_store_creator=SparkDataSourceCreator,
),
IntegrationTestRepoConfig(
Comment thread
kevjumba marked this conversation as resolved.
Outdated
provider="gcp",
offline_store_creator=BigQueryDataSourceCreator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def create_offline_store_config(self):
self.spark_offline_store_config.spark_conf = self.spark_conf
return self.spark_offline_store_config

# abstract
def create_data_source(
self,
df: pd.DataFrame,
Expand Down