Skip to content

Commit 8b3a97a

Browse files
authored
Refactor data source classes to fix import issues (#1723)
* Refactor data source classes to fix import issues Signed-off-by: Achal Shah <achals@gmail.com> * make format and lint Signed-off-by: Achal Shah <achals@gmail.com> * remove unneded __init__ files Signed-off-by: Achal Shah <achals@gmail.com>
1 parent 8cfe914 commit 8b3a97a

File tree

9 files changed

+664
-630
lines changed

9 files changed

+664
-630
lines changed

sdk/python/feast/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
from pkg_resources import DistributionNotFound, get_distribution
44

5-
from feast.infra.offline_stores.bigquery import BigQuerySource
6-
from feast.infra.offline_stores.file import FileSource
7-
from feast.infra.offline_stores.redshift import RedshiftSource
5+
from feast.infra.offline_stores.bigquery_source import BigQuerySource
6+
from feast.infra.offline_stores.file_source import FileSource
7+
from feast.infra.offline_stores.redshift_source import RedshiftSource
88

99
from .client import Client
1010
from .data_source import KafkaSource, KinesisSource, SourceType
@@ -29,18 +29,18 @@
2929
pass
3030

3131
__all__ = [
32-
"BigQuerySource",
3332
"Client",
3433
"Entity",
3534
"KafkaSource",
3635
"KinesisSource",
37-
"RedshiftSource",
3836
"Feature",
3937
"FeatureStore",
4038
"FeatureTable",
4139
"FeatureView",
4240
"RepoConfig",
4341
"SourceType",
4442
"ValueType",
43+
"BigQuerySource",
4544
"FileSource",
45+
"RedshiftSource",
4646
]

sdk/python/feast/data_source.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,17 +317,17 @@ def from_proto(data_source: DataSourceProto):
317317
return cls.from_proto(data_source)
318318

319319
if data_source.file_options.file_format and data_source.file_options.file_url:
320-
from feast.infra.offline_stores.file import FileSource
320+
from feast.infra.offline_stores.file_source import FileSource
321321

322322
data_source_obj = FileSource.from_proto(data_source)
323323
elif (
324324
data_source.bigquery_options.table_ref or data_source.bigquery_options.query
325325
):
326-
from feast.infra.offline_stores.bigquery import BigQuerySource
326+
from feast.infra.offline_stores.bigquery_source import BigQuerySource
327327

328328
data_source_obj = BigQuerySource.from_proto(data_source)
329329
elif data_source.redshift_options.table or data_source.redshift_options.query:
330-
from feast.infra.offline_stores.redshift import RedshiftSource
330+
from feast.infra.offline_stores.redshift_source import RedshiftSource
331331

332332
data_source_obj = RedshiftSource.from_proto(data_source)
333333
elif (
@@ -378,6 +378,14 @@ def get_table_column_names_and_types(
378378

379379

380380
class KafkaSource(DataSource):
381+
def validate(self, config: RepoConfig):
382+
pass
383+
384+
def get_table_column_names_and_types(
385+
self, config: RepoConfig
386+
) -> Iterable[Tuple[str, str]]:
387+
pass
388+
381389
def __init__(
382390
self,
383391
event_timestamp_column: str,
@@ -463,6 +471,14 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
463471

464472

465473
class KinesisSource(DataSource):
474+
def validate(self, config: RepoConfig):
475+
pass
476+
477+
def get_table_column_names_and_types(
478+
self, config: RepoConfig
479+
) -> Iterable[Tuple[str, str]]:
480+
pass
481+
466482
@staticmethod
467483
def from_proto(data_source: DataSourceProto):
468484
return KinesisSource(

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 4 additions & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import uuid
33
from dataclasses import asdict, dataclass
44
from datetime import date, datetime, timedelta
5-
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
5+
from typing import List, Optional, Set, Union
66

77
import pandas
88
import pyarrow
@@ -12,12 +12,11 @@
1212
from pydantic.typing import Literal
1313
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed
1414

15-
from feast import errors, type_map
15+
from feast import errors
1616
from feast.data_source import DataSource
1717
from feast.errors import (
1818
BigQueryJobCancelled,
1919
BigQueryJobStillRunning,
20-
DataSourceNotFoundException,
2120
FeastProviderLoginError,
2221
)
2322
from feast.feature_view import FeatureView
@@ -26,10 +25,10 @@
2625
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
2726
_get_requested_feature_views_to_features_dict,
2827
)
29-
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
3028
from feast.registry import Registry
3129
from feast.repo_config import FeastConfigBaseModel, RepoConfig
32-
from feast.value_type import ValueType
30+
31+
from .bigquery_source import BigQuerySource
3332

3433
try:
3534
from google.api_core.exceptions import NotFound
@@ -707,201 +706,3 @@ def _get_bigquery_client(project: Optional[str] = None):
707706
) USING ({{featureview.name}}__entity_row_unique_id)
708707
{% endfor %}
709708
"""
710-
711-
712-
class BigQuerySource(DataSource):
713-
def __init__(
714-
self,
715-
event_timestamp_column: Optional[str] = "",
716-
table_ref: Optional[str] = None,
717-
created_timestamp_column: Optional[str] = "",
718-
field_mapping: Optional[Dict[str, str]] = None,
719-
date_partition_column: Optional[str] = "",
720-
query: Optional[str] = None,
721-
):
722-
self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query)
723-
724-
super().__init__(
725-
event_timestamp_column,
726-
created_timestamp_column,
727-
field_mapping,
728-
date_partition_column,
729-
)
730-
731-
def __eq__(self, other):
732-
if not isinstance(other, BigQuerySource):
733-
raise TypeError(
734-
"Comparisons should only involve BigQuerySource class objects."
735-
)
736-
737-
return (
738-
self.bigquery_options.table_ref == other.bigquery_options.table_ref
739-
and self.bigquery_options.query == other.bigquery_options.query
740-
and self.event_timestamp_column == other.event_timestamp_column
741-
and self.created_timestamp_column == other.created_timestamp_column
742-
and self.field_mapping == other.field_mapping
743-
)
744-
745-
@property
746-
def table_ref(self):
747-
return self._bigquery_options.table_ref
748-
749-
@property
750-
def query(self):
751-
return self._bigquery_options.query
752-
753-
@property
754-
def bigquery_options(self):
755-
"""
756-
Returns the bigquery options of this data source
757-
"""
758-
return self._bigquery_options
759-
760-
@bigquery_options.setter
761-
def bigquery_options(self, bigquery_options):
762-
"""
763-
Sets the bigquery options of this data source
764-
"""
765-
self._bigquery_options = bigquery_options
766-
767-
@staticmethod
768-
def from_proto(data_source: DataSourceProto):
769-
770-
assert data_source.HasField("bigquery_options")
771-
772-
return BigQuerySource(
773-
field_mapping=dict(data_source.field_mapping),
774-
table_ref=data_source.bigquery_options.table_ref,
775-
event_timestamp_column=data_source.event_timestamp_column,
776-
created_timestamp_column=data_source.created_timestamp_column,
777-
date_partition_column=data_source.date_partition_column,
778-
query=data_source.bigquery_options.query,
779-
)
780-
781-
def to_proto(self) -> DataSourceProto:
782-
data_source_proto = DataSourceProto(
783-
type=DataSourceProto.BATCH_BIGQUERY,
784-
field_mapping=self.field_mapping,
785-
bigquery_options=self.bigquery_options.to_proto(),
786-
)
787-
788-
data_source_proto.event_timestamp_column = self.event_timestamp_column
789-
data_source_proto.created_timestamp_column = self.created_timestamp_column
790-
data_source_proto.date_partition_column = self.date_partition_column
791-
792-
return data_source_proto
793-
794-
def validate(self, config: RepoConfig):
795-
if not self.query:
796-
from google.api_core.exceptions import NotFound
797-
from google.cloud import bigquery
798-
799-
client = bigquery.Client()
800-
try:
801-
client.get_table(self.table_ref)
802-
except NotFound:
803-
raise DataSourceNotFoundException(self.table_ref)
804-
805-
def get_table_query_string(self) -> str:
806-
"""Returns a string that can directly be used to reference this table in SQL"""
807-
if self.table_ref:
808-
return f"`{self.table_ref}`"
809-
else:
810-
return f"({self.query})"
811-
812-
@staticmethod
813-
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
814-
return type_map.bq_to_feast_value_type
815-
816-
def get_table_column_names_and_types(
817-
self, config: RepoConfig
818-
) -> Iterable[Tuple[str, str]]:
819-
from google.cloud import bigquery
820-
821-
client = bigquery.Client()
822-
if self.table_ref is not None:
823-
table_schema = client.get_table(self.table_ref).schema
824-
if not isinstance(table_schema[0], bigquery.schema.SchemaField):
825-
raise TypeError("Could not parse BigQuery table schema.")
826-
827-
name_type_pairs = [(field.name, field.field_type) for field in table_schema]
828-
else:
829-
bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1"
830-
queryRes = client.query(bq_columns_query).result()
831-
name_type_pairs = [
832-
(schema_field.name, schema_field.field_type)
833-
for schema_field in queryRes.schema
834-
]
835-
836-
return name_type_pairs
837-
838-
839-
class BigQueryOptions:
840-
"""
841-
DataSource BigQuery options used to source features from BigQuery query
842-
"""
843-
844-
def __init__(self, table_ref: Optional[str], query: Optional[str]):
845-
self._table_ref = table_ref
846-
self._query = query
847-
848-
@property
849-
def query(self):
850-
"""
851-
Returns the BigQuery SQL query referenced by this source
852-
"""
853-
return self._query
854-
855-
@query.setter
856-
def query(self, query):
857-
"""
858-
Sets the BigQuery SQL query referenced by this source
859-
"""
860-
self._query = query
861-
862-
@property
863-
def table_ref(self):
864-
"""
865-
Returns the table ref of this BQ table
866-
"""
867-
return self._table_ref
868-
869-
@table_ref.setter
870-
def table_ref(self, table_ref):
871-
"""
872-
Sets the table ref of this BQ table
873-
"""
874-
self._table_ref = table_ref
875-
876-
@classmethod
877-
def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions):
878-
"""
879-
Creates a BigQueryOptions from a protobuf representation of a BigQuery option
880-
881-
Args:
882-
bigquery_options_proto: A protobuf representation of a DataSource
883-
884-
Returns:
885-
Returns a BigQueryOptions object based on the bigquery_options protobuf
886-
"""
887-
888-
bigquery_options = cls(
889-
table_ref=bigquery_options_proto.table_ref,
890-
query=bigquery_options_proto.query,
891-
)
892-
893-
return bigquery_options
894-
895-
def to_proto(self) -> DataSourceProto.BigQueryOptions:
896-
"""
897-
Converts an BigQueryOptionsProto object to its protobuf representation.
898-
899-
Returns:
900-
BigQueryOptionsProto protobuf
901-
"""
902-
903-
bigquery_options_proto = DataSourceProto.BigQueryOptions(
904-
table_ref=self.table_ref, query=self.query,
905-
)
906-
907-
return bigquery_options_proto

0 commit comments

Comments
 (0)