Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def __init__(

if _message_format is None:
raise ValueError("Message format must be specified for Kafka source")
print("Asdfasdf")

super().__init__(
event_timestamp_column=_event_timestamp_column,
created_timestamp_column=created_timestamp_column,
Expand Down Expand Up @@ -467,7 +467,9 @@ def from_proto(data_source: DataSourceProto):
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
batch_source=DataSource.from_proto(data_source.batch_source),
batch_source=DataSource.from_proto(data_source.batch_source)
if data_source.batch_source
else None,
)

def to_proto(self) -> DataSourceProto:
Expand Down Expand Up @@ -500,17 +502,20 @@ class RequestSource(DataSource):
"""
RequestSource that can be used to provide input features for on demand transforms

Args:
Attributes:
name: Name of the request data source
schema Union[Dict[str, ValueType], List[Field]]: Schema mapping from the input feature name to a ValueType
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the request data source, typically the email of the primary
schema: Schema mapping from the input feature name to a ValueType
description: A human-readable description.
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the request data source, typically the email of the primary
maintainer.
"""

name: str
schema: List[Field]
description: str
tags: Dict[str, str]
owner: str

def __init__(
self,
Expand Down Expand Up @@ -697,7 +702,9 @@ def from_proto(data_source: DataSourceProto):
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
batch_source=DataSource.from_proto(data_source.batch_source),
batch_source=DataSource.from_proto(data_source.batch_source)
if data_source.batch_source
else None,
)

@staticmethod
Expand Down
54 changes: 8 additions & 46 deletions sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,9 @@ def __eq__(self, other):
)

return (
self.name == other.name
and self.bigquery_options.table == other.bigquery_options.table
and self.bigquery_options.query == other.bigquery_options.query
and self.timestamp_field == other.timestamp_field
and self.created_timestamp_column == other.created_timestamp_column
and self.field_mapping == other.field_mapping
and self.description == other.description
and self.tags == other.tags
and self.owner == other.owner
super().__eq__(other)
and self.table == other.table
and self.query == other.query
)

@property
Expand All @@ -120,7 +114,6 @@ def query(self):

@staticmethod
def from_proto(data_source: DataSourceProto):

assert data_source.HasField("bigquery_options")

return BigQuerySource(
Expand All @@ -144,11 +137,10 @@ def to_proto(self) -> DataSourceProto:
description=self.description,
tags=self.tags,
owner=self.owner,
timestamp_field=self.timestamp_field,
created_timestamp_column=self.created_timestamp_column,
)

data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column

return data_source_proto

def validate(self, config: RepoConfig):
Expand Down Expand Up @@ -200,42 +192,14 @@ def get_table_column_names_and_types(

class BigQueryOptions:
"""
DataSource BigQuery options used to source features from BigQuery query
Configuration options for a BigQuery data source.
"""

def __init__(
self, table: Optional[str], query: Optional[str],
):
self._table = table
self._query = query

@property
def query(self):
"""
Returns the BigQuery SQL query referenced by this source
"""
return self._query

@query.setter
def query(self, query):
"""
Sets the BigQuery SQL query referenced by this source
"""
self._query = query

@property
def table(self):
"""
Returns the table ref of this BQ table
"""
return self._table

@table.setter
def table(self, table):
"""
Sets the table ref of this BQ table
"""
self._table = table
self.table = table or ""
self.query = query or ""

@classmethod
def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions):
Expand All @@ -248,7 +212,6 @@ def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions):
Returns:
Returns a BigQueryOptions object based on the bigquery_options protobuf
"""

bigquery_options = cls(
table=bigquery_options_proto.table, query=bigquery_options_proto.query,
)
Expand All @@ -262,7 +225,6 @@ def to_proto(self) -> DataSourceProto.BigQueryOptions:
Returns:
BigQueryOptionsProto protobuf
"""

bigquery_options_proto = DataSourceProto.BigQueryOptions(
table=self.table, query=self.query,
)
Expand Down
70 changes: 11 additions & 59 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,11 @@ def __eq__(self, other):
raise TypeError("Comparisons should only involve FileSource class objects.")

return (
self.name == other.name
super().__eq__(other)
and self.path == other.path
and self.file_options.file_format == other.file_options.file_format
and self.timestamp_field == other.timestamp_field
and self.created_timestamp_column == other.created_timestamp_column
and self.field_mapping == other.field_mapping
and self.file_options.s3_endpoint_override
== other.file_options.s3_endpoint_override
and self.description == other.description
and self.tags == other.tags
and self.owner == other.owner
)

@property
Expand Down Expand Up @@ -203,7 +198,7 @@ def get_table_query_string(self) -> str:

class FileOptions:
"""
DataSource File options used to source features from a file
Configuration options for a file data source.
"""

def __init__(
Expand All @@ -213,66 +208,23 @@ def __init__(
uri: Optional[str],
):
"""
FileOptions initialization method
Initializes a FileOptions object.

Args:
file_format (FileFormat, optional): file source format eg. parquet
s3_endpoint_override (str, optional): custom s3 endpoint (used only with s3 uri)
uri (str, optional): file source url eg. s3:// or local file

"""
self._file_format = file_format
self._uri = uri
self._s3_endpoint_override = s3_endpoint_override

@property
def file_format(self):
"""
Returns the file format of this file
"""
return self._file_format

@file_format.setter
def file_format(self, file_format):
"""
Sets the file format of this file
"""
self._file_format = file_format

@property
def uri(self):
"""
Returns the file url of this file
file_format (optional): File source format, e.g. parquet.
s3_endpoint_override (optional): Custom s3 endpoint (used only with s3 uri).
uri (optional): File source url, e.g. s3:// or local file.
"""
return self._uri

@uri.setter
def uri(self, uri):
"""
Sets the file url of this file
"""
self._uri = uri

@property
def s3_endpoint_override(self):
"""
Returns the s3 endpoint override
"""
return None if self._s3_endpoint_override == "" else self._s3_endpoint_override

@s3_endpoint_override.setter
def s3_endpoint_override(self, s3_endpoint_override):
"""
Sets the s3 endpoint override
"""
self._s3_endpoint_override = s3_endpoint_override
self.file_format = file_format
self.uri = uri or ""
self.s3_endpoint_override = s3_endpoint_override or ""

@classmethod
def from_proto(cls, file_options_proto: DataSourceProto.FileOptions):
"""
Creates a FileOptions from a protobuf representation of a file option

args:
Args:
file_options_proto: a protobuf representation of a datasource

Returns:
Expand Down
65 changes: 10 additions & 55 deletions sdk/python/feast/infra/offline_stores/redshift_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def from_proto(data_source: DataSourceProto):
A RedshiftSource object based on the data_source protobuf.
"""
return RedshiftSource(
name=data_source.name,
field_mapping=dict(data_source.field_mapping),
table=data_source.redshift_options.table,
schema=data_source.redshift_options.schema,
Expand All @@ -129,17 +130,11 @@ def __eq__(self, other):
)

return (
self.name == other.name
super().__eq__(other)
and self.redshift_options.table == other.redshift_options.table
and self.redshift_options.schema == other.redshift_options.schema
and self.redshift_options.query == other.redshift_options.query
and self.redshift_options.database == other.redshift_options.database
and self.timestamp_field == other.timestamp_field
and self.created_timestamp_column == other.created_timestamp_column
and self.field_mapping == other.field_mapping
and self.description == other.description
and self.tags == other.tags
and self.owner == other.owner
)

@property
Expand Down Expand Up @@ -170,17 +165,17 @@ def to_proto(self) -> DataSourceProto:
A DataSourceProto object.
"""
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.BATCH_REDSHIFT,
field_mapping=self.field_mapping,
redshift_options=self.redshift_options.to_proto(),
description=self.description,
tags=self.tags,
owner=self.owner,
timestamp_field=self.timestamp_field,
created_timestamp_column=self.created_timestamp_column,
)

data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column

return data_source_proto

def validate(self, config: RepoConfig):
Expand Down Expand Up @@ -256,7 +251,7 @@ def get_table_column_names_and_types(

class RedshiftOptions:
"""
DataSource Redshift options used to source features from Redshift query.
Configuration options for a Redshift data source.
"""

def __init__(
Expand All @@ -266,50 +261,10 @@ def __init__(
query: Optional[str],
database: Optional[str],
):
self._table = table
self._schema = schema
self._query = query
self._database = database

@property
def query(self):
"""Returns the Redshift SQL query referenced by this source."""
return self._query

@query.setter
def query(self, query):
"""Sets the Redshift SQL query referenced by this source."""
self._query = query

@property
def table(self):
"""Returns the table name of this Redshift table."""
return self._table

@table.setter
def table(self, table_name):
"""Sets the table ref of this Redshift table."""
self._table = table_name

@property
def schema(self):
"""Returns the schema name of this Redshift table."""
return self._schema

@schema.setter
def schema(self, schema):
"""Sets the schema of this Redshift table."""
self._schema = schema

@property
def database(self):
"""Returns the schema name of this Redshift table."""
return self._database

@database.setter
def database(self, database):
"""Sets the database name of this Redshift table."""
self._database = database
self.table = table or ""
self.schema = schema or ""
self.query = query or ""
self.database = database or ""

@classmethod
def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions):
Expand Down
Loading