Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: Simplify DataSource.from_proto logic
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Mar 22, 2022
commit 07212b574a90d8fceb5a49835f1b8584be532fea
64 changes: 31 additions & 33 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,22 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
return kinesis_options_proto


_BATCH_DATA_SOURCE_TYPES = {
"file_options": "feast.infra.offline_stores.file_source.FileSource",
"bigquery_options": "feast.infra.offline_stores.bigquery_source.BigQuerySource",
"redshift_options": "feast.infra.offline_stores.redshift_source.RedshiftSource",
"snowflake_options": "feast.infra.offline_stores.snowflake_source.SnowflakeSource",
}


_NON_BATCH_DATA_SOURCE_TYPES = {
"request_data_options",
"custom_options",
"kafka_options",
"kinesis_options",
}


class DataSource(ABC):
"""
DataSource that can be used to source features.
Expand Down Expand Up @@ -210,44 +226,26 @@ def from_proto(data_source: DataSourceProto) -> Any:
Raises:
ValueError: The type of DataSource could not be identified.
"""
if data_source.data_source_class_type:
cls = get_data_source_class_from_type(data_source.data_source_class_type)
return cls.from_proto(data_source)

if data_source.request_data_options and data_source.request_data_options.schema:
data_source_obj = RequestDataSource.from_proto(data_source)
elif data_source.file_options.file_format and data_source.file_options.file_url:
from feast.infra.offline_stores.file_source import FileSource

data_source_obj = FileSource.from_proto(data_source)
elif (
data_source.bigquery_options.table_ref or data_source.bigquery_options.query
option_type = data_source.WhichOneof("options")
if not option_type or (
option_type not in _BATCH_DATA_SOURCE_TYPES
and option_type not in _NON_BATCH_DATA_SOURCE_TYPES
):
from feast.infra.offline_stores.bigquery_source import BigQuerySource

data_source_obj = BigQuerySource.from_proto(data_source)
elif data_source.redshift_options.table or data_source.redshift_options.query:
from feast.infra.offline_stores.redshift_source import RedshiftSource

data_source_obj = RedshiftSource.from_proto(data_source)

elif data_source.snowflake_options.table or data_source.snowflake_options.query:
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
raise ValueError("Could not identify the source type being added.")

data_source_obj = SnowflakeSource.from_proto(data_source)
if option_type in _BATCH_DATA_SOURCE_TYPES:
cls = get_data_source_class_from_type(_BATCH_DATA_SOURCE_TYPES[option_type])
return cls.from_proto(data_source)

elif (
data_source.kafka_options.bootstrap_servers
and data_source.kafka_options.topic
and data_source.kafka_options.message_format
):
if option_type == "custom_options":
cls = get_data_source_class_from_type(data_source.data_source_class_type)
data_source_obj = cls.from_proto(data_source)
elif option_type == "kafka_options":
data_source_obj = KafkaSource.from_proto(data_source)
elif (
data_source.kinesis_options.record_format
and data_source.kinesis_options.region
and data_source.kinesis_options.stream_name
):
elif option_type == "kinesis_options":
data_source_obj = KinesisSource.from_proto(data_source)
elif option_type == "request_data_options":
data_source_obj = RequestDataSource.from_proto(data_source)
else:
raise ValueError("Could not identify the source type being added.")

Expand Down