Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
generating test environments bases on test markers and fixtures
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed May 10, 2022
commit 155feee8b03196879419085003dda892c3efa6ae
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
from feast.infra.offline_stores.contrib.trino_offline_store.tests.data_source import (
TrinoSourceCreator,
)
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
from tests.integration.feature_repos.repo_configuration import REDIS_CONFIG
from tests.integration.feature_repos.universal.online_store.redis import (
RedisOnlineStoreCreator,
)

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(offline_store_creator=SparkDataSourceCreator),
IntegrationTestRepoConfig(offline_store_creator=TrinoSourceCreator),
AVAILABLE_OFFLINE_STORES = [
("local", SparkDataSourceCreator),
("local", TrinoSourceCreator),
]

AVAILABLE_ONLINE_STORES = {"redis": (REDIS_CONFIG, RedisOnlineStoreCreator)}
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
from feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source import (
PostgreSQLDataSourceCreator,
)
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(
provider="local",
offline_store_creator=PostgreSQLDataSourceCreator,
online_store_creator=PostgreSQLDataSourceCreator,
),
]
AVAILABLE_OFFLINE_STORES = [("local", PostgreSQLDataSourceCreator)]

AVAILABLE_ONLINE_STORES = {"postgres": (None, PostgreSQLDataSourceCreator)}
22 changes: 12 additions & 10 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
from datetime import datetime
from multiprocessing.pool import ThreadPool
from queue import Queue
from queue import Empty, Queue
from threading import Lock, Thread
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple

Expand Down Expand Up @@ -292,22 +292,24 @@ def increment(self):

def worker(shared_counter):
while True:
client.delete_multi(deletion_queue.get())
try:
job = deletion_queue.get(block=False)
except Empty:
return

client.delete_multi(job)
shared_counter.increment()
LOGGER.debug(
f"batch deletions completed: {shared_counter.value} ({shared_counter.value * BATCH_SIZE} total entries) & outstanding queue size: {deletion_queue.qsize()}"
)
deletion_queue.task_done()

for _ in range(NUM_THREADS):
Thread(target=worker, args=(status_info_counter,), daemon=True).start()

query = client.query(kind="Row", ancestor=key)
while True:
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

infinite looping here might hang the whole testing flow

entities = list(query.fetch(limit=BATCH_SIZE))
if not entities:
break
deletion_queue.put([entity.key for entity in entities])
for page in query.fetch().pages:
deletion_queue.put([entity.key for entity in page])

for _ in range(NUM_THREADS):
Thread(target=worker, args=(status_info_counter,)).start()

deletion_queue.join()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

@pytest.mark.benchmark
@pytest.mark.integration
@pytest.mark.universal_online_stores
def test_online_retrieval(environment, universal_data_sources, benchmark):
fs = environment.feature_store
entities, datasets, data_sources = universal_data_sources
Expand Down
192 changes: 131 additions & 61 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.
import logging
import multiprocessing
import time
import socket
from contextlib import closing
from datetime import datetime, timedelta
from multiprocessing import Process
from sys import platform
Expand All @@ -24,19 +25,22 @@
from _pytest.nodes import Item

from feast import FeatureStore
from feast.wait import wait_retry_backoff
from tests.data.data_creator import create_dataset
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.repo_configuration import (
FULL_REPO_CONFIGS,
REDIS_CLUSTER_CONFIG,
REDIS_CONFIG,
AVAILABLE_OFFLINE_STORES,
AVAILABLE_ONLINE_STORES,
Environment,
TestData,
construct_test_environment,
construct_universal_test_data,
)
from tests.integration.feature_repos.universal.data_sources.file import (
FileDataSourceCreator,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -161,86 +165,152 @@ def start_test_local_server(repo_path: str, port: int):
fs.serve("localhost", port, no_access_log=True)


@pytest.fixture(
params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS]
)
def environment(request, worker_id: str):
@pytest.fixture(scope="session")
def environment(request, worker_id):
e = construct_test_environment(
request.param, worker_id=worker_id, fixture_request=request
)

yield e

e.feature_store.teardown()
e.data_source_creator.teardown()
if e.online_store_creator:
e.online_store_creator.teardown()


_config_cache = {}


def pytest_generate_tests(metafunc):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some docs to how this pytest_generate_tests thingy works? It's the first time I've seen it used and seems extremely powerful

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.

if "environment" in metafunc.fixturenames:
markers = {m.name: m for m in metafunc.definition.own_markers}

if "universal_offline_stores" in markers:
offline_stores = AVAILABLE_OFFLINE_STORES
else:
# default offline store for testing online store dimension
offline_stores = [("local", FileDataSourceCreator)]

online_stores = None
if "universal_online_stores" in markers:
Comment on lines 192 to 199
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we get rid of the universal flag? since it doesn't seem like the online and offline versions are more versatile?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I need a name here that would represent that specific tests works with all offline stores (universal_offline_stores mark currently) or all online stores (universal_online_stores mark). Any ideas for a better name?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the pytest.mark.universal flag (since it seems like this the universal_online_stores and universal_offline_stores together would represent the same thing that current flag is supposed to represent).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Removed universal.

# Online stores are explicitly requested
if "only" in markers["universal_online_stores"].kwargs:
online_stores = [
AVAILABLE_ONLINE_STORES.get(store_name)
for store_name in markers["universal_online_stores"].kwargs["only"]
if store_name in AVAILABLE_ONLINE_STORES
]
else:
online_stores = AVAILABLE_ONLINE_STORES.values()

if online_stores is None:
# No online stores requested -> setting the default or first available
online_stores = [
AVAILABLE_ONLINE_STORES.get(
"redis",
AVAILABLE_ONLINE_STORES.get(
"sqlite", next(iter(AVAILABLE_ONLINE_STORES.values()))
),
)
]

extra_dimensions = [{}]

if "python_server" in metafunc.fixturenames:
extra_dimensions.extend(
[
{"python_feature_server": True},
# {"python_feature_server": True, "provider": "aws"},
]
)

if "goserver" in markers:
extra_dimensions.append({"go_feature_retrieval": True})

configs = []
for provider, offline_store_creator in offline_stores:
for online_store, online_store_creator in online_stores:
for dim in extra_dimensions:
config = {
"provider": provider,
"offline_store_creator": offline_store_creator,
"online_store": online_store,
"online_store_creator": online_store_creator,
**dim,
}
# temporary Go works only with redis
if config.get("go_feature_retrieval") and (
not isinstance(online_store, dict)
or online_store["type"] != "redis"
):
continue

# aws lambda works only with dynamo
if (
config.get("python_feature_server")
and config.get("provider") == "aws"
and (
not isinstance(online_store, dict)
or online_store["type"] != "dynamodb"
)
):
continue

c = IntegrationTestRepoConfig(**config)

if c not in _config_cache:
_config_cache[c] = c

configs.append(_config_cache[c])

metafunc.parametrize(
"environment", configs, indirect=True, ids=[str(c) for c in configs]
)


@pytest.fixture(scope="session")
def python_server(environment):
proc = Process(
target=start_test_local_server,
args=(e.feature_store.repo_path, e.get_local_server_port()),
args=(environment.feature_store.repo_path, environment.get_local_server_port()),
daemon=True,
)
if e.python_feature_server and e.test_repo_config.provider == "local":
if (
environment.python_feature_server
and environment.test_repo_config.provider == "local"
):
proc.start()
# Wait for server to start
time.sleep(3)

def cleanup():
e.feature_store.teardown()
if proc.is_alive():
proc.kill()
if e.online_store_creator:
e.online_store_creator.teardown()

request.addfinalizer(cleanup)

return e
wait_retry_backoff(
lambda: (
None,
_check_port_open("localhost", environment.get_local_server_port()),
),
timeout_secs=10,
)

yield

@pytest.fixture(
params=[REDIS_CONFIG, REDIS_CLUSTER_CONFIG],
scope="session",
ids=[str(c) for c in [REDIS_CONFIG, REDIS_CLUSTER_CONFIG]],
)
def local_redis_environment(request, worker_id):
e = construct_test_environment(
IntegrationTestRepoConfig(online_store=request.param),
worker_id=worker_id,
fixture_request=request,
)
if proc.is_alive():
proc.kill()

def cleanup():
e.feature_store.teardown()

request.addfinalizer(cleanup)
return e
def _check_port_open(host, port) -> bool:
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
return sock.connect_ex((host, port)) == 0


@pytest.fixture(scope="session")
def universal_data_sources(request, environment) -> TestData:
def cleanup():
# logger.info("Running cleanup in %s, Request: %s", worker_id, request.param)
environment.data_source_creator.teardown()

request.addfinalizer(cleanup)
def universal_data_sources(environment) -> TestData:
return construct_universal_test_data(environment)


@pytest.fixture(scope="session")
def redis_universal_data_sources(request, local_redis_environment):
def cleanup():
# logger.info("Running cleanup in %s, Request: %s", worker_id, request.param)
local_redis_environment.data_source_creator.teardown()

request.addfinalizer(cleanup)
return construct_universal_test_data(local_redis_environment)


@pytest.fixture(scope="session")
def e2e_data_sources(environment: Environment, request):
def e2e_data_sources(environment: Environment):
df = create_dataset()
data_source = environment.data_source_creator.create_data_source(
df, environment.feature_store.project, field_mapping={"ts_1": "ts"},
)

def cleanup():
environment.data_source_creator.teardown()
if environment.online_store_creator:
environment.online_store_creator.teardown()

request.addfinalizer(cleanup)

return df, data_source
Loading