From 68d18d2421f5af74e9ba8dd14fdd9474ee0ce93a Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Mon, 12 Feb 2024 00:28:03 +0800 Subject: [PATCH 01/13] allowing follow-imports + fixing on_demand_feature_view.py Signed-off-by: Chester Ong --- Makefile | 2 +- sdk/python/feast/on_demand_feature_view.py | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 6736e64078..6fe0d62f37 100644 --- a/Makefile +++ b/Makefile @@ -310,7 +310,7 @@ format-python: cd ${ROOT_DIR}/sdk/python; python -m black --target-version py38 feast tests lint-python: - cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ --follow-imports=skip feast + cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ feast cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/ cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index fcafeaa2bc..e347953d09 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import copy import functools import warnings from datetime import datetime from types import FunctionType -from typing import Any, Dict, List, Optional, Type, Union +from typing import Dict, List, Optional, Type, Union import dill import pandas as pd @@ -16,6 +18,7 @@ from feast.feature_view import FeatureView from feast.feature_view_projection import FeatureViewProjection from feast.field import Field, from_value_type +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( OnDemandFeatureView as OnDemandFeatureViewProto, ) @@ -335,14 +338,14 @@ def get_transformed_features_df( df_with_features.drop(columns=columns_to_cleanup, inplace=True) return df_with_transformed_features.rename(columns=rename_columns) - def infer_features(self): + def infer_features(self) -> None: """ Infers the set of features associated to this feature view from the input source. Raises: RegistryInferenceFailure: The set of features could not be inferred. """ - rand_df_value: Dict[str, Any] = { + rand_df_value = { "float": 1.0, "int": 1, "str": "hello world", @@ -365,7 +368,7 @@ def infer_features(self): dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) sample_val = rand_df_value[dtype] if dtype in rand_df_value else None df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype) - output_df: pd.DataFrame = self.udf.__call__(df) + output_df = self.udf.__call__(df) inferred_features = [] for f, dt in zip(output_df.columns, output_df.dtypes): inferred_features.append( @@ -396,11 +399,11 @@ def infer_features(self): ) @staticmethod - def get_requested_odfvs(feature_refs, project, registry): + def get_requested_odfvs(feature_refs: List[str], project: str, registry: BaseRegistry) -> List[OnDemandFeatureView]: all_on_demand_feature_views = registry.list_on_demand_feature_views( project, allow_cache=True ) - requested_on_demand_feature_views: List[OnDemandFeatureView] = [] + requested_on_demand_feature_views = [] for odfv in all_on_demand_feature_views: for feature in odfv.features: if f"{odfv.name}:{feature.name}" in feature_refs: From 73bba2c62bf3b5af2c007f4a3ca08be00cf2d15e Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Mon, 12 Feb 2024 00:42:26 +0800 Subject: [PATCH 02/13] creating base type `SnowflakeStoreConfig` to fix circular imports Signed-off-by: Chester Ong --- .../feast/infra/offline_stores/snowflake.py | 42 +-------------- .../feast/infra/online_stores/snowflake.py | 37 ++----------- .../infra/utils/snowflake/snowflake_utils.py | 53 +++++++++++++++++-- sdk/python/feast/on_demand_feature_view.py | 4 +- 4 files changed, 57 insertions(+), 79 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index dd13ffc96c..9446ead3df 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -46,6 +46,7 @@ from feast.infra.registry.base_registry import BaseRegistry from feast.infra.utils.snowflake.snowflake_utils import ( GetSnowflakeConnection, + SnowflakeStoreConfig, execute_snowflake_statement, write_pandas, write_parquet, @@ -78,51 +79,12 @@ warnings.filterwarnings("ignore", category=DeprecationWarning) -class SnowflakeOfflineStoreConfig(FeastConfigBaseModel): +class SnowflakeOfflineStoreConfig(SnowflakeStoreConfig): """Offline store config for Snowflake""" type: Literal["snowflake.offline"] = "snowflake.offline" """ Offline store type selector """ - config_path: Optional[str] = os.path.expanduser("~/.snowsql/config") - """ Snowflake config path -- absolute path required (Cant use ~)""" - - account: Optional[str] = None - """ Snowflake deployment identifier -- drop .snowflakecomputing.com """ - - user: Optional[str] = None - """ Snowflake user name """ - - password: Optional[str] = None - """ Snowflake password """ - - role: Optional[str] = None - """ Snowflake role name """ - - warehouse: Optional[str] = None - """ Snowflake warehouse name """ - - authenticator: Optional[str] = None - """ Snowflake authenticator name """ - - database: StrictStr - """ Snowflake database name """ - - schema_: Optional[str] = Field("PUBLIC", alias="schema") - """ Snowflake schema name """ - - storage_integration_name: Optional[str] = None - """ Storage integration name in snowflake """ - - blob_export_location: Optional[str] = None - """ Location (in S3, Google storage or Azure storage) where data is offloaded """ - - convert_timestamp_columns: Optional[bool] = None - """ Convert timestamp columns on export to a Parquet-supported format """ - - class Config: - allow_population_by_field_name = True - class SnowflakeOfflineStore(OfflineStore): @staticmethod diff --git a/sdk/python/feast/infra/online_stores/snowflake.py b/sdk/python/feast/infra/online_stores/snowflake.py index c1a03a2862..6d6c313029 100644 --- a/sdk/python/feast/infra/online_stores/snowflake.py +++ b/sdk/python/feast/infra/online_stores/snowflake.py @@ -1,11 +1,9 @@ import itertools -import os from binascii import hexlify from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple import pandas as pd -from pydantic import Field, StrictStr from pydantic.schema import Literal from feast.entity import Entity @@ -14,53 +12,24 @@ from feast.infra.online_stores.online_store import OnlineStore from feast.infra.utils.snowflake.snowflake_utils import ( GetSnowflakeConnection, + SnowflakeStoreConfig, execute_snowflake_statement, get_snowflake_online_store_path, write_pandas_binary, ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.repo_config import RepoConfig from feast.usage import log_exceptions_and_usage from feast.utils import to_naive_utc -class SnowflakeOnlineStoreConfig(FeastConfigBaseModel): +class SnowflakeOnlineStoreConfig(SnowflakeStoreConfig): """Online store config for Snowflake""" type: Literal["snowflake.online"] = "snowflake.online" """ Online store type selector """ - config_path: Optional[str] = os.path.expanduser("~/.snowsql/config") - """ Snowflake config path -- absolute path required (Can't use ~)""" - - account: Optional[str] = None - """ Snowflake deployment identifier -- drop .snowflakecomputing.com """ - - user: Optional[str] = None - """ Snowflake user name """ - - password: Optional[str] = None - """ Snowflake password """ - - role: Optional[str] = None - """ Snowflake role name """ - - warehouse: Optional[str] = None - """ Snowflake warehouse name """ - - authenticator: Optional[str] = None - """ Snowflake authenticator name """ - - database: StrictStr - """ Snowflake database name """ - - schema_: Optional[str] = Field("PUBLIC", alias="schema") - """ Snowflake schema name """ - - class Config: - allow_population_by_field_name = True - class SnowflakeOnlineStore(OnlineStore): @log_exceptions_and_usage(online_store="snowflake") diff --git a/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py b/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py index 8eb5177ac2..28953b945e 100644 --- a/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py +++ b/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py @@ -12,6 +12,7 @@ import pyarrow from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization +from pydantic import Field, StrictStr from tenacity import ( retry, retry_if_exception_type, @@ -22,7 +23,7 @@ import feast from feast.errors import SnowflakeIncompleteConfig, SnowflakeQueryUnknownError from feast.feature_view import FeatureView -from feast.repo_config import RepoConfig +from feast.repo_config import FeastConfigBaseModel, RepoConfig try: import snowflake.connector @@ -33,7 +34,6 @@ raise FeastExtrasDependencyImportError("snowflake", str(e)) - getLogger("snowflake.connector.cursor").disabled = True getLogger("snowflake.connector.connection").disabled = True getLogger("snowflake.connector.network").disabled = True @@ -42,10 +42,56 @@ _cache = {} +class SnowflakeStoreConfig(FeastConfigBaseModel): + """Store config for Snowflake""" + + type: str + """ Online or Offine store type selector """ + + config_path: Optional[str] = os.path.expanduser("~/.snowsql/config") + """ Snowflake config path -- absolute path required (Cant use ~)""" + + account: Optional[str] = None + """ Snowflake deployment identifier -- drop .snowflakecomputing.com """ + + user: Optional[str] = None + """ Snowflake user name """ + + password: Optional[str] = None + """ Snowflake password """ + + role: Optional[str] = None + """ Snowflake role name """ + + warehouse: Optional[str] = None + """ Snowflake warehouse name """ + + authenticator: Optional[str] = None + """ Snowflake authenticator name """ + + database: StrictStr + """ Snowflake database name """ + + schema_: Optional[str] = Field("PUBLIC", alias="schema") + """ Snowflake schema name """ + + storage_integration_name: Optional[str] = None + """ Storage integration name in snowflake """ + + blob_export_location: Optional[str] = None + """ Location (in S3, Google storage or Azure storage) where data is offloaded """ + + convert_timestamp_columns: Optional[bool] = None + """ Convert timestamp columns on export to a Parquet-supported format """ + + class Config: + allow_population_by_field_name = True + + class GetSnowflakeConnection: def __init__( self, - config: str, + config: SnowflakeStoreConfig, autocommit=True, ): self.config = config @@ -516,7 +562,6 @@ def chunk_helper(lst: pd.DataFrame, n: int) -> Iterator[Tuple[int, pd.DataFrame] def parse_private_key_path(key_path: str, private_key_passphrase: str) -> bytes: - with open(key_path, "rb") as key: p_key = serialization.load_pem_private_key( key.read(), diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index e347953d09..3e7f46a72d 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -399,7 +399,9 @@ def infer_features(self) -> None: ) @staticmethod - def get_requested_odfvs(feature_refs: List[str], project: str, registry: BaseRegistry) -> List[OnDemandFeatureView]: + def get_requested_odfvs( + feature_refs: List[str], project: str, registry: BaseRegistry + ) -> List[OnDemandFeatureView]: all_on_demand_feature_views = registry.list_on_demand_feature_views( project, allow_cache=True ) From d80acd86424150397a7bba3912023622bca833fc Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Mon, 12 Feb 2024 00:57:19 +0800 Subject: [PATCH 03/13] fixing circular imports Signed-off-by: Chester Ong --- .../feast/infra/registry/base_registry.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index f89b079478..7bf9ad5c0a 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import json from abc import ABC, abstractmethod from collections import defaultdict @@ -20,13 +22,13 @@ from google.protobuf.json_format import MessageToJson from proto import Message +import feast.feature_service +import feast.on_demand_feature_view from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity -from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import Infra -from feast.on_demand_feature_view import OnDemandFeatureView from feast.project_metadata import ProjectMetadata from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.request_feature_view import RequestFeatureView @@ -150,7 +152,10 @@ def list_data_sources( # Feature service operations @abstractmethod def apply_feature_service( - self, feature_service: FeatureService, project: str, commit: bool = True + self, + feature_service: feast.feature_service.FeatureService, + project: str, + commit: bool = True, ): """ Registers a single feature service with Feast @@ -174,7 +179,7 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): @abstractmethod def get_feature_service( self, name: str, project: str, allow_cache: bool = False - ) -> FeatureService: + ) -> feast.feature_service.FeatureService: """ Retrieves a feature service. @@ -191,7 +196,7 @@ def get_feature_service( @abstractmethod def list_feature_services( self, project: str, allow_cache: bool = False - ) -> List[FeatureService]: + ) -> List[feast.feature_service.FeatureService]: """ Retrieve a list of feature services from the registry @@ -265,7 +270,7 @@ def list_stream_feature_views( @abstractmethod def get_on_demand_feature_view( self, name: str, project: str, allow_cache: bool = False - ) -> OnDemandFeatureView: + ) -> feast.on_demand_feature_view.OnDemandFeatureView: """ Retrieves an on demand feature view. @@ -282,7 +287,7 @@ def get_on_demand_feature_view( @abstractmethod def list_on_demand_feature_views( self, project: str, allow_cache: bool = False - ) -> List[OnDemandFeatureView]: + ) -> List[feast.on_demand_feature_view.OnDemandFeatureView]: """ Retrieve a list of on demand feature views from the registry From 3ecff669dc4f1c817c7a7804feca495f5ff35bae Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Mon, 12 Feb 2024 01:25:04 +0800 Subject: [PATCH 04/13] converting create_data_source to take in arbitrary arguments Signed-off-by: Chester Ong --- .../athena_offline_store/tests/data_source.py | 12 ++++--- .../mssql_offline_store/tests/data_source.py | 11 +++--- .../tests/data_source.py | 13 ++++--- .../spark_offline_store/tests/data_source.py | 11 +++--- .../trino_offline_store/tests/data_source.py | 13 ++++--- .../universal/data_source_creator.py | 14 ++------ .../universal/data_sources/bigquery.py | 11 +++--- .../universal/data_sources/file.py | 35 ++++++++++++------- .../universal/data_sources/redshift.py | 12 ++++--- .../universal/data_sources/snowflake.py | 14 ++++---- .../universal/online_store/bigtable.py | 14 ++++---- .../universal/online_store/datastore.py | 5 +-- .../universal/online_store/dynamodb.py | 13 +++---- 13 files changed, 99 insertions(+), 79 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py index f68e109d6c..4d16e2a9a5 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py @@ -47,12 +47,14 @@ def __init__(self, project_name: str, *args, **kwargs): def create_data_source( self, df: pd.DataFrame, - destination_name: str, - suffix: Optional[str] = None, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, + **kwargs, ) -> DataSource: + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_field = kwargs.get("timestamp_field", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) table_name = destination_name s3_target = ( diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py index 2604cf7c18..98f6cb0d25 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py @@ -63,12 +63,15 @@ def create_offline_store_config(self) -> MsSqlServerOfflineStoreConfig: def create_data_source( self, df: pd.DataFrame, - destination_name: str, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, **kwargs, ) -> DataSource: + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_field = kwargs.get("timestamp_field", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) + # Make sure the field mapping is correct and convert the datetime datasources. if timestamp_field in df: df[timestamp_field] = pd.to_datetime(df[timestamp_field], utc=True).fillna( diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py index 224fcea30f..c9d0ac0060 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py @@ -81,12 +81,15 @@ def __init__( def create_data_source( self, df: pd.DataFrame, - destination_name: str, - suffix: Optional[str] = None, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, + **kwargs, ) -> DataSource: + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_field = kwargs.get("timestamp_field", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) + destination_name = self.get_prefixed_table_name(destination_name) if self.offline_store_config: diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py index 7b4fda3b5f..f0cb5a7bd9 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py @@ -67,12 +67,15 @@ def create_offline_store_config(self): def create_data_source( self, df: pd.DataFrame, - destination_name: str, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, **kwargs, ) -> DataSource: + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_field = kwargs.get("timestamp_field", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) + if timestamp_field in df: df[timestamp_field] = pd.to_datetime(df[timestamp_field], utc=True) # Make sure the field mapping is correct and convert the datetime datasources. diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py index a5aa53df7a..162e12cbd4 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py @@ -80,12 +80,15 @@ def teardown(self): def create_data_source( self, df: pd.DataFrame, - destination_name: str, - suffix: Optional[str] = None, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, + **kwargs, ) -> DataSource: + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_field = kwargs.get("timestamp_field", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) + destination_name = self.get_prefixed_table_name(destination_name) self.client.execute_query( f"CREATE SCHEMA IF NOT EXISTS memory.{self.project_name}" diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index d64463606f..95419409fe 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -17,11 +17,7 @@ def __init__(self, project_name: str, *args, **kwargs): def create_data_source( self, df: pd.DataFrame, - destination_name: str, - event_timestamp_column="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, - timestamp_field: Optional[str] = None, + **kwargs, ) -> DataSource: """ Create a data source based on the dataframe. Implementing this method requires the underlying implementation to @@ -30,13 +26,7 @@ def create_data_source( Args: df: The dataframe to be used to create the data source. - destination_name: This str is used by the implementing classes to - isolate the multiple dataframes from each other. - event_timestamp_column: (Deprecated) Pass through for the underlying data source. - created_timestamp_column: Pass through for the underlying data source. - field_mapping: Pass through for the underlying data source. - timestamp_field: Pass through for the underlying data source. - + kwargs: Additional arguments to be passed to the underlying data source. Returns: A Data source object, pointing to a table or file that is uploaded/persisted for the purpose of the diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 215d19ba7f..7464f28103 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -63,13 +63,14 @@ def create_offline_store_config(self): def create_data_source( self, df: pd.DataFrame, - destination_name: str, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, **kwargs, ) -> DataSource: - + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_field = kwargs.get("timestamp_field", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) destination_name = self.get_prefixed_table_name(destination_name) self.create_dataset() diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 3263785683..6e51bdc757 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -38,11 +38,14 @@ def __init__(self, project_name: str, *args, **kwargs): def create_data_source( self, df: pd.DataFrame, - destination_name: str, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, + **kwargs, ) -> DataSource: + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_field = kwargs.get("timestamp_field", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) destination_name = self.get_prefixed_table_name(destination_name) @@ -93,11 +96,14 @@ class FileParquetDatasetSourceCreator(FileDataSourceCreator): def create_data_source( self, df: pd.DataFrame, - destination_name: str, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, + **kwargs, ) -> DataSource: + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_field = kwargs.get("timestamp_field", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) destination_name = self.get_prefixed_table_name(destination_name) @@ -167,12 +173,15 @@ def _upload_parquet_file(self, df, file_name, minio_endpoint): def create_data_source( self, df: pd.DataFrame, - destination_name: Optional[str] = None, - suffix: Optional[str] = None, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, + **kwargs, ) -> DataSource: + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_field = kwargs.get("timestamp_field", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) + filename = f"{destination_name}.parquet" port = self.minio.get_exposed_port("9000") host = self.minio.get_container_host_ip() diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py index e6f20d6125..c400fac69b 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py @@ -47,12 +47,14 @@ def __init__(self, project_name: str, *args, **kwargs): def create_data_source( self, df: pd.DataFrame, - destination_name: str, - suffix: Optional[str] = None, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, + **kwargs, ) -> DataSource: + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_field = kwargs.get("timestamp_field", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) destination_name = self.get_prefixed_table_name(destination_name) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index 1414291a18..0d07fdaaf3 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -47,12 +47,14 @@ def __init__(self, project_name: str, *args, **kwargs): def create_data_source( self, df: pd.DataFrame, - destination_name: str, - suffix: Optional[str] = None, - timestamp_field="ts", - created_timestamp_column="created_ts", - field_mapping: Optional[Dict[str, str]] = None, + **kwargs, ) -> DataSource: + destination_name = kwargs.get("destination_name") + if not destination_name: + raise ValueError("destination_name is required") + timestamp_column = kwargs.get("timestamp_column", "ts") + created_timestamp_column = kwargs.get("created_timestamp_column", "created_ts") + field_mapping = kwargs.get("field_mapping", None) destination_name = self.get_prefixed_table_name(destination_name) @@ -63,7 +65,7 @@ def create_data_source( return SnowflakeSource( table=destination_name, - timestamp_field=timestamp_field, + timestamp_field=timestamp_column, created_timestamp_column=created_timestamp_column, field_mapping=field_mapping or {"ts_1": "ts"}, ) diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/bigtable.py b/sdk/python/tests/integration/feature_repos/universal/online_store/bigtable.py index c06143e245..0314624c8c 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/bigtable.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/bigtable.py @@ -1,10 +1,10 @@ import os -from typing import Dict from google.cloud import bigtable from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs +from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.online_store_creator import ( OnlineStoreCreator, ) @@ -28,7 +28,7 @@ def __init__(self, project_name: str, **kwargs): .with_exposed_ports(self.port) ) - def create_online_store(self) -> Dict[str, str]: + def create_online_store(self) -> FeastConfigBaseModel: self.container.start() log_string_to_wait_for = r"\[bigtable\] Cloud Bigtable emulator running" wait_for_logs( @@ -36,11 +36,11 @@ def create_online_store(self) -> Dict[str, str]: ) exposed_port = self.container.get_exposed_port(self.port) os.environ[bigtable.client.BIGTABLE_EMULATOR] = f"{self.host}:{exposed_port}" - return { - "type": "bigtable", - "project_id": self.gcp_project, - "instance": self.bt_instance, - } + return FeastConfigBaseModel( + type="bigtable", + project_id=self.gcp_project, + instance=self.bt_instance, + ) def teardown(self): del os.environ[bigtable.client.BIGTABLE_EMULATOR] diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/datastore.py b/sdk/python/tests/integration/feature_repos/universal/online_store/datastore.py index b5bbb94f7c..26954ca0fe 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/datastore.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/datastore.py @@ -5,6 +5,7 @@ from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs +from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.online_store_creator import ( OnlineStoreCreator, ) @@ -23,7 +24,7 @@ def __init__(self, project_name: str, **kwargs): .with_exposed_ports("8081") ) - def create_online_store(self) -> Dict[str, str]: + def create_online_store(self) -> FeastConfigBaseModel: self.container.start() log_string_to_wait_for = r"\[datastore\] Dev App Server is now running" wait_for_logs( @@ -31,7 +32,7 @@ def create_online_store(self) -> Dict[str, str]: ) exposed_port = self.container.get_exposed_port("8081") os.environ[datastore.client.DATASTORE_EMULATOR_HOST] = f"0.0.0.0:{exposed_port}" - return {"type": "datastore", "project_id": "test-project"} + return FeastConfigBaseModel(type="datastore", project_id="test-project") def teardown(self): del os.environ[datastore.client.DATASTORE_EMULATOR_HOST] diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/dynamodb.py b/sdk/python/tests/integration/feature_repos/universal/online_store/dynamodb.py index 1aefdffb24..8445d5393f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/dynamodb.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/dynamodb.py @@ -3,6 +3,7 @@ from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs +from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.online_store_creator import ( OnlineStoreCreator, ) @@ -15,7 +16,7 @@ def __init__(self, project_name: str, **kwargs): "amazon/dynamodb-local:latest" ).with_exposed_ports("8000") - def create_online_store(self) -> Dict[str, str]: + def create_online_store(self) -> FeastConfigBaseModel: self.container.start() log_string_to_wait_for = ( "Initializing DynamoDB Local with the following configuration:" @@ -24,11 +25,11 @@ def create_online_store(self) -> Dict[str, str]: container=self.container, predicate=log_string_to_wait_for, timeout=10 ) exposed_port = self.container.get_exposed_port("8000") - return { - "type": "dynamodb", - "endpoint_url": f"http://localhost:{exposed_port}", - "region": "us-west-2", - } + return FeastConfigBaseModel( + type="dynamodb", + endpoint_url=f"http://localhost:{exposed_port}", + region="us-west-2", + ) def teardown(self): self.container.stop() From 39fb45b4c47c63df6a69e07b46c8a130fc80f35c Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Mon, 12 Feb 2024 01:33:19 +0800 Subject: [PATCH 05/13] lint fix on base function `create_online_store` Signed-off-by: Chester Ong --- .../contrib/mssql_offline_store/mssql.py | 4 ++-- .../mssql_offline_store/tests/data_source.py | 1 + .../universal/online_store/cassandra.py | 15 ++++++++------- .../universal/online_store/hazelcast.py | 13 +++++++------ .../universal/online_store/hbase.py | 9 +++++++-- .../universal/online_store/mysql.py | 17 +++++++++-------- .../universal/online_store/redis.py | 7 +++++-- 7 files changed, 39 insertions(+), 27 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py index 849d5cc797..c9c775cd1a 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py @@ -32,7 +32,7 @@ from feast.infra.provider import RetrievalJob from feast.infra.registry.base_registry import BaseRegistry from feast.on_demand_feature_view import OnDemandFeatureView -from feast.repo_config import FeastBaseModel, RepoConfig +from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.type_map import pa_to_mssql_type from feast.usage import log_exceptions_and_usage @@ -43,7 +43,7 @@ EntitySchema = Dict[str, np.dtype] -class MsSqlServerOfflineStoreConfig(FeastBaseModel): +class MsSqlServerOfflineStoreConfig(FeastConfigBaseModel): """Offline store config for SQL Server""" type: Literal["mssql"] = "mssql" diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py index 98f6cb0d25..8507414da6 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py @@ -14,6 +14,7 @@ from feast.infra.offline_stores.contrib.mssql_offline_store.mssqlserver_source import ( MsSqlServerSource, ) +from feast.repo_config import FeastConfigBaseModel from feast.saved_dataset import SavedDatasetStorage from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py b/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py index 190d94a830..da1fbcb40f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py @@ -20,6 +20,7 @@ from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs +from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.online_store_creator import ( OnlineStoreCreator, ) @@ -32,7 +33,7 @@ def __init__(self, project_name: str, **kwargs): "9042" ) - def create_online_store(self) -> Dict[str, object]: + def create_online_store(self) -> FeastConfigBaseModel: self.container.start() log_string_to_wait_for = "Startup complete" # on a modern machine it takes about 45-60 seconds for the container @@ -45,12 +46,12 @@ def create_online_store(self) -> Dict[str, object]: self.container.exec(f'cqlsh -e "{keyspace_creation_command}"') time.sleep(2) exposed_port = int(self.container.get_exposed_port("9042")) - return { - "type": "cassandra", - "hosts": ["127.0.0.1"], - "port": exposed_port, - "keyspace": keyspace_name, - } + return FeastConfigBaseModel( + type="cassandra", + hosts=["127.0.0.1"], + port=exposed_port, + keyspace=keyspace_name, + ) def teardown(self): self.container.stop() diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py b/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py index 65d74135ae..6d3624f715 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py @@ -6,6 +6,7 @@ from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs +from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.online_store_creator import ( OnlineStoreCreator, ) @@ -29,7 +30,7 @@ def __init__(self, project_name: str, **kwargs): .with_exposed_ports(5701) ) - def create_online_store(self) -> Dict[str, Any]: + def create_online_store(self) -> FeastConfigBaseModel: self.container.start() cluster_member = ( self.container.get_container_host_ip() @@ -38,11 +39,11 @@ def create_online_store(self) -> Dict[str, Any]: ) log_string_to_wait_for = r"Cluster name: " + self.cluster_name wait_for_logs(self.container, predicate=log_string_to_wait_for, timeout=10) - return { - "type": "hazelcast", - "cluster_name": self.cluster_name, - "cluster_members": [cluster_member], - } + return FeastConfigBaseModel( + type="hazelcast", + cluster_name=self.cluster_name, + cluster_members=[cluster_member], + ) def teardown(self): self.container.stop() diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py b/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py index dba611b30b..461e29b173 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py @@ -3,6 +3,7 @@ from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs +from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.online_store_creator import ( OnlineStoreCreator, ) @@ -13,7 +14,7 @@ def __init__(self, project_name: str, **kwargs): super().__init__(project_name) self.container = DockerContainer("harisekhon/hbase").with_exposed_ports("9090") - def create_online_store(self) -> Dict[str, str]: + def create_online_store(self) -> FeastConfigBaseModel: self.container.start() log_string_to_wait_for = ( "Initializing Hbase Local with the following configuration:" @@ -22,7 +23,11 @@ def create_online_store(self) -> Dict[str, str]: container=self.container, predicate=log_string_to_wait_for, timeout=10 ) exposed_port = self.container.get_exposed_port("9090") - return {"type": "hbase", "host": "127.0.0.1", "port": exposed_port} + return FeastConfigBaseModel( + type="hbase", + host="127.0.0.1", + port=exposed_port, + ) def teardown(self): self.container.stop() diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py b/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py index 093295c86b..f0ee61a04f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py @@ -2,6 +2,7 @@ from testcontainers.mysql import MySqlContainer +from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.online_store_creator import ( OnlineStoreCreator, ) @@ -18,16 +19,16 @@ def __init__(self, project_name: str, **kwargs): .with_env("MYSQL_DATABASE", "test") ) - def create_online_store(self) -> Dict[str, str]: + def create_online_store(self) -> FeastConfigBaseModel: self.container.start() exposed_port = self.container.get_exposed_port(3306) - return { - "type": "mysql", - "user": "root", - "password": "test", - "database": "test", - "port": exposed_port, - } + return FeastConfigBaseModel( + type="mysql", + user="root", + password="test", + database="test", + port=exposed_port, + ) def teardown(self): self.container.stop() diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py b/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py index 11d62d9d30..cc27a83ca7 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py @@ -3,6 +3,7 @@ from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs +from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.online_store_creator import ( OnlineStoreCreator, ) @@ -13,14 +14,16 @@ def __init__(self, project_name: str, **kwargs): super().__init__(project_name) self.container = DockerContainer("redis").with_exposed_ports("6379") - def create_online_store(self) -> Dict[str, str]: + def create_online_store(self) -> FeastConfigBaseModel: self.container.start() log_string_to_wait_for = "Ready to accept connections" wait_for_logs( container=self.container, predicate=log_string_to_wait_for, timeout=10 ) exposed_port = self.container.get_exposed_port("6379") - return {"type": "redis", "connection_string": f"localhost:{exposed_port},db=0"} + return FeastConfigBaseModel( + type="redis", connection_string=f"localhost:{exposed_port},db=0" + ) def teardown(self): self.container.stop() From 83818e065fe3dbe71f14a7379c7a865186d05ddc Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Mon, 12 Feb 2024 01:34:25 +0800 Subject: [PATCH 06/13] lint fix on ingest_stream_feature_view Signed-off-by: Chester Ong --- .../feast/infra/contrib/spark_kafka_processor.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index bac1c28b06..1c1fb07c41 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -68,13 +68,10 @@ def __init__( # data_source type has been checked to be an instance of KafkaSource. self.data_source: KafkaSource = self.data_source # type: ignore - def ingest_stream_feature_view( - self, to: PushMode = PushMode.ONLINE - ) -> StreamingQuery: + def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE): ingested_stream_df = self._ingest_stream_data() transformed_df = self._construct_transformation_plan(ingested_stream_df) - online_store_query = self._write_stream_data(transformed_df, to) - return online_store_query + self._write_stream_data(transformed_df, to) def _ingest_stream_data(self) -> StreamTable: """Only supports json and avro formats currently.""" @@ -129,7 +126,7 @@ def _ingest_stream_data(self) -> StreamTable: def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: return self.sfv.udf.__call__(df) if self.sfv.udf else df - def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery: + def _write_stream_data(self, df: StreamTable, to: PushMode): # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. def batch_write(row: DataFrame, batch_id: int): rows: pd.DataFrame = row.toPandas() @@ -168,4 +165,3 @@ def batch_write(row: DataFrame, batch_id: int): ) query.awaitTermination(timeout=self.query_timeout) - return query From 2fa14b7ea3fef74ac899ac0cf86f7fdf8a642702 Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Mon, 12 Feb 2024 02:07:57 +0800 Subject: [PATCH 07/13] lint fixes in configuration Signed-off-by: Chester Ong --- .../feature_repos/repo_configuration.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 027dea2c58..4937379fa0 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -99,14 +99,14 @@ "host": os.getenv("ROCKSET_APISERVER", "api.rs2.usw2.rockset.com"), } -OFFLINE_STORE_TO_PROVIDER_CONFIG: Dict[str, DataSourceCreator] = { +OFFLINE_STORE_TO_PROVIDER_CONFIG = { "file": ("local", FileDataSourceCreator), "bigquery": ("gcp", BigQueryDataSourceCreator), "redshift": ("aws", RedshiftDataSourceCreator), "snowflake": ("aws", SnowflakeDataSourceCreator), } -AVAILABLE_OFFLINE_STORES: List[Tuple[str, Type[DataSourceCreator]]] = [ +AVAILABLE_OFFLINE_STORES: List[Tuple[str, Any]] = [ ("local", FileDataSourceCreator), ] @@ -118,13 +118,10 @@ # Only configure Cloud DWH if running full integration tests if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True": - AVAILABLE_OFFLINE_STORES.extend( - [ - ("gcp", BigQueryDataSourceCreator), - ("aws", RedshiftDataSourceCreator), - ("aws", SnowflakeDataSourceCreator), - ] - ) + + AVAILABLE_OFFLINE_STORES.append(("gcp", BigQueryDataSourceCreator)) + AVAILABLE_OFFLINE_STORES.append(("aws", RedshiftDataSourceCreator)) + AVAILABLE_OFFLINE_STORES.append(("aws", SnowflakeDataSourceCreator)) AVAILABLE_ONLINE_STORES["redis"] = (REDIS_CONFIG, None) AVAILABLE_ONLINE_STORES["dynamodb"] = (DYNAMO_CONFIG, None) From a96c266b575f1895f1a6727b9df1af97700402b4 Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Mon, 12 Feb 2024 02:09:46 +0800 Subject: [PATCH 08/13] isort linting Signed-off-by: Chester Ong --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 1 - .../contrib/athena_offline_store/tests/data_source.py | 2 +- .../contrib/mssql_offline_store/tests/data_source.py | 3 +-- .../contrib/postgres_offline_store/tests/data_source.py | 2 +- .../contrib/spark_offline_store/tests/data_source.py | 2 +- .../contrib/trino_offline_store/tests/data_source.py | 2 +- sdk/python/feast/infra/offline_stores/snowflake.py | 4 +--- .../feature_repos/universal/data_source_creator.py | 1 - .../feature_repos/universal/data_sources/bigquery.py | 2 +- .../integration/feature_repos/universal/data_sources/file.py | 2 +- .../feature_repos/universal/data_sources/redshift.py | 2 +- .../feature_repos/universal/data_sources/snowflake.py | 2 +- .../feature_repos/universal/online_store/cassandra.py | 1 - .../feature_repos/universal/online_store/datastore.py | 1 - .../feature_repos/universal/online_store/dynamodb.py | 2 -- .../feature_repos/universal/online_store/hazelcast.py | 1 - .../integration/feature_repos/universal/online_store/hbase.py | 2 -- .../integration/feature_repos/universal/online_store/mysql.py | 2 -- .../integration/feature_repos/universal/online_store/redis.py | 2 -- 19 files changed, 10 insertions(+), 26 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 1c1fb07c41..829b43ee48 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -5,7 +5,6 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.avro.functions import from_avro from pyspark.sql.functions import col, from_json -from pyspark.sql.streaming import StreamingQuery from feast.data_format import AvroFormat, JsonFormat from feast.data_source import KafkaSource, PushMode diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py index 4d16e2a9a5..4d29cf064d 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py @@ -1,6 +1,6 @@ import os import uuid -from typing import Dict, List, Optional +from typing import List import pandas as pd diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py index 8507414da6..04efbbb35b 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional +from typing import List import pandas as pd import pytest @@ -14,7 +14,6 @@ from feast.infra.offline_stores.contrib.mssql_offline_store.mssqlserver_source import ( MsSqlServerSource, ) -from feast.repo_config import FeastConfigBaseModel from feast.saved_dataset import SavedDatasetStorage from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py index c9d0ac0060..1e378b7a25 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py @@ -1,5 +1,5 @@ import logging -from typing import Dict, Optional +from typing import Dict import pandas as pd import pytest diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py index f0cb5a7bd9..d14d07de85 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py @@ -2,7 +2,7 @@ import shutil import tempfile import uuid -from typing import Dict, List, Optional +from typing import List import pandas as pd from pyspark import SparkConf diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py index 162e12cbd4..850a596586 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py @@ -1,6 +1,6 @@ import pathlib import uuid -from typing import Dict, List, Optional +from typing import List import pandas as pd import pytest diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 9446ead3df..088f0105f8 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -1,6 +1,5 @@ import contextlib import json -import os import uuid import warnings from datetime import datetime @@ -23,7 +22,6 @@ import numpy as np import pandas as pd import pyarrow -from pydantic import Field, StrictStr from pydantic.typing import Literal from pytz import utc @@ -51,7 +49,7 @@ write_pandas, write_parquet, ) -from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.types import ( Array, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index 95419409fe..3e2d14f5c3 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -from typing import Dict, Optional import pandas as pd diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 7464f28103..2a0e0bc9f6 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -1,6 +1,6 @@ import os import uuid -from typing import Dict, List, Optional +from typing import List, Optional import pandas as pd from google.cloud import bigquery diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 6e51bdc757..aad8543224 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -2,7 +2,7 @@ import shutil import tempfile import uuid -from typing import Any, Dict, List, Optional +from typing import Any, List import pandas as pd import pyarrow as pa diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py index c400fac69b..eb65e9b53c 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py @@ -1,6 +1,6 @@ import os import uuid -from typing import Dict, List, Optional +from typing import List import pandas as pd diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index 0d07fdaaf3..a318b2a253 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -1,6 +1,6 @@ import os import uuid -from typing import Dict, List, Optional +from typing import List import pandas as pd diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py b/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py index da1fbcb40f..13dac69397 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/cassandra.py @@ -15,7 +15,6 @@ # import time -from typing import Dict from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/datastore.py b/sdk/python/tests/integration/feature_repos/universal/online_store/datastore.py index 26954ca0fe..52d0e8243a 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/datastore.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/datastore.py @@ -1,5 +1,4 @@ import os -from typing import Dict from google.cloud import datastore from testcontainers.core.container import DockerContainer diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/dynamodb.py b/sdk/python/tests/integration/feature_repos/universal/online_store/dynamodb.py index 8445d5393f..35781cccb1 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/dynamodb.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/dynamodb.py @@ -1,5 +1,3 @@ -from typing import Dict - from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py b/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py index 6d3624f715..233021e185 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/hazelcast.py @@ -1,7 +1,6 @@ import logging import random import string -from typing import Any, Dict from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py b/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py index 461e29b173..430a54c28c 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/hbase.py @@ -1,5 +1,3 @@ -from typing import Dict - from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py b/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py index f0ee61a04f..6c54c0119e 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/mysql.py @@ -1,5 +1,3 @@ -from typing import Dict - from testcontainers.mysql import MySqlContainer from feast.repo_config import FeastConfigBaseModel diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py b/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py index cc27a83ca7..62e8b28262 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py @@ -1,5 +1,3 @@ -from typing import Dict - from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs From f0f7fec79447d73b731238368c4958b690563638 Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Mon, 12 Feb 2024 02:17:01 +0800 Subject: [PATCH 09/13] mypy checking of all files in sdk/python folder Signed-off-by: Chester Ong --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 6fe0d62f37..fd061cd08c 100644 --- a/Makefile +++ b/Makefile @@ -310,7 +310,7 @@ format-python: cd ${ROOT_DIR}/sdk/python; python -m black --target-version py38 feast tests lint-python: - cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ feast + cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/ cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests From 28010b7861405c4f9a708b3afc8974defc90a5a0 Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Mon, 12 Feb 2024 02:30:30 +0800 Subject: [PATCH 10/13] checking only feast directory for mypy errors Signed-off-by: Chester Ong --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index fd061cd08c..ce9951d89d 100644 --- a/Makefile +++ b/Makefile @@ -310,7 +310,7 @@ format-python: cd ${ROOT_DIR}/sdk/python; python -m black --target-version py38 feast tests lint-python: - cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ + cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ feast/ cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/ cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests From 5dbe13d8abcd33ce199aca9d22a38866d3f820cd Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Tue, 13 Feb 2024 16:30:54 +0800 Subject: [PATCH 11/13] allow extras for FeastConfigBaseModel Signed-off-by: Chester Ong --- sdk/python/feast/repo_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 3461ae058b..092b48c40f 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -103,7 +103,7 @@ class FeastConfigBaseModel(BaseModel): class Config: arbitrary_types_allowed = True - extra = "forbid" + extra = "allow" class RegistryConfig(FeastBaseModel): From 004949ddd1d63f42594dd323df99a6c44929e985 Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Tue, 13 Feb 2024 17:01:17 +0800 Subject: [PATCH 12/13] fixing tests Signed-off-by: Chester Ong --- sdk/python/tests/conftest.py | 2 +- .../integration/materialization/contrib/spark/test_spark.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 728bd9b34f..19c877477b 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -347,7 +347,7 @@ def e2e_data_sources(environment: Environment): df = create_basic_driver_dataset() data_source = environment.data_source_creator.create_data_source( df, - environment.feature_store.project, + destination_name=environment.feature_store.project, field_mapping={"ts_1": "ts"}, ) diff --git a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py index c7028a09ef..015b6d0593 100644 --- a/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py +++ b/sdk/python/tests/integration/materialization/contrib/spark/test_spark.py @@ -38,7 +38,7 @@ def test_spark_materialization_consistency(): ds = spark_environment.data_source_creator.create_data_source( df, - spark_environment.feature_store.project, + destination_name=spark_environment.feature_store.project, field_mapping={"ts_1": "ts"}, ) From 2696cc337b4a64983bc043b065b7c0d2a6c18cb7 Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Tue, 13 Feb 2024 17:02:26 +0800 Subject: [PATCH 13/13] adding extra field for online store config Signed-off-by: Chester Ong --- sdk/python/tests/unit/infra/scaffolding/test_repo_config.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py b/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py index 42229f8683..7a9b22292a 100644 --- a/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py +++ b/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py @@ -154,8 +154,7 @@ def test_extra_field(): path: "online_store.db" """ ), - expect_error="__root__ -> online_store -> that_field_should_not_be_here\n" - " extra fields not permitted (type=value_error.extra)", + expect_error=None, # extra fields are allowed )