-
Notifications
You must be signed in to change notification settings - Fork 1.3k
chore: Generate environments for each individual test based on its markers/fixtures #2648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,14 +1,7 @@ | ||
| from feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source import ( | ||
| PostgreSQLDataSourceCreator, | ||
| ) | ||
| from tests.integration.feature_repos.integration_test_repo_config import ( | ||
| IntegrationTestRepoConfig, | ||
| ) | ||
|
|
||
| FULL_REPO_CONFIGS = [ | ||
| IntegrationTestRepoConfig( | ||
| provider="local", | ||
| offline_store_creator=PostgreSQLDataSourceCreator, | ||
| online_store_creator=PostgreSQLDataSourceCreator, | ||
| ), | ||
| ] | ||
| AVAILABLE_OFFLINE_STORES = [("local", PostgreSQLDataSourceCreator)] | ||
|
|
||
| AVAILABLE_ONLINE_STORES = {"postgres": (None, PostgreSQLDataSourceCreator)} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,8 @@ | |
| # limitations under the License. | ||
| import logging | ||
| import multiprocessing | ||
| import time | ||
| import socket | ||
| from contextlib import closing | ||
| from datetime import datetime, timedelta | ||
| from multiprocessing import Process | ||
| from sys import platform | ||
|
|
@@ -24,19 +25,22 @@ | |
| from _pytest.nodes import Item | ||
|
|
||
| from feast import FeatureStore | ||
| from feast.wait import wait_retry_backoff | ||
| from tests.data.data_creator import create_dataset | ||
| from tests.integration.feature_repos.integration_test_repo_config import ( | ||
| IntegrationTestRepoConfig, | ||
| ) | ||
| from tests.integration.feature_repos.repo_configuration import ( | ||
| FULL_REPO_CONFIGS, | ||
| REDIS_CLUSTER_CONFIG, | ||
| REDIS_CONFIG, | ||
| AVAILABLE_OFFLINE_STORES, | ||
| AVAILABLE_ONLINE_STORES, | ||
| Environment, | ||
| TestData, | ||
| construct_test_environment, | ||
| construct_universal_test_data, | ||
| ) | ||
| from tests.integration.feature_repos.universal.data_sources.file import ( | ||
| FileDataSourceCreator, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -161,86 +165,152 @@ def start_test_local_server(repo_path: str, port: int): | |
| fs.serve("localhost", port, no_access_log=True) | ||
|
|
||
|
|
||
| @pytest.fixture( | ||
| params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS] | ||
| ) | ||
| def environment(request, worker_id: str): | ||
| @pytest.fixture(scope="session") | ||
| def environment(request, worker_id): | ||
| e = construct_test_environment( | ||
| request.param, worker_id=worker_id, fixture_request=request | ||
| ) | ||
|
|
||
| yield e | ||
|
|
||
| e.feature_store.teardown() | ||
| e.data_source_creator.teardown() | ||
| if e.online_store_creator: | ||
| e.online_store_creator.teardown() | ||
|
|
||
|
|
||
| _config_cache = {} | ||
|
|
||
|
|
||
| def pytest_generate_tests(metafunc): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add some docs to how this
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment. |
||
| if "environment" in metafunc.fixturenames: | ||
| markers = {m.name: m for m in metafunc.definition.own_markers} | ||
|
|
||
| if "universal_offline_stores" in markers: | ||
| offline_stores = AVAILABLE_OFFLINE_STORES | ||
| else: | ||
| # default offline store for testing online store dimension | ||
| offline_stores = [("local", FileDataSourceCreator)] | ||
|
|
||
| online_stores = None | ||
| if "universal_online_stores" in markers: | ||
|
Comment on lines
192
to
199
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we get rid of the
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I need a name here that would represent that specific tests works with all offline stores (
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant the
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Removed |
||
| # Online stores are explicitly requested | ||
| if "only" in markers["universal_online_stores"].kwargs: | ||
| online_stores = [ | ||
| AVAILABLE_ONLINE_STORES.get(store_name) | ||
| for store_name in markers["universal_online_stores"].kwargs["only"] | ||
| if store_name in AVAILABLE_ONLINE_STORES | ||
| ] | ||
| else: | ||
| online_stores = AVAILABLE_ONLINE_STORES.values() | ||
|
|
||
| if online_stores is None: | ||
| # No online stores requested -> setting the default or first available | ||
| online_stores = [ | ||
| AVAILABLE_ONLINE_STORES.get( | ||
| "redis", | ||
| AVAILABLE_ONLINE_STORES.get( | ||
| "sqlite", next(iter(AVAILABLE_ONLINE_STORES.values())) | ||
| ), | ||
| ) | ||
| ] | ||
|
|
||
| extra_dimensions = [{}] | ||
|
|
||
| if "python_server" in metafunc.fixturenames: | ||
| extra_dimensions.extend( | ||
| [ | ||
| {"python_feature_server": True}, | ||
| # {"python_feature_server": True, "provider": "aws"}, | ||
| ] | ||
| ) | ||
|
|
||
| if "goserver" in markers: | ||
| extra_dimensions.append({"go_feature_retrieval": True}) | ||
|
|
||
| configs = [] | ||
| for provider, offline_store_creator in offline_stores: | ||
| for online_store, online_store_creator in online_stores: | ||
| for dim in extra_dimensions: | ||
| config = { | ||
| "provider": provider, | ||
| "offline_store_creator": offline_store_creator, | ||
| "online_store": online_store, | ||
| "online_store_creator": online_store_creator, | ||
| **dim, | ||
| } | ||
| # temporary Go works only with redis | ||
| if config.get("go_feature_retrieval") and ( | ||
| not isinstance(online_store, dict) | ||
| or online_store["type"] != "redis" | ||
| ): | ||
| continue | ||
|
|
||
| # aws lambda works only with dynamo | ||
| if ( | ||
| config.get("python_feature_server") | ||
| and config.get("provider") == "aws" | ||
| and ( | ||
| not isinstance(online_store, dict) | ||
| or online_store["type"] != "dynamodb" | ||
| ) | ||
| ): | ||
| continue | ||
|
|
||
| c = IntegrationTestRepoConfig(**config) | ||
|
|
||
| if c not in _config_cache: | ||
| _config_cache[c] = c | ||
|
|
||
| configs.append(_config_cache[c]) | ||
|
|
||
| metafunc.parametrize( | ||
| "environment", configs, indirect=True, ids=[str(c) for c in configs] | ||
| ) | ||
|
|
||
|
|
||
| @pytest.fixture(scope="session") | ||
| def python_server(environment): | ||
| proc = Process( | ||
| target=start_test_local_server, | ||
| args=(e.feature_store.repo_path, e.get_local_server_port()), | ||
| args=(environment.feature_store.repo_path, environment.get_local_server_port()), | ||
| daemon=True, | ||
| ) | ||
| if e.python_feature_server and e.test_repo_config.provider == "local": | ||
| if ( | ||
| environment.python_feature_server | ||
| and environment.test_repo_config.provider == "local" | ||
| ): | ||
| proc.start() | ||
| # Wait for server to start | ||
| time.sleep(3) | ||
|
|
||
| def cleanup(): | ||
| e.feature_store.teardown() | ||
| if proc.is_alive(): | ||
| proc.kill() | ||
| if e.online_store_creator: | ||
| e.online_store_creator.teardown() | ||
|
|
||
| request.addfinalizer(cleanup) | ||
|
|
||
| return e | ||
| wait_retry_backoff( | ||
| lambda: ( | ||
| None, | ||
| _check_port_open("localhost", environment.get_local_server_port()), | ||
| ), | ||
| timeout_secs=10, | ||
| ) | ||
|
|
||
| yield | ||
|
|
||
| @pytest.fixture( | ||
| params=[REDIS_CONFIG, REDIS_CLUSTER_CONFIG], | ||
| scope="session", | ||
| ids=[str(c) for c in [REDIS_CONFIG, REDIS_CLUSTER_CONFIG]], | ||
| ) | ||
| def local_redis_environment(request, worker_id): | ||
| e = construct_test_environment( | ||
| IntegrationTestRepoConfig(online_store=request.param), | ||
| worker_id=worker_id, | ||
| fixture_request=request, | ||
| ) | ||
| if proc.is_alive(): | ||
| proc.kill() | ||
|
|
||
| def cleanup(): | ||
| e.feature_store.teardown() | ||
|
|
||
| request.addfinalizer(cleanup) | ||
| return e | ||
| def _check_port_open(host, port) -> bool: | ||
| with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: | ||
| return sock.connect_ex((host, port)) == 0 | ||
|
|
||
|
|
||
| @pytest.fixture(scope="session") | ||
| def universal_data_sources(request, environment) -> TestData: | ||
| def cleanup(): | ||
| # logger.info("Running cleanup in %s, Request: %s", worker_id, request.param) | ||
| environment.data_source_creator.teardown() | ||
|
|
||
| request.addfinalizer(cleanup) | ||
| def universal_data_sources(environment) -> TestData: | ||
| return construct_universal_test_data(environment) | ||
|
|
||
|
|
||
| @pytest.fixture(scope="session") | ||
| def redis_universal_data_sources(request, local_redis_environment): | ||
| def cleanup(): | ||
| # logger.info("Running cleanup in %s, Request: %s", worker_id, request.param) | ||
| local_redis_environment.data_source_creator.teardown() | ||
|
|
||
| request.addfinalizer(cleanup) | ||
| return construct_universal_test_data(local_redis_environment) | ||
|
|
||
|
|
||
| @pytest.fixture(scope="session") | ||
| def e2e_data_sources(environment: Environment, request): | ||
| def e2e_data_sources(environment: Environment): | ||
| df = create_dataset() | ||
| data_source = environment.data_source_creator.create_data_source( | ||
| df, environment.feature_store.project, field_mapping={"ts_1": "ts"}, | ||
| ) | ||
|
|
||
| def cleanup(): | ||
| environment.data_source_creator.teardown() | ||
| if environment.online_store_creator: | ||
| environment.online_store_creator.teardown() | ||
|
|
||
| request.addfinalizer(cleanup) | ||
|
|
||
| return df, data_source | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
infinite looping here might hang the whole testing flow