diff --git a/sdk/python/feast/cli/ui.py b/sdk/python/feast/cli/ui.py index 9fd7b24b7cd..bcac7cf2c3c 100644 --- a/sdk/python/feast/cli/ui.py +++ b/sdk/python/feast/cli/ui.py @@ -2,6 +2,8 @@ from feast.repo_operations import create_feature_store, registry_dump +VALID_MODES = ("proto", "rest", "rest-external") + @click.command() @click.option( @@ -52,6 +54,25 @@ show_default=False, help="path to TLS(SSL) certificate public key. You need to pass --key arg as well to start server in TLS mode", ) +@click.option( + "--mode", + "-m", + type=click.Choice(VALID_MODES, case_sensitive=False), + default="proto", + show_default=True, + help=( + "Data serving mode for the UI. " + "'proto' serves the registry as a protobuf blob (current default). " + "'rest' mounts the REST registry API alongside the UI. " + "'rest-external' proxies to an external REST registry API." + ), +) +@click.option( + "--rest-api-url", + type=click.STRING, + default="", + help="Base URL of an external REST registry API (required when --mode=rest-external). Example: http://registry-host:6570/api/v1", +) @click.pass_context def ui( ctx: click.Context, @@ -61,6 +82,8 @@ def ui( root_path: str = "", tls_key_path: str = "", tls_cert_path: str = "", + mode: str = "proto", + rest_api_url: str = "", ): """ Shows the Feast UI over the current directory @@ -69,8 +92,11 @@ def ui( raise click.BadParameter( "Please configure --key and --cert args to start the feature server in SSL mode." ) + if mode == "rest-external" and not rest_api_url: + raise click.BadParameter( + "--rest-api-url is required when using --mode=rest-external." + ) store = create_feature_store(ctx) - # Pass in the registry_dump method to get around a circular dependency store.serve_ui( host=host, port=port, @@ -79,4 +105,6 @@ def ui( root_path=root_path, tls_key_path=tls_key_path, tls_cert_path=tls_cert_path, + mode=mode, + rest_api_url=rest_api_url, ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f95bbf10c03..dad5835d68c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -3153,8 +3153,15 @@ def serve_ui( root_path: str = "", tls_key_path: str = "", tls_cert_path: str = "", + mode: str = "proto", + rest_api_url: str = "", ) -> None: - """Start the UI server locally""" + """Start the UI server locally + + Args: + mode: Data serving mode - 'proto' (default), 'rest', or 'rest-external'. + rest_api_url: Base URL for external REST API (required for 'rest-external' mode). + """ if flags_helper.is_test(): warnings.warn( "The Feast UI is an experimental feature. " @@ -3171,6 +3178,8 @@ def serve_ui( root_path=root_path, tls_key_path=tls_key_path, tls_cert_path=tls_cert_path, + mode=mode, + rest_api_url=rest_api_url, ) def serve_registry( diff --git a/sdk/python/feast/ui_server.py b/sdk/python/feast/ui_server.py index 99a4abc9c81..c6b89ada28d 100644 --- a/sdk/python/feast/ui_server.py +++ b/sdk/python/feast/ui_server.py @@ -1,33 +1,72 @@ import json +import logging import threading from importlib import resources as importlib_resources from typing import Callable, Optional import uvicorn -from fastapi import FastAPI, Response, status +from fastapi import FastAPI, Request, Response, status from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles import feast +logger = logging.getLogger(__name__) -def get_app( + +def _build_projects_list( store: "feast.FeatureStore", project_id: str, - registry_ttl_secs: int, - root_path: str = "", + root_path: str, + mode: str, ): - app = FastAPI() + """Build the projects list for the UI, with mode-aware registry paths.""" + discovered_projects = [] + registry = store.registry.proto() - app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) + if mode == "proto": + registry_path_template = f"{root_path}/registry" + else: + registry_path_template = f"{root_path}/api/v1" + + if registry and registry.projects and len(registry.projects) > 0: + for proj in registry.projects: + if proj.spec and proj.spec.name: + discovered_projects.append( + { + "name": proj.spec.name.replace("_", " ").title(), + "description": proj.spec.description + or f"Project: {proj.spec.name}", + "id": proj.spec.name, + "registryPath": registry_path_template, + } + ) + else: + discovered_projects.append( + { + "name": "Project", + "description": "Test project", + "id": project_id, + "registryPath": registry_path_template, + } + ) + + if len(discovered_projects) > 1: + all_projects_entry = { + "name": "All Projects", + "description": "View data across all projects", + "id": "all", + "registryPath": registry_path_template, + } + discovered_projects.insert(0, all_projects_entry) + + return {"projects": discovered_projects, "mode": mode} - # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down + +def _setup_proto_mode( + app: FastAPI, store: "feast.FeatureStore", registry_ttl_secs: int +): + """Set up the legacy proto-blob serving mode (GET /registry).""" registry_proto = None shutting_down = False active_timer: Optional[threading.Timer] = None @@ -51,57 +90,155 @@ def shutdown_event(): async_refresh() - ui_dir_ref = importlib_resources.files(__spec__.parent) / "ui/build/" # type: ignore[name-defined, arg-type] - with importlib_resources.as_file(ui_dir_ref) as ui_dir: - # Initialize with the projects-list.json file - with ui_dir.joinpath("projects-list.json").open(mode="w") as f: - # Get all projects from the registry - discovered_projects = [] - registry = store.registry.proto() - - # Use the projects list from the registry - if registry and registry.projects and len(registry.projects) > 0: - for proj in registry.projects: - if proj.spec and proj.spec.name: - discovered_projects.append( - { - "name": proj.spec.name.replace("_", " ").title(), - "description": proj.spec.description - or f"Project: {proj.spec.name}", - "id": proj.spec.name, - "registryPath": f"{root_path}/registry", - } - ) - else: - # If no projects in registry, use the current project from feature_store.yaml - discovered_projects.append( - { - "name": "Project", - "description": "Test project", - "id": project_id, - "registryPath": f"{root_path}/registry", - } - ) + @app.get("/registry") + def read_registry(): + if registry_proto is None: + return Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) + return Response( + content=registry_proto.SerializeToString(), + media_type="application/octet-stream", + ) - # Add "All Projects" option at the beginning if there are multiple projects - if len(discovered_projects) > 1: - all_projects_entry = { - "name": "All Projects", - "description": "View data across all projects", - "id": "all", - "registryPath": f"{root_path}/registry", - } - discovered_projects.insert(0, all_projects_entry) - - projects_dict = {"projects": discovered_projects} - f.write(json.dumps(projects_dict)) + @app.get("/health") + def health(): + return ( + Response(status_code=status.HTTP_200_OK) + if registry_proto + else Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) + ) + + +def _setup_rest_mode(app: FastAPI, store: "feast.FeatureStore", registry_ttl_secs: int): + """Mount the REST registry API routes on the UI server under /api/v1.""" + from feast.api.registry.rest import register_all_routes + from feast.registry_server import RegistryServer + + registry_proto = None + shutting_down = False + active_timer: Optional[threading.Timer] = None + + def async_refresh(): + store.refresh_registry() + nonlocal registry_proto + registry_proto = store.registry.proto() + if shutting_down: + return + nonlocal active_timer + active_timer = threading.Timer(registry_ttl_secs, async_refresh) + active_timer.start() + + @app.on_event("shutdown") + def shutdown_event(): + nonlocal shutting_down + shutting_down = True + if active_timer: + active_timer.cancel() + + async_refresh() + + grpc_handler = RegistryServer(store.registry) + + rest_app = FastAPI(root_path="/api/v1") + register_all_routes(rest_app, grpc_handler) + app.mount("/api/v1", rest_app) @app.get("/registry") def read_registry(): if registry_proto is None: + return Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) + return Response( + content=registry_proto.SerializeToString(), + media_type="application/octet-stream", + ) + + @app.get("/health") + def health(): + return ( + Response(status_code=status.HTTP_200_OK) + if registry_proto + else Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) + ) + + logger.info("REST registry API mounted at /api/v1") + + +def _setup_rest_external_mode( + app: FastAPI, + store: "feast.FeatureStore", + rest_api_url: str, + registry_ttl_secs: int, +): + """Reverse-proxy REST API calls to an external registry server.""" + import httpx + + rest_api_url = rest_api_url.rstrip("/") + client = httpx.AsyncClient(timeout=60.0) + + registry_proto = None + shutting_down = False + active_timer: Optional[threading.Timer] = None + + def async_refresh(): + store.refresh_registry() + nonlocal registry_proto + registry_proto = store.registry.proto() + if shutting_down: + return + nonlocal active_timer + active_timer = threading.Timer(registry_ttl_secs, async_refresh) + active_timer.start() + + @app.on_event("shutdown") + async def shutdown_event(): + nonlocal shutting_down + shutting_down = True + if active_timer: + active_timer.cancel() + await client.aclose() + + async_refresh() + + @app.api_route("/api/v1/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) + async def proxy_to_external(request: Request, path: str): + target_url = f"{rest_api_url}/{path}" + query_string = str(request.url.query) + if query_string: + target_url = f"{target_url}?{query_string}" + + headers = { + k: v + for k, v in request.headers.items() + if k.lower() not in ("host", "content-length", "transfer-encoding") + } + + body = await request.body() + + try: + resp = await client.request( + method=request.method, + url=target_url, + headers=headers, + content=body if body else None, + ) + return Response( + content=resp.content, + status_code=resp.status_code, + media_type=resp.headers.get("content-type", "application/json"), + ) + except httpx.RequestError as e: + logger.error(f"Error proxying to {target_url}: {e}") return Response( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE - ) # Service Unavailable + content=json.dumps( + {"detail": "Failed to reach the upstream registry API"} + ), + status_code=status.HTTP_502_BAD_GATEWAY, + media_type="application/json", + ) + + @app.get("/registry") + def read_registry(): + if registry_proto is None: + return Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) return Response( content=registry_proto.SerializeToString(), media_type="application/octet-stream", @@ -115,14 +252,45 @@ def health(): else Response(status_code=status.HTTP_503_SERVICE_UNAVAILABLE) ) - # For all other paths (such as paths that would otherwise be handled by react router), pass to React + logger.info(f"REST external proxy configured → {rest_api_url}") + + +def get_app( + store: "feast.FeatureStore", + project_id: str, + registry_ttl_secs: int, + root_path: str = "", + mode: str = "proto", + rest_api_url: str = "", +): + app = FastAPI() + + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + if mode == "rest": + _setup_rest_mode(app, store, registry_ttl_secs) + elif mode == "rest-external": + _setup_rest_external_mode(app, store, rest_api_url, registry_ttl_secs) + else: + _setup_proto_mode(app, store, registry_ttl_secs) + + ui_dir_ref = importlib_resources.files(__spec__.parent) / "ui/build/" # type: ignore[name-defined, arg-type] + with importlib_resources.as_file(ui_dir_ref) as ui_dir: + projects_dict = _build_projects_list(store, project_id, root_path, mode) + with ui_dir.joinpath("projects-list.json").open(mode="w") as f: + f.write(json.dumps(projects_dict)) + @app.api_route("/p/{path_name:path}", methods=["GET"]) def catch_all(): filename = ui_dir.joinpath("index.html") - with open(filename) as f: content = f.read() - return Response(content, media_type="text/html") app.mount( @@ -144,13 +312,20 @@ def start_server( root_path: str = "", tls_key_path: str = "", tls_cert_path: str = "", + mode: str = "proto", + rest_api_url: str = "", ): app = get_app( store, project_id, registry_ttl_sec, root_path, + mode=mode, + rest_api_url=rest_api_url, ) + + logger.info(f"Starting Feast UI server in '{mode}' mode on {host}:{port}") + if tls_key_path and tls_cert_path: uvicorn.run( app, diff --git a/sdk/python/tests/unit/test_ui_server.py b/sdk/python/tests/unit/test_ui_server.py index 36389f7b860..c2436c777cc 100644 --- a/sdk/python/tests/unit/test_ui_server.py +++ b/sdk/python/tests/unit/test_ui_server.py @@ -207,3 +207,236 @@ def test_catch_all_route(ui_app_with_registry): # The route will fail due to the scope issue with ui_dir with pytest.raises(Exception): # Expecting NameError or FileNotFoundError client.get("/p/some/react/path") + + +# ---------- Mode-aware projects-list.json tests ---------- + + +def _read_projects_list(temp_dir): + """Read the projects-list.json written by get_app via the mock (ui_dir = temp_dir).""" + projects_file = os.path.join(temp_dir, "projects-list.json") + with open(projects_file) as f: + return json.load(f) + + +def test_projects_list_proto_mode(mock_feature_store): + """projects-list.json uses /registry paths and mode='proto' by default.""" + mock_registry = MagicMock() + mock_proto = MagicMock() + mock_proto.SerializeToString.return_value = b"data" + mock_proto.projects = [] + mock_registry.proto.return_value = mock_proto + mock_feature_store.registry = mock_registry + + with tempfile.TemporaryDirectory() as temp_dir: + _create_mock_ui_files(temp_dir) + + with _setup_importlib_mocks(temp_dir): + get_app(mock_feature_store, TEST_PROJECT_NAME, REGISTRY_TTL_SECS) + + data = _read_projects_list(temp_dir) + assertpy.assert_that(data["mode"]).is_equal_to("proto") + assertpy.assert_that(data["projects"][0]["registryPath"]).is_equal_to( + "/registry" + ) + + +def test_projects_list_rest_mode(mock_feature_store): + """projects-list.json uses /api/v1 paths and mode='rest' when REST mode is set.""" + mock_registry = MagicMock() + mock_proto = MagicMock() + mock_proto.SerializeToString.return_value = b"data" + mock_proto.projects = [] + mock_registry.proto.return_value = mock_proto + mock_feature_store.registry = mock_registry + + with tempfile.TemporaryDirectory() as temp_dir: + _create_mock_ui_files(temp_dir) + + with _setup_importlib_mocks(temp_dir): + get_app( + mock_feature_store, + TEST_PROJECT_NAME, + REGISTRY_TTL_SECS, + mode="rest", + ) + + data = _read_projects_list(temp_dir) + assertpy.assert_that(data["mode"]).is_equal_to("rest") + assertpy.assert_that(data["projects"][0]["registryPath"]).is_equal_to("/api/v1") + + +def test_projects_list_rest_mode_with_root_path(mock_feature_store): + """REST mode respects root_path prefix in registryPath.""" + mock_registry = MagicMock() + mock_proto = MagicMock() + mock_proto.SerializeToString.return_value = b"data" + mock_proto.projects = [] + mock_registry.proto.return_value = mock_proto + mock_feature_store.registry = mock_registry + + with tempfile.TemporaryDirectory() as temp_dir: + _create_mock_ui_files(temp_dir) + + with _setup_importlib_mocks(temp_dir): + get_app( + mock_feature_store, + TEST_PROJECT_NAME, + REGISTRY_TTL_SECS, + root_path="/feast", + mode="rest", + ) + + data = _read_projects_list(temp_dir) + assertpy.assert_that(data["projects"][0]["registryPath"]).is_equal_to( + "/feast/api/v1" + ) + + +# ---------- REST mode backward-compat: /registry and /health still work ---------- + + +def test_rest_mode_health_endpoint(mock_feature_store): + """Health endpoint works in REST mode.""" + mock_registry = MagicMock() + mock_proto = MagicMock() + mock_proto.SerializeToString.return_value = b"data" + mock_proto.projects = [] + mock_registry.proto.return_value = mock_proto + mock_feature_store.registry = mock_registry + + with tempfile.TemporaryDirectory() as temp_dir: + _create_mock_ui_files(temp_dir) + + with _setup_importlib_mocks(temp_dir): + app = get_app( + mock_feature_store, + TEST_PROJECT_NAME, + REGISTRY_TTL_SECS, + mode="rest", + ) + client = TestClient(app) + response = client.get("/health") + assertpy.assert_that(response.status_code).is_equal_to( + EXPECTED_SUCCESS_STATUS + ) + + +def test_rest_mode_registry_endpoint_backward_compat(mock_feature_store): + """/registry proto blob endpoint is still available in REST mode.""" + mock_registry = MagicMock() + mock_proto = MagicMock() + mock_proto.SerializeToString.return_value = b"proto_blob" + mock_proto.projects = [] + mock_registry.proto.return_value = mock_proto + mock_feature_store.registry = mock_registry + + with tempfile.TemporaryDirectory() as temp_dir: + _create_mock_ui_files(temp_dir) + + with _setup_importlib_mocks(temp_dir): + app = get_app( + mock_feature_store, + TEST_PROJECT_NAME, + REGISTRY_TTL_SECS, + mode="rest", + ) + client = TestClient(app) + response = client.get("/registry") + assertpy.assert_that(response.status_code).is_equal_to( + EXPECTED_SUCCESS_STATUS + ) + assertpy.assert_that(response.headers["content-type"]).is_equal_to( + "application/octet-stream" + ) + + +# ---------- rest-external proxy tests ---------- + + +def test_rest_external_mode_health_endpoint(mock_feature_store): + """Health endpoint works in rest-external mode.""" + mock_registry = MagicMock() + mock_proto = MagicMock() + mock_proto.SerializeToString.return_value = b"data" + mock_proto.projects = [] + mock_registry.proto.return_value = mock_proto + mock_feature_store.registry = mock_registry + + with tempfile.TemporaryDirectory() as temp_dir: + _create_mock_ui_files(temp_dir) + + with _setup_importlib_mocks(temp_dir): + app = get_app( + mock_feature_store, + TEST_PROJECT_NAME, + REGISTRY_TTL_SECS, + mode="rest-external", + rest_api_url="http://fake-registry:6570/api/v1", + ) + client = TestClient(app) + response = client.get("/health") + assertpy.assert_that(response.status_code).is_equal_to( + EXPECTED_SUCCESS_STATUS + ) + + +def test_rest_external_mode_proxy_unreachable(mock_feature_store): + """rest-external returns 502 when external API is unreachable.""" + from unittest.mock import AsyncMock + + import httpx + + mock_registry = MagicMock() + mock_proto = MagicMock() + mock_proto.SerializeToString.return_value = b"data" + mock_proto.projects = [] + mock_registry.proto.return_value = mock_proto + mock_feature_store.registry = mock_registry + + mock_httpx_client = AsyncMock() + mock_httpx_client.request.side_effect = httpx.ConnectError("Connection refused") + + with tempfile.TemporaryDirectory() as temp_dir: + _create_mock_ui_files(temp_dir) + + with ( + _setup_importlib_mocks(temp_dir), + patch("httpx.AsyncClient", return_value=mock_httpx_client), + ): + app = get_app( + mock_feature_store, + TEST_PROJECT_NAME, + REGISTRY_TTL_SECS, + mode="rest-external", + rest_api_url="http://fake-registry:6570/api/v1", + ) + client = TestClient(app) + response = client.get("/api/v1/projects") + assertpy.assert_that(response.status_code).is_equal_to(502) + + +def test_rest_external_mode_projects_list(mock_feature_store): + """projects-list.json mode is 'rest-external' with /api/v1 paths.""" + mock_registry = MagicMock() + mock_proto = MagicMock() + mock_proto.SerializeToString.return_value = b"data" + mock_proto.projects = [] + mock_registry.proto.return_value = mock_proto + mock_feature_store.registry = mock_registry + + with tempfile.TemporaryDirectory() as temp_dir: + _create_mock_ui_files(temp_dir) + + with _setup_importlib_mocks(temp_dir): + get_app( + mock_feature_store, + TEST_PROJECT_NAME, + REGISTRY_TTL_SECS, + mode="rest-external", + rest_api_url="http://fake:6570/api/v1", + ) + + data = _read_projects_list(temp_dir) + assertpy.assert_that(data["mode"]).is_equal_to("rest-external") + assertpy.assert_that(data["projects"][0]["registryPath"]).is_equal_to("/api/v1")