From ce78c62125c642a77b694ebde23925245d2c9be5 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Thu, 19 Mar 2020 09:35:07 +0800 Subject: [PATCH 1/4] Create new configuration class --- sdk/python/feast/config.py | 249 +++++++++++++++++--------------- sdk/python/tests/test_config.py | 118 +++++++++++++++ 2 files changed, 249 insertions(+), 118 deletions(-) create mode 100644 sdk/python/tests/test_config.py diff --git a/sdk/python/feast/config.py b/sdk/python/feast/config.py index 061bf24c3b7..8fa627b7284 100644 --- a/sdk/python/feast/config.py +++ b/sdk/python/feast/config.py @@ -13,152 +13,165 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +from configparser import ConfigParser +from os.path import expanduser, join import logging import os import sys -from os.path import expanduser, join -from typing import Dict -from urllib.parse import ParseResult, urlparse - +from typing import Dict, Union +from urllib.parse import urlparse +from urllib.parse import ParseResult +from typing import TextIO import toml +from typing import Optional _logger = logging.getLogger(__name__) -feast_configuration_properties = {"core_url": "URL", "serving_url": "URL"} +FEAST_DEFAULT_OPTIONS = { + "CORE_URL": "localhost:6565", + "SERVING_URL": "localhost:6565" +} CONFIGURATION_FILE_DIR = os.environ.get("FEAST_CONFIG", ".feast") -CONFIGURATION_FILE_NAME = "config.toml" - - -def _get_or_create_config() -> Dict: - """Get user configuration file or create it and return""" - - user_config_file_dir, user_config_file_path = _get_config_file_locations() - user_config_file_dir = user_config_file_dir.rstrip("/") + "/" - if not os.path.exists(os.path.dirname(user_config_file_dir)): - os.makedirs(os.path.dirname(user_config_file_dir)) - - if not os.path.isfile(user_config_file_path): - _save_config(user_config_file_path, _props_to_dict()) - - try: - return toml.load(user_config_file_path) - except FileNotFoundError: - _logger.error( - "Could not find Feast configuration file " + user_config_file_path - ) - sys.exit(1) - except toml.decoder.TomlDecodeError: - _logger.error( - "Could not decode Feast configuration file " + user_config_file_path - ) - sys.exit(1) - except Exception as e: - _logger.error(e) - sys.exit(1) - - -def set_property(prop: str, value: str): +CONFIGURATION_FILE_NAME = "config" +CONFIGURATION_FILE_SECTION = "general" +FEAST_ENV_VAR_PREFIX = 'FEAST_' + + +def _init_config(path: str): """ - Sets a single property in the Feast users local configuration file + Returns a ConfigParser that reads in a feast configuration file. If the + file does not exist it will be created. Args: - prop: Feast property name - value: Feast property value + path: Optional path to initialize as Feast configuration + + Returns: ConfigParser of the Feast configuration file, with defaults + preloaded + """ - if _is_valid_property(prop, value): - active_feast_config = _get_or_create_config() - active_feast_config[prop] = value - _, user_config_file_path = _get_config_file_locations() - _save_config(user_config_file_path, active_feast_config) - print("Updated property [%s]" % prop) - else: - _logger.error("Invalid property selected") - sys.exit(1) - - -def get_config_property_or_fail(prop: str, force_config: Dict[str, str] = None) -> str: + # Create the configuration file directory if needed + config_dir = os.path.dirname(path) + config_dir = config_dir.rstrip("/") + "/" + + if not os.path.exists(os.path.dirname(config_dir)): + os.makedirs(os.path.dirname(config_dir)) + + # Create the configuration file itself + config = ConfigParser(defaults=FEAST_DEFAULT_OPTIONS) + if os.path.exists(path): + config.read(path) + + # Store all configuration in a single section + if not config.has_section(CONFIGURATION_FILE_SECTION): + config.add_section(CONFIGURATION_FILE_SECTION) + + # Save the current configuration + config.write(open(path, 'w')) + + return config + + +def _get_feast_env_vars(): """ - Gets a single property in the users configuration + Get environmental variables that start with FEAST_ + Returns: Dict of Feast environmental variables (stripped of prefix) + """ + feast_env_vars = {} + for key in os.environ.keys(): + if key.upper().startswith(FEAST_ENV_VAR_PREFIX): + feast_env_vars[key[len(FEAST_ENV_VAR_PREFIX):]] = os.environ[key] + return feast_env_vars - Args: - prop: Property to retrieve - force_config: Configuration dictionary containing properties that should - be overridden. This will take precedence over file based properties. - Returns: - Returns a string property +class Config: """ - if ( - isinstance(force_config, dict) - and prop in force_config - and force_config[prop] is not None + Maintains and provides access to Feast configuration + + Configuration is stored as key/value pairs. The user can specify options + through either input arguments to this class, environmental variables, or + by setting the config in a configuration file + + """ + + def __init__(self, + options: Optional[Dict[str, str]] = None, + path: Optional[str] = None, ): - return force_config[prop] + """ + Configuration options are returned as follows (higher replaces lower) + 1. Initialized options ("options" argument) + 2. Environmental variables (reloaded on every "get") + 3. Configuration file options (loaded once) + 4. Default options (loaded once from memory) - active_feast_config = _get_or_create_config() - if _is_valid_property(prop, active_feast_config[prop]): - return active_feast_config[prop] - _logger.error("Could not load Feast property from configuration: %s" % prop) - sys.exit(1) + Args: + options: (optional) A list of initialized/hardcoded options. + path: (optional) File path to configuration file + """ + if not path: + path = join(expanduser("~"), CONFIGURATION_FILE_DIR, + CONFIGURATION_FILE_NAME) + config = _init_config(path) -def _props_to_dict() -> Dict[str, str]: - """Create empty dictionary of all Feast properties""" - prop_dict = {} - for prop in feast_configuration_properties: - prop_dict[prop] = "" - return prop_dict + self._options = {} + if options and isinstance(options, dict): + self._options = options + self._config = config # type: ConfigParser + def get(self, option): + """ + Returns a single configuration option as a string -def _is_valid_property(prop: str, value: str) -> bool: - """ - Validates both a Feast property as well as value + Args: + option: Name of the option - Args: - prop: Feast property name - value: Feast property value + Returns: String option that is returned - Returns: - Returns True if property and value are valid - """ - if prop not in feast_configuration_properties: - _logger.error("You are trying to set an invalid property") - sys.exit(1) + """ + return self._config.get(CONFIGURATION_FILE_SECTION, option, + vars={**_get_feast_env_vars(), + **self._options}) - prop_type = feast_configuration_properties[prop] + def getboolean(self, option): + """ + Returns a single configuration option as a boolean - if prop_type == "URL": - if "//" not in value: - value = "%s%s" % ("grpc://", value) - parsed_value = urlparse(value) # type: ParseResult - if parsed_value.netloc: - return True + Args: + option: Name of the option - _logger.error("The property you are trying to set could not be identified") - sys.exit(1) + Returns: Boolean option value that is returned + """ + return self._config.getboolean(CONFIGURATION_FILE_SECTION, option, + vars={**_get_feast_env_vars(), + **self._options}) -def _save_config(user_config_file_path: str, config_string: Dict[str, str]): - """ - Saves Feast configuration + def getint(self, option): + """ + Returns a single configuration option as an integer - Args: - user_config_file_path: Local file system path to save configuration - config_string: Contents in dictionary format to save to path - """ - try: - with open(user_config_file_path, "w+") as f: - toml.dump(config_string, f) - except Exception as e: - _logger.error("Could not update configuration file for Feast") - print(e) - sys.exit(1) - - -def _get_config_file_locations() -> (str, str): - """Gets the local user configuration directory and file path""" - user_config_file_dir = join(expanduser("~"), CONFIGURATION_FILE_DIR) - user_config_file_path = join(user_config_file_dir, CONFIGURATION_FILE_NAME) - return user_config_file_dir, user_config_file_path + Args: + option: Name of the option + + Returns: Integer option value that is returned + + """ + return self._config.getint(CONFIGURATION_FILE_SECTION, option, + vars={**_get_feast_env_vars(), + **self._options}) + + def getfloat(self, option): + """ + Returns a single configuration option as an integer + + Args: + option: Name of the option + + Returns: Float option value that is returned + + """ + return self._config.getfloat(CONFIGURATION_FILE_SECTION, option, + vars={**_get_feast_env_vars(), + **self._options}) diff --git a/sdk/python/tests/test_config.py b/sdk/python/tests/test_config.py new file mode 100644 index 00000000000..7824c289fc3 --- /dev/null +++ b/sdk/python/tests/test_config.py @@ -0,0 +1,118 @@ +# Copyright 2020 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from feast.config import Config +from tempfile import mkstemp +import os + + +class TestConfig: + @pytest.fixture + def normal_config(self): + fd, path = mkstemp() + return Config(path=path) + + def test_init_config_file_with_path(self): + configuration_string = \ + '[general]\nCORE_URL = grpc://127.0.0.1:6565' + + fd, path = mkstemp() + with open(fd, 'w') as f: + f.write(configuration_string) + config = Config(path=path) + assert config.get("core_url") == "grpc://127.0.0.1:6565" + + def test_load_environmental_variable(self, normal_config): + import os + serving_url = 'http://196.25.1.1' + os.environ['FEAST_SERVING_URL'] = serving_url + assert normal_config.get('SERVING_URL') == serving_url + del os.environ['FEAST_SERVING_URL'] + + def test_env_var_not_case_sensitive(self, normal_config): + import os + serving_url = 'http://196.25.1.1' + os.environ['FEAST_SerVING_url'] = serving_url + assert normal_config.get('SERVING_URL') == serving_url + + def test_force_options(self): + fd, path = mkstemp() + options = {'feast_config_1': "one", "random_config_two": 2} + config = Config(options, path) + assert config.get('feast_config_1') == 'one' + + def test_init_options_precedence(self): + """ + Init options > env var > file options > default options + """ + fd, path = mkstemp() + os.environ['FEAST_CORE_URL'] = 'env' + options = {'core_url': "init", "serving_url": "init"} + configuration_string = \ + '[general]\nCORE_URL = file\n' + with open(fd, 'w') as f: + f.write(configuration_string) + config = Config(options, path) + assert config.get('core_url') == 'init' + del os.environ['FEAST_CORE_URL'] + + def test_env_var_precedence(self): + """ + Env vars > file options > default options + """ + fd, path = mkstemp() + os.environ['FEAST_CORE_URL'] = 'env' + configuration_string = \ + '[general]\nCORE_URL = file\n' + with open(fd, 'w') as f: + f.write(configuration_string) + config = Config(path=path) + assert config.get('CORE_URL') == 'env' + + del os.environ['FEAST_CORE_URL'] + + def test_file_option_precedence(self): + """ + file options > default options + """ + fd, path = mkstemp() + configuration_string = \ + '[general]\nCORE_URL = file\n' + with open(fd, 'w') as f: + f.write(configuration_string) + config = Config(path=path) + assert config.get('CORE_URL') == 'file' + + def test_default_options(self): + """ + default options + """ + fd, path = mkstemp() + config = Config(path=path) + assert config.get('CORE_URL') == 'localhost:6565' + + def test_type_casting(self): + """ + Test type casting of strings to other types + """ + fd, path = mkstemp() + os.environ['FEAST_INT_VAR'] = '1' + os.environ['FEAST_FLOAT_VAR'] = '1.0' + os.environ['FEAST_BOOLEAN_VAR'] = 'True' + config = Config(path=path) + + assert config.getint('INT_VAR') == 1 + assert config.getfloat('FLOAT_VAR') == 1.0 + assert config.getboolean('BOOLEAN_VAR') is True From 870f17b0c2cfbf04ce5b9fd496aa6ac265214c8c Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Thu, 19 Mar 2020 11:25:22 +0800 Subject: [PATCH 2/4] Add new configuration class to CLI and Client class --- sdk/python/feast/cli.py | 67 +++------ sdk/python/feast/client.py | 139 +++++++++--------- sdk/python/feast/config.py | 93 ++++++++---- sdk/python/feast/constants.py | 17 ++- sdk/python/tests/test_client.py | 243 ++++++++++++++++---------------- sdk/python/tests/test_config.py | 21 +++ 6 files changed, 316 insertions(+), 264 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 27b98b1086e..b8f4ec30b69 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -17,6 +17,7 @@ import sys import click +from feast.config import Config import pkg_resources import toml import yaml @@ -25,6 +26,9 @@ from feast.client import Client from feast.feature_set import FeatureSet from feast.loaders.yaml import yaml_loader +import yaml +import json +from feast.constants import * _logger = logging.getLogger(__name__) @@ -50,7 +54,8 @@ def cli(): @cli.command() @click.option( - "--client-only", "-c", is_flag=True, help="Print only the version of the CLI" + "--client-only", "-c", is_flag=True, + help="Print only the version of the CLI" ) @common_options def version(client_only: bool, **kwargs): @@ -64,14 +69,7 @@ def version(client_only: bool, **kwargs): } if not client_only: - feast_client = Client( - core_url=feast_config.get_config_property_or_fail( - "core_url", force_config=kwargs - ), - serving_url=feast_config.get_config_property_or_fail( - "serving_url", force_config=kwargs - ), - ) + feast_client = Client(**kwargs) feast_versions_dict.update(feast_client.version()) print(json.dumps(feast_versions_dict)) @@ -94,13 +92,8 @@ def config_list(): """ List Feast properties for the currently active configuration """ - try: - feast_config_string = toml.dumps(feast_config._get_or_create_config()) - if not feast_config_string.strip(): - print("Configuration has not been set") - else: - print(feast_config_string.replace('""', "").strip()) + print(Config()) except Exception as e: _logger.error("Error occurred when reading Feast configuration file") _logger.exception(e) @@ -115,7 +108,9 @@ def config_set(prop, value): Set a Feast properties for the currently active configuration """ try: - feast_config.set_property(prop.strip(), value.strip()) + conf = Config() + conf.set(option=prop.strip(), value=value.strip()) + conf.save() except Exception as e: _logger.error("Error in reading config file") _logger.exception(e) @@ -135,9 +130,7 @@ def feature_set_list(): """ List all feature sets """ - feast_client = Client( - core_url=feast_config.get_config_property_or_fail("core_url") - ) # type: Client + feast_client = Client() # type: Client table = [] for fs in feast_client.list_feature_sets(): @@ -160,12 +153,9 @@ def feature_set_create(filename): Create or update a feature set """ - feature_sets = [FeatureSet.from_dict(fs_dict) for fs_dict in yaml_loader(filename)] - - feast_client = Client( - core_url=feast_config.get_config_property_or_fail("core_url") - ) # type: Client - + feature_sets = [FeatureSet.from_dict(fs_dict) for fs_dict in + yaml_loader(filename)] + feast_client = Client() # type: Client feast_client.apply(feature_sets) @@ -176,10 +166,7 @@ def feature_set_describe(name: str, version: int): """ Describe a feature set """ - feast_client = Client( - core_url=feast_config.get_config_property_or_fail("core_url") - ) # type: Client - + feast_client = Client() # type: Client fs = feast_client.get_feature_set(name=name, version=version) if not fs: print( @@ -187,7 +174,8 @@ def feature_set_describe(name: str, version: int): ) return - print(yaml.dump(yaml.safe_load(str(fs)), default_flow_style=False, sort_keys=False)) + print(yaml.dump(yaml.safe_load(str(fs)), default_flow_style=False, + sort_keys=False)) @cli.group(name="projects") @@ -204,9 +192,7 @@ def project_create(name: str): """ Create a project """ - feast_client = Client( - core_url=feast_config.get_config_property_or_fail("core_url") - ) # type: Client + feast_client = Client() # type: Client feast_client.create_project(name) @@ -216,10 +202,8 @@ def project_archive(name: str): """ Archive a project """ - feast_client = Client( - core_url=feast_config.get_config_property_or_fail("core_url") - ) # type: Client - feast_client.archive_project(name) + feast_client = # type: Client + Client().archive_project(name) @project.command(name="list") @@ -227,9 +211,7 @@ def project_list(): """ List all projects """ - feast_client = Client( - core_url=feast_config.get_config_property_or_fail("core_url") - ) # type: Client + feast_client = Client() # type: Client table = [] for project in feast_client.list_projects(): @@ -265,10 +247,7 @@ def ingest(name, version, filename, file_type): Ingest feature data into a feature set """ - feast_client = Client( - core_url=feast_config.get_config_property_or_fail("core_url") - ) # type: Client - + feast_client = Client() # type: Client feature_set = feast_client.get_feature_set(name=name, version=version) feature_set.ingest_file(file_path=filename) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index ffdb71743d0..2acbf78f88e 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + import logging import os import shutil @@ -19,6 +20,7 @@ import time from collections import OrderedDict from math import ceil +from typing import Dict, List, Tuple, Union, Optional from typing import Dict, List, Optional, Tuple, Union import grpc @@ -60,17 +62,11 @@ GetOnlineFeaturesResponse, ) from feast.serving.ServingService_pb2_grpc import ServingServiceStub +from feast.config import Config +from feast.constants import * _logger = logging.getLogger(__name__) -GRPC_CONNECTION_TIMEOUT_DEFAULT = 3 # type: int -GRPC_CONNECTION_TIMEOUT_APPLY = 600 # type: int -FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL" -FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" -FEAST_PROJECT_ENV_KEY = "FEAST_PROJECT" -FEAST_CORE_SECURE_ENV_KEY = "FEAST_CORE_SECURE" -FEAST_SERVING_SECURE_ENV_KEY = "FEAST_SERVING_SECURE" -BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS = 300 CPU_COUNT = os.cpu_count() # type: int @@ -80,13 +76,7 @@ class Client: """ def __init__( - self, - core_url: str = None, - serving_url: str = None, - project: str = None, - core_secure: bool = None, - serving_secure: bool = None, - ): + self, options: Optional[Dict[str, str]] = None, **kwargs): """ The Feast Client should be initialized with at least one service url @@ -96,12 +86,16 @@ def __init__( project: Sets the active project. This field is optional. core_secure: Use client-side SSL/TLS for Core gRPC API serving_secure: Use client-side SSL/TLS for Serving gRPC API + options: Configuration options to initialize client with + **kwargs: Additional keyword arguments that will be used as + configuration options along with "options" """ - self._core_url: str = core_url - self._serving_url: str = serving_url - self._project: str = project - self._core_secure: bool = core_secure - self._serving_secure: bool = serving_secure + + if options is None: + options = dict() + self._config = Config(options={**options, + **kwargs}) + self.__core_channel: grpc.Channel = None self.__serving_channel: grpc.Channel = None self._core_service_stub: CoreServiceStub = None @@ -115,12 +109,7 @@ def core_url(self) -> str: Returns: Feast Core URL string """ - - if self._core_url is not None: - return self._core_url - if os.getenv(FEAST_CORE_URL_ENV_KEY) is not None: - return os.getenv(FEAST_CORE_URL_ENV_KEY) - return "" + return self._config.get(CONFIG_CORE_URL_KEY) @core_url.setter def core_url(self, value: str): @@ -130,7 +119,7 @@ def core_url(self, value: str): Args: value: Feast Core URL """ - self._core_url = value + self._config.set(CONFIG_CORE_URL_KEY, value) @property def serving_url(self) -> str: @@ -140,11 +129,7 @@ def serving_url(self) -> str: Returns: Feast Serving URL string """ - if self._serving_url is not None: - return self._serving_url - if os.getenv(FEAST_SERVING_URL_ENV_KEY) is not None: - return os.getenv(FEAST_SERVING_URL_ENV_KEY) - return "" + return self._config.get(CONFIG_SERVING_URL_KEY) @serving_url.setter def serving_url(self, value: str): @@ -154,7 +139,7 @@ def serving_url(self, value: str): Args: value: Feast Serving URL """ - self._serving_url = value + self._config.set(CONFIG_SERVING_URL_KEY, value) @property def core_secure(self) -> bool: @@ -164,10 +149,7 @@ def core_secure(self) -> bool: Returns: Whether client-side SSL/TLS is enabled """ - - if self._core_secure is not None: - return self._core_secure - return os.getenv(FEAST_CORE_SECURE_ENV_KEY, "").lower() == "true" + return self._config.getboolean(CONFIG_CORE_SECURE_KEY) @core_secure.setter def core_secure(self, value: bool): @@ -177,7 +159,7 @@ def core_secure(self, value: bool): Args: value: True to enable client-side SSL/TLS """ - self._core_secure = value + self._config.set(CONFIG_CORE_SECURE_KEY, value) @property def serving_secure(self) -> bool: @@ -187,10 +169,7 @@ def serving_secure(self) -> bool: Returns: Whether client-side SSL/TLS is enabled """ - - if self._serving_secure is not None: - return self._serving_secure - return os.getenv(FEAST_SERVING_SECURE_ENV_KEY, "").lower() == "true" + return self._config.getboolean(CONFIG_SERVING_SECURE_KEY) @serving_secure.setter def serving_secure(self, value: bool): @@ -200,7 +179,7 @@ def serving_secure(self, value: bool): Args: value: True to enable client-side SSL/TLS """ - self._serving_secure = value + self._config.set(CONFIG_SERVING_SECURE_KEY, value) def version(self): """ @@ -211,14 +190,19 @@ def version(self): if self.serving_url: self._connect_serving() serving_version = self._serving_service_stub.GetFeastServingInfo( - GetFeastServingInfoRequest(), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT + GetFeastServingInfoRequest(), + timeout=self._config.getint( + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) ).version - result["serving"] = {"url": self.serving_url, "version": serving_version} + result["serving"] = {"url": self.serving_url, + "version": serving_version} if self.core_url: self._connect_core() core_version = self._core_service_stub.GetFeastCoreVersion( - GetFeastCoreVersionRequest(), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT + GetFeastCoreVersionRequest(), + timeout=self._config.getint( + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) ).version result["core"] = {"url": self.core_url, "version": core_version} @@ -239,15 +223,15 @@ def _connect_core(self, skip_if_connected: bool = True): if self.__core_channel is None: if self.core_secure or self.core_url.endswith(":443"): - self.__core_channel = grpc.secure_channel( - self.core_url, grpc.ssl_channel_credentials() - ) + self.__core_channel = grpc.secure_channel(self.core_url, + grpc.ssl_channel_credentials()) else: self.__core_channel = grpc.insecure_channel(self.core_url) try: grpc.channel_ready_future(self.__core_channel).result( - timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT + timeout=self._config.getint( + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) ) except grpc.FutureTimeoutError: raise ConnectionError( @@ -273,15 +257,15 @@ def _connect_serving(self, skip_if_connected=True): if self.__serving_channel is None: if self.serving_secure or self.serving_url.endswith(":443"): - self.__serving_channel = grpc.secure_channel( - self.serving_url, grpc.ssl_channel_credentials() - ) + self.__serving_channel = grpc.secure_channel(self.serving_url, + grpc.ssl_channel_credentials()) else: self.__serving_channel = grpc.insecure_channel(self.serving_url) try: grpc.channel_ready_future(self.__serving_channel).result( - timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT + timeout=self._config.getint( + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) ) except grpc.FutureTimeoutError: raise ConnectionError( @@ -289,7 +273,8 @@ def _connect_serving(self, skip_if_connected=True): f"Serving gRPC server {self.serving_url} " ) else: - self._serving_service_stub = ServingServiceStub(self.__serving_channel) + self._serving_service_stub = ServingServiceStub( + self.__serving_channel) @property def project(self) -> Union[str, None]: @@ -299,11 +284,7 @@ def project(self) -> Union[str, None]: Returns: Project name """ - if self._project is not None: - return self._project - if os.getenv(FEAST_PROJECT_ENV_KEY) is not None: - return os.getenv(FEAST_PROJECT_ENV_KEY) - return None + return self._config.get("project") def set_project(self, project: str): """ @@ -312,7 +293,7 @@ def set_project(self, project: str): Args: project: Project to set as active """ - self._project = project + self._config.set("project", project) def list_projects(self) -> List[str]: """ @@ -324,7 +305,8 @@ def list_projects(self) -> List[str]: """ self._connect_core() response = self._core_service_stub.ListProjects( - ListProjectsRequest(), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT + ListProjectsRequest(), timeout=self._config.getint( + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) ) # type: ListProjectsResponse return list(response.projects) @@ -338,7 +320,9 @@ def create_project(self, project: str): self._connect_core() self._core_service_stub.CreateProject( - CreateProjectRequest(name=project), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT + CreateProjectRequest(name=project), + timeout=self._config.getint( + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) ) # type: CreateProjectResponse def archive_project(self, project): @@ -353,7 +337,9 @@ def archive_project(self, project): self._connect_core() self._core_service_stub.ArchiveProject( - ArchiveProjectRequest(name=project), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT + ArchiveProjectRequest(name=project), + timeout=self._config.getint( + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) ) # type: ArchiveProjectResponse if self._project == project: @@ -402,7 +388,8 @@ def _apply_feature_set(self, feature_set: FeatureSet): try: apply_fs_response = self._core_service_stub.ApplyFeatureSet( ApplyFeatureSetRequest(feature_set=feature_set_proto), - timeout=GRPC_CONNECTION_TIMEOUT_APPLY, + timeout=self._config.getint( + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ) # type: ApplyFeatureSetResponse except grpc.RpcError as e: raise grpc.RpcError(e.details()) @@ -573,7 +560,9 @@ def get_batch_features( # Retrieve serving information to determine store type and # staging location serving_info = self._serving_service_stub.GetFeastServingInfo( - GetFeastServingInfoRequest(), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT + GetFeastServingInfoRequest(), + timeout=self._config.getint( + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) ) # type: GetFeastServingInfoResponse if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH: @@ -587,7 +576,8 @@ def get_batch_features( # Remove timezone from datetime column if isinstance( - entity_rows["datetime"].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype + entity_rows["datetime"].dtype, + pd.core.dtypes.dtypes.DatetimeTZDtype ): entity_rows["datetime"] = pd.DatetimeIndex( entity_rows["datetime"] @@ -614,7 +604,8 @@ def get_batch_features( features=feature_references, dataset_source=DatasetSource( file_source=DatasetSource.FileSource( - file_uris=staged_files, data_format=DataFormat.DATA_FORMAT_AVRO + file_uris=staged_files, + data_format=DataFormat.DATA_FORMAT_AVRO ) ), ) @@ -723,7 +714,8 @@ def ingest( raise Exception(f"Feature set name must be provided") # Read table and get row count - dir_path, dest_path = _read_table_from_source(source, chunk_size, max_workers) + dir_path, dest_path = _read_table_from_source(source, chunk_size, + max_workers) pq_file = pq.ParquetFile(dest_path) @@ -742,7 +734,8 @@ def ingest( print("Waiting for feature set to be ready for ingestion...") while True: if timeout is not None and time.time() - current_time >= timeout: - raise TimeoutError("Timed out waiting for feature set to be ready") + raise TimeoutError( + "Timed out waiting for feature set to be ready") feature_set = self.get_feature_set(name, version) if ( feature_set is not None @@ -850,7 +843,8 @@ def _build_feature_references( f'Could not parse feature ref {feature_ref}, expecting "project/feature:version"' ) - features.append(FeatureReference(project=project, name=name, version=version)) + features.append( + FeatureReference(project=project, name=name, version=version)) return features @@ -905,7 +899,8 @@ def _read_table_from_source( else: table = pq.read_table(file_path) else: - raise ValueError(f"Unknown data source provided for ingestion: {source}") + raise ValueError( + f"Unknown data source provided for ingestion: {source}") # Ensure that PyArrow table is initialised assert isinstance(table, pa.lib.Table) diff --git a/sdk/python/feast/config.py b/sdk/python/feast/config.py index 8fa627b7284..77d81d867e6 100644 --- a/sdk/python/feast/config.py +++ b/sdk/python/feast/config.py @@ -13,30 +13,27 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from configparser import ConfigParser +from configparser import ConfigParser, NoOptionError from os.path import expanduser, join import logging import os -import sys -from typing import Dict, Union -from urllib.parse import urlparse -from urllib.parse import ParseResult -from typing import TextIO -import toml +from typing import Dict from typing import Optional +from feast.constants import * _logger = logging.getLogger(__name__) FEAST_DEFAULT_OPTIONS = { - "CORE_URL": "localhost:6565", - "SERVING_URL": "localhost:6565" + CONFIG_PROJECT_KEY: "default", + CONFIG_CORE_URL_KEY: "localhost:6565", + CONFIG_CORE_SECURE_KEY: "False", + CONFIG_SERVING_URL_KEY: "localhost:6565", + CONFIG_SERVING_SECURE_KEY: 'False', + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: '3', + CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY: '600', + CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY: '600', } -CONFIGURATION_FILE_DIR = os.environ.get("FEAST_CONFIG", ".feast") -CONFIGURATION_FILE_NAME = "config" -CONFIGURATION_FILE_SECTION = "general" -FEAST_ENV_VAR_PREFIX = 'FEAST_' - def _init_config(path: str): """ @@ -63,8 +60,8 @@ def _init_config(path: str): config.read(path) # Store all configuration in a single section - if not config.has_section(CONFIGURATION_FILE_SECTION): - config.add_section(CONFIGURATION_FILE_SECTION) + if not config.has_section(CONFIG_FILE_SECTION): + config.add_section(CONFIG_FILE_SECTION) # Save the current configuration config.write(open(path, 'w')) @@ -79,8 +76,9 @@ def _get_feast_env_vars(): """ feast_env_vars = {} for key in os.environ.keys(): - if key.upper().startswith(FEAST_ENV_VAR_PREFIX): - feast_env_vars[key[len(FEAST_ENV_VAR_PREFIX):]] = os.environ[key] + if key.upper().startswith(CONFIG_FEAST_ENV_VAR_PREFIX): + feast_env_vars[key[len(CONFIG_FEAST_ENV_VAR_PREFIX):]] = os.environ[ + key] return feast_env_vars @@ -110,15 +108,19 @@ def __init__(self, path: (optional) File path to configuration file """ if not path: - path = join(expanduser("~"), CONFIGURATION_FILE_DIR, - CONFIGURATION_FILE_NAME) + path = join(expanduser("~"), + os.environ.get(FEAST_CONFIG_FILE_ENV_KEY, + CONFIG_FILE_DEFAULT_DIRECTORY), + CONFIG_FILE_NAME) config = _init_config(path) self._options = {} if options and isinstance(options, dict): self._options = options + self._config = config # type: ConfigParser + self._path = path # type: str def get(self, option): """ @@ -130,7 +132,7 @@ def get(self, option): Returns: String option that is returned """ - return self._config.get(CONFIGURATION_FILE_SECTION, option, + return self._config.get(CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}) @@ -144,7 +146,7 @@ def getboolean(self, option): Returns: Boolean option value that is returned """ - return self._config.getboolean(CONFIGURATION_FILE_SECTION, option, + return self._config.getboolean(CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}) @@ -158,7 +160,7 @@ def getint(self, option): Returns: Integer option value that is returned """ - return self._config.getint(CONFIGURATION_FILE_SECTION, option, + return self._config.getint(CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}) @@ -172,6 +174,49 @@ def getfloat(self, option): Returns: Float option value that is returned """ - return self._config.getfloat(CONFIGURATION_FILE_SECTION, option, + return self._config.getfloat(CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}) + + def set(self, option, value): + """ + Sets a configuration option. Must be serializable to string + Args: + option: Option name to use as key + value: Value to store under option + """ + self._config.set(CONFIG_FILE_SECTION, option, + value=str(value)) + + def exists(self, option): + """ + Tests whether a specific option is available + + Args: + option: Name of the option to check + + Returns: Boolean true/false whether the option is set + + """ + try: + self.get(option=option) + return True + except NoOptionError: + return False + + def save(self): + """ + Save the current configuration to disk. This does not include + environmental variables or initialized options + """ + self._config.write(open(self._path, 'w')) + + def __str__(self): + result = '' + for section_name in self._config.sections(): + result += '\n[' + section_name + ']\n' + + for name, value in self._config.items(section_name): + result += name + ' = ' + value + '\n' + + return result diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 9b001ac4067..115b14d2dea 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -14,4 +14,19 @@ # limitations under the License. # -DATETIME_COLUMN = "datetime" # type: str +DATETIME_COLUMN = 'datetime' +FEAST_CONFIG_FILE_ENV_KEY = "FEAST_CONFIG" +CONFIG_FEAST_ENV_VAR_PREFIX = 'FEAST_' +CONFIG_FILE_DEFAULT_DIRECTORY = ".feast" +CONFIG_FILE_NAME = "config" +CONFIG_FILE_SECTION = "general" + +CONFIG_CORE_URL_KEY = "core_url" +CONFIG_SERVING_URL_KEY = "serving_url" +CONFIG_PROJECT_KEY = "project" +CONFIG_CORE_SECURE_KEY = "core_secure" +CONFIG_SERVING_SECURE_KEY = "serving_secure" +CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = 'grpc_connection_timeout_default' +CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = 'grpc_connection_timeout_apply_key' +CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = 'batch_feature_request_wait_time_seconds' + diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 1478256c066..223d903ab2b 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -94,7 +94,8 @@ def mock_client(self, mocker): @pytest.fixture def server_credentials(self): private_key = pkgutil.get_data(__name__, _PRIVATE_KEY_RESOURCE_PATH) - certificate_chain = pkgutil.get_data(__name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) + certificate_chain = pkgutil.get_data(__name__, + _CERTIFICATE_CHAIN_RESOURCE_PATH) return grpc.ssl_server_credentials(((private_key, certificate_chain),)) @pytest.fixture @@ -136,36 +137,28 @@ def secure_serving_server(self, server_credentials): @pytest.fixture def secure_client(self, secure_core_server, secure_serving_server): - root_certificate_credentials = pkgutil.get_data( - __name__, _ROOT_CERTIFICATE_RESOURCE_PATH - ) + root_certificate_credentials = pkgutil.get_data(__name__, + _ROOT_CERTIFICATE_RESOURCE_PATH) # this is needed to establish a secure connection using self-signed certificates, for the purpose of the test ssl_channel_credentials = grpc.ssl_channel_credentials( - root_certificates=root_certificate_credentials - ) - with mock.patch( - "grpc.ssl_channel_credentials", - MagicMock(return_value=ssl_channel_credentials), - ): - yield Client( - core_url="localhost:50053", - serving_url="localhost:50054", - core_secure=True, - serving_secure=True, - ) + root_certificates=root_certificate_credentials) + with mock.patch("grpc.ssl_channel_credentials", + MagicMock(return_value=ssl_channel_credentials)): + yield Client(core_url="localhost:50053", + serving_url="localhost:50054", core_secure=True, + serving_secure=True) @pytest.fixture def client(self, core_server, serving_server): return Client(core_url="localhost:50051", serving_url="localhost:50052") - @pytest.mark.parametrize( - "mocked_client", - [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], - ) + @pytest.mark.parametrize("mocked_client", + [pytest.lazy_fixture("mock_client"), + pytest.lazy_fixture("secure_mock_client") + ]) def test_version(self, mocked_client, mocker): mocked_client._core_service_stub = Core.CoreServiceStub( - grpc.insecure_channel("") - ) + grpc.insecure_channel("")) mocked_client._serving_service_stub = Serving.ServingServiceStub( grpc.insecure_channel("") ) @@ -190,10 +183,10 @@ def test_version(self, mocked_client, mocker): and status["serving"]["version"] == "0.3.2" ) - @pytest.mark.parametrize( - "mocked_client", - [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], - ) + @pytest.mark.parametrize("mocked_client", + [pytest.lazy_fixture("mock_client"), + pytest.lazy_fixture("secure_mock_client") + ]) def test_get_online_features(self, mocked_client, mocker): ROW_COUNT = 300 @@ -203,7 +196,8 @@ def test_get_online_features(self, mocked_client, mocker): fields = dict() for feature_num in range(1, 10): - fields[f"my_project/feature_{str(feature_num)}:1"] = ValueProto.Value( + fields[ + f"my_project/feature_{str(feature_num)}:1"] = ValueProto.Value( int64_val=feature_num ) field_values = GetOnlineFeaturesResponse.FieldValues(fields=fields) @@ -214,7 +208,8 @@ def test_get_online_features(self, mocked_client, mocker): response.field_values.append(field_values) entity_rows.append( GetOnlineFeaturesRequest.EntityRow( - fields={"customer_id": ValueProto.Value(int64_val=row_number)} + fields={ + "customer_id": ValueProto.Value(int64_val=row_number)} ) ) @@ -240,18 +235,19 @@ def test_get_online_features(self, mocked_client, mocker): ) # type: GetOnlineFeaturesResponse assert ( - response.field_values[0].fields["my_project/feature_1:1"].int64_val == 1 - and response.field_values[0].fields["my_project/feature_9:1"].int64_val == 9 + response.field_values[0].fields[ + "my_project/feature_1:1"].int64_val == 1 + and response.field_values[0].fields[ + "my_project/feature_9:1"].int64_val == 9 ) - @pytest.mark.parametrize( - "mocked_client", - [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], - ) + @pytest.mark.parametrize("mocked_client", + [pytest.lazy_fixture("mock_client"), + pytest.lazy_fixture("secure_mock_client") + ]) def test_get_feature_set(self, mocked_client, mocker): mocked_client._core_service_stub = Core.CoreServiceStub( - grpc.insecure_channel("") - ) + grpc.insecure_channel("")) from google.protobuf.duration_pb2 import Duration @@ -283,7 +279,8 @@ def test_get_feature_set(self, mocked_client, mocker): source=Source( type=SourceType.KAFKA, kafka_source_config=KafkaSourceConfig( - bootstrap_servers="localhost:9092", topic="topic" + bootstrap_servers="localhost:9092", + topic="topic" ), ), ), @@ -305,18 +302,17 @@ def test_get_feature_set(self, mocked_client, mocker): and len(feature_set.entities) == 1 ) - @pytest.mark.parametrize( - "mocked_client", - [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], - ) + @pytest.mark.parametrize("mocked_client", + [pytest.lazy_fixture("mock_client"), + pytest.lazy_fixture("secure_mock_client") + ]) def test_get_batch_features(self, mocked_client, mocker): mocked_client._serving_service_stub = Serving.ServingServiceStub( grpc.insecure_channel("") ) mocked_client._core_service_stub = Core.CoreServiceStub( - grpc.insecure_channel("") - ) + grpc.insecure_channel("")) mocker.patch.object( mocked_client._core_service_stub, @@ -329,7 +325,8 @@ def test_get_batch_features(self, mocked_client, mocker): project="my_project", entities=[ EntitySpecProto( - name="customer", value_type=ValueProto.ValueType.INT64 + name="customer", + value_type=ValueProto.ValueType.INT64 ), EntitySpecProto( name="transaction", @@ -347,7 +344,8 @@ def test_get_batch_features(self, mocked_client, mocker): ), ], ), - meta=FeatureSetMetaProto(status=FeatureSetStatusProto.STATUS_READY), + meta=FeatureSetMetaProto( + status=FeatureSetStatusProto.STATUS_READY), ) ), ) @@ -407,7 +405,8 @@ def test_get_batch_features(self, mocked_client, mocker): entity_rows=pd.DataFrame( { "datetime": [ - pd.datetime.now(tz=timezone("Asia/Singapore")) for _ in range(3) + pd.datetime.now(tz=timezone("Asia/Singapore")) for _ in + range(3) ], "customer": [1001, 1002, 1003], "transaction": [1001, 1002, 1003], @@ -424,17 +423,19 @@ def test_get_batch_features(self, mocked_client, mocker): actual_dataframe = response.to_dataframe() assert actual_dataframe[ - ["my_project/customer_feature_1:1", "my_project/customer_feature_2:1"] + ["my_project/customer_feature_1:1", + "my_project/customer_feature_2:1"] ].equals( expected_dataframe[ - ["my_project/customer_feature_1:1", "my_project/customer_feature_2:1"] + ["my_project/customer_feature_1:1", + "my_project/customer_feature_2:1"] ] ) - @pytest.mark.parametrize( - "test_client", - [pytest.lazy_fixture("client"), pytest.lazy_fixture("secure_client")], - ) + @pytest.mark.parametrize("test_client", [pytest.lazy_fixture("client"), + pytest.lazy_fixture( + "secure_client") + ]) def test_apply_feature_set_success(self, test_client): test_client.set_project("project1") @@ -465,17 +466,15 @@ def test_apply_feature_set_success(self, test_client): and feature_sets[1].features[1].dtype == ValueType.BYTES_LIST ) - @pytest.mark.parametrize( - "dataframe,test_client", - [ - (dataframes.GOOD, pytest.lazy_fixture("client")), - (dataframes.GOOD, pytest.lazy_fixture("secure_client")), - ], - ) + @pytest.mark.parametrize("dataframe,test_client", + [(dataframes.GOOD, pytest.lazy_fixture("client")), + (dataframes.GOOD, + pytest.lazy_fixture("secure_client"))]) def test_feature_set_ingest_success(self, dataframe, test_client, mocker): test_client.set_project("project1") driver_fs = FeatureSet( - "driver-feature-set", source=KafkaSource(brokers="kafka:9092", topic="test") + "driver-feature-set", + source=KafkaSource(brokers="kafka:9092", topic="test") ) driver_fs.add(Feature(name="feature_1", dtype=ValueType.FLOAT)) driver_fs.add(Feature(name="feature_2", dtype=ValueType.STRING)) @@ -498,13 +497,11 @@ def test_feature_set_ingest_success(self, dataframe, test_client, mocker): # Ingest data into Feast test_client.ingest("driver-feature-set", dataframe) - @pytest.mark.parametrize( - "dataframe,exception,test_client", - [ - (dataframes.GOOD, TimeoutError, pytest.lazy_fixture("client")), - (dataframes.GOOD, TimeoutError, pytest.lazy_fixture("secure_client")), - ], - ) + @pytest.mark.parametrize("dataframe,exception,test_client", + [(dataframes.GOOD, TimeoutError, + pytest.lazy_fixture("client")), + (dataframes.GOOD, TimeoutError, + pytest.lazy_fixture("secure_client"))]) def test_feature_set_ingest_fail_if_pending( self, dataframe, exception, test_client, mocker ): @@ -538,29 +535,25 @@ def test_feature_set_ingest_fail_if_pending( @pytest.mark.parametrize( "dataframe,exception,test_client", [ - (dataframes.BAD_NO_DATETIME, Exception, pytest.lazy_fixture("client")), + (dataframes.BAD_NO_DATETIME, Exception, + pytest.lazy_fixture("client")), + (dataframes.BAD_INCORRECT_DATETIME_TYPE, Exception, + pytest.lazy_fixture("client")), ( - dataframes.BAD_INCORRECT_DATETIME_TYPE, - Exception, - pytest.lazy_fixture("client"), - ), - (dataframes.BAD_NO_ENTITY, Exception, pytest.lazy_fixture("client")), + dataframes.BAD_NO_ENTITY, Exception, pytest.lazy_fixture("client")), (dataframes.NO_FEATURES, Exception, pytest.lazy_fixture("client")), - ( - dataframes.BAD_NO_DATETIME, - Exception, - pytest.lazy_fixture("secure_client"), - ), - ( - dataframes.BAD_INCORRECT_DATETIME_TYPE, - Exception, - pytest.lazy_fixture("secure_client"), - ), - (dataframes.BAD_NO_ENTITY, Exception, pytest.lazy_fixture("secure_client")), - (dataframes.NO_FEATURES, Exception, pytest.lazy_fixture("secure_client")), + (dataframes.BAD_NO_DATETIME, Exception, + pytest.lazy_fixture("secure_client")), + (dataframes.BAD_INCORRECT_DATETIME_TYPE, Exception, + pytest.lazy_fixture("secure_client")), + (dataframes.BAD_NO_ENTITY, Exception, + pytest.lazy_fixture("secure_client")), + (dataframes.NO_FEATURES, Exception, + pytest.lazy_fixture("secure_client")), ], ) - def test_feature_set_ingest_failure(self, test_client, dataframe, exception): + def test_feature_set_ingest_failure(self, test_client, dataframe, + exception): with pytest.raises(exception): # Create feature set driver_fs = FeatureSet("driver-feature-set") @@ -574,13 +567,9 @@ def test_feature_set_ingest_failure(self, test_client, dataframe, exception): # Ingest data into Feast test_client.ingest(driver_fs, dataframe=dataframe) - @pytest.mark.parametrize( - "dataframe,test_client", - [ - (dataframes.ALL_TYPES, pytest.lazy_fixture("client")), - (dataframes.ALL_TYPES, pytest.lazy_fixture("secure_client")), - ], - ) + @pytest.mark.parametrize("dataframe,test_client", [ + (dataframes.ALL_TYPES, pytest.lazy_fixture("client")), + (dataframes.ALL_TYPES, pytest.lazy_fixture("secure_client"))]) def test_feature_set_types_success(self, test_client, dataframe, mocker): test_client.set_project("project1") @@ -599,12 +588,14 @@ def test_feature_set_types_success(self, test_client, dataframe, mocker): Feature(name="float_list_feature", dtype=ValueType.FLOAT_LIST), Feature(name="int64_list_feature", dtype=ValueType.INT64_LIST), Feature(name="int32_list_feature", dtype=ValueType.INT32_LIST), - Feature(name="string_list_feature", dtype=ValueType.STRING_LIST), + Feature(name="string_list_feature", + dtype=ValueType.STRING_LIST), Feature(name="bytes_list_feature", dtype=ValueType.BYTES_LIST), # Feature(name="bool_list_feature", # dtype=ValueType.BOOL_LIST), # TODO: Add support for this # type again https://github.com/gojek/feast/issues/341 - Feature(name="double_list_feature", dtype=ValueType.DOUBLE_LIST), + Feature(name="double_list_feature", + dtype=ValueType.DOUBLE_LIST), ], max_age=Duration(seconds=3600), ) @@ -615,7 +606,8 @@ def test_feature_set_types_success(self, test_client, dataframe, mocker): mocker.patch.object( test_client._core_service_stub, "GetFeatureSet", - return_value=GetFeatureSetResponse(feature_set=all_types_fs.to_proto()), + return_value=GetFeatureSetResponse( + feature_set=all_types_fs.to_proto()), ) # Need to create a mock producer @@ -625,40 +617,45 @@ def test_feature_set_types_success(self, test_client, dataframe, mocker): @patch("grpc.channel_ready_future") def test_secure_channel_creation_with_secure_client(self, _mocked_obj): - client = Client( - core_url="localhost:50051", - serving_url="localhost:50052", - serving_secure=True, - core_secure=True, - ) - with mock.patch("grpc.secure_channel") as _grpc_mock, mock.patch( - "grpc.ssl_channel_credentials", MagicMock(return_value="test") - ) as _mocked_credentials: + client = Client(core_url="localhost:50051", + serving_url="localhost:50052", serving_secure=True, + core_secure=True) + with mock.patch("grpc.secure_channel") as _grpc_mock, \ + mock.patch("grpc.ssl_channel_credentials", + MagicMock(return_value="test")) as _mocked_credentials: client._connect_serving() - _grpc_mock.assert_called_with( - client.serving_url, _mocked_credentials.return_value - ) + _grpc_mock.assert_called_with(client.serving_url, + _mocked_credentials.return_value) @mock.patch("grpc.channel_ready_future") - def test_secure_channel_creation_with_secure_serving_url( - self, _mocked_obj, - ): + def test_secure_channel_creation_with_secure_serving_url(self, + _mocked_obj, ): client = Client(core_url="localhost:50051", serving_url="localhost:443") - with mock.patch("grpc.secure_channel") as _grpc_mock, mock.patch( - "grpc.ssl_channel_credentials", MagicMock(return_value="test") - ) as _mocked_credentials: + with mock.patch("grpc.secure_channel") as _grpc_mock, \ + mock.patch("grpc.ssl_channel_credentials", + MagicMock(return_value="test")) as _mocked_credentials: client._connect_serving() - _grpc_mock.assert_called_with( - client.serving_url, _mocked_credentials.return_value - ) + _grpc_mock.assert_called_with(client.serving_url, + _mocked_credentials.return_value) + + @patch("grpc.channel_ready_future") + def test_secure_channel_creation_with_secure_client(self, _mocked_obj): + client = Client(core_url="localhost:50053", + serving_url="localhost:50054", serving_secure=True, + core_secure=True) + with mock.patch("grpc.secure_channel") as _grpc_mock, \ + mock.patch("grpc.ssl_channel_credentials", + MagicMock(return_value="test")) as _mocked_credentials: + client._connect_core() + _grpc_mock.assert_called_with(client.core_url, + _mocked_credentials.return_value) @patch("grpc.channel_ready_future") def test_secure_channel_creation_with_secure_core_url(self, _mocked_obj): client = Client(core_url="localhost:443", serving_url="localhost:50054") - with mock.patch("grpc.secure_channel") as _grpc_mock, mock.patch( - "grpc.ssl_channel_credentials", MagicMock(return_value="test") - ) as _mocked_credentials: + with mock.patch("grpc.secure_channel") as _grpc_mock, \ + mock.patch("grpc.ssl_channel_credentials", + MagicMock(return_value="test")) as _mocked_credentials: client._connect_core() - _grpc_mock.assert_called_with( - client.core_url, _mocked_credentials.return_value - ) + _grpc_mock.assert_called_with(client.core_url, + _mocked_credentials.return_value) diff --git a/sdk/python/tests/test_config.py b/sdk/python/tests/test_config.py index 7824c289fc3..b850ed0e588 100644 --- a/sdk/python/tests/test_config.py +++ b/sdk/python/tests/test_config.py @@ -116,3 +116,24 @@ def test_type_casting(self): assert config.getint('INT_VAR') == 1 assert config.getfloat('FLOAT_VAR') == 1.0 assert config.getboolean('BOOLEAN_VAR') is True + + def test_set_value(self): + """ + Test type casting of strings to other types + """ + fd, path = mkstemp() + config = Config(path=path) + config.set("my_val", 1) + + assert config.getint('my_val') == 1 + + def test_exists(self): + """ + Test type casting of strings to other types + """ + fd, path = mkstemp() + config = Config(path=path) + config.set("my_val_exist", 1) + + assert config.exists('my_val_exist') is True + assert config.exists('my_val_not_exist') is False From 552fd0ccee95dc0a017f0a07891eea5ed161659e Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Thu, 19 Mar 2020 11:54:56 +0800 Subject: [PATCH 3/4] Add new configuration class to CLI and Client class --- sdk/python/feast/cli.py | 20 +-- sdk/python/feast/client.py | 79 +++++----- sdk/python/feast/config.py | 96 +++++++------ sdk/python/feast/constants.py | 26 +++- sdk/python/tests/test_client.py | 245 ++++++++++++++++---------------- sdk/python/tests/test_config.py | 82 +++++------ 6 files changed, 276 insertions(+), 272 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index b8f4ec30b69..dc4784b3025 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -17,18 +17,13 @@ import sys import click -from feast.config import Config import pkg_resources -import toml import yaml -from feast import config as feast_config from feast.client import Client +from feast.config import Config from feast.feature_set import FeatureSet from feast.loaders.yaml import yaml_loader -import yaml -import json -from feast.constants import * _logger = logging.getLogger(__name__) @@ -54,8 +49,7 @@ def cli(): @cli.command() @click.option( - "--client-only", "-c", is_flag=True, - help="Print only the version of the CLI" + "--client-only", "-c", is_flag=True, help="Print only the version of the CLI" ) @common_options def version(client_only: bool, **kwargs): @@ -153,8 +147,7 @@ def feature_set_create(filename): Create or update a feature set """ - feature_sets = [FeatureSet.from_dict(fs_dict) for fs_dict in - yaml_loader(filename)] + feature_sets = [FeatureSet.from_dict(fs_dict) for fs_dict in yaml_loader(filename)] feast_client = Client() # type: Client feast_client.apply(feature_sets) @@ -174,8 +167,7 @@ def feature_set_describe(name: str, version: int): ) return - print(yaml.dump(yaml.safe_load(str(fs)), default_flow_style=False, - sort_keys=False)) + print(yaml.dump(yaml.safe_load(str(fs)), default_flow_style=False, sort_keys=False)) @cli.group(name="projects") @@ -202,8 +194,8 @@ def project_archive(name: str): """ Archive a project """ - feast_client = # type: Client - Client().archive_project(name) + feast_client = Client() # type: Client + feast_client.archive_project(name) @project.command(name="list") diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 2acbf78f88e..bd86fdca1cb 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -20,7 +20,6 @@ import time from collections import OrderedDict from math import ceil -from typing import Dict, List, Tuple, Union, Optional from typing import Dict, List, Optional, Tuple, Union import grpc @@ -28,6 +27,14 @@ import pyarrow as pa import pyarrow.parquet as pq +from feast.config import Config +from feast.constants import ( + CONFIG_CORE_SECURE_KEY, + CONFIG_CORE_URL_KEY, + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY, + CONFIG_SERVING_SECURE_KEY, + CONFIG_SERVING_URL_KEY, +) from feast.core.CoreService_pb2 import ( ApplyFeatureSetRequest, ApplyFeatureSetResponse, @@ -62,8 +69,6 @@ GetOnlineFeaturesResponse, ) from feast.serving.ServingService_pb2_grpc import ServingServiceStub -from feast.config import Config -from feast.constants import * _logger = logging.getLogger(__name__) @@ -75,8 +80,7 @@ class Client: Feast Client: Used for creating, managing, and retrieving features. """ - def __init__( - self, options: Optional[Dict[str, str]] = None, **kwargs): + def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs): """ The Feast Client should be initialized with at least one service url @@ -93,8 +97,7 @@ def __init__( if options is None: options = dict() - self._config = Config(options={**options, - **kwargs}) + self._config = Config(options={**options, **kwargs}) self.__core_channel: grpc.Channel = None self.__serving_channel: grpc.Channel = None @@ -191,18 +194,15 @@ def version(self): self._connect_serving() serving_version = self._serving_service_stub.GetFeastServingInfo( GetFeastServingInfoRequest(), - timeout=self._config.getint( - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ).version - result["serving"] = {"url": self.serving_url, - "version": serving_version} + result["serving"] = {"url": self.serving_url, "version": serving_version} if self.core_url: self._connect_core() core_version = self._core_service_stub.GetFeastCoreVersion( GetFeastCoreVersionRequest(), - timeout=self._config.getint( - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ).version result["core"] = {"url": self.core_url, "version": core_version} @@ -223,15 +223,15 @@ def _connect_core(self, skip_if_connected: bool = True): if self.__core_channel is None: if self.core_secure or self.core_url.endswith(":443"): - self.__core_channel = grpc.secure_channel(self.core_url, - grpc.ssl_channel_credentials()) + self.__core_channel = grpc.secure_channel( + self.core_url, grpc.ssl_channel_credentials() + ) else: self.__core_channel = grpc.insecure_channel(self.core_url) try: grpc.channel_ready_future(self.__core_channel).result( - timeout=self._config.getint( - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) ) except grpc.FutureTimeoutError: raise ConnectionError( @@ -257,15 +257,15 @@ def _connect_serving(self, skip_if_connected=True): if self.__serving_channel is None: if self.serving_secure or self.serving_url.endswith(":443"): - self.__serving_channel = grpc.secure_channel(self.serving_url, - grpc.ssl_channel_credentials()) + self.__serving_channel = grpc.secure_channel( + self.serving_url, grpc.ssl_channel_credentials() + ) else: self.__serving_channel = grpc.insecure_channel(self.serving_url) try: grpc.channel_ready_future(self.__serving_channel).result( - timeout=self._config.getint( - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) ) except grpc.FutureTimeoutError: raise ConnectionError( @@ -273,8 +273,7 @@ def _connect_serving(self, skip_if_connected=True): f"Serving gRPC server {self.serving_url} " ) else: - self._serving_service_stub = ServingServiceStub( - self.__serving_channel) + self._serving_service_stub = ServingServiceStub(self.__serving_channel) @property def project(self) -> Union[str, None]: @@ -305,8 +304,8 @@ def list_projects(self) -> List[str]: """ self._connect_core() response = self._core_service_stub.ListProjects( - ListProjectsRequest(), timeout=self._config.getint( - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) + ListProjectsRequest(), + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ) # type: ListProjectsResponse return list(response.projects) @@ -321,8 +320,7 @@ def create_project(self, project: str): self._connect_core() self._core_service_stub.CreateProject( CreateProjectRequest(name=project), - timeout=self._config.getint( - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ) # type: CreateProjectResponse def archive_project(self, project): @@ -338,8 +336,7 @@ def archive_project(self, project): self._connect_core() self._core_service_stub.ArchiveProject( ArchiveProjectRequest(name=project), - timeout=self._config.getint( - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ) # type: ArchiveProjectResponse if self._project == project: @@ -388,8 +385,7 @@ def _apply_feature_set(self, feature_set: FeatureSet): try: apply_fs_response = self._core_service_stub.ApplyFeatureSet( ApplyFeatureSetRequest(feature_set=feature_set_proto), - timeout=self._config.getint( - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ) # type: ApplyFeatureSetResponse except grpc.RpcError as e: raise grpc.RpcError(e.details()) @@ -561,8 +557,7 @@ def get_batch_features( # staging location serving_info = self._serving_service_stub.GetFeastServingInfo( GetFeastServingInfoRequest(), - timeout=self._config.getint( - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY) + timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), ) # type: GetFeastServingInfoResponse if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH: @@ -576,8 +571,7 @@ def get_batch_features( # Remove timezone from datetime column if isinstance( - entity_rows["datetime"].dtype, - pd.core.dtypes.dtypes.DatetimeTZDtype + entity_rows["datetime"].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype ): entity_rows["datetime"] = pd.DatetimeIndex( entity_rows["datetime"] @@ -604,8 +598,7 @@ def get_batch_features( features=feature_references, dataset_source=DatasetSource( file_source=DatasetSource.FileSource( - file_uris=staged_files, - data_format=DataFormat.DATA_FORMAT_AVRO + file_uris=staged_files, data_format=DataFormat.DATA_FORMAT_AVRO ) ), ) @@ -714,8 +707,7 @@ def ingest( raise Exception(f"Feature set name must be provided") # Read table and get row count - dir_path, dest_path = _read_table_from_source(source, chunk_size, - max_workers) + dir_path, dest_path = _read_table_from_source(source, chunk_size, max_workers) pq_file = pq.ParquetFile(dest_path) @@ -734,8 +726,7 @@ def ingest( print("Waiting for feature set to be ready for ingestion...") while True: if timeout is not None and time.time() - current_time >= timeout: - raise TimeoutError( - "Timed out waiting for feature set to be ready") + raise TimeoutError("Timed out waiting for feature set to be ready") feature_set = self.get_feature_set(name, version) if ( feature_set is not None @@ -843,8 +834,7 @@ def _build_feature_references( f'Could not parse feature ref {feature_ref}, expecting "project/feature:version"' ) - features.append( - FeatureReference(project=project, name=name, version=version)) + features.append(FeatureReference(project=project, name=name, version=version)) return features @@ -899,8 +889,7 @@ def _read_table_from_source( else: table = pq.read_table(file_path) else: - raise ValueError( - f"Unknown data source provided for ingestion: {source}") + raise ValueError(f"Unknown data source provided for ingestion: {source}") # Ensure that PyArrow table is initialised assert isinstance(table, pa.lib.Table) diff --git a/sdk/python/feast/config.py b/sdk/python/feast/config.py index 77d81d867e6..fd35b6e5d87 100644 --- a/sdk/python/feast/config.py +++ b/sdk/python/feast/config.py @@ -13,26 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from configparser import ConfigParser, NoOptionError -from os.path import expanduser, join import logging import os -from typing import Dict -from typing import Optional -from feast.constants import * +from configparser import ConfigParser, NoOptionError +from os.path import expanduser, join +from typing import Dict, Optional -_logger = logging.getLogger(__name__) +from feast.constants import ( + CONFIG_FEAST_ENV_VAR_PREFIX, + CONFIG_FILE_DEFAULT_DIRECTORY, + CONFIG_FILE_NAME, + CONFIG_FILE_SECTION, + FEAST_CONFIG_FILE_ENV_KEY, +) +from feast.constants import FEAST_DEFAULT_OPTIONS as DEFAULTS -FEAST_DEFAULT_OPTIONS = { - CONFIG_PROJECT_KEY: "default", - CONFIG_CORE_URL_KEY: "localhost:6565", - CONFIG_CORE_SECURE_KEY: "False", - CONFIG_SERVING_URL_KEY: "localhost:6565", - CONFIG_SERVING_SECURE_KEY: 'False', - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: '3', - CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY: '600', - CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY: '600', -} +_logger = logging.getLogger(__name__) def _init_config(path: str): @@ -55,7 +51,7 @@ def _init_config(path: str): os.makedirs(os.path.dirname(config_dir)) # Create the configuration file itself - config = ConfigParser(defaults=FEAST_DEFAULT_OPTIONS) + config = ConfigParser(defaults=DEFAULTS) if os.path.exists(path): config.read(path) @@ -64,7 +60,7 @@ def _init_config(path: str): config.add_section(CONFIG_FILE_SECTION) # Save the current configuration - config.write(open(path, 'w')) + config.write(open(path, "w")) return config @@ -77,8 +73,7 @@ def _get_feast_env_vars(): feast_env_vars = {} for key in os.environ.keys(): if key.upper().startswith(CONFIG_FEAST_ENV_VAR_PREFIX): - feast_env_vars[key[len(CONFIG_FEAST_ENV_VAR_PREFIX):]] = os.environ[ - key] + feast_env_vars[key[len(CONFIG_FEAST_ENV_VAR_PREFIX) :]] = os.environ[key] return feast_env_vars @@ -92,9 +87,8 @@ class Config: """ - def __init__(self, - options: Optional[Dict[str, str]] = None, - path: Optional[str] = None, + def __init__( + self, options: Optional[Dict[str, str]] = None, path: Optional[str] = None, ): """ Configuration options are returned as follows (higher replaces lower) @@ -108,10 +102,13 @@ def __init__(self, path: (optional) File path to configuration file """ if not path: - path = join(expanduser("~"), - os.environ.get(FEAST_CONFIG_FILE_ENV_KEY, - CONFIG_FILE_DEFAULT_DIRECTORY), - CONFIG_FILE_NAME) + path = join( + expanduser("~"), + os.environ.get( + FEAST_CONFIG_FILE_ENV_KEY, CONFIG_FILE_DEFAULT_DIRECTORY, + ), + CONFIG_FILE_NAME, + ) config = _init_config(path) @@ -132,9 +129,11 @@ def get(self, option): Returns: String option that is returned """ - return self._config.get(CONFIG_FILE_SECTION, option, - vars={**_get_feast_env_vars(), - **self._options}) + return self._config.get( + CONFIG_FILE_SECTION, + option, + vars={**_get_feast_env_vars(), **self._options}, + ) def getboolean(self, option): """ @@ -146,9 +145,11 @@ def getboolean(self, option): Returns: Boolean option value that is returned """ - return self._config.getboolean(CONFIG_FILE_SECTION, option, - vars={**_get_feast_env_vars(), - **self._options}) + return self._config.getboolean( + CONFIG_FILE_SECTION, + option, + vars={**_get_feast_env_vars(), **self._options}, + ) def getint(self, option): """ @@ -160,9 +161,11 @@ def getint(self, option): Returns: Integer option value that is returned """ - return self._config.getint(CONFIG_FILE_SECTION, option, - vars={**_get_feast_env_vars(), - **self._options}) + return self._config.getint( + CONFIG_FILE_SECTION, + option, + vars={**_get_feast_env_vars(), **self._options}, + ) def getfloat(self, option): """ @@ -174,9 +177,11 @@ def getfloat(self, option): Returns: Float option value that is returned """ - return self._config.getfloat(CONFIG_FILE_SECTION, option, - vars={**_get_feast_env_vars(), - **self._options}) + return self._config.getfloat( + CONFIG_FILE_SECTION, + option, + vars={**_get_feast_env_vars(), **self._options}, + ) def set(self, option, value): """ @@ -185,8 +190,7 @@ def set(self, option, value): option: Option name to use as key value: Value to store under option """ - self._config.set(CONFIG_FILE_SECTION, option, - value=str(value)) + self._config.set(CONFIG_FILE_SECTION, option, value=str(value)) def exists(self, option): """ @@ -209,14 +213,12 @@ def save(self): Save the current configuration to disk. This does not include environmental variables or initialized options """ - self._config.write(open(self._path, 'w')) + self._config.write(open(self._path, "w")) def __str__(self): - result = '' + result = "" for section_name in self._config.sections(): - result += '\n[' + section_name + ']\n' - + result += "\n[" + section_name + "]\n" for name, value in self._config.items(section_name): - result += name + ' = ' + value + '\n' - + result += name + " = " + value + "\n" return result diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 115b14d2dea..c4bde75404a 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -14,19 +14,35 @@ # limitations under the License. # -DATETIME_COLUMN = 'datetime' +# General constants +DATETIME_COLUMN = "datetime" FEAST_CONFIG_FILE_ENV_KEY = "FEAST_CONFIG" -CONFIG_FEAST_ENV_VAR_PREFIX = 'FEAST_' +CONFIG_FEAST_ENV_VAR_PREFIX = "FEAST_" CONFIG_FILE_DEFAULT_DIRECTORY = ".feast" CONFIG_FILE_NAME = "config" CONFIG_FILE_SECTION = "general" + +# Feast configuration options CONFIG_CORE_URL_KEY = "core_url" CONFIG_SERVING_URL_KEY = "serving_url" CONFIG_PROJECT_KEY = "project" CONFIG_CORE_SECURE_KEY = "core_secure" CONFIG_SERVING_SECURE_KEY = "serving_secure" -CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = 'grpc_connection_timeout_default' -CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = 'grpc_connection_timeout_apply_key' -CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = 'batch_feature_request_wait_time_seconds' +CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default" +CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply_key" +CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = ( + "batch_feature_request_wait_time_seconds" +) +# Configuration option default values +FEAST_DEFAULT_OPTIONS = { + CONFIG_PROJECT_KEY: "default", + CONFIG_CORE_URL_KEY: "localhost:6565", + CONFIG_CORE_SECURE_KEY: "False", + CONFIG_SERVING_URL_KEY: "localhost:6565", + CONFIG_SERVING_SECURE_KEY: "False", + CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "3", + CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY: "600", + CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY: "600", +} diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 223d903ab2b..b41500125cd 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + + import pkgutil import tempfile from concurrent import futures @@ -94,8 +96,7 @@ def mock_client(self, mocker): @pytest.fixture def server_credentials(self): private_key = pkgutil.get_data(__name__, _PRIVATE_KEY_RESOURCE_PATH) - certificate_chain = pkgutil.get_data(__name__, - _CERTIFICATE_CHAIN_RESOURCE_PATH) + certificate_chain = pkgutil.get_data(__name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) return grpc.ssl_server_credentials(((private_key, certificate_chain),)) @pytest.fixture @@ -137,28 +138,36 @@ def secure_serving_server(self, server_credentials): @pytest.fixture def secure_client(self, secure_core_server, secure_serving_server): - root_certificate_credentials = pkgutil.get_data(__name__, - _ROOT_CERTIFICATE_RESOURCE_PATH) + root_certificate_credentials = pkgutil.get_data( + __name__, _ROOT_CERTIFICATE_RESOURCE_PATH + ) # this is needed to establish a secure connection using self-signed certificates, for the purpose of the test ssl_channel_credentials = grpc.ssl_channel_credentials( - root_certificates=root_certificate_credentials) - with mock.patch("grpc.ssl_channel_credentials", - MagicMock(return_value=ssl_channel_credentials)): - yield Client(core_url="localhost:50053", - serving_url="localhost:50054", core_secure=True, - serving_secure=True) + root_certificates=root_certificate_credentials + ) + with mock.patch( + "grpc.ssl_channel_credentials", + MagicMock(return_value=ssl_channel_credentials), + ): + yield Client( + core_url="localhost:50053", + serving_url="localhost:50054", + core_secure=True, + serving_secure=True, + ) @pytest.fixture def client(self, core_server, serving_server): return Client(core_url="localhost:50051", serving_url="localhost:50052") - @pytest.mark.parametrize("mocked_client", - [pytest.lazy_fixture("mock_client"), - pytest.lazy_fixture("secure_mock_client") - ]) + @pytest.mark.parametrize( + "mocked_client", + [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], + ) def test_version(self, mocked_client, mocker): mocked_client._core_service_stub = Core.CoreServiceStub( - grpc.insecure_channel("")) + grpc.insecure_channel("") + ) mocked_client._serving_service_stub = Serving.ServingServiceStub( grpc.insecure_channel("") ) @@ -183,10 +192,10 @@ def test_version(self, mocked_client, mocker): and status["serving"]["version"] == "0.3.2" ) - @pytest.mark.parametrize("mocked_client", - [pytest.lazy_fixture("mock_client"), - pytest.lazy_fixture("secure_mock_client") - ]) + @pytest.mark.parametrize( + "mocked_client", + [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], + ) def test_get_online_features(self, mocked_client, mocker): ROW_COUNT = 300 @@ -196,8 +205,7 @@ def test_get_online_features(self, mocked_client, mocker): fields = dict() for feature_num in range(1, 10): - fields[ - f"my_project/feature_{str(feature_num)}:1"] = ValueProto.Value( + fields[f"my_project/feature_{str(feature_num)}:1"] = ValueProto.Value( int64_val=feature_num ) field_values = GetOnlineFeaturesResponse.FieldValues(fields=fields) @@ -208,8 +216,7 @@ def test_get_online_features(self, mocked_client, mocker): response.field_values.append(field_values) entity_rows.append( GetOnlineFeaturesRequest.EntityRow( - fields={ - "customer_id": ValueProto.Value(int64_val=row_number)} + fields={"customer_id": ValueProto.Value(int64_val=row_number)} ) ) @@ -235,19 +242,18 @@ def test_get_online_features(self, mocked_client, mocker): ) # type: GetOnlineFeaturesResponse assert ( - response.field_values[0].fields[ - "my_project/feature_1:1"].int64_val == 1 - and response.field_values[0].fields[ - "my_project/feature_9:1"].int64_val == 9 + response.field_values[0].fields["my_project/feature_1:1"].int64_val == 1 + and response.field_values[0].fields["my_project/feature_9:1"].int64_val == 9 ) - @pytest.mark.parametrize("mocked_client", - [pytest.lazy_fixture("mock_client"), - pytest.lazy_fixture("secure_mock_client") - ]) + @pytest.mark.parametrize( + "mocked_client", + [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], + ) def test_get_feature_set(self, mocked_client, mocker): mocked_client._core_service_stub = Core.CoreServiceStub( - grpc.insecure_channel("")) + grpc.insecure_channel("") + ) from google.protobuf.duration_pb2 import Duration @@ -279,8 +285,7 @@ def test_get_feature_set(self, mocked_client, mocker): source=Source( type=SourceType.KAFKA, kafka_source_config=KafkaSourceConfig( - bootstrap_servers="localhost:9092", - topic="topic" + bootstrap_servers="localhost:9092", topic="topic" ), ), ), @@ -302,17 +307,18 @@ def test_get_feature_set(self, mocked_client, mocker): and len(feature_set.entities) == 1 ) - @pytest.mark.parametrize("mocked_client", - [pytest.lazy_fixture("mock_client"), - pytest.lazy_fixture("secure_mock_client") - ]) + @pytest.mark.parametrize( + "mocked_client", + [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], + ) def test_get_batch_features(self, mocked_client, mocker): mocked_client._serving_service_stub = Serving.ServingServiceStub( grpc.insecure_channel("") ) mocked_client._core_service_stub = Core.CoreServiceStub( - grpc.insecure_channel("")) + grpc.insecure_channel("") + ) mocker.patch.object( mocked_client._core_service_stub, @@ -325,8 +331,7 @@ def test_get_batch_features(self, mocked_client, mocker): project="my_project", entities=[ EntitySpecProto( - name="customer", - value_type=ValueProto.ValueType.INT64 + name="customer", value_type=ValueProto.ValueType.INT64 ), EntitySpecProto( name="transaction", @@ -344,8 +349,7 @@ def test_get_batch_features(self, mocked_client, mocker): ), ], ), - meta=FeatureSetMetaProto( - status=FeatureSetStatusProto.STATUS_READY), + meta=FeatureSetMetaProto(status=FeatureSetStatusProto.STATUS_READY), ) ), ) @@ -405,8 +409,7 @@ def test_get_batch_features(self, mocked_client, mocker): entity_rows=pd.DataFrame( { "datetime": [ - pd.datetime.now(tz=timezone("Asia/Singapore")) for _ in - range(3) + pd.datetime.now(tz=timezone("Asia/Singapore")) for _ in range(3) ], "customer": [1001, 1002, 1003], "transaction": [1001, 1002, 1003], @@ -423,19 +426,17 @@ def test_get_batch_features(self, mocked_client, mocker): actual_dataframe = response.to_dataframe() assert actual_dataframe[ - ["my_project/customer_feature_1:1", - "my_project/customer_feature_2:1"] + ["my_project/customer_feature_1:1", "my_project/customer_feature_2:1"] ].equals( expected_dataframe[ - ["my_project/customer_feature_1:1", - "my_project/customer_feature_2:1"] + ["my_project/customer_feature_1:1", "my_project/customer_feature_2:1"] ] ) - @pytest.mark.parametrize("test_client", [pytest.lazy_fixture("client"), - pytest.lazy_fixture( - "secure_client") - ]) + @pytest.mark.parametrize( + "test_client", + [pytest.lazy_fixture("client"), pytest.lazy_fixture("secure_client")], + ) def test_apply_feature_set_success(self, test_client): test_client.set_project("project1") @@ -466,15 +467,17 @@ def test_apply_feature_set_success(self, test_client): and feature_sets[1].features[1].dtype == ValueType.BYTES_LIST ) - @pytest.mark.parametrize("dataframe,test_client", - [(dataframes.GOOD, pytest.lazy_fixture("client")), - (dataframes.GOOD, - pytest.lazy_fixture("secure_client"))]) + @pytest.mark.parametrize( + "dataframe,test_client", + [ + (dataframes.GOOD, pytest.lazy_fixture("client")), + (dataframes.GOOD, pytest.lazy_fixture("secure_client")), + ], + ) def test_feature_set_ingest_success(self, dataframe, test_client, mocker): test_client.set_project("project1") driver_fs = FeatureSet( - "driver-feature-set", - source=KafkaSource(brokers="kafka:9092", topic="test") + "driver-feature-set", source=KafkaSource(brokers="kafka:9092", topic="test") ) driver_fs.add(Feature(name="feature_1", dtype=ValueType.FLOAT)) driver_fs.add(Feature(name="feature_2", dtype=ValueType.STRING)) @@ -497,11 +500,13 @@ def test_feature_set_ingest_success(self, dataframe, test_client, mocker): # Ingest data into Feast test_client.ingest("driver-feature-set", dataframe) - @pytest.mark.parametrize("dataframe,exception,test_client", - [(dataframes.GOOD, TimeoutError, - pytest.lazy_fixture("client")), - (dataframes.GOOD, TimeoutError, - pytest.lazy_fixture("secure_client"))]) + @pytest.mark.parametrize( + "dataframe,exception,test_client", + [ + (dataframes.GOOD, TimeoutError, pytest.lazy_fixture("client")), + (dataframes.GOOD, TimeoutError, pytest.lazy_fixture("secure_client")), + ], + ) def test_feature_set_ingest_fail_if_pending( self, dataframe, exception, test_client, mocker ): @@ -535,25 +540,29 @@ def test_feature_set_ingest_fail_if_pending( @pytest.mark.parametrize( "dataframe,exception,test_client", [ - (dataframes.BAD_NO_DATETIME, Exception, - pytest.lazy_fixture("client")), - (dataframes.BAD_INCORRECT_DATETIME_TYPE, Exception, - pytest.lazy_fixture("client")), + (dataframes.BAD_NO_DATETIME, Exception, pytest.lazy_fixture("client")), ( - dataframes.BAD_NO_ENTITY, Exception, pytest.lazy_fixture("client")), + dataframes.BAD_INCORRECT_DATETIME_TYPE, + Exception, + pytest.lazy_fixture("client"), + ), + (dataframes.BAD_NO_ENTITY, Exception, pytest.lazy_fixture("client")), (dataframes.NO_FEATURES, Exception, pytest.lazy_fixture("client")), - (dataframes.BAD_NO_DATETIME, Exception, - pytest.lazy_fixture("secure_client")), - (dataframes.BAD_INCORRECT_DATETIME_TYPE, Exception, - pytest.lazy_fixture("secure_client")), - (dataframes.BAD_NO_ENTITY, Exception, - pytest.lazy_fixture("secure_client")), - (dataframes.NO_FEATURES, Exception, - pytest.lazy_fixture("secure_client")), + ( + dataframes.BAD_NO_DATETIME, + Exception, + pytest.lazy_fixture("secure_client"), + ), + ( + dataframes.BAD_INCORRECT_DATETIME_TYPE, + Exception, + pytest.lazy_fixture("secure_client"), + ), + (dataframes.BAD_NO_ENTITY, Exception, pytest.lazy_fixture("secure_client")), + (dataframes.NO_FEATURES, Exception, pytest.lazy_fixture("secure_client")), ], ) - def test_feature_set_ingest_failure(self, test_client, dataframe, - exception): + def test_feature_set_ingest_failure(self, test_client, dataframe, exception): with pytest.raises(exception): # Create feature set driver_fs = FeatureSet("driver-feature-set") @@ -567,9 +576,13 @@ def test_feature_set_ingest_failure(self, test_client, dataframe, # Ingest data into Feast test_client.ingest(driver_fs, dataframe=dataframe) - @pytest.mark.parametrize("dataframe,test_client", [ - (dataframes.ALL_TYPES, pytest.lazy_fixture("client")), - (dataframes.ALL_TYPES, pytest.lazy_fixture("secure_client"))]) + @pytest.mark.parametrize( + "dataframe,test_client", + [ + (dataframes.ALL_TYPES, pytest.lazy_fixture("client")), + (dataframes.ALL_TYPES, pytest.lazy_fixture("secure_client")), + ], + ) def test_feature_set_types_success(self, test_client, dataframe, mocker): test_client.set_project("project1") @@ -588,14 +601,12 @@ def test_feature_set_types_success(self, test_client, dataframe, mocker): Feature(name="float_list_feature", dtype=ValueType.FLOAT_LIST), Feature(name="int64_list_feature", dtype=ValueType.INT64_LIST), Feature(name="int32_list_feature", dtype=ValueType.INT32_LIST), - Feature(name="string_list_feature", - dtype=ValueType.STRING_LIST), + Feature(name="string_list_feature", dtype=ValueType.STRING_LIST), Feature(name="bytes_list_feature", dtype=ValueType.BYTES_LIST), # Feature(name="bool_list_feature", # dtype=ValueType.BOOL_LIST), # TODO: Add support for this # type again https://github.com/gojek/feast/issues/341 - Feature(name="double_list_feature", - dtype=ValueType.DOUBLE_LIST), + Feature(name="double_list_feature", dtype=ValueType.DOUBLE_LIST), ], max_age=Duration(seconds=3600), ) @@ -606,8 +617,7 @@ def test_feature_set_types_success(self, test_client, dataframe, mocker): mocker.patch.object( test_client._core_service_stub, "GetFeatureSet", - return_value=GetFeatureSetResponse( - feature_set=all_types_fs.to_proto()), + return_value=GetFeatureSetResponse(feature_set=all_types_fs.to_proto()), ) # Need to create a mock producer @@ -617,45 +627,40 @@ def test_feature_set_types_success(self, test_client, dataframe, mocker): @patch("grpc.channel_ready_future") def test_secure_channel_creation_with_secure_client(self, _mocked_obj): - client = Client(core_url="localhost:50051", - serving_url="localhost:50052", serving_secure=True, - core_secure=True) - with mock.patch("grpc.secure_channel") as _grpc_mock, \ - mock.patch("grpc.ssl_channel_credentials", - MagicMock(return_value="test")) as _mocked_credentials: + client = Client( + core_url="localhost:50051", + serving_url="localhost:50052", + serving_secure=True, + core_secure=True, + ) + with mock.patch("grpc.secure_channel") as _grpc_mock, mock.patch( + "grpc.ssl_channel_credentials", MagicMock(return_value="test") + ) as _mocked_credentials: client._connect_serving() - _grpc_mock.assert_called_with(client.serving_url, - _mocked_credentials.return_value) + _grpc_mock.assert_called_with( + client.serving_url, _mocked_credentials.return_value + ) @mock.patch("grpc.channel_ready_future") - def test_secure_channel_creation_with_secure_serving_url(self, - _mocked_obj, ): + def test_secure_channel_creation_with_secure_serving_url( + self, _mocked_obj, + ): client = Client(core_url="localhost:50051", serving_url="localhost:443") - with mock.patch("grpc.secure_channel") as _grpc_mock, \ - mock.patch("grpc.ssl_channel_credentials", - MagicMock(return_value="test")) as _mocked_credentials: + with mock.patch("grpc.secure_channel") as _grpc_mock, mock.patch( + "grpc.ssl_channel_credentials", MagicMock(return_value="test") + ) as _mocked_credentials: client._connect_serving() - _grpc_mock.assert_called_with(client.serving_url, - _mocked_credentials.return_value) - - @patch("grpc.channel_ready_future") - def test_secure_channel_creation_with_secure_client(self, _mocked_obj): - client = Client(core_url="localhost:50053", - serving_url="localhost:50054", serving_secure=True, - core_secure=True) - with mock.patch("grpc.secure_channel") as _grpc_mock, \ - mock.patch("grpc.ssl_channel_credentials", - MagicMock(return_value="test")) as _mocked_credentials: - client._connect_core() - _grpc_mock.assert_called_with(client.core_url, - _mocked_credentials.return_value) + _grpc_mock.assert_called_with( + client.serving_url, _mocked_credentials.return_value + ) @patch("grpc.channel_ready_future") def test_secure_channel_creation_with_secure_core_url(self, _mocked_obj): client = Client(core_url="localhost:443", serving_url="localhost:50054") - with mock.patch("grpc.secure_channel") as _grpc_mock, \ - mock.patch("grpc.ssl_channel_credentials", - MagicMock(return_value="test")) as _mocked_credentials: + with mock.patch("grpc.secure_channel") as _grpc_mock, mock.patch( + "grpc.ssl_channel_credentials", MagicMock(return_value="test") + ) as _mocked_credentials: client._connect_core() - _grpc_mock.assert_called_with(client.core_url, - _mocked_credentials.return_value) + _grpc_mock.assert_called_with( + client.core_url, _mocked_credentials.return_value + ) diff --git a/sdk/python/tests/test_config.py b/sdk/python/tests/test_config.py index b850ed0e588..9ed34a736a2 100644 --- a/sdk/python/tests/test_config.py +++ b/sdk/python/tests/test_config.py @@ -12,10 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +from tempfile import mkstemp + import pytest + from feast.config import Config -from tempfile import mkstemp -import os class TestConfig: @@ -25,75 +27,73 @@ def normal_config(self): return Config(path=path) def test_init_config_file_with_path(self): - configuration_string = \ - '[general]\nCORE_URL = grpc://127.0.0.1:6565' + configuration_string = "[general]\nCORE_URL = grpc://127.0.0.1:6565" fd, path = mkstemp() - with open(fd, 'w') as f: + with open(fd, "w") as f: f.write(configuration_string) config = Config(path=path) assert config.get("core_url") == "grpc://127.0.0.1:6565" def test_load_environmental_variable(self, normal_config): import os - serving_url = 'http://196.25.1.1' - os.environ['FEAST_SERVING_URL'] = serving_url - assert normal_config.get('SERVING_URL') == serving_url - del os.environ['FEAST_SERVING_URL'] + + serving_url = "http://196.25.1.1" + os.environ["FEAST_SERVING_URL"] = serving_url + assert normal_config.get("SERVING_URL") == serving_url + del os.environ["FEAST_SERVING_URL"] def test_env_var_not_case_sensitive(self, normal_config): import os - serving_url = 'http://196.25.1.1' - os.environ['FEAST_SerVING_url'] = serving_url - assert normal_config.get('SERVING_URL') == serving_url + + serving_url = "http://196.25.1.1" + os.environ["FEAST_SerVING_url"] = serving_url + assert normal_config.get("SERVING_URL") == serving_url def test_force_options(self): fd, path = mkstemp() - options = {'feast_config_1': "one", "random_config_two": 2} + options = {"feast_config_1": "one", "random_config_two": 2} config = Config(options, path) - assert config.get('feast_config_1') == 'one' + assert config.get("feast_config_1") == "one" def test_init_options_precedence(self): """ Init options > env var > file options > default options """ fd, path = mkstemp() - os.environ['FEAST_CORE_URL'] = 'env' - options = {'core_url': "init", "serving_url": "init"} - configuration_string = \ - '[general]\nCORE_URL = file\n' - with open(fd, 'w') as f: + os.environ["FEAST_CORE_URL"] = "env" + options = {"core_url": "init", "serving_url": "init"} + configuration_string = "[general]\nCORE_URL = file\n" + with open(fd, "w") as f: f.write(configuration_string) config = Config(options, path) - assert config.get('core_url') == 'init' - del os.environ['FEAST_CORE_URL'] + assert config.get("core_url") == "init" + del os.environ["FEAST_CORE_URL"] def test_env_var_precedence(self): """ Env vars > file options > default options """ fd, path = mkstemp() - os.environ['FEAST_CORE_URL'] = 'env' - configuration_string = \ - '[general]\nCORE_URL = file\n' - with open(fd, 'w') as f: + os.environ["FEAST_CORE_URL"] = "env" + configuration_string = "[general]\nCORE_URL = file\n" + with open(fd, "w") as f: f.write(configuration_string) config = Config(path=path) - assert config.get('CORE_URL') == 'env' + assert config.get("CORE_URL") == "env" - del os.environ['FEAST_CORE_URL'] + del os.environ["FEAST_CORE_URL"] def test_file_option_precedence(self): """ file options > default options """ fd, path = mkstemp() - configuration_string = \ - '[general]\nCORE_URL = file\n' - with open(fd, 'w') as f: + configuration_string = "[general]\nCORE_URL = file\n" + with open(fd, "w") as f: f.write(configuration_string) config = Config(path=path) - assert config.get('CORE_URL') == 'file' + assert config.get("CORE_URL") == "file" def test_default_options(self): """ @@ -101,21 +101,21 @@ def test_default_options(self): """ fd, path = mkstemp() config = Config(path=path) - assert config.get('CORE_URL') == 'localhost:6565' + assert config.get("CORE_URL") == "localhost:6565" def test_type_casting(self): """ Test type casting of strings to other types """ fd, path = mkstemp() - os.environ['FEAST_INT_VAR'] = '1' - os.environ['FEAST_FLOAT_VAR'] = '1.0' - os.environ['FEAST_BOOLEAN_VAR'] = 'True' + os.environ["FEAST_INT_VAR"] = "1" + os.environ["FEAST_FLOAT_VAR"] = "1.0" + os.environ["FEAST_BOOLEAN_VAR"] = "True" config = Config(path=path) - assert config.getint('INT_VAR') == 1 - assert config.getfloat('FLOAT_VAR') == 1.0 - assert config.getboolean('BOOLEAN_VAR') is True + assert config.getint("INT_VAR") == 1 + assert config.getfloat("FLOAT_VAR") == 1.0 + assert config.getboolean("BOOLEAN_VAR") is True def test_set_value(self): """ @@ -125,7 +125,7 @@ def test_set_value(self): config = Config(path=path) config.set("my_val", 1) - assert config.getint('my_val') == 1 + assert config.getint("my_val") == 1 def test_exists(self): """ @@ -135,5 +135,5 @@ def test_exists(self): config = Config(path=path) config.set("my_val_exist", 1) - assert config.exists('my_val_exist') is True - assert config.exists('my_val_not_exist') is False + assert config.exists("my_val_exist") is True + assert config.exists("my_val_not_exist") is False From 88bfd846f456c710ad223c818f337f1c7e553685 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Thu, 19 Mar 2020 12:04:13 +0800 Subject: [PATCH 4/4] Add project key to config in client --- sdk/python/feast/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index bd86fdca1cb..2a0b636b373 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -32,6 +32,7 @@ CONFIG_CORE_SECURE_KEY, CONFIG_CORE_URL_KEY, CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY, + CONFIG_PROJECT_KEY, CONFIG_SERVING_SECURE_KEY, CONFIG_SERVING_URL_KEY, ) @@ -283,7 +284,7 @@ def project(self) -> Union[str, None]: Returns: Project name """ - return self._config.get("project") + return self._config.get(CONFIG_PROJECT_KEY) def set_project(self, project: str): """ @@ -292,7 +293,7 @@ def set_project(self, project: str): Args: project: Project to set as active """ - self._config.set("project", project) + self._config.set(CONFIG_PROJECT_KEY, project) def list_projects(self) -> List[str]: """