Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Operational metrics for offline store and SOX metrics for both
Signed-off-by: Jitendra Yejare <11752425+jyejare@users.noreply.github.com>
  • Loading branch information
jyejare committed Jun 2, 2026
commit 4c5fb2cd30fd109681e3ebcac577fbae5ecbfc62
79 changes: 67 additions & 12 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,28 +152,72 @@ class ChatRequest(BaseModel):
messages: List[ChatMessage]


def _resolve_feature_counts(
def _parse_feature_info(
features: Union[List[str], "feast.FeatureService"],
) -> tuple:
"""Return (feature_count, feature_view_count) from the resolved features.
"""Return ``(feature_view_names, feature_count)`` from resolved features.

``features`` is either a list of ``"feature_view:feature"`` strings or
a ``FeatureService`` with ``feature_view_projections``.

Returns:
(fv_names, feat_count) where fv_names is a list of unique feature
view name strings and feat_count is the total number of features.
"""
from feast.feature_service import FeatureService

if isinstance(features, FeatureService):
projections = features.feature_view_projections
fv_count = len(projections)
fv_names = [p.name for p in projections]
feat_count = sum(len(p.features) for p in projections)
elif isinstance(features, list):
feat_count = len(features)
fv_names = {ref.split(":")[0].split("@")[0] for ref in features if ":" in ref}
fv_count = len(fv_names)
fv_names = list(
{ref.split(":")[0].split("@")[0] for ref in features if ":" in ref}
Comment thread
jyejare marked this conversation as resolved.
Outdated
)
else:
fv_names = []
feat_count = 0
fv_count = 0
return str(feat_count), str(fv_count)
return fv_names, feat_count


def _resolve_feature_counts(
features: Union[List[str], "feast.FeatureService"],
) -> tuple:
"""Return ``(feature_count_str, feature_view_count_str)`` for Prometheus labels."""
fv_names, feat_count = _parse_feature_info(features)
return str(feat_count), str(len(fv_names))


def _emit_online_audit(
request: GetOnlineFeaturesRequest,
features: Union[List[str], "feast.FeatureService"],
entity_count: int,
status: str,
latency_ms: float,
):
"""Best-effort audit log emission for online feature requests."""
try:
from feast.permissions.security_manager import get_security_manager

requestor_id = "anonymous"
sm = get_security_manager()
if sm and sm.current_user:
requestor_id = sm.current_user.username or "anonymous"

fv_names, feat_count = _parse_feature_info(features)

feast_metrics.emit_online_audit_log(
requestor_id=requestor_id,
entity_keys=list(request.entities.keys()),
entity_count=entity_count,
feature_views=fv_names,
feature_count=feat_count,
status=status,
latency_ms=latency_ms,
)
except Exception:
logger.warning("Failed to emit online audit log", exc_info=True)


async def _get_features(
Expand Down Expand Up @@ -390,11 +434,22 @@ async def get_online_features(request: GetOnlineFeaturesRequest) -> Any:
include_feature_view_version_metadata=request.include_feature_view_version_metadata,
)

if store._get_provider().async_supported.online.read:
response = await store.get_online_features_async(**read_params) # type: ignore
else:
response = await run_in_threadpool(
lambda: store.get_online_features(**read_params) # type: ignore
audit_start_ms = time.monotonic() * 1000
audit_status = "success"
try:
if store._get_provider().async_supported.online.read:
response = await store.get_online_features_async(**read_params) # type: ignore
else:
response = await run_in_threadpool(
lambda: store.get_online_features(**read_params) # type: ignore
)
except Exception:
audit_status = "error"
raise
finally:
audit_latency_ms = time.monotonic() * 1000 - audit_start_ms
_emit_online_audit(
request, features, entity_count, audit_status, audit_latency_ms
)

response_dict = await run_in_threadpool(
Expand Down
11 changes: 11 additions & 0 deletions sdk/python/feast/infra/feature_servers/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ class MetricsConfig(FeastConfigBaseModel):
"""Emit per-feature-view freshness gauges
(feast_feature_freshness_seconds)."""

offline_features: StrictBool = True
"""Emit offline store retrieval metrics
(feast_offline_store_request_total,
feast_offline_store_request_latency_seconds,
feast_offline_store_row_count)."""

audit_logging: StrictBool = False
Comment thread
jyejare marked this conversation as resolved.
"""Emit structured JSON audit log entries for online and offline
feature requests via the ``feast.audit`` logger. Captures requestor
identity, entity keys, feature views, row counts, and latency."""


class BaseFeatureServerConfig(FeastConfigBaseModel):
"""Base Feature Server config that should be extended"""
Expand Down
65 changes: 63 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import warnings
from abc import ABC
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -70,6 +72,21 @@ def __init__(
self.max_event_timestamp = max_event_timestamp


def _extract_retrieval_metadata(job: "RetrievalJob") -> tuple:
"""Return ``(feature_view_names, feature_count)`` from a RetrievalJob's metadata."""
try:
meta = job.metadata
if meta:
feature_count = len(meta.features)
feature_views = list(
{ref.split(":")[0] for ref in meta.features if ":" in ref}
Comment thread
jyejare marked this conversation as resolved.
Outdated
)
return feature_views, feature_count
except (NotImplementedError, AttributeError):
pass
return [], 0


class RetrievalJob(ABC):
"""A RetrievalJob manages the execution of a query to retrieve data from the offline store."""

Expand Down Expand Up @@ -152,7 +169,51 @@ def to_arrow(
validation_reference (optional): The validation to apply against the retrieved dataframe.
timeout (optional): The query timeout if applicable.
"""
features_table = self._to_arrow_internal(timeout=timeout)
start_wall = time.monotonic()
status_label = "success"
row_count = 0
try:
features_table = self._to_arrow_internal(timeout=timeout)
row_count = features_table.num_rows
except Exception:
status_label = "error"
raise
finally:
try:
from feast import metrics as feast_metrics

elapsed = time.monotonic() - start_wall

if feast_metrics._config.offline_features:
feast_metrics.offline_store_request_total.labels(
method="to_arrow", status=status_label
).inc()
feast_metrics.offline_store_request_latency_seconds.labels(
method="to_arrow"
).observe(elapsed)
if row_count > 0:
feast_metrics.offline_store_row_count.labels(
method="to_arrow"
).observe(row_count)

if feast_metrics._config.audit_logging:
feature_views, feature_count = _extract_retrieval_metadata(self)
now_iso = datetime.now(tz=timezone.utc).isoformat()
feast_metrics.emit_offline_audit_log(
method="to_arrow",
feature_views=feature_views,
feature_count=feature_count,
row_count=row_count,
status=status_label,
start_time=now_iso,
end_time=now_iso,
Comment thread
jyejare marked this conversation as resolved.
Outdated
duration_ms=elapsed * 1000,
)
except Exception:
logging.getLogger(__name__).debug(
"Failed to record offline store metrics", exc_info=True
)

if self.on_demand_feature_views:
# Build a mapping of ODFV name to requested feature names
# This ensures we only return the features that were explicitly requested
Expand Down
103 changes: 102 additions & 1 deletion sdk/python/feast/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"""

import atexit
import json
import logging
import os
import shutil
Expand All @@ -51,7 +52,7 @@
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, List, Optional

import psutil

Expand Down Expand Up @@ -123,6 +124,8 @@ class _MetricsFlags:
push: bool = False
materialization: bool = False
freshness: bool = False
offline_features: bool = False
audit_logging: bool = False


_config = _MetricsFlags()
Expand All @@ -144,6 +147,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag
push=True,
materialization=True,
freshness=True,
offline_features=True,
audit_logging=False,
)
return _MetricsFlags(
enabled=True,
Expand All @@ -153,6 +158,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag
push=getattr(metrics_config, "push", True),
materialization=getattr(metrics_config, "materialization", True),
freshness=getattr(metrics_config, "freshness", True),
offline_features=getattr(metrics_config, "offline_features", True),
audit_logging=getattr(metrics_config, "audit_logging", False),
)


Expand Down Expand Up @@ -260,6 +267,33 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag
multiprocess_mode="max",
)

# ---------------------------------------------------------------------------
# Offline store retrieval metrics
# ---------------------------------------------------------------------------
offline_store_request_total = Counter(
"feast_offline_store_request_total",
"Total offline store retrieval requests",
["method", "status"],
)
offline_store_request_latency_seconds = Histogram(
"feast_offline_store_request_latency_seconds",
"Latency of offline store retrieval operations in seconds",
["method"],
buckets=(0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0),
)
offline_store_row_count = Histogram(
"feast_offline_store_row_count",
"Number of rows returned by offline store retrieval",
["method"],
buckets=(100, 1000, 10000, 100000, 500000, 1000000, 5000000),
)

# ---------------------------------------------------------------------------
# Audit logger — separate from the main feast logger so operators can
# route SOX-style audit entries to a dedicated sink.
# ---------------------------------------------------------------------------
audit_logger = logging.getLogger("feast.audit")

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -388,6 +422,71 @@ def track_materialization(
)


def emit_online_audit_log(
*,
requestor_id: str,
entity_keys: List[str],
entity_count: int,
feature_views: List[str],
feature_count: int,
status: str,
latency_ms: float,
):
"""Emit a structured JSON audit log entry for an online feature request."""
if not _config.audit_logging:
return
audit_logger.info(
_json_dumps(
{
"event": "online_feature_request",
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
"requestor_id": requestor_id,
"entity_keys": entity_keys,
"entity_count": entity_count,
"feature_views": feature_views,
"feature_count": feature_count,
"status": status,
"latency_ms": round(latency_ms, 2),
}
)
)


def emit_offline_audit_log(
*,
method: str,
feature_views: List[str],
feature_count: int,
row_count: int,
status: str,
start_time: str,
end_time: str,
duration_ms: float,
):
"""Emit a structured JSON audit log entry for an offline feature retrieval."""
if not _config.audit_logging:
return
audit_logger.info(
_json_dumps(
{
"event": "offline_feature_retrieval",
Comment thread
jyejare marked this conversation as resolved.
"method": method,
"start_time": start_time,
"end_time": end_time,
"feature_views": feature_views,
"feature_count": feature_count,
"row_count": row_count,
"status": status,
"duration_ms": round(duration_ms, 2),
}
)
)


def _json_dumps(obj: dict) -> str:
return json.dumps(obj, separators=(",", ":"))


def update_feature_freshness(
store: "FeatureStore",
) -> None:
Expand Down Expand Up @@ -507,6 +606,8 @@ def start_metrics_server(
push=True,
materialization=True,
freshness=True,
offline_features=True,
audit_logging=False,
)

from prometheus_client import CollectorRegistry, make_wsgi_app
Expand Down
Loading