Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
21e47d2
test
mavysavydav Jun 12, 2021
06e0c77
refactored existing tests to test full_feature_names feature on data …
Mwad22 Jun 16, 2021
4b7dd18
removed full_feature_names usage from quickstart and README to have m…
Mwad22 Jun 16, 2021
579e08f
Update CHANGELOG for Feast v0.10.8
Jun 17, 2021
462da43
GitBook: [master] 2 pages modified
achals Jun 17, 2021
df95ee8
Schema Inferencing should happen at apply time (#1646)
mavysavydav Jun 18, 2021
e383575
GitBook: [master] 80 pages modified
woop Jun 19, 2021
dd25ad6
GitBook: [master] 80 pages modified
woop Jun 20, 2021
cef2869
Provide descriptive error on invalid table reference (#1627)
codyjlin Jun 21, 2021
c2e2b4d
Refactor OnlineStoreConfig classes into owning modules (#1649)
achals Jun 21, 2021
d2cda24
Possibility to specify a project for BigQuery queries (#1656)
MattDelac Jun 21, 2021
4ab4c60
Refactor OfflineStoreConfig classes into their owning modules (#1657)
achals Jun 22, 2021
64a2cb5
Run python unit tests in parallel (#1652)
achals Jun 22, 2021
9e4c907
Rename telemetry to usage (#1660)
Jun 22, 2021
b951282
resolved final comments on PR (variable renaming, refactor tests)
Mwad22 Jun 23, 2021
a68b12b
reformatted after merge conflict
Mwad22 Jun 23, 2021
094dbf3
Update CHANGELOG for Feast v0.11.0
woop Jun 24, 2021
0a148f9
Update charts README (#1659)
szalai1 Jun 25, 2021
0ce8210
Added Redis to list of online stores for local provider in providers …
nels Jun 25, 2021
d71e4c5
Grouped inferencing statements together in apply methods for easier r…
mavysavydav Jun 25, 2021
c14023f
Add RedshiftDataSource (#1669)
Jun 28, 2021
d138648
Provide the user with more options for setting the to_bigquery config…
codyjlin Jun 28, 2021
c02b9eb
Add streaming sources to the FeatureView API (#1664)
achals Jun 28, 2021
12dbbea
Add to_table() to RetrievalJob object (#1663)
MattDelac Jun 29, 2021
d0fe0a9
Rename to_table to to_arrow (#1671)
MattDelac Jun 29, 2021
6e8670e
Cancel BigQuery job if timeout hits (#1672)
MattDelac Jun 29, 2021
5314024
Fix Feature References example (#1674)
GregKuhlmann Jun 30, 2021
eb1da5e
Allow strings for online/offline store instead of dicts (#1673)
achals Jun 30, 2021
183a0b9
Remove default list from the FeatureView constructor (#1679)
achals Jul 1, 2021
b714a12
made changes requested by @tsotnet
Mwad22 Jul 2, 2021
c78894f
Fix unit tests that got broken by Pandas 1.3.0 release (#1683)
Jul 3, 2021
20c9461
Add support for DynamoDB and S3 registry (#1483)
leonid133 Jul 3, 2021
d36d1a0
Parallelize integration tests (#1684)
Jul 4, 2021
651bce3
BQ exception should be raised first before we check the timedout (#1675)
MattDelac Jul 5, 2021
f3b92c3
Update sdk/python/feast/infra/provider.py
Mwad22 Jul 5, 2021
f400d65
Update sdk/python/feast/feature_store.py
Mwad22 Jul 5, 2021
082fca7
made error logic/messages more descriptive
Mwad22 Jul 5, 2021
3aca976
made error logic/messages more descriptive.
Mwad22 Jul 5, 2021
79aa736
Simplified error messages
Mwad22 Jul 6, 2021
d7d08ef
ran formatter, issue in errors.py
Mwad22 Jul 7, 2021
2ab8eea
Merge branch 'master' into mwad22-1618-PR
Mwad22 Jul 7, 2021
650340d
python linter issues resolved
Mwad22 Jul 7, 2021
5d582a6
removed unnecessary default assignment in get_historical_features. de…
Mwad22 Jul 8, 2021
8724e0b
added error message assertion for feature name collisions, and other …
Mwad22 Jul 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add RedshiftDataSource (#1669)
* Add RedshiftDataSource

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

* Call parent __init__ first

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>
Signed-off-by: Mwad22 <51929507+Mwad22@users.noreply.github.com>
  • Loading branch information
Tsotne Tabidze authored and Mwad22 committed Jul 7, 2021
commit c14023f5c15f4233f0ffd30b945f78c1106674b1
12 changes: 12 additions & 0 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ message DataSource {
BATCH_BIGQUERY = 2;
STREAM_KAFKA = 3;
STREAM_KINESIS = 4;
BATCH_REDSHIFT = 5;
}
SourceType type = 1;

Expand Down Expand Up @@ -100,11 +101,22 @@ message DataSource {
StreamFormat record_format = 3;
}

// Defines options for DataSource that sources features from a Redshift Query
message RedshiftOptions {
// Redshift table name
string table = 1;

// SQL query that returns a table containing feature data. Must contain an event_timestamp column, and respective
// entity columns
string query = 2;
}

// DataSource options.
oneof options {
FileOptions file_options = 11;
BigQueryOptions bigquery_options = 12;
KafkaOptions kafka_options = 13;
KinesisOptions kinesis_options = 14;
RedshiftOptions redshift_options = 15;
}
}
2 changes: 2 additions & 0 deletions sdk/python/feast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
FileSource,
KafkaSource,
KinesisSource,
RedshiftSource,
SourceType,
)
from .entity import Entity
Expand Down Expand Up @@ -37,6 +38,7 @@
"FileSource",
"KafkaSource",
"KinesisSource",
"RedshiftSource",
"Feature",
"FeatureStore",
"FeatureTable",
Expand Down
266 changes: 260 additions & 6 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
from typing import Callable, Dict, Iterable, Optional, Tuple

from pyarrow.parquet import ParquetFile
from tenacity import retry, retry_unless_exception_type, wait_exponential

from feast import type_map
from feast.data_format import FileFormat, StreamFormat
from feast.errors import DataSourceNotFoundException
from feast.errors import (
DataSourceNotFoundException,
RedshiftCredentialsError,
RedshiftQueryError,
)
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig
from feast.value_type import ValueType


Expand Down Expand Up @@ -477,6 +483,15 @@ def from_proto(data_source):
date_partition_column=data_source.date_partition_column,
query=data_source.bigquery_options.query,
)
elif data_source.redshift_options.table or data_source.redshift_options.query:
data_source_obj = RedshiftSource(
field_mapping=data_source.field_mapping,
table=data_source.redshift_options.table,
event_timestamp_column=data_source.event_timestamp_column,
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
query=data_source.redshift_options.query,
)
elif (
data_source.kafka_options.bootstrap_servers
and data_source.kafka_options.topic
Expand Down Expand Up @@ -520,12 +535,27 @@ def to_proto(self) -> DataSourceProto:
"""
raise NotImplementedError

def validate(self):
def validate(self, config: RepoConfig):
"""
Validates the underlying data source.
"""
raise NotImplementedError

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
"""
Get the callable method that returns Feast type given the raw column type
"""
raise NotImplementedError

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
"""
Get the list of column names and raw column types
"""
raise NotImplementedError


class FileSource(DataSource):
def __init__(
Expand Down Expand Up @@ -622,15 +652,17 @@ def to_proto(self) -> DataSourceProto:

return data_source_proto

def validate(self):
def validate(self, config: RepoConfig):
# TODO: validate a FileSource
pass

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
return type_map.pa_to_feast_value_type

def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
schema = ParquetFile(self.path).schema_arrow
return zip(schema.names, map(str, schema.types))

Expand Down Expand Up @@ -703,7 +735,7 @@ def to_proto(self) -> DataSourceProto:

return data_source_proto

def validate(self):
def validate(self, config: RepoConfig):
if not self.query:
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
Expand All @@ -725,7 +757,9 @@ def get_table_query_string(self) -> str:
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
return type_map.bq_to_feast_value_type

def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
from google.cloud import bigquery

client = bigquery.Client()
Expand Down Expand Up @@ -875,3 +909,223 @@ def to_proto(self) -> DataSourceProto:
data_source_proto.date_partition_column = self.date_partition_column

return data_source_proto


class RedshiftOptions:
"""
DataSource Redshift options used to source features from Redshift query
"""

def __init__(self, table: Optional[str], query: Optional[str]):
self._table = table
self._query = query

@property
def query(self):
"""
Returns the Redshift SQL query referenced by this source
"""
return self._query

@query.setter
def query(self, query):
"""
Sets the Redshift SQL query referenced by this source
"""
self._query = query

@property
def table(self):
"""
Returns the table name of this Redshift table
"""
return self._table

@table.setter
def table(self, table_name):
"""
Sets the table ref of this Redshift table
"""
self._table = table_name

@classmethod
def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions):
"""
Creates a RedshiftOptions from a protobuf representation of a Redshift option

Args:
redshift_options_proto: A protobuf representation of a DataSource

Returns:
Returns a RedshiftOptions object based on the redshift_options protobuf
"""

redshift_options = cls(
table=redshift_options_proto.table, query=redshift_options_proto.query,
)

return redshift_options

def to_proto(self) -> DataSourceProto.RedshiftOptions:
"""
Converts an RedshiftOptionsProto object to its protobuf representation.

Returns:
RedshiftOptionsProto protobuf
"""

redshift_options_proto = DataSourceProto.RedshiftOptions(
table=self.table, query=self.query,
)

return redshift_options_proto


class RedshiftSource(DataSource):
def __init__(
self,
event_timestamp_column: Optional[str] = "",
table: Optional[str] = None,
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = "",
query: Optional[str] = None,
):
super().__init__(
event_timestamp_column,
created_timestamp_column,
field_mapping,
date_partition_column,
)

self._redshift_options = RedshiftOptions(table=table, query=query)

def __eq__(self, other):
if not isinstance(other, RedshiftSource):
raise TypeError(
"Comparisons should only involve RedshiftSource class objects."
)

return (
self.redshift_options.table == other.redshift_options.table
and self.redshift_options.query == other.redshift_options.query
and self.event_timestamp_column == other.event_timestamp_column
and self.created_timestamp_column == other.created_timestamp_column
and self.field_mapping == other.field_mapping
)

@property
def table(self):
return self._redshift_options.table

@property
def query(self):
return self._redshift_options.query

@property
def redshift_options(self):
"""
Returns the Redshift options of this data source
"""
return self._redshift_options

@redshift_options.setter
def redshift_options(self, _redshift_options):
"""
Sets the Redshift options of this data source
"""
self._redshift_options = _redshift_options

def to_proto(self) -> DataSourceProto:
data_source_proto = DataSourceProto(
type=DataSourceProto.BATCH_REDSHIFT,
field_mapping=self.field_mapping,
redshift_options=self.redshift_options.to_proto(),
)

data_source_proto.event_timestamp_column = self.event_timestamp_column
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column

return data_source_proto

def validate(self, config: RepoConfig):
# As long as the query gets successfully executed, or the table exists,
# the data source is validated. We don't need the results though.
# TODO: uncomment this
# self.get_table_column_names_and_types(config)
print("Validate", self.get_table_column_names_and_types(config))

def get_table_query_string(self) -> str:
"""Returns a string that can directly be used to reference this table in SQL"""
if self.table:
return f"`{self.table}`"
else:
return f"({self.query})"

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
return type_map.redshift_to_feast_value_type

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError

from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig

assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)

client = boto3.client(
"redshift-data", config=Config(region_name=config.offline_store.region)
)

try:
if self.table is not None:
table = client.describe_table(
ClusterIdentifier=config.offline_store.cluster_id,
Database=config.offline_store.database,
DbUser=config.offline_store.user,
Table=self.table,
)
# The API returns valid JSON with empty column list when the table doesn't exist
if len(table["ColumnList"]) == 0:
raise DataSourceNotFoundException(self.table)

columns = table["ColumnList"]
else:
statement = client.execute_statement(
ClusterIdentifier=config.offline_store.cluster_id,
Database=config.offline_store.database,
DbUser=config.offline_store.user,
Sql=f"SELECT * FROM ({self.query}) LIMIT 1",
)

# Need to retry client.describe_statement(...) until the task is finished. We don't want to bombard
# Redshift with queries, and neither do we want to wait for a long time on the initial call.
# The solution is exponential backoff. The backoff starts with 0.1 seconds and doubles exponentially
# until reaching 30 seconds, at which point the backoff is fixed.
@retry(
wait=wait_exponential(multiplier=0.1, max=30),
retry=retry_unless_exception_type(RedshiftQueryError),
)
def wait_for_statement():
desc = client.describe_statement(Id=statement["Id"])
if desc["Status"] in ("SUBMITTED", "STARTED", "PICKED"):
raise Exception # Retry
if desc["Status"] != "FINISHED":
raise RedshiftQueryError(desc) # Don't retry. Raise exception.

wait_for_statement()

result = client.get_statement_result(Id=statement["Id"])

columns = result["ColumnMetadata"]
except ClientError as e:
if e.response["Error"]["Code"] == "ValidationException":
raise RedshiftCredentialsError() from e
raise

return [(column["name"], column["typeName"].upper()) for column in columns]
10 changes: 10 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,13 @@ def __init__(self, repo_obj_type: str, specific_issue: str):
f"Inference to fill in missing information for {repo_obj_type} failed. {specific_issue}. "
"Try filling the information explicitly."
)


class RedshiftCredentialsError(Exception):
def __init__(self):
super().__init__("Redshift API failed due to incorrect credentials")


class RedshiftQueryError(Exception):
def __init__(self, details):
super().__init__(f"Redshift SQL Query failed to finish. Details: {details}")
Loading