From 4c5fb2cd30fd109681e3ebcac577fbae5ecbfc62 Mon Sep 17 00:00:00 2001 From: Jitendra Yejare <11752425+jyejare@users.noreply.github.com> Date: Mon, 27 Apr 2026 19:27:41 +0530 Subject: [PATCH 1/5] feat: Operational metrics for offline store and SOX metrics for both Signed-off-by: Jitendra Yejare <11752425+jyejare@users.noreply.github.com> --- sdk/python/feast/feature_server.py | 79 ++- .../infra/feature_servers/base_config.py | 11 + .../infra/offline_stores/offline_store.py | 65 +- sdk/python/feast/metrics.py | 103 ++- sdk/python/tests/unit/test_metrics.py | 661 +++++++++++++++++- 5 files changed, 899 insertions(+), 20 deletions(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 81359222797..6d566a10c5d 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -152,28 +152,72 @@ class ChatRequest(BaseModel): messages: List[ChatMessage] -def _resolve_feature_counts( +def _parse_feature_info( features: Union[List[str], "feast.FeatureService"], ) -> tuple: - """Return (feature_count, feature_view_count) from the resolved features. + """Return ``(feature_view_names, feature_count)`` from resolved features. ``features`` is either a list of ``"feature_view:feature"`` strings or a ``FeatureService`` with ``feature_view_projections``. + + Returns: + (fv_names, feat_count) where fv_names is a list of unique feature + view name strings and feat_count is the total number of features. """ from feast.feature_service import FeatureService if isinstance(features, FeatureService): projections = features.feature_view_projections - fv_count = len(projections) + fv_names = [p.name for p in projections] feat_count = sum(len(p.features) for p in projections) elif isinstance(features, list): feat_count = len(features) - fv_names = {ref.split(":")[0].split("@")[0] for ref in features if ":" in ref} - fv_count = len(fv_names) + fv_names = list( + {ref.split(":")[0].split("@")[0] for ref in features if ":" in ref} + ) else: + fv_names = [] feat_count = 0 - fv_count = 0 - return str(feat_count), str(fv_count) + return fv_names, feat_count + + +def _resolve_feature_counts( + features: Union[List[str], "feast.FeatureService"], +) -> tuple: + """Return ``(feature_count_str, feature_view_count_str)`` for Prometheus labels.""" + fv_names, feat_count = _parse_feature_info(features) + return str(feat_count), str(len(fv_names)) + + +def _emit_online_audit( + request: GetOnlineFeaturesRequest, + features: Union[List[str], "feast.FeatureService"], + entity_count: int, + status: str, + latency_ms: float, +): + """Best-effort audit log emission for online feature requests.""" + try: + from feast.permissions.security_manager import get_security_manager + + requestor_id = "anonymous" + sm = get_security_manager() + if sm and sm.current_user: + requestor_id = sm.current_user.username or "anonymous" + + fv_names, feat_count = _parse_feature_info(features) + + feast_metrics.emit_online_audit_log( + requestor_id=requestor_id, + entity_keys=list(request.entities.keys()), + entity_count=entity_count, + feature_views=fv_names, + feature_count=feat_count, + status=status, + latency_ms=latency_ms, + ) + except Exception: + logger.warning("Failed to emit online audit log", exc_info=True) async def _get_features( @@ -390,11 +434,22 @@ async def get_online_features(request: GetOnlineFeaturesRequest) -> Any: include_feature_view_version_metadata=request.include_feature_view_version_metadata, ) - if store._get_provider().async_supported.online.read: - response = await store.get_online_features_async(**read_params) # type: ignore - else: - response = await run_in_threadpool( - lambda: store.get_online_features(**read_params) # type: ignore + audit_start_ms = time.monotonic() * 1000 + audit_status = "success" + try: + if store._get_provider().async_supported.online.read: + response = await store.get_online_features_async(**read_params) # type: ignore + else: + response = await run_in_threadpool( + lambda: store.get_online_features(**read_params) # type: ignore + ) + except Exception: + audit_status = "error" + raise + finally: + audit_latency_ms = time.monotonic() * 1000 - audit_start_ms + _emit_online_audit( + request, features, entity_count, audit_status, audit_latency_ms ) response_dict = await run_in_threadpool( diff --git a/sdk/python/feast/infra/feature_servers/base_config.py b/sdk/python/feast/infra/feature_servers/base_config.py index df324dc57d3..14ad2fe505e 100644 --- a/sdk/python/feast/infra/feature_servers/base_config.py +++ b/sdk/python/feast/infra/feature_servers/base_config.py @@ -82,6 +82,17 @@ class MetricsConfig(FeastConfigBaseModel): """Emit per-feature-view freshness gauges (feast_feature_freshness_seconds).""" + offline_features: StrictBool = True + """Emit offline store retrieval metrics + (feast_offline_store_request_total, + feast_offline_store_request_latency_seconds, + feast_offline_store_row_count).""" + + audit_logging: StrictBool = False + """Emit structured JSON audit log entries for online and offline + feature requests via the ``feast.audit`` logger. Captures requestor + identity, entity keys, feature views, row counts, and latency.""" + class BaseFeatureServerConfig(FeastConfigBaseModel): """Base Feature Server config that should be extended""" diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 4ae0c680c3b..9d9fee22623 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -11,9 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import time import warnings from abc import ABC -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import ( TYPE_CHECKING, @@ -70,6 +72,21 @@ def __init__( self.max_event_timestamp = max_event_timestamp +def _extract_retrieval_metadata(job: "RetrievalJob") -> tuple: + """Return ``(feature_view_names, feature_count)`` from a RetrievalJob's metadata.""" + try: + meta = job.metadata + if meta: + feature_count = len(meta.features) + feature_views = list( + {ref.split(":")[0] for ref in meta.features if ":" in ref} + ) + return feature_views, feature_count + except (NotImplementedError, AttributeError): + pass + return [], 0 + + class RetrievalJob(ABC): """A RetrievalJob manages the execution of a query to retrieve data from the offline store.""" @@ -152,7 +169,51 @@ def to_arrow( validation_reference (optional): The validation to apply against the retrieved dataframe. timeout (optional): The query timeout if applicable. """ - features_table = self._to_arrow_internal(timeout=timeout) + start_wall = time.monotonic() + status_label = "success" + row_count = 0 + try: + features_table = self._to_arrow_internal(timeout=timeout) + row_count = features_table.num_rows + except Exception: + status_label = "error" + raise + finally: + try: + from feast import metrics as feast_metrics + + elapsed = time.monotonic() - start_wall + + if feast_metrics._config.offline_features: + feast_metrics.offline_store_request_total.labels( + method="to_arrow", status=status_label + ).inc() + feast_metrics.offline_store_request_latency_seconds.labels( + method="to_arrow" + ).observe(elapsed) + if row_count > 0: + feast_metrics.offline_store_row_count.labels( + method="to_arrow" + ).observe(row_count) + + if feast_metrics._config.audit_logging: + feature_views, feature_count = _extract_retrieval_metadata(self) + now_iso = datetime.now(tz=timezone.utc).isoformat() + feast_metrics.emit_offline_audit_log( + method="to_arrow", + feature_views=feature_views, + feature_count=feature_count, + row_count=row_count, + status=status_label, + start_time=now_iso, + end_time=now_iso, + duration_ms=elapsed * 1000, + ) + except Exception: + logging.getLogger(__name__).debug( + "Failed to record offline store metrics", exc_info=True + ) + if self.on_demand_feature_views: # Build a mapping of ODFV name to requested feature names # This ensures we only return the features that were explicitly requested diff --git a/sdk/python/feast/metrics.py b/sdk/python/feast/metrics.py index 694f25a687e..f827f6e31ee 100644 --- a/sdk/python/feast/metrics.py +++ b/sdk/python/feast/metrics.py @@ -42,6 +42,7 @@ """ import atexit +import json import logging import os import shutil @@ -51,7 +52,7 @@ from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timezone -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, List, Optional import psutil @@ -123,6 +124,8 @@ class _MetricsFlags: push: bool = False materialization: bool = False freshness: bool = False + offline_features: bool = False + audit_logging: bool = False _config = _MetricsFlags() @@ -144,6 +147,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag push=True, materialization=True, freshness=True, + offline_features=True, + audit_logging=False, ) return _MetricsFlags( enabled=True, @@ -153,6 +158,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag push=getattr(metrics_config, "push", True), materialization=getattr(metrics_config, "materialization", True), freshness=getattr(metrics_config, "freshness", True), + offline_features=getattr(metrics_config, "offline_features", True), + audit_logging=getattr(metrics_config, "audit_logging", False), ) @@ -260,6 +267,33 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag multiprocess_mode="max", ) +# --------------------------------------------------------------------------- +# Offline store retrieval metrics +# --------------------------------------------------------------------------- +offline_store_request_total = Counter( + "feast_offline_store_request_total", + "Total offline store retrieval requests", + ["method", "status"], +) +offline_store_request_latency_seconds = Histogram( + "feast_offline_store_request_latency_seconds", + "Latency of offline store retrieval operations in seconds", + ["method"], + buckets=(0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0), +) +offline_store_row_count = Histogram( + "feast_offline_store_row_count", + "Number of rows returned by offline store retrieval", + ["method"], + buckets=(100, 1000, 10000, 100000, 500000, 1000000, 5000000), +) + +# --------------------------------------------------------------------------- +# Audit logger — separate from the main feast logger so operators can +# route SOX-style audit entries to a dedicated sink. +# --------------------------------------------------------------------------- +audit_logger = logging.getLogger("feast.audit") + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -388,6 +422,71 @@ def track_materialization( ) +def emit_online_audit_log( + *, + requestor_id: str, + entity_keys: List[str], + entity_count: int, + feature_views: List[str], + feature_count: int, + status: str, + latency_ms: float, +): + """Emit a structured JSON audit log entry for an online feature request.""" + if not _config.audit_logging: + return + audit_logger.info( + _json_dumps( + { + "event": "online_feature_request", + "timestamp": datetime.now(tz=timezone.utc).isoformat(), + "requestor_id": requestor_id, + "entity_keys": entity_keys, + "entity_count": entity_count, + "feature_views": feature_views, + "feature_count": feature_count, + "status": status, + "latency_ms": round(latency_ms, 2), + } + ) + ) + + +def emit_offline_audit_log( + *, + method: str, + feature_views: List[str], + feature_count: int, + row_count: int, + status: str, + start_time: str, + end_time: str, + duration_ms: float, +): + """Emit a structured JSON audit log entry for an offline feature retrieval.""" + if not _config.audit_logging: + return + audit_logger.info( + _json_dumps( + { + "event": "offline_feature_retrieval", + "method": method, + "start_time": start_time, + "end_time": end_time, + "feature_views": feature_views, + "feature_count": feature_count, + "row_count": row_count, + "status": status, + "duration_ms": round(duration_ms, 2), + } + ) + ) + + +def _json_dumps(obj: dict) -> str: + return json.dumps(obj, separators=(",", ":")) + + def update_feature_freshness( store: "FeatureStore", ) -> None: @@ -507,6 +606,8 @@ def start_metrics_server( push=True, materialization=True, freshness=True, + offline_features=True, + audit_logging=False, ) from prometheus_client import CollectorRegistry, make_wsgi_app diff --git a/sdk/python/tests/unit/test_metrics.py b/sdk/python/tests/unit/test_metrics.py index bffde73dd91..2750757f67a 100644 --- a/sdk/python/tests/unit/test_metrics.py +++ b/sdk/python/tests/unit/test_metrics.py @@ -18,9 +18,14 @@ import pytest from feast.metrics import ( + emit_offline_audit_log, + emit_online_audit_log, feature_freshness_seconds, materialization_duration_seconds, materialization_result_total, + offline_store_request_latency_seconds, + offline_store_request_total, + offline_store_row_count, online_features_entity_count, online_features_request_count, online_features_status_total, @@ -42,13 +47,11 @@ ) -@pytest.fixture(autouse=True) -def _enable_metrics(): - """Enable all metric categories for each test, then restore.""" +def _all_enabled_flags(): + """Return a _MetricsFlags with every category enabled.""" import feast.metrics as m - original = m._config - m._config = m._MetricsFlags( + return m._MetricsFlags( enabled=True, resource=True, request=True, @@ -56,7 +59,18 @@ def _enable_metrics(): push=True, materialization=True, freshness=True, + offline_features=True, + audit_logging=True, ) + + +@pytest.fixture(autouse=True) +def _enable_metrics(): + """Enable all metric categories for each test, then restore.""" + import feast.metrics as m + + original = m._config + m._config = _all_enabled_flags() yield m._config = original @@ -1081,3 +1095,640 @@ def test_separate_from_read_transform_metric(self): assert abs(read_delta - 0.01) < 0.001 assert abs(write_delta - 0.05) < 0.001 + + +class TestOfflineStoreMetrics: + """Tests for the offline store Prometheus metrics (RED pattern).""" + + def test_request_total_increments_on_success(self): + before = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + + offline_store_request_total.labels(method="to_arrow", status="success").inc() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before + 1 + ) + + def test_request_total_increments_on_error(self): + before = offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + + offline_store_request_total.labels(method="to_arrow", status="error").inc() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + == before + 1 + ) + + def test_latency_histogram_records(self): + before_sum = offline_store_request_latency_seconds.labels( + method="to_arrow" + )._sum.get() + + offline_store_request_latency_seconds.labels(method="to_arrow").observe(2.5) + + after_sum = offline_store_request_latency_seconds.labels( + method="to_arrow" + )._sum.get() + assert pytest.approx(after_sum - before_sum, abs=0.01) == 2.5 + + def test_row_count_histogram_records(self): + before_sum = offline_store_row_count.labels(method="to_arrow")._sum.get() + + offline_store_row_count.labels(method="to_arrow").observe(1000) + + after_sum = offline_store_row_count.labels(method="to_arrow")._sum.get() + assert pytest.approx(after_sum - before_sum, abs=1) == 1000 + + def test_different_methods_tracked_independently(self): + before_a = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + before_b = offline_store_request_total.labels( + method="other", status="success" + )._value.get() + + offline_store_request_total.labels(method="to_arrow", status="success").inc() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before_a + 1 + ) + assert ( + offline_store_request_total.labels( + method="other", status="success" + )._value.get() + == before_b + ) + + +class TestEmitAuditLogs: + """Tests for structured JSON audit log emission.""" + + def test_emit_online_audit_log_writes_json(self): + import json + import logging + + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_online_audit_log( + requestor_id="user@example.com", + entity_keys=["driver_id", "customer_id"], + entity_count=10, + feature_views=["driver_fv", "order_fv"], + feature_count=5, + status="success", + latency_ms=42.0, + ) + + mock_info.assert_called_once() + logged_json = mock_info.call_args[0][0] + record = json.loads(logged_json) + + assert record["event"] == "online_feature_request" + assert record["requestor_id"] == "user@example.com" + assert record["entity_keys"] == ["driver_id", "customer_id"] + assert record["entity_count"] == 10 + assert record["feature_views"] == ["driver_fv", "order_fv"] + assert record["feature_count"] == 5 + assert record["status"] == "success" + assert record["latency_ms"] == pytest.approx(42.0) + assert "timestamp" in record + + def test_emit_online_audit_log_noop_when_disabled(self): + import logging + + import feast.metrics as m + + m._config = m._MetricsFlags(enabled=True, audit_logging=False) + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_online_audit_log( + requestor_id="user@example.com", + entity_keys=["driver_id"], + entity_count=1, + feature_views=["driver_fv"], + feature_count=1, + status="success", + latency_ms=10.0, + ) + mock_info.assert_not_called() + + def test_emit_offline_audit_log_writes_json(self): + import json + import logging + + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_offline_audit_log( + method="to_arrow", + feature_views=["driver_fv"], + feature_count=3, + row_count=500, + status="success", + start_time="2026-04-27T12:00:00+00:00", + end_time="2026-04-27T12:00:01+00:00", + duration_ms=1230.0, + ) + + mock_info.assert_called_once() + logged_json = mock_info.call_args[0][0] + record = json.loads(logged_json) + + assert record["event"] == "offline_feature_retrieval" + assert record["method"] == "to_arrow" + assert record["feature_views"] == ["driver_fv"] + assert record["feature_count"] == 3 + assert record["row_count"] == 500 + assert record["status"] == "success" + assert record["duration_ms"] == pytest.approx(1230.0) + assert record["start_time"] == "2026-04-27T12:00:00+00:00" + assert record["end_time"] == "2026-04-27T12:00:01+00:00" + + def test_emit_offline_audit_log_noop_when_disabled(self): + import logging + + import feast.metrics as m + + m._config = m._MetricsFlags(enabled=True, audit_logging=False) + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_offline_audit_log( + method="to_arrow", + feature_views=["fv"], + feature_count=1, + row_count=10, + status="success", + start_time="t0", + end_time="t1", + duration_ms=500.0, + ) + mock_info.assert_not_called() + + def test_emit_online_audit_log_with_error_status(self): + import json + import logging + + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_online_audit_log( + requestor_id="unknown", + entity_keys=[], + entity_count=0, + feature_views=[], + feature_count=0, + status="error", + latency_ms=1.0, + ) + + record = json.loads(mock_info.call_args[0][0]) + assert record["status"] == "error" + + +class TestBuildMetricsFlagsOfflineAndAudit: + """Tests for the new offline_features and audit_logging flags.""" + + def test_no_config_defaults_for_new_flags(self): + from feast.metrics import build_metrics_flags + + flags = build_metrics_flags(None) + assert flags.offline_features is True + assert flags.audit_logging is False + + def test_explicit_enable(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + offline_features=True, + audit_logging=True, + ) + flags = build_metrics_flags(mc) + assert flags.offline_features is True + assert flags.audit_logging is True + + def test_explicit_disable(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + offline_features=False, + audit_logging=False, + ) + flags = build_metrics_flags(mc) + assert flags.offline_features is False + assert flags.audit_logging is False + + def test_missing_new_attrs_fall_back_to_defaults(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + ) + flags = build_metrics_flags(mc) + assert flags.offline_features is True + assert flags.audit_logging is False + + +class TestExtractRetrievalMetadata: + """Tests for _extract_retrieval_metadata helper.""" + + def test_extracts_feature_views_and_count(self): + from feast.infra.offline_stores.offline_store import ( + RetrievalMetadata, + _extract_retrieval_metadata, + ) + + job = MagicMock() + job.metadata = RetrievalMetadata( + features=[ + "driver_fv:conv_rate", + "driver_fv:acc_rate", + "vehicle_fv:mileage", + ], + keys=["driver_id"], + ) + + fv_names, feat_count = _extract_retrieval_metadata(job) + assert feat_count == 3 + assert set(fv_names) == {"driver_fv", "vehicle_fv"} + + def test_returns_empty_when_no_metadata(self): + from feast.infra.offline_stores.offline_store import ( + _extract_retrieval_metadata, + ) + + job = MagicMock() + job.metadata = None + + fv_names, feat_count = _extract_retrieval_metadata(job) + assert fv_names == [] + assert feat_count == 0 + + def test_handles_not_implemented_metadata(self): + from feast.infra.offline_stores.offline_store import ( + _extract_retrieval_metadata, + ) + + job = MagicMock() + type(job).metadata = property( + lambda self: (_ for _ in ()).throw(NotImplementedError()) + ) + + fv_names, feat_count = _extract_retrieval_metadata(job) + assert fv_names == [] + assert feat_count == 0 + + +class TestRetrievalJobToArrowInstrumentation: + """Tests for the metrics/audit instrumentation in RetrievalJob.to_arrow().""" + + def _make_job( + self, table, on_demand_fvs=None, metadata=None, raise_on_internal=None + ): + """Create a concrete RetrievalJob subclass for testing.""" + from feast.infra.offline_stores.offline_store import RetrievalJob + + class _TestJob(RetrievalJob): + def __init__(self): + self._table = table + self._odfvs = on_demand_fvs or [] + self._metadata = metadata + self._raise = raise_on_internal + + def _to_arrow_internal(self, timeout=None): + if self._raise: + raise self._raise + return self._table + + @property + def full_feature_names(self): + return False + + @property + def on_demand_feature_views(self): + return self._odfvs + + @property + def metadata(self): + return self._metadata + + return _TestJob() + + def test_success_increments_counter_and_records_latency(self): + import pyarrow as pa + + table = pa.table({"col": [1, 2, 3]}) + job = self._make_job(table) + + before_count = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + before_latency = offline_store_request_latency_seconds.labels( + method="to_arrow" + )._sum.get() + + result = job.to_arrow() + + assert result.num_rows == 3 + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before_count + 1 + ) + assert ( + offline_store_request_latency_seconds.labels(method="to_arrow")._sum.get() + > before_latency + ) + + def test_error_increments_error_counter(self): + job = self._make_job(None, raise_on_internal=RuntimeError("query failed")) + + before_error = offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + + with pytest.raises(RuntimeError, match="query failed"): + job.to_arrow() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + == before_error + 1 + ) + + def test_row_count_recorded_on_success(self): + import pyarrow as pa + + table = pa.table({"a": list(range(500))}) + job = self._make_job(table) + + before_sum = offline_store_row_count.labels(method="to_arrow")._sum.get() + + job.to_arrow() + + assert ( + offline_store_row_count.labels(method="to_arrow")._sum.get() + >= before_sum + 500 + ) + + def test_row_count_not_recorded_when_zero(self): + import pyarrow as pa + + table = pa.table({"a": pa.array([], type=pa.int64())}) + job = self._make_job(table) + + before_count = offline_store_row_count.labels(method="to_arrow")._sum.get() + + job.to_arrow() + + assert ( + offline_store_row_count.labels(method="to_arrow")._sum.get() == before_count + ) + + def test_metrics_skipped_when_offline_features_disabled(self): + import pyarrow as pa + + import feast.metrics as m + + m._config = m._MetricsFlags( + enabled=True, offline_features=False, audit_logging=False + ) + + table = pa.table({"col": [1, 2]}) + job = self._make_job(table) + + before_count = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + + job.to_arrow() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before_count + ) + + def test_audit_log_emitted_on_success(self): + import pyarrow as pa + + from feast.infra.offline_stores.offline_store import RetrievalMetadata + + meta = RetrievalMetadata( + features=["driver_fv:conv_rate", "driver_fv:acc_rate"], + keys=["driver_id"], + ) + table = pa.table({"col": [1, 2, 3]}) + job = self._make_job(table, metadata=meta) + + with patch("feast.metrics.emit_offline_audit_log") as mock_audit: + job.to_arrow() + + mock_audit.assert_called_once() + call_kwargs = mock_audit.call_args[1] + assert call_kwargs["method"] == "to_arrow" + assert call_kwargs["status"] == "success" + assert call_kwargs["row_count"] == 3 + assert call_kwargs["feature_count"] == 2 + assert set(call_kwargs["feature_views"]) == {"driver_fv"} + + def test_audit_log_skipped_when_disabled(self): + import pyarrow as pa + + import feast.metrics as m + + m._config = m._MetricsFlags( + enabled=True, offline_features=True, audit_logging=False + ) + + table = pa.table({"col": [1]}) + job = self._make_job(table) + + with patch("feast.metrics.emit_offline_audit_log") as mock_audit: + job.to_arrow() + mock_audit.assert_not_called() + + def test_instrumentation_failure_does_not_mask_query_error(self): + """If metrics code itself throws, the original query error still propagates.""" + import pyarrow as pa + + table = pa.table({"col": [1]}) + job = self._make_job(table) + + with patch( + "feast.metrics._config", + new_callable=lambda: property( + lambda self: (_ for _ in ()).throw(RuntimeError("metrics broken")) + ), + ): + result = job.to_arrow() + assert result.num_rows == 1 + + +class TestParseFeatureInfo: + """Tests for _parse_feature_info in feature_server.""" + + def test_feature_ref_list(self): + from feast.feature_server import _parse_feature_info + + refs = ["driver_fv:conv_rate", "driver_fv:acc_rate", "vehicle_fv:mileage"] + fv_names, feat_count = _parse_feature_info(refs) + assert feat_count == 3 + assert set(fv_names) == {"driver_fv", "vehicle_fv"} + + def test_empty_list(self): + from feast.feature_server import _parse_feature_info + + fv_names, feat_count = _parse_feature_info([]) + assert fv_names == [] + assert feat_count == 0 + + def test_feature_service(self): + from feast.feature_server import _parse_feature_info + + proj1 = MagicMock() + proj1.name = "driver_fv" + proj1.features = [MagicMock(), MagicMock()] + proj2 = MagicMock() + proj2.name = "order_fv" + proj2.features = [MagicMock()] + + fs_svc = MagicMock() + fs_svc.feature_view_projections = [proj1, proj2] + + from feast.feature_service import FeatureService + + fs_svc.__class__ = FeatureService + + fv_names, feat_count = _parse_feature_info(fs_svc) + assert feat_count == 3 + assert fv_names == ["driver_fv", "order_fv"] + + def test_strips_version_suffix(self): + from feast.feature_server import _parse_feature_info + + refs = ["driver_fv@v2:conv_rate"] + fv_names, feat_count = _parse_feature_info(refs) + assert feat_count == 1 + assert fv_names == ["driver_fv"] + + +class TestEmitOnlineAudit: + """Tests for the _emit_online_audit helper in feature_server.""" + + def test_emits_audit_log_with_anonymous_user(self): + from feast.feature_server import GetOnlineFeaturesRequest, _emit_online_audit + + request = GetOnlineFeaturesRequest( + entities={"driver_id": [1, 2]}, + features=["driver_fv:conv_rate"], + ) + + with ( + patch("feast.feature_server.feast_metrics") as mock_metrics, + patch( + "feast.permissions.security_manager.get_security_manager", + return_value=None, + ), + ): + _emit_online_audit( + request=request, + features=request.features, + entity_count=2, + status="success", + latency_ms=15.0, + ) + + mock_metrics.emit_online_audit_log.assert_called_once() + kwargs = mock_metrics.emit_online_audit_log.call_args[1] + assert kwargs["requestor_id"] == "anonymous" + assert kwargs["entity_keys"] == ["driver_id"] + assert kwargs["entity_count"] == 2 + assert kwargs["status"] == "success" + + def test_emits_audit_log_with_authenticated_user(self): + from feast.feature_server import GetOnlineFeaturesRequest, _emit_online_audit + + request = GetOnlineFeaturesRequest( + entities={"driver_id": [1]}, + features=["driver_fv:conv_rate"], + ) + + mock_sm = MagicMock() + mock_sm.current_user.username = "jdoe" + + with ( + patch("feast.feature_server.feast_metrics") as mock_metrics, + patch( + "feast.permissions.security_manager.get_security_manager", + return_value=mock_sm, + ), + ): + _emit_online_audit( + request=request, + features=request.features, + entity_count=1, + status="success", + latency_ms=10.0, + ) + + kwargs = mock_metrics.emit_online_audit_log.call_args[1] + assert kwargs["requestor_id"] == "jdoe" + + def test_does_not_raise_on_failure(self): + from feast.feature_server import GetOnlineFeaturesRequest, _emit_online_audit + + request = GetOnlineFeaturesRequest( + entities={"driver_id": [1]}, + features=["driver_fv:conv_rate"], + ) + + with patch( + "feast.permissions.security_manager.get_security_manager", + side_effect=RuntimeError("auth broken"), + ): + _emit_online_audit( + request=request, + features=request.features, + entity_count=1, + status="error", + latency_ms=5.0, + ) From b5d1fd0e91597d9499ee86928e62162c3c85a5cc Mon Sep 17 00:00:00 2001 From: Jitendra Yejare <11752425+jyejare@users.noreply.github.com> Date: Mon, 11 May 2026 14:29:18 +0530 Subject: [PATCH 2/5] fix: Resolve comments from review Signed-off-by: Jitendra Yejare <11752425+jyejare@users.noreply.github.com> --- .../feature-servers/python-feature-server.md | 72 ++++++++++++++++++- .../samples/v1_featurestore_serving.yaml | 4 +- sdk/python/feast/feature_server.py | 5 +- .../infra/offline_stores/offline_store.py | 16 ++--- sdk/python/feast/metrics.py | 1 + sdk/python/tests/unit/test_metrics.py | 10 +-- 6 files changed, 89 insertions(+), 19 deletions(-) diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index 654c4b9f938..4802599866d 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -352,11 +352,14 @@ feature_server: push: true # push request counters materialization: true # materialization counters & duration freshness: true # feature freshness gauges + offline_features: true # offline store retrieval counters & latency + audit_logging: false # structured JSON audit logs (see below) ``` Any category set to `false` will emit no metrics and start no background threads (e.g., setting `freshness: false` prevents the registry polling -thread from starting). All categories default to `true`. +thread from starting). All categories default to `true` except +`audit_logging`, which defaults to `false`. ### Available metrics @@ -375,6 +378,9 @@ thread from starting). All categories default to `true`. | `feast_materialization_result_total` | Counter | `feature_view`, `status` | `materialization` | Materialization runs (success/failure) | | `feast_materialization_duration_seconds` | Histogram | `feature_view` | `materialization` | Materialization duration per feature view | | `feast_feature_freshness_seconds` | Gauge | `feature_view`, `project` | `freshness` | Seconds since last materialization | +| `feast_offline_store_request_total` | Counter | `method`, `status` | `offline_features` | Total offline store retrieval requests | +| `feast_offline_store_request_latency_seconds` | Histogram | `method` | `offline_features` | Latency of offline store retrieval operations | +| `feast_offline_store_row_count` | Histogram | `method` | `offline_features` | Rows returned by offline store retrieval | ### Per-ODFV transformation metrics @@ -405,6 +411,70 @@ The `odfv_name` label lets you filter or group by individual ODFV, and the `mode` label (`python`, `pandas`, `substrait`) lets you compare transformation engines. +### Audit logging + +Feast can emit structured JSON audit log entries for every online and offline +feature retrieval. These are written via the standard `feast.audit` Python +logger, so you can route them to a dedicated file, SIEM, or log aggregator +independently of application logs. + +Audit logging is **disabled by default**. Enable it in `feature_store.yaml`: + +```yaml +feature_server: + type: local + metrics: + enabled: true + audit_logging: true +``` + +**Online audit log** (emitted per `/get-online-features` call): + +```json +{ + "event": "online_feature_request", + "timestamp": "2026-05-11T08:30:00.123456+00:00", + "requestor_id": "user@example.com", + "entity_keys": ["driver_id"], + "entity_count": 3, + "feature_views": ["driver_hourly_stats"], + "feature_count": 3, + "status": "success", + "latency_ms": 12.34 +} +``` + +**Offline audit log** (emitted per `RetrievalJob.to_arrow()` call): + +```json +{ + "event": "offline_feature_retrieval", + "timestamp": "2026-05-11T08:31:00.456789+00:00", + "method": "to_arrow", + "start_time": "2026-05-11T08:30:59.226789+00:00", + "end_time": "2026-05-11T08:31:00.456789+00:00", + "feature_views": ["driver_hourly_stats"], + "feature_count": 3, + "row_count": 500, + "status": "success", + "duration_ms": 1230.0 +} +``` + +The `requestor_id` field in online audit logs is populated from the +security manager's current user when authentication is configured, and +falls back to `"anonymous"` otherwise. + +To route audit logs to a separate file: + +```python +import logging + +handler = logging.FileHandler("/var/log/feast/audit.log") +handler.setFormatter(logging.Formatter("%(message)s")) +logging.getLogger("feast.audit").addHandler(handler) +``` + ### Scraping with Prometheus ```yaml diff --git a/infra/feast-operator/config/samples/v1_featurestore_serving.yaml b/infra/feast-operator/config/samples/v1_featurestore_serving.yaml index f60640624c9..412499412e6 100644 --- a/infra/feast-operator/config/samples/v1_featurestore_serving.yaml +++ b/infra/feast-operator/config/samples/v1_featurestore_serving.yaml @@ -26,8 +26,8 @@ spec: push: true # push/write request counters materialization: true # materialization counters and duration histograms freshness: false # feature freshness gauges (can be expensive at scale) - # Example: when a future SDK adds "registry_sync", enable it here - # registry_sync: false + offline_features: true # offline store retrieval counters, latency, row count + audit_logging: false # structured JSON audit logs via the feast.audit logger offlinePushBatching: enabled: true batchSize: 1000 # max rows per offline write batch diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 6d566a10c5d..bba91130db3 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -165,6 +165,7 @@ def _parse_feature_info( view name strings and feat_count is the total number of features. """ from feast.feature_service import FeatureService + from feast.utils import _parse_feature_ref if isinstance(features, FeatureService): projections = features.feature_view_projections @@ -172,9 +173,7 @@ def _parse_feature_info( feat_count = sum(len(p.features) for p in projections) elif isinstance(features, list): feat_count = len(features) - fv_names = list( - {ref.split(":")[0].split("@")[0] for ref in features if ":" in ref} - ) + fv_names = list({_parse_feature_ref(ref)[0] for ref in features if ":" in ref}) else: fv_names = [] feat_count = 0 diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 9d9fee22623..2803b15526e 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -15,7 +15,7 @@ import time import warnings from abc import ABC -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import ( TYPE_CHECKING, @@ -191,22 +191,22 @@ def to_arrow( feast_metrics.offline_store_request_latency_seconds.labels( method="to_arrow" ).observe(elapsed) - if row_count > 0: - feast_metrics.offline_store_row_count.labels( - method="to_arrow" - ).observe(row_count) + feast_metrics.offline_store_row_count.labels( + method="to_arrow" + ).observe(row_count) if feast_metrics._config.audit_logging: feature_views, feature_count = _extract_retrieval_metadata(self) - now_iso = datetime.now(tz=timezone.utc).isoformat() + end_dt = datetime.now(tz=timezone.utc) + start_dt = end_dt - timedelta(seconds=elapsed) feast_metrics.emit_offline_audit_log( method="to_arrow", feature_views=feature_views, feature_count=feature_count, row_count=row_count, status=status_label, - start_time=now_iso, - end_time=now_iso, + start_time=start_dt.isoformat(), + end_time=end_dt.isoformat(), duration_ms=elapsed * 1000, ) except Exception: diff --git a/sdk/python/feast/metrics.py b/sdk/python/feast/metrics.py index f827f6e31ee..13a855d587b 100644 --- a/sdk/python/feast/metrics.py +++ b/sdk/python/feast/metrics.py @@ -470,6 +470,7 @@ def emit_offline_audit_log( _json_dumps( { "event": "offline_feature_retrieval", + "timestamp": datetime.now(tz=timezone.utc).isoformat(), "method": method, "start_time": start_time, "end_time": end_time, diff --git a/sdk/python/tests/unit/test_metrics.py b/sdk/python/tests/unit/test_metrics.py index 2750757f67a..abf2a35e389 100644 --- a/sdk/python/tests/unit/test_metrics.py +++ b/sdk/python/tests/unit/test_metrics.py @@ -1246,6 +1246,7 @@ def test_emit_offline_audit_log_writes_json(self): record = json.loads(logged_json) assert record["event"] == "offline_feature_retrieval" + assert "timestamp" in record assert record["method"] == "to_arrow" assert record["feature_views"] == ["driver_fv"] assert record["feature_count"] == 3 @@ -1508,19 +1509,18 @@ def test_row_count_recorded_on_success(self): >= before_sum + 500 ) - def test_row_count_not_recorded_when_zero(self): + def test_row_count_recorded_when_zero(self): import pyarrow as pa table = pa.table({"a": pa.array([], type=pa.int64())}) job = self._make_job(table) - before_count = offline_store_row_count.labels(method="to_arrow")._sum.get() + hist = offline_store_row_count.labels(method="to_arrow") + before_bucket = hist._buckets[0].get() job.to_arrow() - assert ( - offline_store_row_count.labels(method="to_arrow")._sum.get() == before_count - ) + assert hist._buckets[0].get() == before_bucket + 1 def test_metrics_skipped_when_offline_features_disabled(self): import pyarrow as pa From fbcef1c06e07239a6a8a949b9e80d5ac61e80943 Mon Sep 17 00:00:00 2001 From: Jitendra Yejare <11752425+jyejare@users.noreply.github.com> Date: Fri, 29 May 2026 00:21:26 +0530 Subject: [PATCH 3/5] feat: System metrics API requests to prometheus Signed-off-by: Jitendra Yejare <11752425+jyejare@users.noreply.github.com> --- .../feast/api/registry/rest/__init__.py | 7 +- .../feast/api/registry/rest/system_metrics.py | 173 ++++++++++++++++++ .../infra/feature_servers/base_config.py | 6 + 3 files changed, 185 insertions(+), 1 deletion(-) create mode 100644 sdk/python/feast/api/registry/rest/system_metrics.py diff --git a/sdk/python/feast/api/registry/rest/__init__.py b/sdk/python/feast/api/registry/rest/__init__.py index 14db40d7af6..87f863c1cf2 100644 --- a/sdk/python/feast/api/registry/rest/__init__.py +++ b/sdk/python/feast/api/registry/rest/__init__.py @@ -11,9 +11,10 @@ from feast.api.registry.rest.projects import get_project_router from feast.api.registry.rest.saved_datasets import get_saved_dataset_router from feast.api.registry.rest.search import get_search_router +from feast.api.registry.rest.system_metrics import get_system_metrics_router -def register_all_routes(app: FastAPI, grpc_handler, server=None): +def register_all_routes(app: FastAPI, grpc_handler, server=None, store=None): app.include_router(get_entity_router(grpc_handler)) app.include_router(get_data_source_router(grpc_handler)) app.include_router(get_feature_service_router(grpc_handler)) @@ -25,3 +26,7 @@ def register_all_routes(app: FastAPI, grpc_handler, server=None): app.include_router(get_saved_dataset_router(grpc_handler)) app.include_router(get_search_router(grpc_handler)) app.include_router(get_metrics_router(grpc_handler, server)) + resolved_store = store or ( + server.store if server and hasattr(server, "store") else None + ) + app.include_router(get_system_metrics_router(grpc_handler, store=resolved_store)) diff --git a/sdk/python/feast/api/registry/rest/system_metrics.py b/sdk/python/feast/api/registry/rest/system_metrics.py new file mode 100644 index 00000000000..56b1f5604e0 --- /dev/null +++ b/sdk/python/feast/api/registry/rest/system_metrics.py @@ -0,0 +1,173 @@ +import logging +import os +import re +from typing import Optional + +import requests as http_requests +from fastapi import APIRouter, HTTPException, Query + +logger = logging.getLogger(__name__) + +_SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" +_CA_CERT_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt" +_CLUSTER_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" +_DEFAULT_THANOS_URL = "https://thanos-querier.openshift-monitoring.svc:9091" +_METRICS_PORT = 8000 + + +def _read_sa_token() -> Optional[str]: + try: + with open(_SA_TOKEN_PATH) as f: + return f.read().strip() + except FileNotFoundError: + return None + + +def _get_ca_bundle() -> str: + for path in (_CA_CERT_PATH, _CLUSTER_CA_PATH): + if os.path.exists(path): + return path + return "" + + +def _parse_prometheus_text(text: str) -> dict: + """Parse Prometheus exposition format into structured metric families.""" + metrics: dict = {} + current_type = "" + + for line in text.splitlines(): + line = line.strip() + if not line: + continue + if line.startswith("# HELP"): + parts = line.split(None, 3) + elif line.startswith("# TYPE"): + parts = line.split(None, 3) + current_type = parts[3] if len(parts) > 3 else "untyped" + elif not line.startswith("#"): + match = re.match( + r"^([a-zA-Z_:][a-zA-Z0-9_:]*)(\{[^}]*\})?\s+([^\s]+)(\s+\d+)?$", + line, + ) + if match: + name = match.group(1) + labels_str = match.group(2) or "" + value = match.group(3) + base_name = name + for suffix in ("_total", "_bucket", "_sum", "_count", "_created"): + if base_name.endswith(suffix): + base_name = base_name[: -len(suffix)] + break + + if base_name not in metrics: + metrics[base_name] = {"type": current_type, "samples": []} + try: + val = float(value) + except ValueError: + val = value + metrics[base_name]["samples"].append( + { + "name": name, + "labels": labels_str, + "value": val, + } + ) + return metrics + + +def get_system_metrics_router(grpc_handler, store=None): + router = APIRouter() + + def _get_prometheus_url() -> str: + if store: + fs_cfg = getattr(store.config, "feature_server", None) + metrics_cfg = getattr(fs_cfg, "metrics", None) + prom_url = getattr(metrics_cfg, "prometheus_url", None) + if prom_url: + return prom_url + env_url = os.environ.get("FEAST_PROMETHEUS_URL") + if env_url: + return env_url + return _DEFAULT_THANOS_URL + + def _query_prometheus(path: str, params: dict) -> dict: + prom_url = _get_prometheus_url() + url = f"{prom_url}{path}" + + headers = {} + token = _read_sa_token() + if token: + headers["Authorization"] = f"Bearer {token}" + + ca_bundle = _get_ca_bundle() + verify = ca_bundle if ca_bundle else False + + try: + resp = http_requests.get( + url, params=params, headers=headers, verify=verify, timeout=15 + ) + resp.raise_for_status() + return resp.json() + except http_requests.exceptions.ConnectionError: + raise HTTPException( + status_code=503, + detail=f"Failed to connect to Prometheus at {prom_url}", + ) + except http_requests.exceptions.Timeout: + raise HTTPException( + status_code=504, + detail="Failed to query Prometheus: request timed out", + ) + except http_requests.exceptions.HTTPError as e: + raise HTTPException( + status_code=e.response.status_code if e.response else 502, + detail=f"Failed to query Prometheus: {e}", + ) + + @router.get("/system-metrics/query", tags=["System Metrics"]) + async def promql_instant( + query: str = Query(..., description="PromQL expression"), + time: Optional[str] = Query( + None, description="Evaluation timestamp (RFC3339 or Unix)" + ), + ): + """Proxy a PromQL instant query to Prometheus/Thanos.""" + params: dict = {"query": query} + if time: + params["time"] = time + return _query_prometheus("/api/v1/query", params) + + @router.get("/system-metrics/query_range", tags=["System Metrics"]) + async def promql_range( + query: str = Query(..., description="PromQL expression"), + start: str = Query(..., description="Start timestamp"), + end: str = Query(..., description="End timestamp"), + step: str = Query("60s", description="Query resolution step"), + ): + """Proxy a PromQL range query to Prometheus/Thanos.""" + return _query_prometheus( + "/api/v1/query_range", + { + "query": query, + "start": start, + "end": end, + "step": step, + }, + ) + + @router.get("/system-metrics/scrape", tags=["System Metrics"]) + async def scrape_metrics(): + """Fallback: scrape the local Prometheus metrics endpoint directly.""" + try: + resp = http_requests.get( + f"http://localhost:{_METRICS_PORT}/metrics", timeout=5 + ) + resp.raise_for_status() + except http_requests.exceptions.RequestException: + raise HTTPException( + status_code=503, + detail=f"Failed to scrape local metrics endpoint on port {_METRICS_PORT}", + ) + return _parse_prometheus_text(resp.text) + + return router diff --git a/sdk/python/feast/infra/feature_servers/base_config.py b/sdk/python/feast/infra/feature_servers/base_config.py index 14ad2fe505e..24c2386daa8 100644 --- a/sdk/python/feast/infra/feature_servers/base_config.py +++ b/sdk/python/feast/infra/feature_servers/base_config.py @@ -93,6 +93,12 @@ class MetricsConfig(FeastConfigBaseModel): feature requests via the ``feast.audit`` logger. Captures requestor identity, entity keys, feature views, row counts, and latency.""" + prometheus_url: Optional[str] = None + """URL of the Prometheus or Thanos Querier API for the System Health + dashboard. On OpenShift defaults to + ``https://thanos-querier.openshift-monitoring.svc:9091``. + The registry REST API proxies PromQL queries through this URL.""" + class BaseFeatureServerConfig(FeastConfigBaseModel): """Base Feature Server config that should be extended""" From 8ba3d5c7457a1987d46b9636701db454d3c0c171 Mon Sep 17 00:00:00 2001 From: Jitendra Yejare <11752425+jyejare@users.noreply.github.com> Date: Fri, 29 May 2026 18:32:10 +0530 Subject: [PATCH 4/5] fix: Reviewers comment fixed Signed-off-by: Jitendra Yejare <11752425+jyejare@users.noreply.github.com> --- .../feast/api/registry/rest/__init__.py | 10 +- .../api/registry/rest/rest_registry_server.py | 2 +- .../feast/api/registry/rest/system_metrics.py | 38 ++-- .../infra/feature_servers/base_config.py | 3 +- .../infra/offline_stores/offline_store.py | 4 +- sdk/python/feast/ui_server.py | 2 +- sdk/python/tests/unit/test_metrics.py | 181 ++++++++++++++++++ 7 files changed, 219 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/api/registry/rest/__init__.py b/sdk/python/feast/api/registry/rest/__init__.py index 87f863c1cf2..53ed55cc437 100644 --- a/sdk/python/feast/api/registry/rest/__init__.py +++ b/sdk/python/feast/api/registry/rest/__init__.py @@ -12,9 +12,10 @@ from feast.api.registry.rest.saved_datasets import get_saved_dataset_router from feast.api.registry.rest.search import get_search_router from feast.api.registry.rest.system_metrics import get_system_metrics_router +from feast.feature_store import FeatureStore -def register_all_routes(app: FastAPI, grpc_handler, server=None, store=None): +def register_all_routes(app: FastAPI, grpc_handler, server, store: FeatureStore): app.include_router(get_entity_router(grpc_handler)) app.include_router(get_data_source_router(grpc_handler)) app.include_router(get_feature_service_router(grpc_handler)) @@ -25,8 +26,5 @@ def register_all_routes(app: FastAPI, grpc_handler, server=None, store=None): app.include_router(get_project_router(grpc_handler)) app.include_router(get_saved_dataset_router(grpc_handler)) app.include_router(get_search_router(grpc_handler)) - app.include_router(get_metrics_router(grpc_handler, server)) - resolved_store = store or ( - server.store if server and hasattr(server, "store") else None - ) - app.include_router(get_system_metrics_router(grpc_handler, store=resolved_store)) + app.include_router(get_metrics_router(grpc_handler, server=server)) + app.include_router(get_system_metrics_router(grpc_handler, store=store)) diff --git a/sdk/python/feast/api/registry/rest/rest_registry_server.py b/sdk/python/feast/api/registry/rest/rest_registry_server.py index 02454b263ff..e97fce1667b 100644 --- a/sdk/python/feast/api/registry/rest/rest_registry_server.py +++ b/sdk/python/feast/api/registry/rest/rest_registry_server.py @@ -276,7 +276,7 @@ async def dispatch(self, request: Request, call_next): ) def _register_routes(self): - register_all_routes(self.app, self.grpc_handler, self) + register_all_routes(self.app, self.grpc_handler, self, self.store) def _add_openapi_security(self): if self.app.openapi_schema: diff --git a/sdk/python/feast/api/registry/rest/system_metrics.py b/sdk/python/feast/api/registry/rest/system_metrics.py index 56b1f5604e0..5ec086ab3f6 100644 --- a/sdk/python/feast/api/registry/rest/system_metrics.py +++ b/sdk/python/feast/api/registry/rest/system_metrics.py @@ -14,6 +14,11 @@ _DEFAULT_THANOS_URL = "https://thanos-querier.openshift-monitoring.svc:9091" _METRICS_PORT = 8000 +# Histogram suffixes that should be folded into the base metric family. +# Counter _total / _created are intentionally NOT stripped so the +# canonical metric name (e.g. feast_offline_store_request_total) is preserved. +_HISTOGRAM_SUFFIXES = ("_bucket", "_sum", "_count") + def _read_sa_token() -> Optional[str]: try: @@ -31,17 +36,24 @@ def _get_ca_bundle() -> str: def _parse_prometheus_text(text: str) -> dict: - """Parse Prometheus exposition format into structured metric families.""" + """Parse Prometheus exposition format into a structured dict of metric families. + + Used by the ``/system-metrics/scrape`` endpoint to convert the raw + text exposition from the local ``/metrics`` endpoint into JSON so the + UI dashboard can consume it without a Prometheus server. + + Returns a dict keyed by metric family name, each containing: + - ``type``: histogram, counter, gauge, etc. + - ``samples``: list of {name, labels, value} dicts. + """ metrics: dict = {} current_type = "" for line in text.splitlines(): line = line.strip() - if not line: + if not line or line.startswith("# HELP"): continue - if line.startswith("# HELP"): - parts = line.split(None, 3) - elif line.startswith("# TYPE"): + if line.startswith("# TYPE"): parts = line.split(None, 3) current_type = parts[3] if len(parts) > 3 else "untyped" elif not line.startswith("#"): @@ -54,7 +66,7 @@ def _parse_prometheus_text(text: str) -> dict: labels_str = match.group(2) or "" value = match.group(3) base_name = name - for suffix in ("_total", "_bucket", "_sum", "_count", "_created"): + for suffix in _HISTOGRAM_SUFFIXES: if base_name.endswith(suffix): base_name = base_name[: -len(suffix)] break @@ -62,7 +74,7 @@ def _parse_prometheus_text(text: str) -> dict: if base_name not in metrics: metrics[base_name] = {"type": current_type, "samples": []} try: - val = float(value) + val: float | str = float(value) except ValueError: val = value metrics[base_name]["samples"].append( @@ -76,6 +88,10 @@ def _parse_prometheus_text(text: str) -> dict: def get_system_metrics_router(grpc_handler, store=None): + # Authentication is enforced at the FastAPI application level via + # inject_user_details (see rest_registry_server.py). These endpoints + # expose operational infrastructure metrics, not feature-level data, + # so no additional RBAC (assert_permissions) checks are required. router = APIRouter() def _get_prometheus_url() -> str: @@ -100,7 +116,7 @@ def _query_prometheus(path: str, params: dict) -> dict: headers["Authorization"] = f"Bearer {token}" ca_bundle = _get_ca_bundle() - verify = ca_bundle if ca_bundle else False + verify: bool | str = ca_bundle if ca_bundle else False try: resp = http_requests.get( @@ -125,7 +141,7 @@ def _query_prometheus(path: str, params: dict) -> dict: ) @router.get("/system-metrics/query", tags=["System Metrics"]) - async def promql_instant( + def promql_instant( query: str = Query(..., description="PromQL expression"), time: Optional[str] = Query( None, description="Evaluation timestamp (RFC3339 or Unix)" @@ -138,7 +154,7 @@ async def promql_instant( return _query_prometheus("/api/v1/query", params) @router.get("/system-metrics/query_range", tags=["System Metrics"]) - async def promql_range( + def promql_range( query: str = Query(..., description="PromQL expression"), start: str = Query(..., description="Start timestamp"), end: str = Query(..., description="End timestamp"), @@ -156,7 +172,7 @@ async def promql_range( ) @router.get("/system-metrics/scrape", tags=["System Metrics"]) - async def scrape_metrics(): + def scrape_metrics(): """Fallback: scrape the local Prometheus metrics endpoint directly.""" try: resp = http_requests.get( diff --git a/sdk/python/feast/infra/feature_servers/base_config.py b/sdk/python/feast/infra/feature_servers/base_config.py index 24c2386daa8..d9c11ea7423 100644 --- a/sdk/python/feast/infra/feature_servers/base_config.py +++ b/sdk/python/feast/infra/feature_servers/base_config.py @@ -95,8 +95,9 @@ class MetricsConfig(FeastConfigBaseModel): prometheus_url: Optional[str] = None """URL of the Prometheus or Thanos Querier API for the System Health - dashboard. On OpenShift defaults to + dashboard. Defaults to ``https://thanos-querier.openshift-monitoring.svc:9091``. + Override via this field or the ``FEAST_PROMETHEUS_URL`` env var. The registry REST API proxies PromQL queries through this URL.""" diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 2803b15526e..24e1e743953 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -74,12 +74,14 @@ def __init__( def _extract_retrieval_metadata(job: "RetrievalJob") -> tuple: """Return ``(feature_view_names, feature_count)`` from a RetrievalJob's metadata.""" + from feast.utils import _parse_feature_ref + try: meta = job.metadata if meta: feature_count = len(meta.features) feature_views = list( - {ref.split(":")[0] for ref in meta.features if ":" in ref} + {_parse_feature_ref(ref)[0] for ref in meta.features if ":" in ref} ) return feature_views, feature_count except (NotImplementedError, AttributeError): diff --git a/sdk/python/feast/ui_server.py b/sdk/python/feast/ui_server.py index 883f995b82d..a1fd0f8d36e 100644 --- a/sdk/python/feast/ui_server.py +++ b/sdk/python/feast/ui_server.py @@ -66,7 +66,7 @@ def _setup_rest_mode(app: FastAPI, store: "feast.FeatureStore"): grpc_handler = RegistryServer(store.registry) rest_app = FastAPI(root_path="/api/v1") - register_all_routes(rest_app, grpc_handler) + register_all_routes(rest_app, grpc_handler, server=None, store=store) app.mount("/api/v1", rest_app) @app.get("/health") diff --git a/sdk/python/tests/unit/test_metrics.py b/sdk/python/tests/unit/test_metrics.py index abf2a35e389..798e28759a9 100644 --- a/sdk/python/tests/unit/test_metrics.py +++ b/sdk/python/tests/unit/test_metrics.py @@ -1732,3 +1732,184 @@ def test_does_not_raise_on_failure(self): status="error", latency_ms=5.0, ) + + +# --------------------------------------------------------------------------- +# System Metrics endpoint tests +# --------------------------------------------------------------------------- + + +class TestParsePrometheusText: + """Tests for _parse_prometheus_text in system_metrics.py.""" + + def test_parses_counter(self): + from feast.api.registry.rest.system_metrics import _parse_prometheus_text + + text = ( + "# HELP feast_offline_store_request_total Total requests\n" + "# TYPE feast_offline_store_request_total counter\n" + 'feast_offline_store_request_total{method="to_arrow",status="success"} 42\n' + ) + result = _parse_prometheus_text(text) + assert "feast_offline_store_request_total" in result + entry = result["feast_offline_store_request_total"] + assert entry["type"] == "counter" + assert len(entry["samples"]) == 1 + assert entry["samples"][0]["value"] == 42.0 + + def test_preserves_total_suffix(self): + """Counter _total suffix must NOT be stripped (was a reported bug).""" + from feast.api.registry.rest.system_metrics import _parse_prometheus_text + + text = ( + "# TYPE feast_offline_store_request_total counter\n" + "feast_offline_store_request_total 10\n" + ) + result = _parse_prometheus_text(text) + assert "feast_offline_store_request_total" in result + assert "feast_offline_store_request" not in result + + def test_groups_histogram_samples(self): + from feast.api.registry.rest.system_metrics import _parse_prometheus_text + + text = ( + "# TYPE feast_offline_store_request_latency_seconds histogram\n" + 'feast_offline_store_request_latency_seconds_bucket{le="0.1"} 5\n' + 'feast_offline_store_request_latency_seconds_bucket{le="1.0"} 15\n' + "feast_offline_store_request_latency_seconds_sum 12.5\n" + "feast_offline_store_request_latency_seconds_count 15\n" + ) + result = _parse_prometheus_text(text) + assert "feast_offline_store_request_latency_seconds" in result + entry = result["feast_offline_store_request_latency_seconds"] + assert entry["type"] == "histogram" + assert len(entry["samples"]) == 4 + + def test_empty_input(self): + from feast.api.registry.rest.system_metrics import _parse_prometheus_text + + assert _parse_prometheus_text("") == {} + + def test_skips_comment_and_help_lines(self): + from feast.api.registry.rest.system_metrics import _parse_prometheus_text + + text = ( + "# HELP some_metric A help string\n" + "# This is a random comment\n" + "# TYPE some_metric gauge\n" + "some_metric 3.14\n" + ) + result = _parse_prometheus_text(text) + assert "some_metric" in result + assert result["some_metric"]["samples"][0]["value"] == 3.14 + + def test_non_numeric_value(self): + from feast.api.registry.rest.system_metrics import _parse_prometheus_text + + text = "# TYPE info_metric gauge\ninfo_metric NaN\n" + result = _parse_prometheus_text(text) + assert len(result["info_metric"]["samples"]) == 1 + + +class TestSystemMetricsRouter: + """Tests for the FastAPI system-metrics router endpoints.""" + + def _make_test_client(self, store=None): + from fastapi import FastAPI + from fastapi.testclient import TestClient + + from feast.api.registry.rest.system_metrics import get_system_metrics_router + + app = FastAPI() + app.include_router(get_system_metrics_router(grpc_handler=None, store=store)) + return TestClient(app) + + @patch("feast.api.registry.rest.system_metrics.http_requests.get") + def test_promql_instant_success(self, mock_get): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "status": "success", + "data": {"resultType": "vector"}, + } + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + client = self._make_test_client() + resp = client.get("/system-metrics/query", params={"query": "up"}) + assert resp.status_code == 200 + assert resp.json()["status"] == "success" + + @patch("feast.api.registry.rest.system_metrics.http_requests.get") + def test_promql_instant_connection_error(self, mock_get): + import requests + + mock_get.side_effect = requests.exceptions.ConnectionError("refused") + + client = self._make_test_client() + resp = client.get("/system-metrics/query", params={"query": "up"}) + assert resp.status_code == 503 + assert "Failed to connect" in resp.json()["detail"] + + @patch("feast.api.registry.rest.system_metrics.http_requests.get") + def test_promql_instant_timeout(self, mock_get): + import requests + + mock_get.side_effect = requests.exceptions.Timeout("timed out") + + client = self._make_test_client() + resp = client.get("/system-metrics/query", params={"query": "up"}) + assert resp.status_code == 504 + assert "timed out" in resp.json()["detail"] + + @patch("feast.api.registry.rest.system_metrics.http_requests.get") + def test_promql_range_success(self, mock_get): + mock_resp = MagicMock() + mock_resp.json.return_value = {"status": "success"} + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + client = self._make_test_client() + resp = client.get( + "/system-metrics/query_range", + params={ + "query": "up", + "start": "2026-01-01T00:00:00Z", + "end": "2026-01-02T00:00:00Z", + }, + ) + assert resp.status_code == 200 + + @patch("feast.api.registry.rest.system_metrics.http_requests.get") + def test_scrape_success(self, mock_get): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.text = "# TYPE up gauge\nup 1\n" + mock_resp.raise_for_status = MagicMock() + mock_get.return_value = mock_resp + + client = self._make_test_client() + resp = client.get("/system-metrics/scrape") + assert resp.status_code == 200 + body = resp.json() + assert "up" in body + + @patch("feast.api.registry.rest.system_metrics.http_requests.get") + def test_scrape_failure(self, mock_get): + import requests + + mock_get.side_effect = requests.exceptions.ConnectionError("refused") + + client = self._make_test_client() + resp = client.get("/system-metrics/scrape") + assert resp.status_code == 503 + assert "Failed to scrape" in resp.json()["detail"] + + def test_prometheus_url_from_store(self): + from feast.api.registry.rest.system_metrics import get_system_metrics_router + + store = MagicMock() + store.config.feature_server.metrics.prometheus_url = "http://custom:9090" + + router = get_system_metrics_router(grpc_handler=None, store=store) + assert router is not None From 76a31180ac0fb77134905d9872f31b5d6485cf28 Mon Sep 17 00:00:00 2001 From: Jitendra Yejare <11752425+jyejare@users.noreply.github.com> Date: Tue, 2 Jun 2026 20:13:58 +0530 Subject: [PATCH 5/5] chore: Removed operational metrics GET request from prometheus Signed-off-by: Jitendra Yejare <11752425+jyejare@users.noreply.github.com> --- .../feast/api/registry/rest/__init__.py | 7 +- .../api/registry/rest/rest_registry_server.py | 2 +- .../feast/api/registry/rest/system_metrics.py | 189 --- .../infra/feature_servers/base_config.py | 7 - sdk/python/feast/repo_operations.py | 1188 ++++++++--------- sdk/python/feast/ui_server.py | 2 +- sdk/python/tests/unit/test_metrics.py | 181 --- 7 files changed, 598 insertions(+), 978 deletions(-) delete mode 100644 sdk/python/feast/api/registry/rest/system_metrics.py diff --git a/sdk/python/feast/api/registry/rest/__init__.py b/sdk/python/feast/api/registry/rest/__init__.py index 53ed55cc437..14db40d7af6 100644 --- a/sdk/python/feast/api/registry/rest/__init__.py +++ b/sdk/python/feast/api/registry/rest/__init__.py @@ -11,11 +11,9 @@ from feast.api.registry.rest.projects import get_project_router from feast.api.registry.rest.saved_datasets import get_saved_dataset_router from feast.api.registry.rest.search import get_search_router -from feast.api.registry.rest.system_metrics import get_system_metrics_router -from feast.feature_store import FeatureStore -def register_all_routes(app: FastAPI, grpc_handler, server, store: FeatureStore): +def register_all_routes(app: FastAPI, grpc_handler, server=None): app.include_router(get_entity_router(grpc_handler)) app.include_router(get_data_source_router(grpc_handler)) app.include_router(get_feature_service_router(grpc_handler)) @@ -26,5 +24,4 @@ def register_all_routes(app: FastAPI, grpc_handler, server, store: FeatureStore) app.include_router(get_project_router(grpc_handler)) app.include_router(get_saved_dataset_router(grpc_handler)) app.include_router(get_search_router(grpc_handler)) - app.include_router(get_metrics_router(grpc_handler, server=server)) - app.include_router(get_system_metrics_router(grpc_handler, store=store)) + app.include_router(get_metrics_router(grpc_handler, server)) diff --git a/sdk/python/feast/api/registry/rest/rest_registry_server.py b/sdk/python/feast/api/registry/rest/rest_registry_server.py index e97fce1667b..02454b263ff 100644 --- a/sdk/python/feast/api/registry/rest/rest_registry_server.py +++ b/sdk/python/feast/api/registry/rest/rest_registry_server.py @@ -276,7 +276,7 @@ async def dispatch(self, request: Request, call_next): ) def _register_routes(self): - register_all_routes(self.app, self.grpc_handler, self, self.store) + register_all_routes(self.app, self.grpc_handler, self) def _add_openapi_security(self): if self.app.openapi_schema: diff --git a/sdk/python/feast/api/registry/rest/system_metrics.py b/sdk/python/feast/api/registry/rest/system_metrics.py deleted file mode 100644 index 5ec086ab3f6..00000000000 --- a/sdk/python/feast/api/registry/rest/system_metrics.py +++ /dev/null @@ -1,189 +0,0 @@ -import logging -import os -import re -from typing import Optional - -import requests as http_requests -from fastapi import APIRouter, HTTPException, Query - -logger = logging.getLogger(__name__) - -_SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" -_CA_CERT_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/service-ca.crt" -_CLUSTER_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" -_DEFAULT_THANOS_URL = "https://thanos-querier.openshift-monitoring.svc:9091" -_METRICS_PORT = 8000 - -# Histogram suffixes that should be folded into the base metric family. -# Counter _total / _created are intentionally NOT stripped so the -# canonical metric name (e.g. feast_offline_store_request_total) is preserved. -_HISTOGRAM_SUFFIXES = ("_bucket", "_sum", "_count") - - -def _read_sa_token() -> Optional[str]: - try: - with open(_SA_TOKEN_PATH) as f: - return f.read().strip() - except FileNotFoundError: - return None - - -def _get_ca_bundle() -> str: - for path in (_CA_CERT_PATH, _CLUSTER_CA_PATH): - if os.path.exists(path): - return path - return "" - - -def _parse_prometheus_text(text: str) -> dict: - """Parse Prometheus exposition format into a structured dict of metric families. - - Used by the ``/system-metrics/scrape`` endpoint to convert the raw - text exposition from the local ``/metrics`` endpoint into JSON so the - UI dashboard can consume it without a Prometheus server. - - Returns a dict keyed by metric family name, each containing: - - ``type``: histogram, counter, gauge, etc. - - ``samples``: list of {name, labels, value} dicts. - """ - metrics: dict = {} - current_type = "" - - for line in text.splitlines(): - line = line.strip() - if not line or line.startswith("# HELP"): - continue - if line.startswith("# TYPE"): - parts = line.split(None, 3) - current_type = parts[3] if len(parts) > 3 else "untyped" - elif not line.startswith("#"): - match = re.match( - r"^([a-zA-Z_:][a-zA-Z0-9_:]*)(\{[^}]*\})?\s+([^\s]+)(\s+\d+)?$", - line, - ) - if match: - name = match.group(1) - labels_str = match.group(2) or "" - value = match.group(3) - base_name = name - for suffix in _HISTOGRAM_SUFFIXES: - if base_name.endswith(suffix): - base_name = base_name[: -len(suffix)] - break - - if base_name not in metrics: - metrics[base_name] = {"type": current_type, "samples": []} - try: - val: float | str = float(value) - except ValueError: - val = value - metrics[base_name]["samples"].append( - { - "name": name, - "labels": labels_str, - "value": val, - } - ) - return metrics - - -def get_system_metrics_router(grpc_handler, store=None): - # Authentication is enforced at the FastAPI application level via - # inject_user_details (see rest_registry_server.py). These endpoints - # expose operational infrastructure metrics, not feature-level data, - # so no additional RBAC (assert_permissions) checks are required. - router = APIRouter() - - def _get_prometheus_url() -> str: - if store: - fs_cfg = getattr(store.config, "feature_server", None) - metrics_cfg = getattr(fs_cfg, "metrics", None) - prom_url = getattr(metrics_cfg, "prometheus_url", None) - if prom_url: - return prom_url - env_url = os.environ.get("FEAST_PROMETHEUS_URL") - if env_url: - return env_url - return _DEFAULT_THANOS_URL - - def _query_prometheus(path: str, params: dict) -> dict: - prom_url = _get_prometheus_url() - url = f"{prom_url}{path}" - - headers = {} - token = _read_sa_token() - if token: - headers["Authorization"] = f"Bearer {token}" - - ca_bundle = _get_ca_bundle() - verify: bool | str = ca_bundle if ca_bundle else False - - try: - resp = http_requests.get( - url, params=params, headers=headers, verify=verify, timeout=15 - ) - resp.raise_for_status() - return resp.json() - except http_requests.exceptions.ConnectionError: - raise HTTPException( - status_code=503, - detail=f"Failed to connect to Prometheus at {prom_url}", - ) - except http_requests.exceptions.Timeout: - raise HTTPException( - status_code=504, - detail="Failed to query Prometheus: request timed out", - ) - except http_requests.exceptions.HTTPError as e: - raise HTTPException( - status_code=e.response.status_code if e.response else 502, - detail=f"Failed to query Prometheus: {e}", - ) - - @router.get("/system-metrics/query", tags=["System Metrics"]) - def promql_instant( - query: str = Query(..., description="PromQL expression"), - time: Optional[str] = Query( - None, description="Evaluation timestamp (RFC3339 or Unix)" - ), - ): - """Proxy a PromQL instant query to Prometheus/Thanos.""" - params: dict = {"query": query} - if time: - params["time"] = time - return _query_prometheus("/api/v1/query", params) - - @router.get("/system-metrics/query_range", tags=["System Metrics"]) - def promql_range( - query: str = Query(..., description="PromQL expression"), - start: str = Query(..., description="Start timestamp"), - end: str = Query(..., description="End timestamp"), - step: str = Query("60s", description="Query resolution step"), - ): - """Proxy a PromQL range query to Prometheus/Thanos.""" - return _query_prometheus( - "/api/v1/query_range", - { - "query": query, - "start": start, - "end": end, - "step": step, - }, - ) - - @router.get("/system-metrics/scrape", tags=["System Metrics"]) - def scrape_metrics(): - """Fallback: scrape the local Prometheus metrics endpoint directly.""" - try: - resp = http_requests.get( - f"http://localhost:{_METRICS_PORT}/metrics", timeout=5 - ) - resp.raise_for_status() - except http_requests.exceptions.RequestException: - raise HTTPException( - status_code=503, - detail=f"Failed to scrape local metrics endpoint on port {_METRICS_PORT}", - ) - return _parse_prometheus_text(resp.text) - - return router diff --git a/sdk/python/feast/infra/feature_servers/base_config.py b/sdk/python/feast/infra/feature_servers/base_config.py index d9c11ea7423..14ad2fe505e 100644 --- a/sdk/python/feast/infra/feature_servers/base_config.py +++ b/sdk/python/feast/infra/feature_servers/base_config.py @@ -93,13 +93,6 @@ class MetricsConfig(FeastConfigBaseModel): feature requests via the ``feast.audit`` logger. Captures requestor identity, entity keys, feature views, row counts, and latency.""" - prometheus_url: Optional[str] = None - """URL of the Prometheus or Thanos Querier API for the System Health - dashboard. Defaults to - ``https://thanos-querier.openshift-monitoring.svc:9091``. - Override via this field or the ``FEAST_PROMETHEUS_URL`` env var. - The registry REST API proxies PromQL queries through this URL.""" - class BaseFeatureServerConfig(FeastConfigBaseModel): """Base Feature Server config that should be extended""" diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 28fe86602ad..0ba74a23d37 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -1,594 +1,594 @@ -import base64 -import importlib -import json -import logging -import os -import random -import re -import sys -import tempfile -from importlib.abc import Loader -from importlib.machinery import ModuleSpec -from pathlib import Path -from typing import List, Optional, Set, Union - -import click -from click.exceptions import BadParameter - -from feast import PushSource -from feast.batch_feature_view import BatchFeatureView -from feast.constants import FEATURE_STORE_YAML_ENV_NAME -from feast.data_source import DataSource, KafkaSource, KinesisSource -from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add -from feast.entity import Entity -from feast.feature_service import FeatureService -from feast.feature_store import FeatureStore -from feast.feature_view import DUMMY_ENTITY, FeatureView -from feast.file_utils import replace_str_in_file -from feast.infra.registry.base_registry import BaseRegistry -from feast.infra.registry.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry -from feast.names import adjectives, animals -from feast.on_demand_feature_view import OnDemandFeatureView -from feast.permissions.permission import Permission -from feast.project import Project -from feast.repo_config import RepoConfig -from feast.repo_contents import RepoContents -from feast.stream_feature_view import StreamFeatureView - -logger = logging.getLogger(__name__) - - -def py_path_to_module(path: Path) -> str: - return ( - str(path.relative_to(os.getcwd()))[: -len(".py")] - .replace("./", "") - .replace("/", ".") - .replace("\\", ".") - ) - - -def read_feastignore(repo_root: Path) -> List[str]: - """Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths""" - feast_ignore = repo_root / ".feastignore" - if not feast_ignore.is_file(): - return [] - lines = feast_ignore.read_text().strip().split("\n") - ignore_paths = [] - for line in lines: - # Remove everything after the first occurance of "#" symbol (comments) - if line.find("#") >= 0: - line = line[: line.find("#")] - # Strip leading or ending whitespaces - line = line.strip() - # Add this processed line to ignore_paths if it's not empty - if len(line) > 0: - ignore_paths.append(line) - return ignore_paths - - -def get_ignore_files(repo_root: Path, ignore_paths: List[str]) -> Set[Path]: - """Get all ignore files that match any of the user-defined ignore paths""" - ignore_files = set() - for ignore_path in set(ignore_paths): - # ignore_path may contains matchers (* or **). Use glob() to match user-defined path to actual paths - for matched_path in repo_root.glob(ignore_path): - if matched_path.is_file(): - # If the matched path is a file, add that to ignore_files set - ignore_files.add(matched_path.resolve()) - else: - # Otherwise, list all Python files in that directory and add all of them to ignore_files set - ignore_files |= { - sub_path.resolve() - for sub_path in matched_path.glob("**/*.py") - if sub_path.is_file() - } - return ignore_files - - -def get_repo_files(repo_root: Path) -> List[Path]: - """Get the list of all repo files, ignoring undesired files & directories specified in .feastignore""" - # Read ignore paths from .feastignore and create a set of all files that match any of these paths - ignore_paths = read_feastignore(repo_root) + [ - ".git", - ".feastignore", - ".venv", - "**/.ipynb_checkpoints", - "**/.pytest_cache", - "**/__pycache__", - ] - ignore_files = get_ignore_files(repo_root, ignore_paths) - - # List all Python files in the root directory (recursively) - repo_files = { - p.resolve() - for p in repo_root.glob("**/*.py") - if p.is_file() and "__init__.py" != p.name - } - # Ignore all files that match any of the ignore paths in .feastignore - repo_files -= ignore_files - - # Sort repo_files to read them in the same order every time - return sorted(repo_files) - - -def parse_repo(repo_root: Path) -> RepoContents: - """ - Collects unique Feast object definitions from the given feature repo. - - Specifically, if an object foo has already been added, bar will still be added if - (bar == foo), but not if (bar is foo). This ensures that import statements will - not result in duplicates, but defining two equal objects will. - """ - res = RepoContents( - projects=[], - data_sources=[], - entities=[], - feature_views=[], - feature_services=[], - on_demand_feature_views=[], - stream_feature_views=[], - permissions=[], - ) - - for repo_file in get_repo_files(repo_root): - module_path = py_path_to_module(repo_file) - module = importlib.import_module(module_path) - - for attr_name in dir(module): - obj = getattr(module, attr_name) - - if isinstance(obj, DataSource) and not any( - (obj is ds) for ds in res.data_sources - ): - res.data_sources.append(obj) - - # Handle batch sources defined within stream sources. - if ( - isinstance(obj, PushSource) - or isinstance(obj, KafkaSource) - or isinstance(obj, KinesisSource) - ): - batch_source = obj.batch_source - - if batch_source and not any( - (batch_source is ds) for ds in res.data_sources - ): - res.data_sources.append(batch_source) - if ( - isinstance(obj, FeatureView) - and not any((obj is fv) for fv in res.feature_views) - and not isinstance(obj, StreamFeatureView) - and not isinstance(obj, BatchFeatureView) - ): - res.feature_views.append(obj) - - # Handle batch sources defined with feature views. - batch_source = obj.batch_source - if batch_source is not None and not any( - (batch_source is ds) for ds in res.data_sources - ): - res.data_sources.append(batch_source) - - # Handle stream sources defined with feature views. - if obj.stream_source: - stream_source = obj.stream_source - if not any((stream_source is ds) for ds in res.data_sources): - res.data_sources.append(stream_source) - elif isinstance(obj, StreamFeatureView) and not any( - (obj is sfv) for sfv in res.stream_feature_views - ): - res.stream_feature_views.append(obj) - - # Handle batch sources defined with feature views. - batch_source = obj.batch_source - if batch_source is not None and not any( - (batch_source is ds) for ds in res.data_sources - ): - res.data_sources.append(batch_source) - assert obj.stream_source - stream_source = obj.stream_source - if not any((stream_source is ds) for ds in res.data_sources): - res.data_sources.append(stream_source) - elif isinstance(obj, BatchFeatureView) and not any( - (obj is bfv) for bfv in res.feature_views - ): - res.feature_views.append(obj) - - # Handle batch sources defined with feature views. - batch_source = obj.batch_source - if batch_source is not None and not any( - (batch_source is ds) for ds in res.data_sources - ): - res.data_sources.append(batch_source) - elif isinstance(obj, Entity) and not any( - (obj is entity) for entity in res.entities - ): - res.entities.append(obj) - elif isinstance(obj, FeatureService) and not any( - (obj is fs) for fs in res.feature_services - ): - res.feature_services.append(obj) - elif isinstance(obj, OnDemandFeatureView) and not any( - (obj is odfv) for odfv in res.on_demand_feature_views - ): - res.on_demand_feature_views.append(obj) - elif isinstance(obj, Permission) and not any( - (obj is p) for p in res.permissions - ): - res.permissions.append(obj) - elif isinstance(obj, Project) and not any((obj is p) for p in res.projects): - res.projects.append(obj) - - res.entities.append(DUMMY_ENTITY) - return res - - -def plan( - repo_config: RepoConfig, - repo_path: Path, - skip_source_validation: bool, - skip_feature_view_validation: bool = False, -): - os.chdir(repo_path) - repo = _get_repo_contents(repo_path, repo_config.project, repo_config) - for project in repo.projects: - repo_config.project = project.name - store, registry = _get_store_and_registry(repo_config) - # TODO: When we support multiple projects in a single repo, we should filter repo contents by project - if not skip_source_validation: - provider = store._get_provider() - data_sources = [ - t.batch_source for t in repo.feature_views if t.batch_source is not None - ] - # Make sure the data source used by this feature view is supported by Feast - for data_source in data_sources: - provider.validate_data_source(store.config, data_source) - - registry_diff, infra_diff, _ = store.plan( - repo, skip_feature_view_validation=skip_feature_view_validation - ) - click.echo(registry_diff.to_string()) - click.echo(infra_diff.to_string()) - - -def _get_repo_contents( - repo_path, - project_name: Optional[str] = None, - repo_config: Optional[RepoConfig] = None, -): - sys.dont_write_bytecode = True - repo = parse_repo(repo_path) - - if len(repo.projects) < 1: - if project_name: - print( - f"No project found in the repository. Using project name {project_name} defined in feature_store.yaml" - ) - project_description = ( - repo_config.project_description if repo_config else None - ) - repo.projects.append( - Project(name=project_name, description=project_description or "") - ) - else: - print( - "No project found in the repository. Either define Project in repository or define a project in feature_store.yaml" - ) - sys.exit(1) - elif len(repo.projects) == 1: - if repo.projects[0].name != project_name: - print( - "Project object name should match with the project name defined in feature_store.yaml" - ) - sys.exit(1) - else: - print( - "Multiple projects found in the repository. Currently no support for multiple projects" - ) - sys.exit(1) - - return repo - - -def _get_store_and_registry(repo_config): - store = FeatureStore(config=repo_config) - registry = store.registry - return store, registry - - -def extract_objects_for_apply_delete(project, registry, repo): - # TODO(achals): This code path should be refactored to handle added & kept entities separately. - ( - _, - objs_to_delete, - objs_to_update, - objs_to_add, - ) = extract_objects_for_keep_delete_update_add(registry, project, repo) - - all_to_apply: List[ - Union[ - Entity, - FeatureView, - OnDemandFeatureView, - StreamFeatureView, - FeatureService, - ] - ] = [] - for object_type in FEAST_OBJECT_TYPES: - to_apply = set(objs_to_add[object_type]).union(objs_to_update[object_type]) - all_to_apply.extend(to_apply) - - all_to_delete: List[ - Union[ - Entity, - FeatureView, - OnDemandFeatureView, - StreamFeatureView, - FeatureService, - ] - ] = [] - for object_type in FEAST_OBJECT_TYPES: - all_to_delete.extend(objs_to_delete[object_type]) - - return ( - all_to_apply, - all_to_delete, - set(objs_to_add[FeastObjectType.FEATURE_VIEW]).union( - set(objs_to_update[FeastObjectType.FEATURE_VIEW]) - ), - objs_to_delete[FeastObjectType.FEATURE_VIEW], - ) - - -def apply_total_with_repo_instance( - store: FeatureStore, - project_name: str, - registry: BaseRegistry, - repo: RepoContents, - skip_source_validation: bool, - skip_feature_view_validation: bool = False, - no_promote: bool = False, -): - if not skip_source_validation: - provider = store._get_provider() - data_sources = [ - t.batch_source for t in repo.feature_views if t.batch_source is not None - ] - # Make sure the data source used by this feature view is supported by Feast - for data_source in data_sources: - provider.validate_data_source(store.config, data_source) - - # For each object in the registry, determine whether it should be kept or deleted. - ( - all_to_apply, - all_to_delete, - views_to_keep, - views_to_delete, - ) = extract_objects_for_apply_delete(project_name, registry, repo) - - try: - if store._should_use_plan(): - # Planning phase - compute diffs first without progress bars - registry_diff, infra_diff, new_infra = store.plan( - repo, - skip_feature_view_validation=skip_feature_view_validation, - ) - click.echo(registry_diff.to_string()) - - # Only show progress bars if there are actual infrastructure changes - progress_ctx = None - if len(infra_diff.infra_object_diffs) > 0: - from feast.diff.apply_progress import ApplyProgressContext - - progress_ctx = ApplyProgressContext() - progress_ctx.start_overall_progress() - - # Apply phase - store._apply_diffs( - registry_diff, - infra_diff, - new_infra, - progress_ctx=progress_ctx, - no_promote=no_promote, - ) - click.echo(infra_diff.to_string()) - else: - # Legacy apply path - no progress bars for legacy path - store.apply( - all_to_apply, - objects_to_delete=all_to_delete, - partial=False, - skip_feature_view_validation=skip_feature_view_validation, - no_promote=no_promote, - ) - log_infra_changes(views_to_keep, views_to_delete) - finally: - # Cleanup is handled in the new _apply_diffs method - pass - - -def log_infra_changes( - views_to_keep: Set[FeatureView], views_to_delete: Set[FeatureView] -): - from colorama import Fore, Style - - for view in views_to_keep: - click.echo( - f"Deploying infrastructure for {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" - ) - for view in views_to_delete: - click.echo( - f"Removing infrastructure for {Style.BRIGHT + Fore.RED}{view.name}{Style.RESET_ALL}" - ) - - -def create_feature_store( - ctx: click.Context, -) -> FeatureStore: - repo = ctx.obj["CHDIR"] - # If we received a base64 encoded version of feature_store.yaml, use that - config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME) - if config_base64: - print("Received base64 encoded feature_store.yaml") - config_bytes = base64.b64decode(config_base64) - # Create a new unique directory for writing feature_store.yaml - repo_path = Path(tempfile.mkdtemp()) - with open(repo_path / "feature_store.yaml", "wb") as f: - f.write(config_bytes) - return FeatureStore(repo_path=str(repo_path.resolve())) - else: - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) - - -def apply_total( - repo_config: RepoConfig, - repo_path: Path, - skip_source_validation: bool, - skip_feature_view_validation: bool = False, - no_promote: bool = False, -): - os.chdir(repo_path) - repo = _get_repo_contents(repo_path, repo_config.project, repo_config) - for project in repo.projects: - repo_config.project = project.name - store, registry = _get_store_and_registry(repo_config) - if not is_valid_name(project.name): - print( - f"{project.name} is not valid. Project name should only have " - f"alphanumerical values, underscores, and hyphens but not start with an underscore or hyphen." - ) - sys.exit(1) - # TODO: When we support multiple projects in a single repo, we should filter repo contents by project. Currently there is no way to associate Feast objects to project. - print(f"Applying changes for project {project.name}") - apply_total_with_repo_instance( - store, - project.name, - registry, - repo, - skip_source_validation, - skip_feature_view_validation, - no_promote=no_promote, - ) - - -def teardown(repo_config: RepoConfig, repo_path: Optional[str]): - # Cannot pass in both repo_path and repo_config to FeatureStore. - feature_store = FeatureStore(repo_path=repo_path, config=repo_config) - feature_store.teardown() - - -def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str: - """For debugging only: output contents of the metadata registry""" - registry_config = repo_config.registry - project = repo_config.project - registry = Registry( - project, - registry_config=registry_config, - repo_path=repo_path, - auth_config=repo_config.auth_config, - ) - registry_dict = registry.to_dict(project=project) - return json.dumps(registry_dict, indent=2, sort_keys=True) - - -def cli_check_repo(repo_path: Path, fs_yaml_file: Path): - sys.path.append(str(repo_path)) - if not fs_yaml_file.exists(): - print( - f"Can't find feature repo configuration file at {fs_yaml_file}. " - "Make sure you're running feast from an initialized feast repository." - ) - sys.exit(1) - - -def init_repo(repo_name: str, template: str, repo_path: Optional[str] = None): - import os - from pathlib import Path - from shutil import copytree - - from colorama import Fore, Style - - # Validate project name - if not is_valid_name(repo_name): - raise BadParameter( - message="Name should be alphanumeric values, underscores, and hyphens but not start with an underscore or hyphen", - param_hint="PROJECT_DIRECTORY", - ) - - # Determine where to create the repository - if repo_path: - # User specified a custom path - target_path = Path(repo_path).resolve() - target_path.mkdir(parents=True, exist_ok=True) - display_path = repo_path - else: - # Default behavior: create subdirectory with project name - target_path = Path(os.path.join(Path.cwd(), repo_name)) - target_path.mkdir(exist_ok=True) - display_path = repo_name - - repo_config_path = target_path / "feature_store.yaml" - - if repo_config_path.exists(): - print( - f"The directory {Style.BRIGHT + Fore.GREEN}{display_path}{Style.RESET_ALL} contains an existing feature " - f"store repository that may cause a conflict" - ) - print() - sys.exit(1) - - # Copy template directory - template_path = str(Path(Path(__file__).parent / "templates" / template).absolute()) - if not os.path.exists(template_path): - raise IOError(f"Could not find template {template}") - copytree(template_path, str(target_path), dirs_exist_ok=True) - - # Rename gitignore files back to .gitignore - for gitignore_path in target_path.rglob("gitignore"): - gitignore_path.rename(gitignore_path.with_name(".gitignore")) - - # Seed the repository - bootstrap_path = target_path / "bootstrap.py" - if os.path.exists(bootstrap_path): - import importlib.util - - spec = importlib.util.spec_from_file_location("bootstrap", str(bootstrap_path)) - assert isinstance(spec, ModuleSpec) - bootstrap = importlib.util.module_from_spec(spec) - assert isinstance(spec.loader, Loader) - spec.loader.exec_module(bootstrap) - bootstrap.bootstrap() # type: ignore - os.remove(bootstrap_path) - - # Template the feature_store.yaml file - feature_store_yaml_path = target_path / "feature_repo" / "feature_store.yaml" - replace_str_in_file( - feature_store_yaml_path, "project: my_project", f"project: {repo_name}" - ) - - # Remove the __pycache__ folder if it exists - import shutil - - shutil.rmtree(target_path / "__pycache__", ignore_errors=True) - - import click - - click.echo() - click.echo( - f"Creating a new Feast repository in {Style.BRIGHT + Fore.GREEN}{target_path}{Style.RESET_ALL}." - ) - click.echo() - - -def is_valid_name(name: str) -> bool: - """A name should be alphanumeric values, underscores, and hyphens but not start with an underscore""" - return ( - not name.startswith(("_", "-")) and re.compile(r"[^\w-]+").search(name) is None - ) - - -def generate_project_name() -> str: - """Generates a unique project name""" - return f"{random.choice(adjectives)}_{random.choice(animals)}" +import base64 +import importlib +import json +import logging +import os +import random +import re +import sys +import tempfile +from importlib.abc import Loader +from importlib.machinery import ModuleSpec +from pathlib import Path +from typing import List, Optional, Set, Union + +import click +from click.exceptions import BadParameter + +from feast import PushSource +from feast.batch_feature_view import BatchFeatureView +from feast.constants import FEATURE_STORE_YAML_ENV_NAME +from feast.data_source import DataSource, KafkaSource, KinesisSource +from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_store import FeatureStore +from feast.feature_view import DUMMY_ENTITY, FeatureView +from feast.file_utils import replace_str_in_file +from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.registry.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry +from feast.names import adjectives, animals +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.permissions.permission import Permission +from feast.project import Project +from feast.repo_config import RepoConfig +from feast.repo_contents import RepoContents +from feast.stream_feature_view import StreamFeatureView + +logger = logging.getLogger(__name__) + + +def py_path_to_module(path: Path) -> str: + return ( + str(path.relative_to(os.getcwd()))[: -len(".py")] + .replace("./", "") + .replace("/", ".") + .replace("\\", ".") + ) + + +def read_feastignore(repo_root: Path) -> List[str]: + """Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths""" + feast_ignore = repo_root / ".feastignore" + if not feast_ignore.is_file(): + return [] + lines = feast_ignore.read_text().strip().split("\n") + ignore_paths = [] + for line in lines: + # Remove everything after the first occurance of "#" symbol (comments) + if line.find("#") >= 0: + line = line[: line.find("#")] + # Strip leading or ending whitespaces + line = line.strip() + # Add this processed line to ignore_paths if it's not empty + if len(line) > 0: + ignore_paths.append(line) + return ignore_paths + + +def get_ignore_files(repo_root: Path, ignore_paths: List[str]) -> Set[Path]: + """Get all ignore files that match any of the user-defined ignore paths""" + ignore_files = set() + for ignore_path in set(ignore_paths): + # ignore_path may contains matchers (* or **). Use glob() to match user-defined path to actual paths + for matched_path in repo_root.glob(ignore_path): + if matched_path.is_file(): + # If the matched path is a file, add that to ignore_files set + ignore_files.add(matched_path.resolve()) + else: + # Otherwise, list all Python files in that directory and add all of them to ignore_files set + ignore_files |= { + sub_path.resolve() + for sub_path in matched_path.glob("**/*.py") + if sub_path.is_file() + } + return ignore_files + + +def get_repo_files(repo_root: Path) -> List[Path]: + """Get the list of all repo files, ignoring undesired files & directories specified in .feastignore""" + # Read ignore paths from .feastignore and create a set of all files that match any of these paths + ignore_paths = read_feastignore(repo_root) + [ + ".git", + ".feastignore", + ".venv", + "**/.ipynb_checkpoints", + "**/.pytest_cache", + "**/__pycache__", + ] + ignore_files = get_ignore_files(repo_root, ignore_paths) + + # List all Python files in the root directory (recursively) + repo_files = { + p.resolve() + for p in repo_root.glob("**/*.py") + if p.is_file() and "__init__.py" != p.name + } + # Ignore all files that match any of the ignore paths in .feastignore + repo_files -= ignore_files + + # Sort repo_files to read them in the same order every time + return sorted(repo_files) + + +def parse_repo(repo_root: Path) -> RepoContents: + """ + Collects unique Feast object definitions from the given feature repo. + + Specifically, if an object foo has already been added, bar will still be added if + (bar == foo), but not if (bar is foo). This ensures that import statements will + not result in duplicates, but defining two equal objects will. + """ + res = RepoContents( + projects=[], + data_sources=[], + entities=[], + feature_views=[], + feature_services=[], + on_demand_feature_views=[], + stream_feature_views=[], + permissions=[], + ) + + for repo_file in get_repo_files(repo_root): + module_path = py_path_to_module(repo_file) + module = importlib.import_module(module_path) + + for attr_name in dir(module): + obj = getattr(module, attr_name) + + if isinstance(obj, DataSource) and not any( + (obj is ds) for ds in res.data_sources + ): + res.data_sources.append(obj) + + # Handle batch sources defined within stream sources. + if ( + isinstance(obj, PushSource) + or isinstance(obj, KafkaSource) + or isinstance(obj, KinesisSource) + ): + batch_source = obj.batch_source + + if batch_source and not any( + (batch_source is ds) for ds in res.data_sources + ): + res.data_sources.append(batch_source) + if ( + isinstance(obj, FeatureView) + and not any((obj is fv) for fv in res.feature_views) + and not isinstance(obj, StreamFeatureView) + and not isinstance(obj, BatchFeatureView) + ): + res.feature_views.append(obj) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + if batch_source is not None and not any( + (batch_source is ds) for ds in res.data_sources + ): + res.data_sources.append(batch_source) + + # Handle stream sources defined with feature views. + if obj.stream_source: + stream_source = obj.stream_source + if not any((stream_source is ds) for ds in res.data_sources): + res.data_sources.append(stream_source) + elif isinstance(obj, StreamFeatureView) and not any( + (obj is sfv) for sfv in res.stream_feature_views + ): + res.stream_feature_views.append(obj) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + if batch_source is not None and not any( + (batch_source is ds) for ds in res.data_sources + ): + res.data_sources.append(batch_source) + assert obj.stream_source + stream_source = obj.stream_source + if not any((stream_source is ds) for ds in res.data_sources): + res.data_sources.append(stream_source) + elif isinstance(obj, BatchFeatureView) and not any( + (obj is bfv) for bfv in res.feature_views + ): + res.feature_views.append(obj) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + if batch_source is not None and not any( + (batch_source is ds) for ds in res.data_sources + ): + res.data_sources.append(batch_source) + elif isinstance(obj, Entity) and not any( + (obj is entity) for entity in res.entities + ): + res.entities.append(obj) + elif isinstance(obj, FeatureService) and not any( + (obj is fs) for fs in res.feature_services + ): + res.feature_services.append(obj) + elif isinstance(obj, OnDemandFeatureView) and not any( + (obj is odfv) for odfv in res.on_demand_feature_views + ): + res.on_demand_feature_views.append(obj) + elif isinstance(obj, Permission) and not any( + (obj is p) for p in res.permissions + ): + res.permissions.append(obj) + elif isinstance(obj, Project) and not any((obj is p) for p in res.projects): + res.projects.append(obj) + + res.entities.append(DUMMY_ENTITY) + return res + + +def plan( + repo_config: RepoConfig, + repo_path: Path, + skip_source_validation: bool, + skip_feature_view_validation: bool = False, +): + os.chdir(repo_path) + repo = _get_repo_contents(repo_path, repo_config.project, repo_config) + for project in repo.projects: + repo_config.project = project.name + store, registry = _get_store_and_registry(repo_config) + # TODO: When we support multiple projects in a single repo, we should filter repo contents by project + if not skip_source_validation: + provider = store._get_provider() + data_sources = [ + t.batch_source for t in repo.feature_views if t.batch_source is not None + ] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + provider.validate_data_source(store.config, data_source) + + registry_diff, infra_diff, _ = store.plan( + repo, skip_feature_view_validation=skip_feature_view_validation + ) + click.echo(registry_diff.to_string()) + click.echo(infra_diff.to_string()) + + +def _get_repo_contents( + repo_path, + project_name: Optional[str] = None, + repo_config: Optional[RepoConfig] = None, +): + sys.dont_write_bytecode = True + repo = parse_repo(repo_path) + + if len(repo.projects) < 1: + if project_name: + print( + f"No project found in the repository. Using project name {project_name} defined in feature_store.yaml" + ) + project_description = ( + repo_config.project_description if repo_config else None + ) + repo.projects.append( + Project(name=project_name, description=project_description or "") + ) + else: + print( + "No project found in the repository. Either define Project in repository or define a project in feature_store.yaml" + ) + sys.exit(1) + elif len(repo.projects) == 1: + if repo.projects[0].name != project_name: + print( + "Project object name should match with the project name defined in feature_store.yaml" + ) + sys.exit(1) + else: + print( + "Multiple projects found in the repository. Currently no support for multiple projects" + ) + sys.exit(1) + + return repo + + +def _get_store_and_registry(repo_config): + store = FeatureStore(config=repo_config) + registry = store.registry + return store, registry + + +def extract_objects_for_apply_delete(project, registry, repo): + # TODO(achals): This code path should be refactored to handle added & kept entities separately. + ( + _, + objs_to_delete, + objs_to_update, + objs_to_add, + ) = extract_objects_for_keep_delete_update_add(registry, project, repo) + + all_to_apply: List[ + Union[ + Entity, + FeatureView, + OnDemandFeatureView, + StreamFeatureView, + FeatureService, + ] + ] = [] + for object_type in FEAST_OBJECT_TYPES: + to_apply = set(objs_to_add[object_type]).union(objs_to_update[object_type]) + all_to_apply.extend(to_apply) + + all_to_delete: List[ + Union[ + Entity, + FeatureView, + OnDemandFeatureView, + StreamFeatureView, + FeatureService, + ] + ] = [] + for object_type in FEAST_OBJECT_TYPES: + all_to_delete.extend(objs_to_delete[object_type]) + + return ( + all_to_apply, + all_to_delete, + set(objs_to_add[FeastObjectType.FEATURE_VIEW]).union( + set(objs_to_update[FeastObjectType.FEATURE_VIEW]) + ), + objs_to_delete[FeastObjectType.FEATURE_VIEW], + ) + + +def apply_total_with_repo_instance( + store: FeatureStore, + project_name: str, + registry: BaseRegistry, + repo: RepoContents, + skip_source_validation: bool, + skip_feature_view_validation: bool = False, + no_promote: bool = False, +): + if not skip_source_validation: + provider = store._get_provider() + data_sources = [ + t.batch_source for t in repo.feature_views if t.batch_source is not None + ] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + provider.validate_data_source(store.config, data_source) + + # For each object in the registry, determine whether it should be kept or deleted. + ( + all_to_apply, + all_to_delete, + views_to_keep, + views_to_delete, + ) = extract_objects_for_apply_delete(project_name, registry, repo) + + try: + if store._should_use_plan(): + # Planning phase - compute diffs first without progress bars + registry_diff, infra_diff, new_infra = store.plan( + repo, + skip_feature_view_validation=skip_feature_view_validation, + ) + click.echo(registry_diff.to_string()) + + # Only show progress bars if there are actual infrastructure changes + progress_ctx = None + if len(infra_diff.infra_object_diffs) > 0: + from feast.diff.apply_progress import ApplyProgressContext + + progress_ctx = ApplyProgressContext() + progress_ctx.start_overall_progress() + + # Apply phase + store._apply_diffs( + registry_diff, + infra_diff, + new_infra, + progress_ctx=progress_ctx, + no_promote=no_promote, + ) + click.echo(infra_diff.to_string()) + else: + # Legacy apply path - no progress bars for legacy path + store.apply( + all_to_apply, + objects_to_delete=all_to_delete, + partial=False, + skip_feature_view_validation=skip_feature_view_validation, + no_promote=no_promote, + ) + log_infra_changes(views_to_keep, views_to_delete) + finally: + # Cleanup is handled in the new _apply_diffs method + pass + + +def log_infra_changes( + views_to_keep: Set[FeatureView], views_to_delete: Set[FeatureView] +): + from colorama import Fore, Style + + for view in views_to_keep: + click.echo( + f"Deploying infrastructure for {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" + ) + for view in views_to_delete: + click.echo( + f"Removing infrastructure for {Style.BRIGHT + Fore.RED}{view.name}{Style.RESET_ALL}" + ) + + +def create_feature_store( + ctx: click.Context, +) -> FeatureStore: + repo = ctx.obj["CHDIR"] + # If we received a base64 encoded version of feature_store.yaml, use that + config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME) + if config_base64: + print("Received base64 encoded feature_store.yaml") + config_bytes = base64.b64decode(config_base64) + # Create a new unique directory for writing feature_store.yaml + repo_path = Path(tempfile.mkdtemp()) + with open(repo_path / "feature_store.yaml", "wb") as f: + f.write(config_bytes) + return FeatureStore(repo_path=str(repo_path.resolve())) + else: + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + + +def apply_total( + repo_config: RepoConfig, + repo_path: Path, + skip_source_validation: bool, + skip_feature_view_validation: bool = False, + no_promote: bool = False, +): + os.chdir(repo_path) + repo = _get_repo_contents(repo_path, repo_config.project, repo_config) + for project in repo.projects: + repo_config.project = project.name + store, registry = _get_store_and_registry(repo_config) + if not is_valid_name(project.name): + print( + f"{project.name} is not valid. Project name should only have " + f"alphanumerical values, underscores, and hyphens but not start with an underscore or hyphen." + ) + sys.exit(1) + # TODO: When we support multiple projects in a single repo, we should filter repo contents by project. Currently there is no way to associate Feast objects to project. + print(f"Applying changes for project {project.name}") + apply_total_with_repo_instance( + store, + project.name, + registry, + repo, + skip_source_validation, + skip_feature_view_validation, + no_promote=no_promote, + ) + + +def teardown(repo_config: RepoConfig, repo_path: Optional[str]): + # Cannot pass in both repo_path and repo_config to FeatureStore. + feature_store = FeatureStore(repo_path=repo_path, config=repo_config) + feature_store.teardown() + + +def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str: + """For debugging only: output contents of the metadata registry""" + registry_config = repo_config.registry + project = repo_config.project + registry = Registry( + project, + registry_config=registry_config, + repo_path=repo_path, + auth_config=repo_config.auth_config, + ) + registry_dict = registry.to_dict(project=project) + return json.dumps(registry_dict, indent=2, sort_keys=True) + + +def cli_check_repo(repo_path: Path, fs_yaml_file: Path): + sys.path.append(str(repo_path)) + if not fs_yaml_file.exists(): + print( + f"Can't find feature repo configuration file at {fs_yaml_file}. " + "Make sure you're running feast from an initialized feast repository." + ) + sys.exit(1) + + +def init_repo(repo_name: str, template: str, repo_path: Optional[str] = None): + import os + from pathlib import Path + from shutil import copytree + + from colorama import Fore, Style + + # Validate project name + if not is_valid_name(repo_name): + raise BadParameter( + message="Name should be alphanumeric values, underscores, and hyphens but not start with an underscore or hyphen", + param_hint="PROJECT_DIRECTORY", + ) + + # Determine where to create the repository + if repo_path: + # User specified a custom path + target_path = Path(repo_path).resolve() + target_path.mkdir(parents=True, exist_ok=True) + display_path = repo_path + else: + # Default behavior: create subdirectory with project name + target_path = Path(os.path.join(Path.cwd(), repo_name)) + target_path.mkdir(exist_ok=True) + display_path = repo_name + + repo_config_path = target_path / "feature_store.yaml" + + if repo_config_path.exists(): + print( + f"The directory {Style.BRIGHT + Fore.GREEN}{display_path}{Style.RESET_ALL} contains an existing feature " + f"store repository that may cause a conflict" + ) + print() + sys.exit(1) + + # Copy template directory + template_path = str(Path(Path(__file__).parent / "templates" / template).absolute()) + if not os.path.exists(template_path): + raise IOError(f"Could not find template {template}") + copytree(template_path, str(target_path), dirs_exist_ok=True) + + # Rename gitignore files back to .gitignore + for gitignore_path in target_path.rglob("gitignore"): + gitignore_path.rename(gitignore_path.with_name(".gitignore")) + + # Seed the repository + bootstrap_path = target_path / "bootstrap.py" + if os.path.exists(bootstrap_path): + import importlib.util + + spec = importlib.util.spec_from_file_location("bootstrap", str(bootstrap_path)) + assert isinstance(spec, ModuleSpec) + bootstrap = importlib.util.module_from_spec(spec) + assert isinstance(spec.loader, Loader) + spec.loader.exec_module(bootstrap) + bootstrap.bootstrap() # type: ignore + os.remove(bootstrap_path) + + # Template the feature_store.yaml file + feature_store_yaml_path = target_path / "feature_repo" / "feature_store.yaml" + replace_str_in_file( + feature_store_yaml_path, "project: my_project", f"project: {repo_name}" + ) + + # Remove the __pycache__ folder if it exists + import shutil + + shutil.rmtree(target_path / "__pycache__", ignore_errors=True) + + import click + + click.echo() + click.echo( + f"Creating a new Feast repository in {Style.BRIGHT + Fore.GREEN}{target_path}{Style.RESET_ALL}." + ) + click.echo() + + +def is_valid_name(name: str) -> bool: + """A name should be alphanumeric values, underscores, and hyphens but not start with an underscore""" + return ( + not name.startswith(("_", "-")) and re.compile(r"[^\w-]+").search(name) is None + ) + + +def generate_project_name() -> str: + """Generates a unique project name""" + return f"{random.choice(adjectives)}_{random.choice(animals)}" diff --git a/sdk/python/feast/ui_server.py b/sdk/python/feast/ui_server.py index a1fd0f8d36e..883f995b82d 100644 --- a/sdk/python/feast/ui_server.py +++ b/sdk/python/feast/ui_server.py @@ -66,7 +66,7 @@ def _setup_rest_mode(app: FastAPI, store: "feast.FeatureStore"): grpc_handler = RegistryServer(store.registry) rest_app = FastAPI(root_path="/api/v1") - register_all_routes(rest_app, grpc_handler, server=None, store=store) + register_all_routes(rest_app, grpc_handler) app.mount("/api/v1", rest_app) @app.get("/health") diff --git a/sdk/python/tests/unit/test_metrics.py b/sdk/python/tests/unit/test_metrics.py index 798e28759a9..abf2a35e389 100644 --- a/sdk/python/tests/unit/test_metrics.py +++ b/sdk/python/tests/unit/test_metrics.py @@ -1732,184 +1732,3 @@ def test_does_not_raise_on_failure(self): status="error", latency_ms=5.0, ) - - -# --------------------------------------------------------------------------- -# System Metrics endpoint tests -# --------------------------------------------------------------------------- - - -class TestParsePrometheusText: - """Tests for _parse_prometheus_text in system_metrics.py.""" - - def test_parses_counter(self): - from feast.api.registry.rest.system_metrics import _parse_prometheus_text - - text = ( - "# HELP feast_offline_store_request_total Total requests\n" - "# TYPE feast_offline_store_request_total counter\n" - 'feast_offline_store_request_total{method="to_arrow",status="success"} 42\n' - ) - result = _parse_prometheus_text(text) - assert "feast_offline_store_request_total" in result - entry = result["feast_offline_store_request_total"] - assert entry["type"] == "counter" - assert len(entry["samples"]) == 1 - assert entry["samples"][0]["value"] == 42.0 - - def test_preserves_total_suffix(self): - """Counter _total suffix must NOT be stripped (was a reported bug).""" - from feast.api.registry.rest.system_metrics import _parse_prometheus_text - - text = ( - "# TYPE feast_offline_store_request_total counter\n" - "feast_offline_store_request_total 10\n" - ) - result = _parse_prometheus_text(text) - assert "feast_offline_store_request_total" in result - assert "feast_offline_store_request" not in result - - def test_groups_histogram_samples(self): - from feast.api.registry.rest.system_metrics import _parse_prometheus_text - - text = ( - "# TYPE feast_offline_store_request_latency_seconds histogram\n" - 'feast_offline_store_request_latency_seconds_bucket{le="0.1"} 5\n' - 'feast_offline_store_request_latency_seconds_bucket{le="1.0"} 15\n' - "feast_offline_store_request_latency_seconds_sum 12.5\n" - "feast_offline_store_request_latency_seconds_count 15\n" - ) - result = _parse_prometheus_text(text) - assert "feast_offline_store_request_latency_seconds" in result - entry = result["feast_offline_store_request_latency_seconds"] - assert entry["type"] == "histogram" - assert len(entry["samples"]) == 4 - - def test_empty_input(self): - from feast.api.registry.rest.system_metrics import _parse_prometheus_text - - assert _parse_prometheus_text("") == {} - - def test_skips_comment_and_help_lines(self): - from feast.api.registry.rest.system_metrics import _parse_prometheus_text - - text = ( - "# HELP some_metric A help string\n" - "# This is a random comment\n" - "# TYPE some_metric gauge\n" - "some_metric 3.14\n" - ) - result = _parse_prometheus_text(text) - assert "some_metric" in result - assert result["some_metric"]["samples"][0]["value"] == 3.14 - - def test_non_numeric_value(self): - from feast.api.registry.rest.system_metrics import _parse_prometheus_text - - text = "# TYPE info_metric gauge\ninfo_metric NaN\n" - result = _parse_prometheus_text(text) - assert len(result["info_metric"]["samples"]) == 1 - - -class TestSystemMetricsRouter: - """Tests for the FastAPI system-metrics router endpoints.""" - - def _make_test_client(self, store=None): - from fastapi import FastAPI - from fastapi.testclient import TestClient - - from feast.api.registry.rest.system_metrics import get_system_metrics_router - - app = FastAPI() - app.include_router(get_system_metrics_router(grpc_handler=None, store=store)) - return TestClient(app) - - @patch("feast.api.registry.rest.system_metrics.http_requests.get") - def test_promql_instant_success(self, mock_get): - mock_resp = MagicMock() - mock_resp.status_code = 200 - mock_resp.json.return_value = { - "status": "success", - "data": {"resultType": "vector"}, - } - mock_resp.raise_for_status = MagicMock() - mock_get.return_value = mock_resp - - client = self._make_test_client() - resp = client.get("/system-metrics/query", params={"query": "up"}) - assert resp.status_code == 200 - assert resp.json()["status"] == "success" - - @patch("feast.api.registry.rest.system_metrics.http_requests.get") - def test_promql_instant_connection_error(self, mock_get): - import requests - - mock_get.side_effect = requests.exceptions.ConnectionError("refused") - - client = self._make_test_client() - resp = client.get("/system-metrics/query", params={"query": "up"}) - assert resp.status_code == 503 - assert "Failed to connect" in resp.json()["detail"] - - @patch("feast.api.registry.rest.system_metrics.http_requests.get") - def test_promql_instant_timeout(self, mock_get): - import requests - - mock_get.side_effect = requests.exceptions.Timeout("timed out") - - client = self._make_test_client() - resp = client.get("/system-metrics/query", params={"query": "up"}) - assert resp.status_code == 504 - assert "timed out" in resp.json()["detail"] - - @patch("feast.api.registry.rest.system_metrics.http_requests.get") - def test_promql_range_success(self, mock_get): - mock_resp = MagicMock() - mock_resp.json.return_value = {"status": "success"} - mock_resp.raise_for_status = MagicMock() - mock_get.return_value = mock_resp - - client = self._make_test_client() - resp = client.get( - "/system-metrics/query_range", - params={ - "query": "up", - "start": "2026-01-01T00:00:00Z", - "end": "2026-01-02T00:00:00Z", - }, - ) - assert resp.status_code == 200 - - @patch("feast.api.registry.rest.system_metrics.http_requests.get") - def test_scrape_success(self, mock_get): - mock_resp = MagicMock() - mock_resp.status_code = 200 - mock_resp.text = "# TYPE up gauge\nup 1\n" - mock_resp.raise_for_status = MagicMock() - mock_get.return_value = mock_resp - - client = self._make_test_client() - resp = client.get("/system-metrics/scrape") - assert resp.status_code == 200 - body = resp.json() - assert "up" in body - - @patch("feast.api.registry.rest.system_metrics.http_requests.get") - def test_scrape_failure(self, mock_get): - import requests - - mock_get.side_effect = requests.exceptions.ConnectionError("refused") - - client = self._make_test_client() - resp = client.get("/system-metrics/scrape") - assert resp.status_code == 503 - assert "Failed to scrape" in resp.json()["detail"] - - def test_prometheus_url_from_store(self): - from feast.api.registry.rest.system_metrics import get_system_metrics_router - - store = MagicMock() - store.config.feature_server.metrics.prometheus_url = "http://custom:9090" - - router = get_system_metrics_router(grpc_handler=None, store=store) - assert router is not None