|
13 | 13 | # limitations under the License. |
14 | 14 | import logging |
15 | 15 | import multiprocessing |
16 | | -import time |
| 16 | +import socket |
| 17 | +from contextlib import closing |
17 | 18 | from datetime import datetime, timedelta |
18 | 19 | from multiprocessing import Process |
19 | 20 | from sys import platform |
|
24 | 25 | from _pytest.nodes import Item |
25 | 26 |
|
26 | 27 | from feast import FeatureStore |
| 28 | +from feast.wait import wait_retry_backoff |
27 | 29 | from tests.data.data_creator import create_dataset |
28 | 30 | from tests.integration.feature_repos.integration_test_repo_config import ( |
29 | 31 | IntegrationTestRepoConfig, |
30 | 32 | ) |
31 | 33 | from tests.integration.feature_repos.repo_configuration import ( |
32 | | - FULL_REPO_CONFIGS, |
33 | | - REDIS_CLUSTER_CONFIG, |
34 | | - REDIS_CONFIG, |
| 34 | + AVAILABLE_OFFLINE_STORES, |
| 35 | + AVAILABLE_ONLINE_STORES, |
35 | 36 | Environment, |
36 | 37 | TestData, |
37 | 38 | construct_test_environment, |
38 | 39 | construct_universal_test_data, |
39 | 40 | ) |
| 41 | +from tests.integration.feature_repos.universal.data_sources.file import ( |
| 42 | + FileDataSourceCreator, |
| 43 | +) |
40 | 44 |
|
41 | 45 | logger = logging.getLogger(__name__) |
42 | 46 |
|
@@ -161,86 +165,152 @@ def start_test_local_server(repo_path: str, port: int): |
161 | 165 | fs.serve("localhost", port, no_access_log=True) |
162 | 166 |
|
163 | 167 |
|
164 | | -@pytest.fixture( |
165 | | - params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS] |
166 | | -) |
167 | | -def environment(request, worker_id: str): |
| 168 | +@pytest.fixture(scope="session") |
| 169 | +def environment(request, worker_id): |
168 | 170 | e = construct_test_environment( |
169 | 171 | request.param, worker_id=worker_id, fixture_request=request |
170 | 172 | ) |
| 173 | + |
| 174 | + yield e |
| 175 | + |
| 176 | + e.feature_store.teardown() |
| 177 | + e.data_source_creator.teardown() |
| 178 | + if e.online_store_creator: |
| 179 | + e.online_store_creator.teardown() |
| 180 | + |
| 181 | + |
| 182 | +_config_cache = {} |
| 183 | + |
| 184 | + |
| 185 | +def pytest_generate_tests(metafunc): |
| 186 | + if "environment" in metafunc.fixturenames: |
| 187 | + markers = {m.name: m for m in metafunc.definition.own_markers} |
| 188 | + |
| 189 | + if "universal_offline_stores" in markers: |
| 190 | + offline_stores = AVAILABLE_OFFLINE_STORES |
| 191 | + else: |
| 192 | + # default offline store for testing online store dimension |
| 193 | + offline_stores = [("local", FileDataSourceCreator)] |
| 194 | + |
| 195 | + online_stores = None |
| 196 | + if "universal_online_stores" in markers: |
| 197 | + # Online stores are explicitly requested |
| 198 | + if "only" in markers["universal_online_stores"].kwargs: |
| 199 | + online_stores = [ |
| 200 | + AVAILABLE_ONLINE_STORES.get(store_name) |
| 201 | + for store_name in markers["universal_online_stores"].kwargs["only"] |
| 202 | + if store_name in AVAILABLE_ONLINE_STORES |
| 203 | + ] |
| 204 | + else: |
| 205 | + online_stores = AVAILABLE_ONLINE_STORES.values() |
| 206 | + |
| 207 | + if online_stores is None: |
| 208 | + # No online stores requested -> setting the default or first available |
| 209 | + online_stores = [ |
| 210 | + AVAILABLE_ONLINE_STORES.get( |
| 211 | + "redis", |
| 212 | + AVAILABLE_ONLINE_STORES.get( |
| 213 | + "sqlite", next(iter(AVAILABLE_ONLINE_STORES.values())) |
| 214 | + ), |
| 215 | + ) |
| 216 | + ] |
| 217 | + |
| 218 | + extra_dimensions = [{}] |
| 219 | + |
| 220 | + if "python_server" in metafunc.fixturenames: |
| 221 | + extra_dimensions.extend( |
| 222 | + [ |
| 223 | + {"python_feature_server": True}, |
| 224 | + # {"python_feature_server": True, "provider": "aws"}, |
| 225 | + ] |
| 226 | + ) |
| 227 | + |
| 228 | + if "goserver" in markers: |
| 229 | + extra_dimensions.append({"go_feature_retrieval": True}) |
| 230 | + |
| 231 | + configs = [] |
| 232 | + for provider, offline_store_creator in offline_stores: |
| 233 | + for online_store, online_store_creator in online_stores: |
| 234 | + for dim in extra_dimensions: |
| 235 | + config = { |
| 236 | + "provider": provider, |
| 237 | + "offline_store_creator": offline_store_creator, |
| 238 | + "online_store": online_store, |
| 239 | + "online_store_creator": online_store_creator, |
| 240 | + **dim, |
| 241 | + } |
| 242 | + # temporary Go works only with redis |
| 243 | + if config.get("go_feature_retrieval") and ( |
| 244 | + not isinstance(online_store, dict) |
| 245 | + or online_store["type"] != "redis" |
| 246 | + ): |
| 247 | + continue |
| 248 | + |
| 249 | + # aws lambda works only with dynamo |
| 250 | + if ( |
| 251 | + config.get("python_feature_server") |
| 252 | + and config.get("provider") == "aws" |
| 253 | + and ( |
| 254 | + not isinstance(online_store, dict) |
| 255 | + or online_store["type"] != "dynamodb" |
| 256 | + ) |
| 257 | + ): |
| 258 | + continue |
| 259 | + |
| 260 | + c = IntegrationTestRepoConfig(**config) |
| 261 | + |
| 262 | + if c not in _config_cache: |
| 263 | + _config_cache[c] = c |
| 264 | + |
| 265 | + configs.append(_config_cache[c]) |
| 266 | + |
| 267 | + metafunc.parametrize( |
| 268 | + "environment", configs, indirect=True, ids=[str(c) for c in configs] |
| 269 | + ) |
| 270 | + |
| 271 | + |
| 272 | +@pytest.fixture(scope="session") |
| 273 | +def python_server(environment): |
171 | 274 | proc = Process( |
172 | 275 | target=start_test_local_server, |
173 | | - args=(e.feature_store.repo_path, e.get_local_server_port()), |
| 276 | + args=(environment.feature_store.repo_path, environment.get_local_server_port()), |
174 | 277 | daemon=True, |
175 | 278 | ) |
176 | | - if e.python_feature_server and e.test_repo_config.provider == "local": |
| 279 | + if ( |
| 280 | + environment.python_feature_server |
| 281 | + and environment.test_repo_config.provider == "local" |
| 282 | + ): |
177 | 283 | proc.start() |
178 | 284 | # Wait for server to start |
179 | | - time.sleep(3) |
180 | | - |
181 | | - def cleanup(): |
182 | | - e.feature_store.teardown() |
183 | | - if proc.is_alive(): |
184 | | - proc.kill() |
185 | | - if e.online_store_creator: |
186 | | - e.online_store_creator.teardown() |
187 | | - |
188 | | - request.addfinalizer(cleanup) |
189 | | - |
190 | | - return e |
| 285 | + wait_retry_backoff( |
| 286 | + lambda: ( |
| 287 | + None, |
| 288 | + _check_port_open("localhost", environment.get_local_server_port()), |
| 289 | + ), |
| 290 | + timeout_secs=10, |
| 291 | + ) |
191 | 292 |
|
| 293 | + yield |
192 | 294 |
|
193 | | -@pytest.fixture( |
194 | | - params=[REDIS_CONFIG, REDIS_CLUSTER_CONFIG], |
195 | | - scope="session", |
196 | | - ids=[str(c) for c in [REDIS_CONFIG, REDIS_CLUSTER_CONFIG]], |
197 | | -) |
198 | | -def local_redis_environment(request, worker_id): |
199 | | - e = construct_test_environment( |
200 | | - IntegrationTestRepoConfig(online_store=request.param), |
201 | | - worker_id=worker_id, |
202 | | - fixture_request=request, |
203 | | - ) |
| 295 | + if proc.is_alive(): |
| 296 | + proc.kill() |
204 | 297 |
|
205 | | - def cleanup(): |
206 | | - e.feature_store.teardown() |
207 | 298 |
|
208 | | - request.addfinalizer(cleanup) |
209 | | - return e |
| 299 | +def _check_port_open(host, port) -> bool: |
| 300 | + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: |
| 301 | + return sock.connect_ex((host, port)) == 0 |
210 | 302 |
|
211 | 303 |
|
212 | 304 | @pytest.fixture(scope="session") |
213 | | -def universal_data_sources(request, environment) -> TestData: |
214 | | - def cleanup(): |
215 | | - # logger.info("Running cleanup in %s, Request: %s", worker_id, request.param) |
216 | | - environment.data_source_creator.teardown() |
217 | | - |
218 | | - request.addfinalizer(cleanup) |
| 305 | +def universal_data_sources(environment) -> TestData: |
219 | 306 | return construct_universal_test_data(environment) |
220 | 307 |
|
221 | 308 |
|
222 | 309 | @pytest.fixture(scope="session") |
223 | | -def redis_universal_data_sources(request, local_redis_environment): |
224 | | - def cleanup(): |
225 | | - # logger.info("Running cleanup in %s, Request: %s", worker_id, request.param) |
226 | | - local_redis_environment.data_source_creator.teardown() |
227 | | - |
228 | | - request.addfinalizer(cleanup) |
229 | | - return construct_universal_test_data(local_redis_environment) |
230 | | - |
231 | | - |
232 | | -@pytest.fixture(scope="session") |
233 | | -def e2e_data_sources(environment: Environment, request): |
| 310 | +def e2e_data_sources(environment: Environment): |
234 | 311 | df = create_dataset() |
235 | 312 | data_source = environment.data_source_creator.create_data_source( |
236 | 313 | df, environment.feature_store.project, field_mapping={"ts_1": "ts"}, |
237 | 314 | ) |
238 | 315 |
|
239 | | - def cleanup(): |
240 | | - environment.data_source_creator.teardown() |
241 | | - if environment.online_store_creator: |
242 | | - environment.online_store_creator.teardown() |
243 | | - |
244 | | - request.addfinalizer(cleanup) |
245 | | - |
246 | 316 | return df, data_source |
0 commit comments