diff --git a/docs/reference/online-stores/remote.md b/docs/reference/online-stores/remote.md index b6734ccc1ec..9630e1e4e62 100644 --- a/docs/reference/online-stores/remote.md +++ b/docs/reference/online-stores/remote.md @@ -2,7 +2,7 @@ ## Description -This remote online store will let you interact with remote feature server. At this moment this only supports the read operation. You can use this online store and able retrieve online features `store.get_online_features` from remote feature server. +This remote online store lets you interact with a remote feature server. You can use this online store to retrieve online features via `store.get_online_features()` from a remote feature server. ## Examples @@ -25,6 +25,128 @@ auth: `cert` is an optional configuration to the public certificate path when the online server starts in TLS(SSL) mode. This may be needed if the online server is started with a self-signed certificate, typically this file ends with `*.crt`, `*.cer`, or `*.pem`. +## Connection Pooling Configuration + +The remote online store uses HTTP connection pooling to improve performance by reusing TCP/TLS connections across multiple requests. This significantly reduces latency by avoiding the overhead of establishing new connections for each request. + +### Configuration Options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `connection_pool_size` | int | 50 | Maximum number of connections to keep in the pool. Increase for high-concurrency workloads. | +| `connection_idle_timeout` | int | 300 | Maximum time in seconds a session can be idle before being closed. Set to `0` to disable idle timeout. | +| `connection_retries` | int | 3 | Number of retries for failed requests with exponential backoff. | + +### Example with Connection Pooling + +{% code title="feature_store.yaml" %} +```yaml +project: my-local-project +registry: /remote/data/registry.db +provider: local +online_store: + type: remote + path: http://feast-feature-server:80 + + # Connection pooling configuration (optional) + connection_pool_size: 50 # Max connections in pool + connection_idle_timeout: 300 # Idle timeout in seconds (0 to disable) + connection_retries: 3 # Retry count with exponential backoff + +entity_key_serialization_version: 3 +auth: + type: no_auth +``` +{% endcode %} + +### Use Cases + +**High-throughput workloads:** +```yaml +online_store: + type: remote + path: http://feast-server:80 + connection_pool_size: 100 # More connections for high concurrency + connection_idle_timeout: 600 # 10 minutes idle timeout + connection_retries: 5 # More retries for resilience +``` + +**Long-running services:** +```yaml +online_store: + type: remote + path: http://feast-server:80 + connection_idle_timeout: 0 # Never auto-close session +``` + +**Resource-constrained environments:** +```yaml +online_store: + type: remote + path: http://feast-server:80 + connection_pool_size: 10 # Fewer connections to reduce memory + connection_idle_timeout: 60 # 1 minute timeout +``` + +### Performance Benefits + +Connection pooling provides significant latency improvements: + +- **Without pooling**: Each request requires a new TCP connection (~10-50ms) and TLS handshake (~30-100ms) +- **With pooling**: Subsequent requests reuse existing connections, eliminating connection overhead + +This is especially beneficial for: +- High-frequency feature retrieval in real-time inference pipelines +- Batch processing with many sequential `get_online_features()` calls +- Services with authentication enabled (reduces token refresh overhead) + +### Session Cleanup + +The HTTP session is automatically managed with idle timeout, but you can also explicitly close it when your application is shutting down or when you want to release resources. + +#### Using FeatureStore context manager (recommended) + +The recommended way to ensure proper cleanup is to use the `FeatureStore` as a context manager: + +```python +from feast import FeatureStore + +# Session is automatically closed when exiting the context +with FeatureStore(repo_path=".") as store: + features = store.get_online_features( + features=["driver_hourly_stats:conv_rate"], + entity_rows=[{"driver_id": 1001}] + ) +``` + +#### Explicit cleanup + +You can also explicitly close the session by calling `close()` on the `FeatureStore`: + +```python +from feast import FeatureStore + +store = FeatureStore(repo_path=".") +try: + features = store.get_online_features( + features=["driver_hourly_stats:conv_rate"], + entity_rows=[{"driver_id": 1001}] + ) +finally: + store.close() # Closes HTTP session and releases resources +``` + +#### Direct session management + +For advanced use cases, you can directly manage the HTTP session via `HttpSessionManager`: + +```python +from feast.permissions.client.http_auth_requests_wrapper import HttpSessionManager + +# Close the cached HTTP session +HttpSessionManager.close_session() +``` + ## How to configure Authentication and Authorization Please refer the [page](./../../../docs/getting-started/concepts/permission.md) for more details on how to configure authentication and authorization. diff --git a/sdk/python/feast/infra/online_stores/remote.py b/sdk/python/feast/infra/online_stores/remote.py index ec2b05759ba..1111e0e6c62 100644 --- a/sdk/python/feast/infra/online_stores/remote.py +++ b/sdk/python/feast/infra/online_stores/remote.py @@ -23,6 +23,7 @@ from feast import Entity, FeatureView, RepoConfig from feast.infra.online_stores.helpers import _to_naive_utc from feast.infra.online_stores.online_store import OnlineStore +from feast.permissions.client.http_auth_requests_wrapper import HttpSessionManager from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel @@ -51,6 +52,18 @@ class RemoteOnlineStoreConfig(FeastConfigBaseModel): """ str: Path to the public certificate when the online server starts in TLS(SSL) mode. This may be needed if the online server started with a self-signed certificate, typically this file ends with `*.crt`, `*.cer`, or `*.pem`. If type is 'remote', then this configuration is needed to connect to remote online server in TLS mode. """ + # Connection pooling configuration + connection_pool_size: int = 50 + """ int: Maximum number of connections to keep in the pool (default 50). + Increase for high-concurrency workloads. """ + + connection_idle_timeout: int = 300 + """ int: Maximum time in seconds a session can be idle before being closed (default 300 = 5 minutes). + Set to 0 to disable idle timeout. """ + + connection_retries: int = 3 + """ int: Number of retries for failed requests with exponential backoff (default 3). """ + class RemoteOnlineStore(OnlineStore): """ @@ -544,6 +557,20 @@ def teardown( ): pass + async def close(self) -> None: + """ + Close the HTTP session and release connection pool resources. + + This method is called automatically when FeatureStore.close() is invoked. + It cleans up the cached HTTP session used for connection pooling. + + Note: Since the session is shared globally, calling close() will affect + all RemoteOnlineStore instances in the same process. This is typically + fine for SDK usage where there's usually one FeatureStore per process. + """ + HttpSessionManager.close_session() + logger.debug("RemoteOnlineStore HTTP session closed") + @rest_error_handling_decorator def get_remote_online_features( diff --git a/sdk/python/feast/permissions/client/http_auth_requests_wrapper.py b/sdk/python/feast/permissions/client/http_auth_requests_wrapper.py index ba02fab8d8d..5b8eb0ffb2f 100644 --- a/sdk/python/feast/permissions/client/http_auth_requests_wrapper.py +++ b/sdk/python/feast/permissions/client/http_auth_requests_wrapper.py @@ -1,5 +1,12 @@ +import logging +import threading +import time +from typing import Optional + import requests from requests import Session +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry from feast.permissions.auth.auth_type import AuthType from feast.permissions.auth_model import ( @@ -7,6 +14,8 @@ ) from feast.permissions.client.client_auth_token import get_auth_token +logger = logging.getLogger(__name__) + class AuthenticatedRequestsSession(Session): def __init__(self, auth_token: str): @@ -14,9 +23,234 @@ def __init__(self, auth_token: str): self.headers.update({"Authorization": f"Bearer {auth_token}"}) -def get_http_auth_requests_session(auth_config: AuthConfig) -> Session: - if auth_config.type == AuthType.NONE.value: - request_session = requests.session() - else: - request_session = AuthenticatedRequestsSession(get_auth_token(auth_config)) - return request_session +class HttpSessionManager: + """ + Manages HTTP sessions with connection pooling for improved performance. + + This class provides: + - Session caching based on auth configuration + - Connection pooling via HTTPAdapter + - Automatic retry with exponential backoff + - Thread-safe session management + - Automatic idle timeout (closes stale sessions) + + Configuration can be customized via feature_store.yaml: + ```yaml + online_store: + type: remote + path: http://localhost:6566 + connection_pool_size: 50 # Max connections in pool + connection_idle_timeout: 300 # Seconds before idle session closes + connection_retries: 3 # Retry count with backoff + ``` + """ + + _session: Optional[Session] = None + _session_auth_type: Optional[str] = None + _session_last_used: Optional[float] = None + _session_config_hash: Optional[int] = None + _lock = threading.Lock() + + # Default configuration (can be overridden via feature_store.yaml) + DEFAULT_POOL_CONNECTIONS = 10 # Number of connection pools to cache + DEFAULT_POOL_MAXSIZE = 50 # Max connections per pool + DEFAULT_MAX_RETRIES = 3 # Number of retries + DEFAULT_BACKOFF_FACTOR = 0.5 # Backoff factor for retries + DEFAULT_MAX_IDLE_SECONDS = 300 # 5 minutes + + # Current active configuration (updated when session is created) + _pool_maxsize: int = DEFAULT_POOL_MAXSIZE + _max_retries: int = DEFAULT_MAX_RETRIES + _max_idle_seconds: int = DEFAULT_MAX_IDLE_SECONDS + + @classmethod + def get_session( + cls, + auth_config: AuthConfig, + pool_maxsize: Optional[int] = None, + max_idle_seconds: Optional[int] = None, + max_retries: Optional[int] = None, + ) -> Session: + """ + Get or create a cached HTTP session with connection pooling. + + The session is cached and reused across requests. A new session + is created if: + - No session exists + - Auth type changes + - Configuration changes + - Session has been idle longer than max_idle_seconds + + Args: + auth_config: Authentication configuration + pool_maxsize: Max connections in pool (default: 50) + max_idle_seconds: Idle timeout in seconds (default: 300, 0 to disable) + max_retries: Number of retries (default: 3) + + Returns: + A requests Session configured with connection pooling + """ + auth_type = auth_config.type if auth_config else AuthType.NONE.value + current_time = time.time() + + # Use provided values or defaults + pool_maxsize = ( + pool_maxsize if pool_maxsize is not None else cls.DEFAULT_POOL_MAXSIZE + ) + max_idle_seconds = ( + max_idle_seconds + if max_idle_seconds is not None + else cls.DEFAULT_MAX_IDLE_SECONDS + ) + max_retries = ( + max_retries if max_retries is not None else cls.DEFAULT_MAX_RETRIES + ) + + # Create config hash to detect configuration changes + config_hash = hash((auth_type, pool_maxsize, max_idle_seconds, max_retries)) + + with cls._lock: + # Check if session has been idle too long (if timeout is enabled) + if ( + cls._session is not None + and cls._session_last_used is not None + and cls._max_idle_seconds > 0 + ): + idle_time = current_time - cls._session_last_used + if idle_time > cls._max_idle_seconds: + logger.debug( + f"Session idle for {idle_time:.1f}s (max: {cls._max_idle_seconds}s), " + "closing stale session" + ) + cls._close_session_internal() + + # Check if we can reuse the cached session (same auth type and config) + if ( + cls._session is not None + and cls._session_auth_type == auth_type + and cls._session_config_hash == config_hash + ): + # For authenticated sessions, update the token in case it expired + if auth_type != AuthType.NONE.value: + try: + auth_token = get_auth_token(auth_config) + cls._session.headers.update( + {"Authorization": f"Bearer {auth_token}"} + ) + except Exception as e: + logger.warning(f"Failed to refresh auth token: {e}") + raise + + # Update last used time + cls._session_last_used = current_time + return cls._session + + # Close existing session if auth type or config changed + if cls._session is not None: + cls._close_session_internal() + + # Create new session with connection pooling + if auth_type == AuthType.NONE.value: + session = requests.Session() + else: + auth_token = get_auth_token(auth_config) + session = AuthenticatedRequestsSession(auth_token) + + # Configure retry strategy with exponential backoff + retry_strategy = Retry( + total=max_retries, + backoff_factor=cls.DEFAULT_BACKOFF_FACTOR, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["GET", "POST", "PUT", "DELETE"], + ) + + # Create HTTP adapter with connection pooling + adapter = HTTPAdapter( + pool_connections=cls.DEFAULT_POOL_CONNECTIONS, + pool_maxsize=pool_maxsize, + max_retries=retry_strategy, + ) + + # Mount adapter for both HTTP and HTTPS + session.mount("http://", adapter) + session.mount("https://", adapter) + + # Set keep-alive header + session.headers.update({"Connection": "keep-alive"}) + + # Cache the session and track configuration + cls._session = session + cls._session_auth_type = auth_type + cls._session_last_used = current_time + cls._session_config_hash = config_hash + cls._pool_maxsize = pool_maxsize + cls._max_retries = max_retries + cls._max_idle_seconds = max_idle_seconds + + idle_timeout_str = ( + f"{max_idle_seconds}s" if max_idle_seconds > 0 else "disabled" + ) + logger.debug( + f"Created new HTTP session with connection pooling: " + f"pool_maxsize={pool_maxsize}, retries={max_retries}, " + f"idle_timeout={idle_timeout_str}" + ) + + return session + + @classmethod + def _close_session_internal(cls) -> None: + """ + Internal method to close session without acquiring lock. + Must be called while holding cls._lock. + """ + if cls._session is not None: + try: + cls._session.close() + except Exception: + pass + cls._session = None + cls._session_auth_type = None + cls._session_last_used = None + cls._session_config_hash = None + + @classmethod + def close_session(cls) -> None: + """ + Close the cached HTTP session and release resources. + + Call this method during application shutdown to clean up. + """ + with cls._lock: + cls._close_session_internal() + logger.debug("HTTP session closed") + + +def get_http_auth_requests_session( + auth_config: AuthConfig, + pool_maxsize: Optional[int] = None, + max_idle_seconds: Optional[int] = None, + max_retries: Optional[int] = None, +) -> Session: + """ + Get an HTTP session with connection pooling and optional authentication. + + This function returns a cached session that reuses TCP/TLS connections + for improved performance. Configuration can be customized via parameters + or defaults are used. + + Args: + auth_config: Authentication configuration + pool_maxsize: Max connections in pool (default: 50) + max_idle_seconds: Idle timeout in seconds (default: 300, 0 to disable) + max_retries: Number of retries (default: 3) + + Returns: + A requests Session configured for the given auth config + """ + return HttpSessionManager.get_session( + auth_config, + pool_maxsize=pool_maxsize, + max_idle_seconds=max_idle_seconds, + max_retries=max_retries, + ) diff --git a/sdk/python/feast/rest_error_handler.py b/sdk/python/feast/rest_error_handler.py index fc802866f94..c8586c826e9 100644 --- a/sdk/python/feast/rest_error_handler.py +++ b/sdk/python/feast/rest_error_handler.py @@ -13,45 +13,92 @@ def rest_error_handling_decorator(func): + """ + Decorator that provides HTTP session management and error handling for REST API calls. + + This decorator: + - Provides a cached HTTP session with connection pooling for improved performance + - Wraps session methods to add logging and error handling + - Maps Feast-specific errors from API responses + + The session is reused across requests (connection pooling), which saves + TCP/TLS handshake overhead on subsequent calls. + + Connection pool settings can be configured via feature_store.yaml: + ```yaml + online_store: + type: remote + path: http://localhost:6566 + connection_pool_size: 50 # Max connections in pool + connection_idle_timeout: 300 # Seconds before idle session closes (0 to disable) + connection_retries: 3 # Retry count with backoff + ``` + """ + @wraps(func) def wrapper(config: RepoConfig, *args, **kwargs): assert isinstance(config, RepoConfig) - # Get a Session object - with get_http_auth_requests_session(config.auth_config) as session: - # Define a wrapper for session methods - def method_wrapper(method_name): - original_method = getattr(session, method_name) - - @wraps(original_method) - def wrapped_method(*args, **kwargs): - logger.debug( - f"Calling {method_name} with args: {args}, kwargs: {kwargs}" - ) - response = original_method(*args, **kwargs) - logger.debug( - f"{method_name} response status code: {response.status_code}" - ) - - try: - response.raise_for_status() - except requests.RequestException: - logger.debug(f"response.json() = {response.json()}") - mapped_error = FeastError.from_error_detail(response.json()) - logger.debug(f"mapped_error = {str(mapped_error)}") - if mapped_error is not None: - raise mapped_error - return response - - return wrapped_method - - # Enhance session methods - session.get = method_wrapper("get") # type: ignore[method-assign] - session.post = method_wrapper("post") # type: ignore[method-assign] - session.put = method_wrapper("put") # type: ignore[method-assign] - session.delete = method_wrapper("delete") # type: ignore[method-assign] - - # Pass the enhanced session object to the decorated function - return func(session, config, *args, **kwargs) + # Extract connection pool configuration from online_store if available + pool_maxsize = None + max_idle_seconds = None + max_retries = None + + if config.online_store is not None: + attr_map = { + "pool_maxsize": "connection_pool_size", + "max_idle_seconds": "connection_idle_timeout", + "max_retries": "connection_retries", + } + conn_config = { + key: getattr(config.online_store, attr_name, None) + for key, attr_name in attr_map.items() + } + pool_maxsize = conn_config["pool_maxsize"] + max_idle_seconds = conn_config["max_idle_seconds"] + max_retries = conn_config["max_retries"] + + # Get a cached session with connection pooling + session = get_http_auth_requests_session( + config.auth_config, + pool_maxsize=pool_maxsize, + max_idle_seconds=max_idle_seconds, + max_retries=max_retries, + ) + + # Define a wrapper for session methods to add logging and error handling + def method_wrapper(method_name): + original_method = getattr(session, method_name) + + @wraps(original_method) + def wrapped_method(*args, **kwargs): + logger.debug( + f"Calling {method_name} with args: {args}, kwargs: {kwargs}" + ) + response = original_method(*args, **kwargs) + logger.debug( + f"{method_name} response status code: {response.status_code}" + ) + + try: + response.raise_for_status() + except requests.RequestException: + logger.debug(f"response.json() = {response.json()}") + mapped_error = FeastError.from_error_detail(response.json()) + logger.debug(f"mapped_error = {str(mapped_error)}") + if mapped_error is not None: + raise mapped_error + return response + + return wrapped_method + + # Enhance session methods with logging and error handling + session.get = method_wrapper("get") # type: ignore[method-assign] + session.post = method_wrapper("post") # type: ignore[method-assign] + session.put = method_wrapper("put") # type: ignore[method-assign] + session.delete = method_wrapper("delete") # type: ignore[method-assign] + + # Pass the enhanced session object to the decorated function + return func(session, config, *args, **kwargs) return wrapper diff --git a/sdk/python/tests/unit/permissions/auth/client/test_http_session_manager.py b/sdk/python/tests/unit/permissions/auth/client/test_http_session_manager.py new file mode 100644 index 00000000000..ab2efe53cdd --- /dev/null +++ b/sdk/python/tests/unit/permissions/auth/client/test_http_session_manager.py @@ -0,0 +1,182 @@ +import pytest + +from feast.permissions.auth_model import NoAuthConfig +from feast.permissions.client.http_auth_requests_wrapper import ( + HttpSessionManager, + get_http_auth_requests_session, +) + + +class TestHttpSessionManager: + """Test suite for HTTP session manager with connection pooling.""" + + @pytest.fixture(autouse=True) + def reset_session(self): + """Reset the session before and after each test.""" + HttpSessionManager.close_session() + yield + HttpSessionManager.close_session() + + def test_session_creation(self): + """Test that session is created with correct configuration.""" + auth_config = NoAuthConfig() + session = get_http_auth_requests_session(auth_config) + + # Session should be created + assert session is not None + + # Session should have HTTP and HTTPS adapters + assert "http://" in session.adapters + assert "https://" in session.adapters + + # Session should have keep-alive header + assert session.headers.get("Connection") == "keep-alive" + + def test_session_reuse(self): + """Test that session is reused across multiple calls.""" + auth_config = NoAuthConfig() + + # First call creates session + session1 = get_http_auth_requests_session(auth_config) + + # Second call should return the same session + session2 = get_http_auth_requests_session(auth_config) + + # Should be the exact same object + assert session1 is session2 + + def test_session_close(self): + """Test that session can be closed and recreated.""" + auth_config = NoAuthConfig() + + # Create session + session1 = get_http_auth_requests_session(auth_config) + assert HttpSessionManager._session is not None + + # Close session + HttpSessionManager.close_session() + assert HttpSessionManager._session is None + assert HttpSessionManager._session_auth_type is None + + # New session should be created + session2 = get_http_auth_requests_session(auth_config) + assert session2 is not None + assert session2 is not session1 + + def test_thread_safety(self): + """Test that session management is thread-safe.""" + import threading + + auth_config = NoAuthConfig() + sessions = [] + errors = [] + + def get_session(): + try: + session = get_http_auth_requests_session(auth_config) + sessions.append(session) + except Exception as e: + errors.append(e) + + # Create multiple threads + threads = [threading.Thread(target=get_session) for _ in range(10)] + + # Start all threads + for t in threads: + t.start() + + # Wait for all threads to complete + for t in threads: + t.join() + + # No errors should have occurred + assert len(errors) == 0 + + # All threads should have gotten the same session (after initial creation) + assert len(sessions) == 10 + # All sessions should be the same object (due to caching) + assert all(s is sessions[0] for s in sessions) + + def test_connection_pool_configuration(self): + """Test that connection pool default configuration is correct.""" + assert HttpSessionManager.DEFAULT_POOL_CONNECTIONS == 10 + assert HttpSessionManager.DEFAULT_POOL_MAXSIZE == 50 + assert HttpSessionManager.DEFAULT_MAX_RETRIES == 3 + assert HttpSessionManager.DEFAULT_BACKOFF_FACTOR == 0.5 + assert HttpSessionManager.DEFAULT_MAX_IDLE_SECONDS == 300 # 5 minutes + + def test_custom_configuration(self): + """Test that custom configuration is applied.""" + auth_config = NoAuthConfig() + + # Create session with custom config + _session = get_http_auth_requests_session( # noqa: F841 + auth_config, + pool_maxsize=100, + max_idle_seconds=600, + max_retries=5, + ) + + # Verify custom config was applied + assert HttpSessionManager._pool_maxsize == 100 + assert HttpSessionManager._max_idle_seconds == 600 + assert HttpSessionManager._max_retries == 5 + + def test_disable_idle_timeout(self): + """Test that idle timeout can be disabled by setting to 0.""" + auth_config = NoAuthConfig() + + # Create session with idle timeout disabled + _session = get_http_auth_requests_session( # noqa: F841 + auth_config, + max_idle_seconds=0, + ) + + assert HttpSessionManager._max_idle_seconds == 0 + + def test_idle_timeout(self): + """Test that session is closed after idle timeout.""" + import time + + auth_config = NoAuthConfig() + + # Create session with very short timeout for testing + session1 = get_http_auth_requests_session( + auth_config, + max_idle_seconds=1, # 1 second + ) + + # Wait longer than timeout + time.sleep(1.5) + + # Get session again - should create new one due to idle timeout + session2 = get_http_auth_requests_session( + auth_config, + max_idle_seconds=1, + ) + + # Should be different sessions + assert session1 is not session2 + + def test_session_last_used_tracking(self): + """Test that last used time is tracked correctly.""" + import time + + auth_config = NoAuthConfig() + + # Create session + _session = get_http_auth_requests_session(auth_config) # noqa: F841 + + # Last used should be set + assert HttpSessionManager._session_last_used is not None + + first_used = HttpSessionManager._session_last_used + + # Small delay + time.sleep(0.1) + + # Get session again + _session2 = get_http_auth_requests_session(auth_config) # noqa: F841 + + # Last used should be updated + assert HttpSessionManager._session_last_used >= first_used diff --git a/sdk/python/tests/unit/test_rest_error_decorator.py b/sdk/python/tests/unit/test_rest_error_decorator.py index 147ae767bdb..cdde3d208e6 100644 --- a/sdk/python/tests/unit/test_rest_error_decorator.py +++ b/sdk/python/tests/unit/test_rest_error_decorator.py @@ -10,6 +10,7 @@ RemoteOnlineStoreConfig, get_remote_online_features, ) +from feast.permissions.client.http_auth_requests_wrapper import HttpSessionManager @pytest.fixture @@ -26,6 +27,9 @@ def none_feast_exception() -> RuntimeError: def test_rest_error_handling_with_feast_exception( mock_post, environment, feast_exception ): + # Close any cached session to ensure mock is applied to fresh session + HttpSessionManager.close_session() + # Create a mock response object mock_response = Mock() mock_response.status_code = feast_exception.http_status_code() @@ -54,6 +58,9 @@ def test_rest_error_handling_with_feast_exception( def test_rest_error_handling_with_none_feast_exception( mock_post, environment, none_feast_exception ): + # Close any cached session to ensure mock is applied to fresh session + HttpSessionManager.close_session() + # Create a mock response object mock_response = Mock() mock_response.status_code = 500