diff --git a/Makefile b/Makefile index 799bc9c42fc..42cdac3dc25 100644 --- a/Makefile +++ b/Makefile @@ -168,9 +168,7 @@ benchmark-python-local: ## Run integration + benchmark tests for Python (local d test-python-unit: ## Run Python unit tests (use pattern= to filter tests, e.g., pattern=milvus, pattern=test_online_retrieval.py, pattern=test_online_retrieval.py::test_get_online_features_milvus) uv run python -m pytest -n 8 --color=yes $(if $(pattern),-k "$(pattern)") \ - --ignore=sdk/python/tests/component/ray \ - --ignore=sdk/python/tests/component/spark \ - sdk/python/tests + sdk/python/tests/unit # Fast unit tests only test-python-unit-fast: ## Run fast unit tests only (no external dependencies) diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/integration/doctest/test_all.py similarity index 99% rename from sdk/python/tests/doctest/test_all.py rename to sdk/python/tests/integration/doctest/test_all.py index bf9e63be6e2..9620aabf55b 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/integration/doctest/test_all.py @@ -5,9 +5,13 @@ import traceback import unittest +import pytest + import feast from feast.utils import _utc_now +pytestmark = pytest.mark.integration + FILES_TO_IGNORE = {"app"} diff --git a/sdk/python/tests/unit/local_feast_tests/test_e2e_local.py b/sdk/python/tests/integration/local_feast_tests/test_e2e_local.py similarity index 99% rename from sdk/python/tests/unit/local_feast_tests/test_e2e_local.py rename to sdk/python/tests/integration/local_feast_tests/test_e2e_local.py index ef11c8cccfa..cee295775b1 100644 --- a/sdk/python/tests/unit/local_feast_tests/test_e2e_local.py +++ b/sdk/python/tests/integration/local_feast_tests/test_e2e_local.py @@ -18,6 +18,8 @@ from tests.utils.cli_repo_creator import CliRunner, get_example_repo from tests.utils.feature_records import validate_online_features +pytestmark = pytest.mark.integration + @pytest.mark.skipif( platform.system() == "Darwin" and os.environ.get("CI") == "true", diff --git a/sdk/python/tests/integration/offline_server/test_offline_server.py b/sdk/python/tests/integration/offline_server/test_offline_server.py new file mode 100644 index 00000000000..b213c64ccb2 --- /dev/null +++ b/sdk/python/tests/integration/offline_server/test_offline_server.py @@ -0,0 +1,375 @@ +import os +import tempfile +from datetime import datetime, timedelta +from unittest.mock import patch + +import assertpy +import pandas as pd +import pyarrow as pa +import pyarrow.flight as flight +import pytest + +from feast import FeatureStore, FeatureView, FileSource +from feast.errors import FeatureViewNotFoundException +from feast.feature_logging import FeatureServiceLoggingSource +from feast.infra.offline_stores.remote import ( + RemoteOfflineStore, + RemoteOfflineStoreConfig, +) +from feast.offline_server import OfflineServer, _init_auth_manager +from feast.repo_config import RepoConfig +from feast.torch_wrapper import get_torch +from tests.utils.cli_repo_creator import CliRunner + +pytestmark = pytest.mark.integration + +PROJECT_NAME = "test_remote_offline" + + +@pytest.fixture +def empty_offline_server(environment): + store = environment.feature_store + + location = "grpc+tcp://localhost:0" + _init_auth_manager(store=store) + return OfflineServer(store=store, location=location) + + +@pytest.fixture +def arrow_client(empty_offline_server): + return flight.FlightClient(f"grpc://localhost:{empty_offline_server.port}") + + +def test_offline_server_is_alive(environment, empty_offline_server, arrow_client): + server = empty_offline_server + client = arrow_client + + assertpy.assert_that(server).is_not_none + assertpy.assert_that(server.port).is_not_equal_to(0) + + actions = list(client.list_actions()) + flights = list(client.list_flights()) + + assertpy.assert_that(actions).is_equal_to( + [ + ( + "offline_write_batch", + "Writes the specified arrow table to the data source underlying the specified feature view.", + ), + ( + "write_logged_features", + "Writes logged features to a specified destination in the offline store.", + ), + ( + "persist", + "Synchronously executes the underlying query and persists the result in the same offline store at the " + "specified destination.", + ), + ] + ) + assertpy.assert_that(flights).is_empty() + + +def default_store(temp_dir): + runner = CliRunner() + result = runner.run(["init", PROJECT_NAME], cwd=temp_dir) + repo_path = os.path.join(temp_dir, PROJECT_NAME, "feature_repo") + assert result.returncode == 0 + + result = runner.run(["--chdir", repo_path, "apply"], cwd=temp_dir) + assert result.returncode == 0 + + fs = FeatureStore(repo_path=repo_path) + return fs + + +def remote_feature_store(offline_server): + offline_config = RemoteOfflineStoreConfig( + type="remote", host="0.0.0.0", port=offline_server.port + ) + + registry_path = os.path.join( + str(offline_server.store.repo_path), + offline_server.store.config.registry.path, + ) + store = FeatureStore( + config=RepoConfig( + project=PROJECT_NAME, + registry=registry_path, + provider="local", + offline_store=offline_config, + entity_key_serialization_version=3, + # repo_config = + ) + ) + return store + + +def test_remote_offline_store_apis(): + with tempfile.TemporaryDirectory() as temp_dir: + store = default_store(str(temp_dir)) + location = "grpc+tcp://localhost:0" + + _init_auth_manager(store=store) + server = OfflineServer(store=store, location=location) + + assertpy.assert_that(server).is_not_none + assertpy.assert_that(server.port).is_not_equal_to(0) + + fs = remote_feature_store(server) + + _test_get_historical_features_returns_data(fs) + _test_get_historical_features_to_tensor(fs) + _test_get_historical_features_returns_nan(fs) + _test_get_historical_features_to_tensor_with_nan(fs) + _test_offline_write_batch(str(temp_dir), fs) + _test_write_logged_features(str(temp_dir), fs) + _test_pull_latest_from_table_or_query(str(temp_dir), fs) + _test_pull_all_from_table_or_query(str(temp_dir), fs) + + +def test_remote_offline_store_exception_handling(): + with tempfile.TemporaryDirectory() as temp_dir: + store = default_store(str(temp_dir)) + location = "grpc+tcp://localhost:0" + + _init_auth_manager(store=store) + server = OfflineServer(store=store, location=location) + + assertpy.assert_that(server).is_not_none + assertpy.assert_that(server.port).is_not_equal_to(0) + + fs = remote_feature_store(server) + data_file = os.path.join( + temp_dir, fs.project, "feature_repo/data/driver_stats.parquet" + ) + data_df = pd.read_parquet(data_file) + + with pytest.raises( + FeatureViewNotFoundException, + match="Feature view test does not exist in project test_remote_offline", + ): + RemoteOfflineStore.offline_write_batch( + fs.config, + FeatureView(name="test", source=FileSource(path="test")), + pa.Table.from_pandas(data_df), + progress=None, + ) + + +def _test_get_historical_features_returns_data(fs: FeatureStore): + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002, 1003], + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + "label_driver_reported_satisfaction": [1, 5, 3], + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } + ) + + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ] + + training_df = fs.get_historical_features(entity_df, features).to_df() + + assertpy.assert_that(training_df).is_not_none() + assertpy.assert_that(len(training_df)).is_equal_to(3) + + for index, driver_id in enumerate(entity_df["driver_id"]): + assertpy.assert_that(training_df["driver_id"][index]).is_equal_to(driver_id) + for feature in features: + column_id = feature.split(":")[1] + value = training_df[column_id][index] + assertpy.assert_that(value).is_not_nan() + + +def _test_get_historical_features_to_tensor(fs: FeatureStore): + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002, 1003], + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + "label_driver_reported_satisfaction": [1, 5, 3], + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } + ) + + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ] + + job = fs.get_historical_features(entity_df, features) + tensor_data = job.to_tensor() + + assertpy.assert_that(tensor_data).is_not_none() + assertpy.assert_that(tensor_data["driver_id"].shape[0]).is_equal_to(3) + torch = get_torch() + for key, values in tensor_data.items(): + if isinstance(values, torch.Tensor): + assertpy.assert_that(values.shape[0]).is_equal_to(3) + for val in values: + val_float = val.item() + assertpy.assert_that(val_float).is_instance_of((float, int)) + assertpy.assert_that(val_float).is_not_nan() + + +def _test_get_historical_features_returns_nan(fs: FeatureStore): + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1, 2, 3], + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + "label_driver_reported_satisfaction": [1, 5, 3], + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } + ) + + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ] + + training_df = fs.get_historical_features(entity_df, features).to_df() + + assertpy.assert_that(training_df).is_not_none() + assertpy.assert_that(len(training_df)).is_equal_to(3) + + for index, driver_id in enumerate(entity_df["driver_id"]): + assertpy.assert_that(training_df["driver_id"][index]).is_equal_to(driver_id) + for feature in features: + column_id = feature.split(":")[1] + value = training_df[column_id][index] + assertpy.assert_that(value).is_nan() + + +def _test_get_historical_features_to_tensor_with_nan(fs: FeatureStore): + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [9991, 9992], # IDs with no matching features + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 10, 59, 42), + ], + } + ) + features = ["driver_hourly_stats:conv_rate"] + job = fs.get_historical_features(entity_df, features) + tensor_data = job.to_tensor() + assert "conv_rate" in tensor_data + values = tensor_data["conv_rate"] + # conv_rate is a float feature, missing values should be NaN + torch = get_torch() + for val in values: + assert isinstance(val, torch.Tensor) or torch.is_tensor(val) + assertpy.assert_that(torch.isnan(val).item()).is_true() + + +def _test_offline_write_batch(temp_dir, fs: FeatureStore): + data_file = os.path.join( + temp_dir, fs.project, "feature_repo/data/driver_stats.parquet" + ) + data_df = pd.read_parquet(data_file) + feature_view = fs.get_feature_view("driver_hourly_stats") + + RemoteOfflineStore.offline_write_batch( + fs.config, feature_view, pa.Table.from_pandas(data_df), progress=None + ) + + +def _test_write_logged_features(temp_dir, fs: FeatureStore): + data_file = os.path.join( + temp_dir, fs.project, "feature_repo/data/driver_stats.parquet" + ) + data_df = pd.read_parquet(data_file) + feature_service = fs.get_feature_service("driver_activity_v1") + + RemoteOfflineStore.write_logged_features( + config=fs.config, + data=pa.Table.from_pandas(data_df), + source=FeatureServiceLoggingSource(feature_service, fs.config.project), + logging_config=feature_service.logging_config, + registry=fs.registry, + ) + + +def _test_pull_latest_from_table_or_query(temp_dir, fs: FeatureStore): + data_source = fs.get_data_source("driver_hourly_stats_source") + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + RemoteOfflineStore.pull_latest_from_table_or_query( + config=fs.config, + data_source=data_source, + join_key_columns=[], + feature_name_columns=[], + timestamp_field="event_timestamp", + created_timestamp_column="created", + start_date=start_date, + end_date=end_date, + ).to_df() + + +def _test_pull_all_from_table_or_query(temp_dir, fs: FeatureStore): + data_source = fs.get_data_source("driver_hourly_stats_source") + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + RemoteOfflineStore.pull_all_from_table_or_query( + config=fs.config, + data_source=data_source, + join_key_columns=[], + feature_name_columns=[], + timestamp_field="event_timestamp", + start_date=start_date, + end_date=end_date, + ).to_df() + + +def test_get_feature_view_by_name_propagates_transient_errors(): + """Transient registry errors must not be swallowed and misreported as + FeatureViewNotFoundException.""" + with tempfile.TemporaryDirectory() as temp_dir: + store = default_store(str(temp_dir)) + location = "grpc+tcp://localhost:0" + + _init_auth_manager(store=store) + server = OfflineServer(store=store, location=location) + + transient_error = ConnectionError("registry temporarily unavailable") + + with patch.object( + server.store.registry, + "get_feature_view", + side_effect=transient_error, + ): + with pytest.raises(ConnectionError, match="registry temporarily"): + server.get_feature_view_by_name( + fv_name="driver_hourly_stats", + name_alias=None, + project=PROJECT_NAME, + ) diff --git a/sdk/python/tests/integration/online_store/test_hybrid_online_store.py b/sdk/python/tests/integration/online_store/test_hybrid_online_store.py index 4b9dad05ff8..26f07c54b14 100644 --- a/sdk/python/tests/integration/online_store/test_hybrid_online_store.py +++ b/sdk/python/tests/integration/online_store/test_hybrid_online_store.py @@ -12,6 +12,8 @@ from feast.protos.feast.types.Value_pb2 import Value from feast.types import PrimitiveFeastType +pytestmark = pytest.mark.integration + @pytest.fixture def sample_entity(): diff --git a/sdk/python/tests/integration/online_store/test_mongodb_online_retrieval.py b/sdk/python/tests/integration/online_store/test_mongodb_online_retrieval.py new file mode 100644 index 00000000000..9c198d9a617 --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_mongodb_online_retrieval.py @@ -0,0 +1,165 @@ +"""MongoDB online store integration tests.""" + +import pytest + +from feast.protos.feast.types.EntityKey_pb2 import ( + EntityKey as EntityKeyProto, +) +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.utils import _utc_now +from tests.universal.feature_repos.universal.feature_views import TAGS +from tests.utils.cli_repo_creator import CliRunner, get_example_repo + +pytestmark = pytest.mark.integration +pytest.importorskip("pymongo") + +docker_available = False +try: + import docker + from testcontainers.mongodb import MongoDbContainer + + try: + client = docker.from_env() + client.ping() + docker_available = True + except Exception: + pass +except ImportError: + pass + +_requires_docker = pytest.mark.skipif( + not docker_available, + reason="Docker is not available or not running. Start Docker daemon to run these tests.", +) + + +@pytest.fixture(scope="module") +def mongodb_container(): + container = MongoDbContainer( + "mongo:latest", + username="test", + password="test", # pragma: allowlist secret + ).with_exposed_ports(27017) + container.start() + yield container + container.stop() + + +@pytest.fixture +def mongodb_connection_string(mongodb_container): + exposed_port = mongodb_container.get_exposed_port(27017) + return f"mongodb://test:test@localhost:{exposed_port}" # pragma: allowlist secret + + +@_requires_docker +def test_mongodb_online_features(mongodb_connection_string): + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_1.py"), + offline_store="file", + online_store="mongodb", + teardown=False, + ) as store: + store.config.online_store.connection_string = mongodb_connection_string + + driver_locations_fv = store.get_feature_view(name="driver_locations") + customer_profile_fv = store.get_feature_view(name="customer_profile") + customer_driver_combined_fv = store.get_feature_view( + name="customer_driver_combined" + ) + + provider = store._get_provider() + + driver_key = EntityKeyProto( + join_keys=["driver_id"], entity_values=[ValueProto(int64_val=1)] + ) + provider.online_write_batch( + config=store.config, + table=driver_locations_fv, + data=[ + ( + driver_key, + { + "lat": ValueProto(double_val=0.1), + "lon": ValueProto(string_val="1.0"), + }, + _utc_now(), + _utc_now(), + ) + ], + progress=None, + ) + + customer_key = EntityKeyProto( + join_keys=["customer_id"], entity_values=[ValueProto(string_val="5")] + ) + provider.online_write_batch( + config=store.config, + table=customer_profile_fv, + data=[ + ( + customer_key, + { + "avg_orders_day": ValueProto(float_val=1.0), + "name": ValueProto(string_val="John"), + "age": ValueProto(int64_val=3), + }, + _utc_now(), + _utc_now(), + ) + ], + progress=None, + ) + + customer_key = EntityKeyProto( + join_keys=["customer_id", "driver_id"], + entity_values=[ValueProto(string_val="5"), ValueProto(int64_val=1)], + ) + provider.online_write_batch( + config=store.config, + table=customer_driver_combined_fv, + data=[ + ( + customer_key, + {"trips": ValueProto(int64_val=7)}, + _utc_now(), + _utc_now(), + ) + ], + progress=None, + ) + + assert len(store.list_entities()) == 3 + assert len(store.list_entities(tags=TAGS)) == 2 + + result = store.get_online_features( + features=[ + "driver_locations:lon", + "customer_profile:avg_orders_day", + "customer_profile:name", + "customer_driver_combined:trips", + ], + entity_rows=[ + {"driver_id": 1, "customer_id": "5"}, + {"driver_id": 1, "customer_id": 5}, + ], + full_feature_names=False, + ).to_dict() + + assert "lon" in result + assert "avg_orders_day" in result + assert "name" in result + assert result["driver_id"] == [1, 1] + assert result["customer_id"] == ["5", "5"] + assert result["lon"] == ["1.0", "1.0"] + assert result["avg_orders_day"] == [1.0, 1.0] + assert result["name"] == ["John", "John"] + assert result["trips"] == [7, 7] + + result = store.get_online_features( + features=["customer_driver_combined:trips"], + entity_rows=[{"driver_id": 0, "customer_id": 0}], + full_feature_names=False, + ).to_dict() + + assert result["trips"] == [None] diff --git a/sdk/python/tests/unit/permissions/auth/server/test_auth_registry_server.py b/sdk/python/tests/integration/permissions/auth/server/test_auth_registry_server.py similarity index 99% rename from sdk/python/tests/unit/permissions/auth/server/test_auth_registry_server.py rename to sdk/python/tests/integration/permissions/auth/server/test_auth_registry_server.py index e0f75d1d3d8..abcb968ac15 100644 --- a/sdk/python/tests/unit/permissions/auth/server/test_auth_registry_server.py +++ b/sdk/python/tests/integration/permissions/auth/server/test_auth_registry_server.py @@ -27,6 +27,8 @@ from tests.utils.auth_permissions_util import get_remote_registry_store from tests.utils.http_server import check_port_open # noqa: E402 +pytestmark = pytest.mark.integration + @pytest.fixture def start_registry_server( diff --git a/sdk/python/tests/integration/test_mcp_feature_server.py b/sdk/python/tests/integration/test_mcp_feature_server.py index 0e59a71dfae..61f45e37314 100644 --- a/sdk/python/tests/integration/test_mcp_feature_server.py +++ b/sdk/python/tests/integration/test_mcp_feature_server.py @@ -9,6 +9,8 @@ from feast.feature_store import FeatureStore from feast.infra.mcp_servers.mcp_config import McpFeatureServerConfig +pytestmark = pytest.mark.integration + class TestMCPFeatureServerIntegration(unittest.TestCase): """Integration tests for MCP feature server functionality.""" diff --git a/sdk/python/tests/unit/online_store/test_mongodb_online_retrieval.py b/sdk/python/tests/unit/online_store/test_mongodb_online_retrieval.py index 029746cf442..485ab2dce08 100644 --- a/sdk/python/tests/unit/online_store/test_mongodb_online_retrieval.py +++ b/sdk/python/tests/unit/online_store/test_mongodb_online_retrieval.py @@ -1,9 +1,6 @@ -""" -Unit tests for MongoDB online store. +"""Pure unit tests for MongoDB online store helpers.""" -Docker-dependent tests are marked with ``@_requires_docker`` and are skipped when -Docker is unavailable. Pure Python tests (no container needed) run in all environments. -""" +# ruff: noqa: E402 from datetime import datetime, timedelta, timezone @@ -15,182 +12,8 @@ from feast.infra.online_stores.mongodb_online_store.mongodb import ( # noqa: E402 MongoDBOnlineStore, ) -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.types import Array, Float32, Int64, String -from feast.utils import _utc_now -from tests.universal.feature_repos.universal.feature_views import TAGS -from tests.utils.cli_repo_creator import CliRunner, get_example_repo - -# Check if Docker is available -docker_available = False -try: - import docker - from testcontainers.mongodb import MongoDbContainer - - # Try to connect to Docker daemon - try: - client = docker.from_env() - client.ping() - docker_available = True - except Exception: - pass -except ImportError: - pass - -# Applied per-test so that pure Python tests still run without Docker. -_requires_docker = pytest.mark.skipif( - not docker_available, - reason="Docker is not available or not running. Start Docker daemon to run these tests.", -) - - -@pytest.fixture(scope="module") -def mongodb_container(): - """Start a MongoDB container for testing.""" - container = MongoDbContainer( - "mongo:latest", - username="test", - password="test", # pragma: allowlist secret - ).with_exposed_ports(27017) - container.start() - yield container - container.stop() - - -@pytest.fixture -def mongodb_connection_string(mongodb_container): - """Get MongoDB connection string from the container.""" - exposed_port = mongodb_container.get_exposed_port(27017) - return f"mongodb://test:test@localhost:{exposed_port}" # pragma: allowlist secret - - -@_requires_docker -def test_mongodb_online_features(mongodb_connection_string): - """ - Test reading from MongoDB online store using testcontainers. - """ - runner = CliRunner() - with ( - runner.local_repo( - get_example_repo("example_feature_repo_1.py"), - offline_store="file", - online_store="mongodb", - teardown=False, # Disable CLI teardown since container will be stopped by fixture - ) as store - ): - # Update the connection string to use the test container - store.config.online_store.connection_string = mongodb_connection_string - - # Write some data to two tables - driver_locations_fv = store.get_feature_view(name="driver_locations") - customer_profile_fv = store.get_feature_view(name="customer_profile") - customer_driver_combined_fv = store.get_feature_view( - name="customer_driver_combined" - ) - - provider = store._get_provider() - - driver_key = EntityKeyProto( - join_keys=["driver_id"], entity_values=[ValueProto(int64_val=1)] - ) - provider.online_write_batch( - config=store.config, - table=driver_locations_fv, - data=[ - ( - driver_key, - { - "lat": ValueProto(double_val=0.1), - "lon": ValueProto(string_val="1.0"), - }, - _utc_now(), - _utc_now(), - ) - ], - progress=None, - ) - - customer_key = EntityKeyProto( - join_keys=["customer_id"], entity_values=[ValueProto(string_val="5")] - ) - provider.online_write_batch( - config=store.config, - table=customer_profile_fv, - data=[ - ( - customer_key, - { - "avg_orders_day": ValueProto(float_val=1.0), - "name": ValueProto(string_val="John"), - "age": ValueProto(int64_val=3), - }, - _utc_now(), - _utc_now(), - ) - ], - progress=None, - ) - - customer_key = EntityKeyProto( - join_keys=["customer_id", "driver_id"], - entity_values=[ValueProto(string_val="5"), ValueProto(int64_val=1)], - ) - provider.online_write_batch( - config=store.config, - table=customer_driver_combined_fv, - data=[ - ( - customer_key, - {"trips": ValueProto(int64_val=7)}, - _utc_now(), - _utc_now(), - ) - ], - progress=None, - ) - - assert len(store.list_entities()) == 3 - assert len(store.list_entities(tags=TAGS)) == 2 - - # Retrieve features using two keys - result = store.get_online_features( - features=[ - "driver_locations:lon", - "customer_profile:avg_orders_day", - "customer_profile:name", - "customer_driver_combined:trips", - ], - entity_rows=[ - {"driver_id": 1, "customer_id": "5"}, - {"driver_id": 1, "customer_id": 5}, - ], - full_feature_names=False, - ).to_dict() - - assert "lon" in result - assert "avg_orders_day" in result - assert "name" in result - assert result["driver_id"] == [1, 1] - assert result["customer_id"] == ["5", "5"] - assert result["lon"] == ["1.0", "1.0"] - assert result["avg_orders_day"] == [1.0, 1.0] - assert result["name"] == ["John", "John"] - assert result["trips"] == [7, 7] - - # Ensure features are still in result when keys not found - result = store.get_online_features( - features=["customer_driver_combined:trips"], - entity_rows=[{"driver_id": 0, "customer_id": 0}], - full_feature_names=False, - ).to_dict() - - assert result["trips"] == [None] - - -# --------------------------------------------------------------------------- -# Pure Python tests — no Docker required -# --------------------------------------------------------------------------- def _make_fv(*field_names: str) -> FeatureView: diff --git a/sdk/python/tests/permissions/test_groups_namespaces_auth.py b/sdk/python/tests/unit/permissions/test_groups_namespaces_auth.py similarity index 100% rename from sdk/python/tests/permissions/test_groups_namespaces_auth.py rename to sdk/python/tests/unit/permissions/test_groups_namespaces_auth.py diff --git a/sdk/python/tests/unit/test_offline_server.py b/sdk/python/tests/unit/test_offline_server.py index efa2aaa6af4..3e25e5c2061 100644 --- a/sdk/python/tests/unit/test_offline_server.py +++ b/sdk/python/tests/unit/test_offline_server.py @@ -1,352 +1,13 @@ -import os -import tempfile -from datetime import datetime, timedelta from unittest.mock import MagicMock, patch import assertpy -import pandas as pd -import pyarrow as pa -import pyarrow.flight as flight -import pytest -from feast import FeatureStore, FeatureView, FileSource -from feast.errors import FeatureViewNotFoundException -from feast.feature_logging import FeatureServiceLoggingSource from feast.infra.offline_stores.remote import ( RemoteOfflineStore, RemoteOfflineStoreConfig, _create_retrieval_metadata, ) -from feast.offline_server import OfflineServer, _init_auth_manager -from feast.repo_config import RepoConfig -from feast.torch_wrapper import get_torch -from tests.utils.cli_repo_creator import CliRunner - -PROJECT_NAME = "test_remote_offline" - - -@pytest.fixture -def empty_offline_server(environment): - store = environment.feature_store - - location = "grpc+tcp://localhost:0" - _init_auth_manager(store=store) - return OfflineServer(store=store, location=location) - - -@pytest.fixture -def arrow_client(empty_offline_server): - return flight.FlightClient(f"grpc://localhost:{empty_offline_server.port}") - - -def test_offline_server_is_alive(environment, empty_offline_server, arrow_client): - server = empty_offline_server - client = arrow_client - - assertpy.assert_that(server).is_not_none - assertpy.assert_that(server.port).is_not_equal_to(0) - - actions = list(client.list_actions()) - flights = list(client.list_flights()) - - assertpy.assert_that(actions).is_equal_to( - [ - ( - "offline_write_batch", - "Writes the specified arrow table to the data source underlying the specified feature view.", - ), - ( - "write_logged_features", - "Writes logged features to a specified destination in the offline store.", - ), - ( - "persist", - "Synchronously executes the underlying query and persists the result in the same offline store at the " - "specified destination.", - ), - ] - ) - assertpy.assert_that(flights).is_empty() - - -def default_store(temp_dir): - runner = CliRunner() - result = runner.run(["init", PROJECT_NAME], cwd=temp_dir) - repo_path = os.path.join(temp_dir, PROJECT_NAME, "feature_repo") - assert result.returncode == 0 - - result = runner.run(["--chdir", repo_path, "apply"], cwd=temp_dir) - assert result.returncode == 0 - - fs = FeatureStore(repo_path=repo_path) - return fs - - -def remote_feature_store(offline_server): - offline_config = RemoteOfflineStoreConfig( - type="remote", host="0.0.0.0", port=offline_server.port - ) - - registry_path = os.path.join( - str(offline_server.store.repo_path), - offline_server.store.config.registry.path, - ) - store = FeatureStore( - config=RepoConfig( - project=PROJECT_NAME, - registry=registry_path, - provider="local", - offline_store=offline_config, - entity_key_serialization_version=3, - # repo_config = - ) - ) - return store - - -def test_remote_offline_store_apis(): - with tempfile.TemporaryDirectory() as temp_dir: - store = default_store(str(temp_dir)) - location = "grpc+tcp://localhost:0" - - _init_auth_manager(store=store) - server = OfflineServer(store=store, location=location) - - assertpy.assert_that(server).is_not_none - assertpy.assert_that(server.port).is_not_equal_to(0) - - fs = remote_feature_store(server) - - _test_get_historical_features_returns_data(fs) - _test_get_historical_features_to_tensor(fs) - _test_get_historical_features_returns_nan(fs) - _test_get_historical_features_to_tensor_with_nan(fs) - _test_offline_write_batch(str(temp_dir), fs) - _test_write_logged_features(str(temp_dir), fs) - _test_pull_latest_from_table_or_query(str(temp_dir), fs) - _test_pull_all_from_table_or_query(str(temp_dir), fs) - - -def test_remote_offline_store_exception_handling(): - with tempfile.TemporaryDirectory() as temp_dir: - store = default_store(str(temp_dir)) - location = "grpc+tcp://localhost:0" - - _init_auth_manager(store=store) - server = OfflineServer(store=store, location=location) - - assertpy.assert_that(server).is_not_none - assertpy.assert_that(server.port).is_not_equal_to(0) - - fs = remote_feature_store(server) - data_file = os.path.join( - temp_dir, fs.project, "feature_repo/data/driver_stats.parquet" - ) - data_df = pd.read_parquet(data_file) - - with pytest.raises( - FeatureViewNotFoundException, - match="Feature view test does not exist in project test_remote_offline", - ): - RemoteOfflineStore.offline_write_batch( - fs.config, - FeatureView(name="test", source=FileSource(path="test")), - pa.Table.from_pandas(data_df), - progress=None, - ) - - -def _test_get_historical_features_returns_data(fs: FeatureStore): - entity_df = pd.DataFrame.from_dict( - { - "driver_id": [1001, 1002, 1003], - "event_timestamp": [ - datetime(2021, 4, 12, 10, 59, 42), - datetime(2021, 4, 12, 8, 12, 10), - datetime(2021, 4, 12, 16, 40, 26), - ], - "label_driver_reported_satisfaction": [1, 5, 3], - "val_to_add": [1, 2, 3], - "val_to_add_2": [10, 20, 30], - } - ) - - features = [ - "driver_hourly_stats:conv_rate", - "driver_hourly_stats:acc_rate", - "driver_hourly_stats:avg_daily_trips", - "transformed_conv_rate:conv_rate_plus_val1", - "transformed_conv_rate:conv_rate_plus_val2", - ] - - training_df = fs.get_historical_features(entity_df, features).to_df() - - assertpy.assert_that(training_df).is_not_none() - assertpy.assert_that(len(training_df)).is_equal_to(3) - - for index, driver_id in enumerate(entity_df["driver_id"]): - assertpy.assert_that(training_df["driver_id"][index]).is_equal_to(driver_id) - for feature in features: - column_id = feature.split(":")[1] - value = training_df[column_id][index] - assertpy.assert_that(value).is_not_nan() - - -def _test_get_historical_features_to_tensor(fs: FeatureStore): - entity_df = pd.DataFrame.from_dict( - { - "driver_id": [1001, 1002, 1003], - "event_timestamp": [ - datetime(2021, 4, 12, 10, 59, 42), - datetime(2021, 4, 12, 8, 12, 10), - datetime(2021, 4, 12, 16, 40, 26), - ], - "label_driver_reported_satisfaction": [1, 5, 3], - "val_to_add": [1, 2, 3], - "val_to_add_2": [10, 20, 30], - } - ) - - features = [ - "driver_hourly_stats:conv_rate", - "driver_hourly_stats:acc_rate", - "driver_hourly_stats:avg_daily_trips", - "transformed_conv_rate:conv_rate_plus_val1", - "transformed_conv_rate:conv_rate_plus_val2", - ] - - job = fs.get_historical_features(entity_df, features) - tensor_data = job.to_tensor() - - assertpy.assert_that(tensor_data).is_not_none() - assertpy.assert_that(tensor_data["driver_id"].shape[0]).is_equal_to(3) - torch = get_torch() - for key, values in tensor_data.items(): - if isinstance(values, torch.Tensor): - assertpy.assert_that(values.shape[0]).is_equal_to(3) - for val in values: - val_float = val.item() - assertpy.assert_that(val_float).is_instance_of((float, int)) - assertpy.assert_that(val_float).is_not_nan() - - -def _test_get_historical_features_returns_nan(fs: FeatureStore): - entity_df = pd.DataFrame.from_dict( - { - "driver_id": [1, 2, 3], - "event_timestamp": [ - datetime(2021, 4, 12, 10, 59, 42), - datetime(2021, 4, 12, 8, 12, 10), - datetime(2021, 4, 12, 16, 40, 26), - ], - "label_driver_reported_satisfaction": [1, 5, 3], - "val_to_add": [1, 2, 3], - "val_to_add_2": [10, 20, 30], - } - ) - - features = [ - "driver_hourly_stats:conv_rate", - "driver_hourly_stats:acc_rate", - "driver_hourly_stats:avg_daily_trips", - "transformed_conv_rate:conv_rate_plus_val1", - "transformed_conv_rate:conv_rate_plus_val2", - ] - - training_df = fs.get_historical_features(entity_df, features).to_df() - - assertpy.assert_that(training_df).is_not_none() - assertpy.assert_that(len(training_df)).is_equal_to(3) - - for index, driver_id in enumerate(entity_df["driver_id"]): - assertpy.assert_that(training_df["driver_id"][index]).is_equal_to(driver_id) - for feature in features: - column_id = feature.split(":")[1] - value = training_df[column_id][index] - assertpy.assert_that(value).is_nan() - - -def _test_get_historical_features_to_tensor_with_nan(fs: FeatureStore): - entity_df = pd.DataFrame.from_dict( - { - "driver_id": [9991, 9992], # IDs with no matching features - "event_timestamp": [ - datetime(2021, 4, 12, 10, 59, 42), - datetime(2021, 4, 12, 10, 59, 42), - ], - } - ) - features = ["driver_hourly_stats:conv_rate"] - job = fs.get_historical_features(entity_df, features) - tensor_data = job.to_tensor() - assert "conv_rate" in tensor_data - values = tensor_data["conv_rate"] - # conv_rate is a float feature, missing values should be NaN - torch = get_torch() - for val in values: - assert isinstance(val, torch.Tensor) or torch.is_tensor(val) - assertpy.assert_that(torch.isnan(val).item()).is_true() - - -def _test_offline_write_batch(temp_dir, fs: FeatureStore): - data_file = os.path.join( - temp_dir, fs.project, "feature_repo/data/driver_stats.parquet" - ) - data_df = pd.read_parquet(data_file) - feature_view = fs.get_feature_view("driver_hourly_stats") - - RemoteOfflineStore.offline_write_batch( - fs.config, feature_view, pa.Table.from_pandas(data_df), progress=None - ) - - -def _test_write_logged_features(temp_dir, fs: FeatureStore): - data_file = os.path.join( - temp_dir, fs.project, "feature_repo/data/driver_stats.parquet" - ) - data_df = pd.read_parquet(data_file) - feature_service = fs.get_feature_service("driver_activity_v1") - - RemoteOfflineStore.write_logged_features( - config=fs.config, - data=pa.Table.from_pandas(data_df), - source=FeatureServiceLoggingSource(feature_service, fs.config.project), - logging_config=feature_service.logging_config, - registry=fs.registry, - ) - - -def _test_pull_latest_from_table_or_query(temp_dir, fs: FeatureStore): - data_source = fs.get_data_source("driver_hourly_stats_source") - - end_date = datetime.now().replace(microsecond=0, second=0, minute=0) - start_date = end_date - timedelta(days=15) - RemoteOfflineStore.pull_latest_from_table_or_query( - config=fs.config, - data_source=data_source, - join_key_columns=[], - feature_name_columns=[], - timestamp_field="event_timestamp", - created_timestamp_column="created", - start_date=start_date, - end_date=end_date, - ).to_df() - - -def _test_pull_all_from_table_or_query(temp_dir, fs: FeatureStore): - data_source = fs.get_data_source("driver_hourly_stats_source") - - end_date = datetime.now().replace(microsecond=0, second=0, minute=0) - start_date = end_date - timedelta(days=15) - RemoteOfflineStore.pull_all_from_table_or_query( - config=fs.config, - data_source=data_source, - join_key_columns=[], - feature_name_columns=[], - timestamp_field="event_timestamp", - start_date=start_date, - end_date=end_date, - ).to_df() +from feast.offline_server import OfflineServer def test_create_retrieval_metadata_with_sql_string(): @@ -364,8 +25,7 @@ def test_create_retrieval_metadata_with_sql_string(): def test_remote_offline_store_sql_entity_df_routing(): - """RemoteOfflineStore.get_historical_features must move a SQL string into - api_parameters['entity_df_sql'] and pass entity_df=None to RemoteRetrievalJob.""" + """RemoteOfflineStore.get_historical_features moves SQL into api_parameters.""" sql = "SELECT driver_id, event_timestamp FROM driver_stats" mock_client = MagicMock() @@ -394,8 +54,7 @@ def test_remote_offline_store_sql_entity_df_routing(): def test_offline_server_get_historical_features_passes_sql_to_store(): - """OfflineServer.get_historical_features must forward entity_df_sql from the - command dict as a SQL string to the backing offline store.""" + """OfflineServer forwards entity_df_sql to the backing offline store.""" sql = "SELECT driver_id, event_timestamp FROM driver_stats" mock_job = MagicMock() @@ -422,34 +81,8 @@ def test_offline_server_get_historical_features_passes_sql_to_store(): "entity_df_sql": sql, } - # Call the real method with the mock server as self result = OfflineServer.get_historical_features(server, command, key=None) assertpy.assert_that(result).is_equal_to(mock_job) _, kwargs = mock_offline_store.get_historical_features.call_args assertpy.assert_that(kwargs["entity_df"]).is_equal_to(sql) - - -def test_get_feature_view_by_name_propagates_transient_errors(): - """Transient registry errors must not be swallowed and misreported as - FeatureViewNotFoundException.""" - with tempfile.TemporaryDirectory() as temp_dir: - store = default_store(str(temp_dir)) - location = "grpc+tcp://localhost:0" - - _init_auth_manager(store=store) - server = OfflineServer(store=store, location=location) - - transient_error = ConnectionError("registry temporarily unavailable") - - with patch.object( - server.store.registry, - "get_feature_view", - side_effect=transient_error, - ): - with pytest.raises(ConnectionError, match="registry temporarily"): - server.get_feature_view_by_name( - fv_name="driver_hourly_stats", - name_alias=None, - project=PROJECT_NAME, - )