Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
2366e85
State of feast
kevjumba Feb 25, 2022
20e82ea
Clean up changes
kevjumba Feb 26, 2022
55e7915
Fix random incorrect changes
kevjumba Feb 26, 2022
43794f7
Fix lint
kevjumba Feb 26, 2022
343ed00
Fix build errors
kevjumba Feb 26, 2022
55c458d
Fix lint
kevjumba Feb 26, 2022
b8ace43
Add spark offline store components to test against current integratio…
kevjumba Mar 1, 2022
d13119f
Fix lint
kevjumba Mar 1, 2022
4b56f55
Rename to pass checks
kevjumba Mar 1, 2022
6e278c4
Fix issues
kevjumba Mar 1, 2022
4bc67d8
Fix type checking issues
kevjumba Mar 1, 2022
c934edc
Fix lint
kevjumba Mar 1, 2022
e01d023
Clean up print statements for first review
kevjumba Mar 1, 2022
01ac14a
Fix lint
kevjumba Mar 1, 2022
26c8a01
Fix flake 8 lint tests
kevjumba Mar 1, 2022
6f8ce3c
Add warnings for alpha version release
kevjumba Mar 2, 2022
551eea1
Format
kevjumba Mar 2, 2022
1e7c2b4
Address review
kevjumba Mar 2, 2022
8e3e9a4
Address review
kevjumba Mar 2, 2022
cc1651e
Fix lint
kevjumba Mar 2, 2022
acf1c28
Add file store functionality
kevjumba Mar 2, 2022
65b113a
lint
kevjumba Mar 2, 2022
7adb8d2
Add example feature repo
kevjumba Mar 2, 2022
648f935
Update data source creator
kevjumba Mar 2, 2022
7b84ac1
Make cli work for feast init with spark
kevjumba Mar 2, 2022
b066a6f
Update the docs
kevjumba Mar 2, 2022
e0099ae
Clean up code
kevjumba Mar 2, 2022
86e74c0
Clean up more code
kevjumba Mar 2, 2022
6fe5b9e
Uncomment repo configs
kevjumba Mar 2, 2022
92c4f87
Fix setup.py
kevjumba Mar 2, 2022
18a2892
Update dependencies
kevjumba Mar 2, 2022
c644388
Fix ci dependencies
kevjumba Mar 3, 2022
9333130
Screwed up rebase
kevjumba Mar 3, 2022
6272f05
Screwed up rebase
kevjumba Mar 3, 2022
cf6bae1
Screwed up rebase
kevjumba Mar 3, 2022
0569b6d
Realign with master
kevjumba Mar 3, 2022
b02e51e
Fix accidental changes
kevjumba Mar 3, 2022
a161fad
Make type map change cleaner
kevjumba Mar 3, 2022
f7c618a
Address review comments
kevjumba Mar 3, 2022
c81fe31
Fix tests accidentally broken
kevjumba Mar 3, 2022
d790a1c
Add comments
kevjumba Mar 3, 2022
1408b8f
Reformat
kevjumba Mar 3, 2022
bf071b3
Fix logger
kevjumba Mar 3, 2022
62a92ac
Remove unused imports
kevjumba Mar 3, 2022
3ec6d22
Fix imports
kevjumba Mar 3, 2022
62ff185
Fix CI dependencies
adchia Mar 3, 2022
0dbc4e7
Prefix destinations with project name
kevjumba Mar 3, 2022
40cb4f8
Update comment
kevjumba Mar 3, 2022
4f5359a
Fix 3.8
kevjumba Mar 3, 2022
513d5bc
temporary fix
kevjumba Mar 3, 2022
8805884
rollback
kevjumba Mar 4, 2022
3ac3b71
update
kevjumba Mar 4, 2022
cfbaef5
Update ci?
kevjumba Mar 4, 2022
6abea5f
Move third party to contrib
kevjumba Mar 4, 2022
05aaeb8
Fix imports
kevjumba Mar 4, 2022
29a60d7
Remove third_party refactor
kevjumba Mar 4, 2022
b43417e
Revert ci requirements and update comment in type map
kevjumba Mar 4, 2022
1acc088
Revert 3.8-requirements
kevjumba Mar 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address review
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Mar 4, 2022
commit 1e7c2b441d381180503a6bfb5274a31a539cc3a6
1 change: 1 addition & 0 deletions sdk/python/feast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.redshift_source import RedshiftSource
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
from feast.infra.offline_stores.third_party.spark_source import SparkSource

from .data_source import KafkaSource, KinesisSource, SourceType
from .entity import Entity
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ def get_historical_features(
all_request_feature_views,
all_on_demand_feature_views,
) = self._get_feature_views_to_use(features)

# TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider.
# This is a weird interface quirk - we should revisit the `get_historical_features` to
# pass in the on demand feature views as well.
Expand All @@ -767,7 +768,6 @@ def get_historical_features(
all_request_feature_views,
all_on_demand_feature_views,
)

feature_views = list(view for view, _ in fvs)
on_demand_feature_views = list(view for view, _ in odfvs)
request_feature_views = list(view for view, _ in request_fvs)
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
FileSource,
RedshiftSource,
SnowflakeSource,
SparkSource,
)
from feast.data_source import DataSource
from feast.errors import RegistryInferenceFailure
Expand Down Expand Up @@ -84,7 +85,7 @@ def update_data_sources_with_inferred_event_timestamp_col(
):
# prepare right match pattern for data source
ts_column_type_regex_pattern = ""
if isinstance(data_source, FileSource):
if isinstance(data_source, FileSource) or isinstance(data_source, SparkSource):
ts_column_type_regex_pattern = r"^timestamp"
elif isinstance(data_source, BigQuerySource):
ts_column_type_regex_pattern = "TIMESTAMP|DATETIME"
Expand All @@ -97,7 +98,7 @@ def update_data_sources_with_inferred_event_timestamp_col(
"DataSource",
"""
DataSource inferencing of event_timestamp_column is currently only supported
for FileSource and BigQuerySource.
for FileSource, SparkSource, BigQuerySource, RedshiftSource, and SnowflakeSource.
""",
)
# for informing the type checker
Expand Down
43 changes: 26 additions & 17 deletions sdk/python/feast/infra/offline_stores/third_party/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import pandas as pd
import pyarrow
import pyspark
from feast_spark_offline_store.spark_source import SparkSource
from feast_spark_offline_store.spark_type_map import spark_schema_to_np_dtypes
from feast.infra.offline_stores.third_party.spark_source import SparkSource
from feast.type_map import spark_schema_to_np_dtypes
from pydantic import StrictStr
from pyspark import SparkConf
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -37,7 +37,6 @@ class SparkOfflineStoreConfig(FeastConfigBaseModel):

spark_conf: Optional[Dict[str, str]] = None
""" Configuration overlay for the spark session """
# to ensure sparksession is the correct config, if not created yet
# sparksession is not serializable and we dont want to pass it around as an argument


Expand Down Expand Up @@ -161,11 +160,6 @@ def get_historical_features(
full_feature_names=full_feature_names,
)

# TODO: Figure out what this is used for
# on_demand_feature_views = OnDemandFeatureView.get_requested_odfvs(
# feature_refs=feature_refs, project=project, registry=registry
# )

return SparkRetrievalJob(
spark_session=spark_session,
query=query,
Expand Down Expand Up @@ -196,22 +190,37 @@ def pull_all_from_table_or_query(
have all already been mapped to column names of the source table and those column names are the values passed
into this function.
"""
assert isinstance(data_source, SparkSource)
warnings.warn(
"The spark offline store is an experimental feature in alpha development. "
"This API is unstable and it could and most probably will be changed in the future.",
RuntimeWarning,
)
from_expression = data_source.get_table_query_string()

field_string = (
'"'
+ '", "'.join(
join_key_columns + feature_name_columns + [event_timestamp_column]
)
+ '"'
)
start_date = start_date.astimezone(tz=utc)
end_date = end_date.astimezone(tz=utc)

return SparkOfflineStore.pull_latest_from_table_or_query(
query = f"""
SELECT {field_string}
FROM {from_expression}
WHERE "{event_timestamp_column}" BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
"""
spark_session = get_spark_session_or_start_new_with_repoconfig(
store_config=config.offline_store
)
return SparkRetrievalJob(
spark_session=spark_session,
query=query,
config=config,
data_source=data_source,
join_key_columns=join_key_columns
+ [event_timestamp_column], # avoid deduplication
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=None,
start_date=start_date,
end_date=end_date,
full_feature_names=False,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import warnings
from typing import Any, Callable, Dict, Iterable, Optional, Tuple

from feast_spark_offline_store.spark_type_map import spark_to_feast_value_type
from feast.type_map import spark_to_feast_value_type
from pyspark.sql.utils import AnalysisException

from feast.data_source import DataSource
Expand Down
Original file line number Diff line number Diff line change
@@ -1,54 +1,2 @@
from collections import defaultdict
from typing import Dict, Iterator, List, Tuple

from numpy import dtype

from feast import ValueType


def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType:
# TODO not all spark types are convertible
type_map: Dict[str, ValueType] = {
"null": ValueType.UNKNOWN,
"byte": ValueType.BYTES,
"string": ValueType.STRING,
"int": ValueType.INT32,
"bigint": ValueType.INT64,
"long": ValueType.INT64,
"double": ValueType.DOUBLE,
"float": ValueType.FLOAT,
"boolean": ValueType.BOOL,
"timestamp": ValueType.UNIX_TIMESTAMP,
"array<byte>": ValueType.BYTES_LIST,
"array<string>": ValueType.STRING_LIST,
"array<int>": ValueType.INT32_LIST,
"array<bigint>": ValueType.INT64_LIST,
"array<double>": ValueType.DOUBLE_LIST,
"array<float>": ValueType.FLOAT_LIST,
"array<boolean>": ValueType.BOOL_LIST,
"array<timestamp>": ValueType.UNIX_TIMESTAMP_LIST,
}
# TODO: this is just incorrect fix
if type(spark_type_as_str) != str or spark_type_as_str not in type_map:
return ValueType.NULL
return type_map[spark_type_as_str.lower()]


def spark_schema_to_np_dtypes(dtypes: List[Tuple[str, str]]) -> Iterator[dtype]:
# TODO recheck all typing (also tz for timestamp)
# https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#timestamp-with-time-zone-semantics

type_map = defaultdict(
lambda: dtype("O"),
{
"boolean": dtype("bool"),
"double": dtype("float64"),
"float": dtype("float64"),
"int": dtype("int64"),
"bigint": dtype("int64"),
"smallint": dtype("int64"),
"timestamp": dtype("datetime64[ns]"),
},
)

return (type_map[t] for _, t in dtypes)
49 changes: 49 additions & 0 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
Tuple,
Type,
Union,
Iterator,
cast,
)

import numpy as np
from collections import defaultdict
import pandas as pd
import pyarrow
from google.protobuf.timestamp_pb2 import Timestamp
Expand Down Expand Up @@ -575,3 +577,50 @@ def _non_empty_value(value: Any) -> bool:
return value is not None and (
not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str)
)

def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType:
# TODO not all spark types are convertible
Comment thread
kevjumba marked this conversation as resolved.
Outdated
type_map: Dict[str, ValueType] = {
"null": ValueType.UNKNOWN,
"byte": ValueType.BYTES,
"string": ValueType.STRING,
"int": ValueType.INT32,
"bigint": ValueType.INT64,
"long": ValueType.INT64,
"double": ValueType.DOUBLE,
"float": ValueType.FLOAT,
"boolean": ValueType.BOOL,
"timestamp": ValueType.UNIX_TIMESTAMP,
"array<byte>": ValueType.BYTES_LIST,
"array<string>": ValueType.STRING_LIST,
"array<int>": ValueType.INT32_LIST,
"array<bigint>": ValueType.INT64_LIST,
"array<double>": ValueType.DOUBLE_LIST,
"array<float>": ValueType.FLOAT_LIST,
"array<boolean>": ValueType.BOOL_LIST,
"array<timestamp>": ValueType.UNIX_TIMESTAMP_LIST,
}
# TODO: this is just incorrect fix
if type(spark_type_as_str) != str or spark_type_as_str not in type_map:
return ValueType.NULL
return type_map[spark_type_as_str.lower()]


def spark_schema_to_np_dtypes(dtypes: List[Tuple[str, str]]) -> Iterator[np.dtype]:
# TODO recheck all typing (also tz for timestamp)
# https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#timestamp-with-time-zone-semantics

type_map = defaultdict(
lambda: np.dtype("O"),
{
"boolean": np.dtype("bool"),
"double": np.dtype("float64"),
"float": np.dtype("float64"),
"int": np.dtype("int64"),
"bigint": np.dtype("int64"),
"smallint": np.dtype("int64"),
"timestamp": np.dtype("datetime64[ns]"),
},
)

return (type_map[t] for _, t in dtypes)
67 changes: 35 additions & 32 deletions sdk/python/tests/integration/feature_repos/repo_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from tests.integration.feature_repos.universal.data_sources.snowflake import (
SnowflakeDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.spark_data_source_creator import (
SparkDataSourceCreator,
)
from tests.integration.feature_repos.universal.feature_views import (
conv_rate_plus_100_feature_view,
create_conv_rate_request_data_source,
Expand Down Expand Up @@ -71,40 +74,40 @@
DEFAULT_FULL_REPO_CONFIGS.extend(
[
# Redis configurations
IntegrationTestRepoConfig(online_store=REDIS_CONFIG),
IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG),
# IntegrationTestRepoConfig(online_store=REDIS_CONFIG),
# IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG),
# GCP configurations
# IntegrationTestRepoConfig(
# provider="local", offline_store_creator=SparkDataSourceCreator,
# )
IntegrationTestRepoConfig(
provider="gcp",
offline_store_creator=BigQueryDataSourceCreator,
online_store="datastore",
),
IntegrationTestRepoConfig(
provider="gcp",
offline_store_creator=BigQueryDataSourceCreator,
online_store=REDIS_CONFIG,
),
# AWS configurations
IntegrationTestRepoConfig(
provider="aws",
offline_store_creator=RedshiftDataSourceCreator,
online_store=DYNAMO_CONFIG,
python_feature_server=True,
),
IntegrationTestRepoConfig(
provider="aws",
offline_store_creator=RedshiftDataSourceCreator,
online_store=REDIS_CONFIG,
),
# Snowflake configurations
IntegrationTestRepoConfig(
Comment thread
kevjumba marked this conversation as resolved.
Outdated
provider="aws", # no list features, no feature server
offline_store_creator=SnowflakeDataSourceCreator,
online_store=REDIS_CONFIG,
),
provider="local", offline_store_creator=SparkDataSourceCreator,
)
# IntegrationTestRepoConfig(
# provider="gcp",
# offline_store_creator=BigQueryDataSourceCreator,
# online_store="datastore",
# ),
# IntegrationTestRepoConfig(
# provider="gcp",
# offline_store_creator=BigQueryDataSourceCreator,
# online_store=REDIS_CONFIG,
# ),
# # AWS configurations
# IntegrationTestRepoConfig(
# provider="aws",
# offline_store_creator=RedshiftDataSourceCreator,
# online_store=DYNAMO_CONFIG,
# python_feature_server=True,
# ),
# IntegrationTestRepoConfig(
# provider="aws",
# offline_store_creator=RedshiftDataSourceCreator,
# online_store=REDIS_CONFIG,
# ),
# # Snowflake configurations
# IntegrationTestRepoConfig(
# provider="aws", # no list features, no feature server
# offline_store_creator=SnowflakeDataSourceCreator,
# online_store=REDIS_CONFIG,
# ),
]
)
full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def create_data_source(
field_mapping: Dict[str, str] = None,
**kwargs,
) -> DataSource:
# df["event_timestamp"] = pd.to_datetime(df["event_timestamp"], utc=True)
if event_timestamp_column in df:
df[event_timestamp_column] = pd.to_datetime(
df[event_timestamp_column], utc=True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,9 @@ def test_feature_get_historical_features_types_match(offline_types_test_fixtures
historical_features = fs.get_historical_features(
entity_df=entity_df, features=features,
)

# Note: Pandas doesn't play well with nan values in ints. BQ will also coerce to floats if there are NaNs
historical_features_df = historical_features.to_df()

print(historical_features_df)
if config.feature_is_list:
assert_feature_list_types(
environment.test_repo_config.provider,
Expand Down Expand Up @@ -285,6 +284,7 @@ def create_feature_view(
value_type = ValueType.BOOL
elif feature_dtype == "datetime":
value_type = ValueType.UNIX_TIMESTAMP

return driver_feature_view(data_source, name=name, value_type=value_type,)


Expand Down Expand Up @@ -347,6 +347,7 @@ def assert_expected_arrow_types(
):
print("Asserting historical feature arrow types")
historical_features_arrow = historical_features.to_arrow()
print(historical_features_arrow)
feature_list_dtype_to_expected_historical_feature_arrow_type = {
"int32": pa.types.is_int64,
"int64": pa.types.is_int64,
Expand Down