|
2 | 2 | import uuid |
3 | 3 | from dataclasses import asdict, dataclass |
4 | 4 | from datetime import date, datetime, timedelta |
5 | | -from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, Union |
| 5 | +from typing import List, Optional, Set, Union |
6 | 6 |
|
7 | 7 | import pandas |
8 | 8 | import pyarrow |
|
12 | 12 | from pydantic.typing import Literal |
13 | 13 | from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed |
14 | 14 |
|
15 | | -from feast import errors, type_map |
| 15 | +from feast import errors |
16 | 16 | from feast.data_source import DataSource |
17 | 17 | from feast.errors import ( |
18 | 18 | BigQueryJobCancelled, |
19 | 19 | BigQueryJobStillRunning, |
20 | | - DataSourceNotFoundException, |
21 | 20 | FeastProviderLoginError, |
22 | 21 | ) |
23 | 22 | from feast.feature_view import FeatureView |
|
26 | 25 | DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, |
27 | 26 | _get_requested_feature_views_to_features_dict, |
28 | 27 | ) |
29 | | -from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto |
30 | 28 | from feast.registry import Registry |
31 | 29 | from feast.repo_config import FeastConfigBaseModel, RepoConfig |
32 | | -from feast.value_type import ValueType |
| 30 | + |
| 31 | +from .bigquery_source import BigQuerySource |
33 | 32 |
|
34 | 33 | try: |
35 | 34 | from google.api_core.exceptions import NotFound |
@@ -707,201 +706,3 @@ def _get_bigquery_client(project: Optional[str] = None): |
707 | 706 | ) USING ({{featureview.name}}__entity_row_unique_id) |
708 | 707 | {% endfor %} |
709 | 708 | """ |
710 | | - |
711 | | - |
712 | | -class BigQuerySource(DataSource): |
713 | | - def __init__( |
714 | | - self, |
715 | | - event_timestamp_column: Optional[str] = "", |
716 | | - table_ref: Optional[str] = None, |
717 | | - created_timestamp_column: Optional[str] = "", |
718 | | - field_mapping: Optional[Dict[str, str]] = None, |
719 | | - date_partition_column: Optional[str] = "", |
720 | | - query: Optional[str] = None, |
721 | | - ): |
722 | | - self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query) |
723 | | - |
724 | | - super().__init__( |
725 | | - event_timestamp_column, |
726 | | - created_timestamp_column, |
727 | | - field_mapping, |
728 | | - date_partition_column, |
729 | | - ) |
730 | | - |
731 | | - def __eq__(self, other): |
732 | | - if not isinstance(other, BigQuerySource): |
733 | | - raise TypeError( |
734 | | - "Comparisons should only involve BigQuerySource class objects." |
735 | | - ) |
736 | | - |
737 | | - return ( |
738 | | - self.bigquery_options.table_ref == other.bigquery_options.table_ref |
739 | | - and self.bigquery_options.query == other.bigquery_options.query |
740 | | - and self.event_timestamp_column == other.event_timestamp_column |
741 | | - and self.created_timestamp_column == other.created_timestamp_column |
742 | | - and self.field_mapping == other.field_mapping |
743 | | - ) |
744 | | - |
745 | | - @property |
746 | | - def table_ref(self): |
747 | | - return self._bigquery_options.table_ref |
748 | | - |
749 | | - @property |
750 | | - def query(self): |
751 | | - return self._bigquery_options.query |
752 | | - |
753 | | - @property |
754 | | - def bigquery_options(self): |
755 | | - """ |
756 | | - Returns the bigquery options of this data source |
757 | | - """ |
758 | | - return self._bigquery_options |
759 | | - |
760 | | - @bigquery_options.setter |
761 | | - def bigquery_options(self, bigquery_options): |
762 | | - """ |
763 | | - Sets the bigquery options of this data source |
764 | | - """ |
765 | | - self._bigquery_options = bigquery_options |
766 | | - |
767 | | - @staticmethod |
768 | | - def from_proto(data_source: DataSourceProto): |
769 | | - |
770 | | - assert data_source.HasField("bigquery_options") |
771 | | - |
772 | | - return BigQuerySource( |
773 | | - field_mapping=dict(data_source.field_mapping), |
774 | | - table_ref=data_source.bigquery_options.table_ref, |
775 | | - event_timestamp_column=data_source.event_timestamp_column, |
776 | | - created_timestamp_column=data_source.created_timestamp_column, |
777 | | - date_partition_column=data_source.date_partition_column, |
778 | | - query=data_source.bigquery_options.query, |
779 | | - ) |
780 | | - |
781 | | - def to_proto(self) -> DataSourceProto: |
782 | | - data_source_proto = DataSourceProto( |
783 | | - type=DataSourceProto.BATCH_BIGQUERY, |
784 | | - field_mapping=self.field_mapping, |
785 | | - bigquery_options=self.bigquery_options.to_proto(), |
786 | | - ) |
787 | | - |
788 | | - data_source_proto.event_timestamp_column = self.event_timestamp_column |
789 | | - data_source_proto.created_timestamp_column = self.created_timestamp_column |
790 | | - data_source_proto.date_partition_column = self.date_partition_column |
791 | | - |
792 | | - return data_source_proto |
793 | | - |
794 | | - def validate(self, config: RepoConfig): |
795 | | - if not self.query: |
796 | | - from google.api_core.exceptions import NotFound |
797 | | - from google.cloud import bigquery |
798 | | - |
799 | | - client = bigquery.Client() |
800 | | - try: |
801 | | - client.get_table(self.table_ref) |
802 | | - except NotFound: |
803 | | - raise DataSourceNotFoundException(self.table_ref) |
804 | | - |
805 | | - def get_table_query_string(self) -> str: |
806 | | - """Returns a string that can directly be used to reference this table in SQL""" |
807 | | - if self.table_ref: |
808 | | - return f"`{self.table_ref}`" |
809 | | - else: |
810 | | - return f"({self.query})" |
811 | | - |
812 | | - @staticmethod |
813 | | - def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: |
814 | | - return type_map.bq_to_feast_value_type |
815 | | - |
816 | | - def get_table_column_names_and_types( |
817 | | - self, config: RepoConfig |
818 | | - ) -> Iterable[Tuple[str, str]]: |
819 | | - from google.cloud import bigquery |
820 | | - |
821 | | - client = bigquery.Client() |
822 | | - if self.table_ref is not None: |
823 | | - table_schema = client.get_table(self.table_ref).schema |
824 | | - if not isinstance(table_schema[0], bigquery.schema.SchemaField): |
825 | | - raise TypeError("Could not parse BigQuery table schema.") |
826 | | - |
827 | | - name_type_pairs = [(field.name, field.field_type) for field in table_schema] |
828 | | - else: |
829 | | - bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1" |
830 | | - queryRes = client.query(bq_columns_query).result() |
831 | | - name_type_pairs = [ |
832 | | - (schema_field.name, schema_field.field_type) |
833 | | - for schema_field in queryRes.schema |
834 | | - ] |
835 | | - |
836 | | - return name_type_pairs |
837 | | - |
838 | | - |
839 | | -class BigQueryOptions: |
840 | | - """ |
841 | | - DataSource BigQuery options used to source features from BigQuery query |
842 | | - """ |
843 | | - |
844 | | - def __init__(self, table_ref: Optional[str], query: Optional[str]): |
845 | | - self._table_ref = table_ref |
846 | | - self._query = query |
847 | | - |
848 | | - @property |
849 | | - def query(self): |
850 | | - """ |
851 | | - Returns the BigQuery SQL query referenced by this source |
852 | | - """ |
853 | | - return self._query |
854 | | - |
855 | | - @query.setter |
856 | | - def query(self, query): |
857 | | - """ |
858 | | - Sets the BigQuery SQL query referenced by this source |
859 | | - """ |
860 | | - self._query = query |
861 | | - |
862 | | - @property |
863 | | - def table_ref(self): |
864 | | - """ |
865 | | - Returns the table ref of this BQ table |
866 | | - """ |
867 | | - return self._table_ref |
868 | | - |
869 | | - @table_ref.setter |
870 | | - def table_ref(self, table_ref): |
871 | | - """ |
872 | | - Sets the table ref of this BQ table |
873 | | - """ |
874 | | - self._table_ref = table_ref |
875 | | - |
876 | | - @classmethod |
877 | | - def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): |
878 | | - """ |
879 | | - Creates a BigQueryOptions from a protobuf representation of a BigQuery option |
880 | | -
|
881 | | - Args: |
882 | | - bigquery_options_proto: A protobuf representation of a DataSource |
883 | | -
|
884 | | - Returns: |
885 | | - Returns a BigQueryOptions object based on the bigquery_options protobuf |
886 | | - """ |
887 | | - |
888 | | - bigquery_options = cls( |
889 | | - table_ref=bigquery_options_proto.table_ref, |
890 | | - query=bigquery_options_proto.query, |
891 | | - ) |
892 | | - |
893 | | - return bigquery_options |
894 | | - |
895 | | - def to_proto(self) -> DataSourceProto.BigQueryOptions: |
896 | | - """ |
897 | | - Converts an BigQueryOptionsProto object to its protobuf representation. |
898 | | -
|
899 | | - Returns: |
900 | | - BigQueryOptionsProto protobuf |
901 | | - """ |
902 | | - |
903 | | - bigquery_options_proto = DataSourceProto.BigQueryOptions( |
904 | | - table_ref=self.table_ref, query=self.query, |
905 | | - ) |
906 | | - |
907 | | - return bigquery_options_proto |
0 commit comments