Skip to content

Commit 155feee

Browse files
committed
generating test environments bases on test markers and fixtures
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 1fd9c2d commit 155feee

19 files changed

+346
-324
lines changed

sdk/python/feast/infra/offline_stores/contrib/contrib_repo_configuration.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
from feast.infra.offline_stores.contrib.trino_offline_store.tests.data_source import (
55
TrinoSourceCreator,
66
)
7-
from tests.integration.feature_repos.integration_test_repo_config import (
8-
IntegrationTestRepoConfig,
7+
from tests.integration.feature_repos.repo_configuration import REDIS_CONFIG
8+
from tests.integration.feature_repos.universal.online_store.redis import (
9+
RedisOnlineStoreCreator,
910
)
1011

11-
FULL_REPO_CONFIGS = [
12-
IntegrationTestRepoConfig(offline_store_creator=SparkDataSourceCreator),
13-
IntegrationTestRepoConfig(offline_store_creator=TrinoSourceCreator),
12+
AVAILABLE_OFFLINE_STORES = [
13+
("local", SparkDataSourceCreator),
14+
("local", TrinoSourceCreator),
1415
]
16+
17+
AVAILABLE_ONLINE_STORES = {"redis": (REDIS_CONFIG, RedisOnlineStoreCreator)}
Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
11
from feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source import (
22
PostgreSQLDataSourceCreator,
33
)
4-
from tests.integration.feature_repos.integration_test_repo_config import (
5-
IntegrationTestRepoConfig,
6-
)
74

8-
FULL_REPO_CONFIGS = [
9-
IntegrationTestRepoConfig(
10-
provider="local",
11-
offline_store_creator=PostgreSQLDataSourceCreator,
12-
online_store_creator=PostgreSQLDataSourceCreator,
13-
),
14-
]
5+
AVAILABLE_OFFLINE_STORES = [("local", PostgreSQLDataSourceCreator)]
6+
7+
AVAILABLE_ONLINE_STORES = {"postgres": (None, PostgreSQLDataSourceCreator)}

sdk/python/feast/infra/online_stores/datastore.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import logging
1616
from datetime import datetime
1717
from multiprocessing.pool import ThreadPool
18-
from queue import Queue
18+
from queue import Empty, Queue
1919
from threading import Lock, Thread
2020
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple
2121

@@ -292,22 +292,24 @@ def increment(self):
292292

293293
def worker(shared_counter):
294294
while True:
295-
client.delete_multi(deletion_queue.get())
295+
try:
296+
job = deletion_queue.get(block=False)
297+
except Empty:
298+
return
299+
300+
client.delete_multi(job)
296301
shared_counter.increment()
297302
LOGGER.debug(
298303
f"batch deletions completed: {shared_counter.value} ({shared_counter.value * BATCH_SIZE} total entries) & outstanding queue size: {deletion_queue.qsize()}"
299304
)
300305
deletion_queue.task_done()
301306

302-
for _ in range(NUM_THREADS):
303-
Thread(target=worker, args=(status_info_counter,), daemon=True).start()
304-
305307
query = client.query(kind="Row", ancestor=key)
306-
while True:
307-
entities = list(query.fetch(limit=BATCH_SIZE))
308-
if not entities:
309-
break
310-
deletion_queue.put([entity.key for entity in entities])
308+
for page in query.fetch().pages:
309+
deletion_queue.put([entity.key for entity in page])
310+
311+
for _ in range(NUM_THREADS):
312+
Thread(target=worker, args=(status_info_counter,)).start()
311313

312314
deletion_queue.join()
313315

sdk/python/tests/benchmarks/test_benchmark_universal_online_retrieval.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
@pytest.mark.benchmark
1919
@pytest.mark.integration
20+
@pytest.mark.universal_online_stores
2021
def test_online_retrieval(environment, universal_data_sources, benchmark):
2122
fs = environment.feature_store
2223
entities, datasets, data_sources = universal_data_sources

sdk/python/tests/conftest.py

Lines changed: 131 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
# limitations under the License.
1414
import logging
1515
import multiprocessing
16-
import time
16+
import socket
17+
from contextlib import closing
1718
from datetime import datetime, timedelta
1819
from multiprocessing import Process
1920
from sys import platform
@@ -24,19 +25,22 @@
2425
from _pytest.nodes import Item
2526

2627
from feast import FeatureStore
28+
from feast.wait import wait_retry_backoff
2729
from tests.data.data_creator import create_dataset
2830
from tests.integration.feature_repos.integration_test_repo_config import (
2931
IntegrationTestRepoConfig,
3032
)
3133
from tests.integration.feature_repos.repo_configuration import (
32-
FULL_REPO_CONFIGS,
33-
REDIS_CLUSTER_CONFIG,
34-
REDIS_CONFIG,
34+
AVAILABLE_OFFLINE_STORES,
35+
AVAILABLE_ONLINE_STORES,
3536
Environment,
3637
TestData,
3738
construct_test_environment,
3839
construct_universal_test_data,
3940
)
41+
from tests.integration.feature_repos.universal.data_sources.file import (
42+
FileDataSourceCreator,
43+
)
4044

4145
logger = logging.getLogger(__name__)
4246

@@ -161,86 +165,152 @@ def start_test_local_server(repo_path: str, port: int):
161165
fs.serve("localhost", port, no_access_log=True)
162166

163167

164-
@pytest.fixture(
165-
params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS]
166-
)
167-
def environment(request, worker_id: str):
168+
@pytest.fixture(scope="session")
169+
def environment(request, worker_id):
168170
e = construct_test_environment(
169171
request.param, worker_id=worker_id, fixture_request=request
170172
)
173+
174+
yield e
175+
176+
e.feature_store.teardown()
177+
e.data_source_creator.teardown()
178+
if e.online_store_creator:
179+
e.online_store_creator.teardown()
180+
181+
182+
_config_cache = {}
183+
184+
185+
def pytest_generate_tests(metafunc):
186+
if "environment" in metafunc.fixturenames:
187+
markers = {m.name: m for m in metafunc.definition.own_markers}
188+
189+
if "universal_offline_stores" in markers:
190+
offline_stores = AVAILABLE_OFFLINE_STORES
191+
else:
192+
# default offline store for testing online store dimension
193+
offline_stores = [("local", FileDataSourceCreator)]
194+
195+
online_stores = None
196+
if "universal_online_stores" in markers:
197+
# Online stores are explicitly requested
198+
if "only" in markers["universal_online_stores"].kwargs:
199+
online_stores = [
200+
AVAILABLE_ONLINE_STORES.get(store_name)
201+
for store_name in markers["universal_online_stores"].kwargs["only"]
202+
if store_name in AVAILABLE_ONLINE_STORES
203+
]
204+
else:
205+
online_stores = AVAILABLE_ONLINE_STORES.values()
206+
207+
if online_stores is None:
208+
# No online stores requested -> setting the default or first available
209+
online_stores = [
210+
AVAILABLE_ONLINE_STORES.get(
211+
"redis",
212+
AVAILABLE_ONLINE_STORES.get(
213+
"sqlite", next(iter(AVAILABLE_ONLINE_STORES.values()))
214+
),
215+
)
216+
]
217+
218+
extra_dimensions = [{}]
219+
220+
if "python_server" in metafunc.fixturenames:
221+
extra_dimensions.extend(
222+
[
223+
{"python_feature_server": True},
224+
# {"python_feature_server": True, "provider": "aws"},
225+
]
226+
)
227+
228+
if "goserver" in markers:
229+
extra_dimensions.append({"go_feature_retrieval": True})
230+
231+
configs = []
232+
for provider, offline_store_creator in offline_stores:
233+
for online_store, online_store_creator in online_stores:
234+
for dim in extra_dimensions:
235+
config = {
236+
"provider": provider,
237+
"offline_store_creator": offline_store_creator,
238+
"online_store": online_store,
239+
"online_store_creator": online_store_creator,
240+
**dim,
241+
}
242+
# temporary Go works only with redis
243+
if config.get("go_feature_retrieval") and (
244+
not isinstance(online_store, dict)
245+
or online_store["type"] != "redis"
246+
):
247+
continue
248+
249+
# aws lambda works only with dynamo
250+
if (
251+
config.get("python_feature_server")
252+
and config.get("provider") == "aws"
253+
and (
254+
not isinstance(online_store, dict)
255+
or online_store["type"] != "dynamodb"
256+
)
257+
):
258+
continue
259+
260+
c = IntegrationTestRepoConfig(**config)
261+
262+
if c not in _config_cache:
263+
_config_cache[c] = c
264+
265+
configs.append(_config_cache[c])
266+
267+
metafunc.parametrize(
268+
"environment", configs, indirect=True, ids=[str(c) for c in configs]
269+
)
270+
271+
272+
@pytest.fixture(scope="session")
273+
def python_server(environment):
171274
proc = Process(
172275
target=start_test_local_server,
173-
args=(e.feature_store.repo_path, e.get_local_server_port()),
276+
args=(environment.feature_store.repo_path, environment.get_local_server_port()),
174277
daemon=True,
175278
)
176-
if e.python_feature_server and e.test_repo_config.provider == "local":
279+
if (
280+
environment.python_feature_server
281+
and environment.test_repo_config.provider == "local"
282+
):
177283
proc.start()
178284
# Wait for server to start
179-
time.sleep(3)
180-
181-
def cleanup():
182-
e.feature_store.teardown()
183-
if proc.is_alive():
184-
proc.kill()
185-
if e.online_store_creator:
186-
e.online_store_creator.teardown()
187-
188-
request.addfinalizer(cleanup)
189-
190-
return e
285+
wait_retry_backoff(
286+
lambda: (
287+
None,
288+
_check_port_open("localhost", environment.get_local_server_port()),
289+
),
290+
timeout_secs=10,
291+
)
191292

293+
yield
192294

193-
@pytest.fixture(
194-
params=[REDIS_CONFIG, REDIS_CLUSTER_CONFIG],
195-
scope="session",
196-
ids=[str(c) for c in [REDIS_CONFIG, REDIS_CLUSTER_CONFIG]],
197-
)
198-
def local_redis_environment(request, worker_id):
199-
e = construct_test_environment(
200-
IntegrationTestRepoConfig(online_store=request.param),
201-
worker_id=worker_id,
202-
fixture_request=request,
203-
)
295+
if proc.is_alive():
296+
proc.kill()
204297

205-
def cleanup():
206-
e.feature_store.teardown()
207298

208-
request.addfinalizer(cleanup)
209-
return e
299+
def _check_port_open(host, port) -> bool:
300+
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
301+
return sock.connect_ex((host, port)) == 0
210302

211303

212304
@pytest.fixture(scope="session")
213-
def universal_data_sources(request, environment) -> TestData:
214-
def cleanup():
215-
# logger.info("Running cleanup in %s, Request: %s", worker_id, request.param)
216-
environment.data_source_creator.teardown()
217-
218-
request.addfinalizer(cleanup)
305+
def universal_data_sources(environment) -> TestData:
219306
return construct_universal_test_data(environment)
220307

221308

222309
@pytest.fixture(scope="session")
223-
def redis_universal_data_sources(request, local_redis_environment):
224-
def cleanup():
225-
# logger.info("Running cleanup in %s, Request: %s", worker_id, request.param)
226-
local_redis_environment.data_source_creator.teardown()
227-
228-
request.addfinalizer(cleanup)
229-
return construct_universal_test_data(local_redis_environment)
230-
231-
232-
@pytest.fixture(scope="session")
233-
def e2e_data_sources(environment: Environment, request):
310+
def e2e_data_sources(environment: Environment):
234311
df = create_dataset()
235312
data_source = environment.data_source_creator.create_data_source(
236313
df, environment.feature_store.project, field_mapping={"ts_1": "ts"},
237314
)
238315

239-
def cleanup():
240-
environment.data_source_creator.teardown()
241-
if environment.online_store_creator:
242-
environment.online_store_creator.teardown()
243-
244-
request.addfinalizer(cleanup)
245-
246316
return df, data_source

0 commit comments

Comments
 (0)