From f1364a3665b5f1b101d4795350eb11faa8985001 Mon Sep 17 00:00:00 2001 From: ntkathole Date: Sun, 29 Jun 2025 17:01:34 +0530 Subject: [PATCH 1/2] fix: Fix remote rbac integration tests Signed-off-by: ntkathole --- .../feature_repos/universal/data_sources/file.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 7e6334b1b88..a592dfc54a5 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -417,6 +417,20 @@ def setup(self, registry: RegistryConfig): ) return "grpc+tcp://{}:{}".format(host, self.server_port) + def teardown(self): + super().teardown() + if self.proc is not None: + self.proc.kill() + + # wait server to free the port + wait_retry_backoff( + lambda: ( + None, + not check_port_open("localhost", self.server_port), + ), + timeout_secs=30, + ) + class RemoteOfflineTlsStoreDataSourceCreator(FileDataSourceCreator): def __init__(self, project_name: str, *args, **kwargs): From cad2b106778d96f226b0863ae69a0f8d401e60ef Mon Sep 17 00:00:00 2001 From: ntkathole Date: Sun, 29 Jun 2025 20:37:07 +0530 Subject: [PATCH 2/2] fix: Spin up the only one instance of the keycloak Signed-off-by: ntkathole --- Makefile | 2 +- sdk/python/tests/integration/conftest.py | 132 +++++++++++++++++++---- 2 files changed, 114 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index a8900c140d5..b8a34855fbf 100644 --- a/Makefile +++ b/Makefile @@ -155,7 +155,7 @@ test-python-integration-local: ## Run Python integration tests (local dev mode) test-python-integration-rbac-remote: ## Run Python remote RBAC integration tests FEAST_IS_LOCAL_TEST=True \ FEAST_LOCAL_ONLINE_CONTAINER=True \ - python -m pytest --tb=short -v -n 4 --color=yes --integration --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \ + python -m pytest --tb=short -v -n 8 --color=yes --integration --durations=10 --timeout=1200 --timeout_method=thread --dist loadgroup \ -k "not test_lambda_materialization and not test_snowflake_materialization" \ -m "rbac_remote_integration_test" \ --log-cli-level=INFO -s \ diff --git a/sdk/python/tests/integration/conftest.py b/sdk/python/tests/integration/conftest.py index 21c9051d0d7..5784ad30292 100644 --- a/sdk/python/tests/integration/conftest.py +++ b/sdk/python/tests/integration/conftest.py @@ -1,7 +1,11 @@ +import atexit +import json import logging import random +import tempfile import time -from multiprocessing import Manager +from pathlib import Path +from typing import Optional import pytest from testcontainers.keycloak import KeycloakContainer @@ -14,28 +18,118 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -shared_state = Manager().dict() +# Shared Keycloak state +_keycloak_container: Optional[KeycloakContainer] = None +_keycloak_info_file = Path(tempfile.gettempdir()) / "feast_keycloak_info.json" + + +def _is_keycloak_healthy(url: str) -> bool: + """Health check for Keycloak.""" + try: + import requests + + response = requests.get(f"{url}/health/ready", timeout=3) + return response.status_code == 200 + except Exception: + try: + import requests + + response = requests.get(f"{url}/auth/realms/master", timeout=3) + return response.status_code in [200, 404] + except Exception: + return False + + +def _get_shared_keycloak_url() -> Optional[str]: + """Get URL of existing Keycloak instance if available.""" + try: + if _keycloak_info_file.exists(): + with open(_keycloak_info_file, "r") as f: + info = json.load(f) + + url = info.get("url") + if url and _is_keycloak_healthy(url): + return url + else: + _keycloak_info_file.unlink() + except Exception as e: + logger.debug(f"Error reading Keycloak info: {e}") + try: + _keycloak_info_file.unlink() + except Exception: + pass + return None + + +def _save_keycloak_info(url: str): + """Save Keycloak info to shared file.""" + try: + info = {"url": url, "timestamp": time.time()} + with open(_keycloak_info_file, "w") as f: + json.dump(info, f) + except Exception as e: + logger.warning(f"Failed to save Keycloak info: {e}") + + +def _cleanup_keycloak(): + """Cleanup Keycloak container on exit.""" + global _keycloak_container + if _keycloak_container: + try: + logger.info("Stopping Keycloak container") + _keycloak_container.stop() + except Exception as e: + logger.warning(f"Error stopping Keycloak: {e}") + finally: + _keycloak_container = None + try: + _keycloak_info_file.unlink() + except Exception: + pass @pytest.fixture(scope="session") def start_keycloak_server(): - # Add random sleep between 0 and 2 before checking the state to avoid concurrency issues. - random_sleep_time = random.uniform(0, 2) - time.sleep(random_sleep_time) - - # If the Keycloak instance is already started (in any worker), reuse it - if shared_state.get("keycloak_started", False): - return shared_state["keycloak_url"] - logger.info("Starting keycloak instance") - with KeycloakContainer("quay.io/keycloak/keycloak:24.0.1") as keycloak_container: - setup_permissions_on_keycloak(keycloak_container.get_client()) - shared_state["keycloak_started"] = True - shared_state["keycloak_url"] = keycloak_container.get_url() - yield shared_state["keycloak_url"] - - # After the fixture is done, cleanup the shared state - del shared_state["keycloak_started"] - del shared_state["keycloak_url"] + global _keycloak_container + + existing_url = _get_shared_keycloak_url() + if existing_url: + logger.info(f"Reusing existing Keycloak at {existing_url}") + yield existing_url + return + + time.sleep(random.uniform(0, 0.5)) + + existing_url = _get_shared_keycloak_url() + if existing_url: + logger.info(f"Found Keycloak started by another process: {existing_url}") + yield existing_url + return + + try: + logger.info("Starting new Keycloak instance") + _keycloak_container = KeycloakContainer("quay.io/keycloak/keycloak:24.0.1") + _keycloak_container.start() + + setup_permissions_on_keycloak(_keycloak_container.get_client()) + + keycloak_url = _keycloak_container.get_url() + + _save_keycloak_info(keycloak_url) + atexit.register(_cleanup_keycloak) + + logger.info(f"Keycloak ready at {keycloak_url}") + yield keycloak_url + + except Exception as e: + logger.error(f"Failed to start Keycloak: {e}") + if _keycloak_container: + try: + _keycloak_container.stop() + except Exception: + pass + _keycloak_container = None + raise @pytest.fixture(scope="session")