Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/python/feast/api/registry/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from feast.api.registry.rest.features import get_feature_router
from feast.api.registry.rest.lineage import get_lineage_router
from feast.api.registry.rest.metrics import get_metrics_router
from feast.api.registry.rest.monitoring import get_monitoring_router
from feast.api.registry.rest.permissions import get_permission_router
from feast.api.registry.rest.projects import get_project_router
from feast.api.registry.rest.saved_datasets import get_saved_dataset_router
Expand All @@ -25,3 +26,4 @@ def register_all_routes(app: FastAPI, grpc_handler, server=None):
app.include_router(get_saved_dataset_router(grpc_handler))
app.include_router(get_search_router(grpc_handler))
app.include_router(get_metrics_router(grpc_handler, server))
app.include_router(get_monitoring_router(grpc_handler, server))
232 changes: 232 additions & 0 deletions sdk/python/feast/api/registry/rest/monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import logging
from datetime import date
from typing import List, Optional

from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field

from feast.monitoring.monitoring_service import MonitoringService
from feast.permissions.action import AuthzedAction
from feast.permissions.security_manager import assert_permissions

logger = logging.getLogger(__name__)


class ComputeMetricsRequest(BaseModel):
project: str = Field(..., description="Feast project name")
feature_view_name: Optional[str] = Field(
None, description="Feature view name (null = all)"
)
feature_names: Optional[List[str]] = Field(
None, description="Feature names to compute (null = all)"
)
start_date: Optional[str] = Field(
None, description="Start date (YYYY-MM-DD), defaults to yesterday"
)
end_date: Optional[str] = Field(
None, description="End date (YYYY-MM-DD), defaults to today"
)
data_source_type: str = Field(..., description="Data source: 'batch' or 'log'")
set_baseline: bool = Field(
False, description="Mark this computation as the baseline"
)


class ComputeMetricsResponse(BaseModel):
status: str
data_source_type: str
computed_features: int
computed_feature_views: int
computed_feature_services: int
metric_dates: List[str]
duration_ms: int


def get_monitoring_router(grpc_handler, server=None) -> APIRouter:
router = APIRouter()

def _get_monitoring_service() -> MonitoringService:
if server is None or not hasattr(server, "store"):
raise HTTPException(
status_code=500,
detail="Failed to access monitoring service: server store not available",
)
return MonitoringService(server.store)

def _assert_fv_permission(
project: str, feature_view_name: str, action: AuthzedAction
):
try:
fv = server.store.registry.get_feature_view(
name=feature_view_name, project=project
)
assert_permissions(fv, actions=[action])
except Exception:
pass
Comment on lines +59 to +65
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Permission check silently swallows all exceptions, completely bypassing RBAC

_assert_fv_permission wraps the entire permission check in except Exception: pass, which catches and ignores FeastPermissionError raised by assert_permissions when the user is unauthorized. As confirmed in feast/permissions/enforcer.py, FeastPermissionError inherits from Exception (via feast/errors.py:568). This means every call to _assert_fv_permission is a no-op — unauthorized users can compute metrics (UPDATE action) and read monitoring data (DESCRIBE action) for any feature view without restriction.

Suggested change
try:
fv = server.store.registry.get_feature_view(
name=feature_view_name, project=project
)
assert_permissions(fv, actions=[action])
except Exception:
pass
try:
fv = server.store.registry.get_feature_view(
name=feature_view_name, project=project
)
assert_permissions(fv, actions=[action])
except FeastObjectNotFoundException:
pass
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


@router.post(
"/monitoring/compute",
tags=["Monitoring"],
response_model=ComputeMetricsResponse,
)
async def compute_metrics(request: ComputeMetricsRequest):
if request.data_source_type not in ("batch", "log"):
raise HTTPException(
status_code=422,
detail="data_source_type must be 'batch' or 'log'",
)

if request.feature_view_name:
_assert_fv_permission(
request.project, request.feature_view_name, AuthzedAction.UPDATE
)

svc = _get_monitoring_service()

start_d = _parse_date(request.start_date) if request.start_date else None
end_d = _parse_date(request.end_date) if request.end_date else None

try:
result = svc.compute_metrics(
project=request.project,
feature_view_name=request.feature_view_name,
feature_names=request.feature_names,
start_date=start_d,
end_date=end_d,
data_source_type=request.data_source_type,
set_baseline=request.set_baseline,
)
return ComputeMetricsResponse(**result)
except NotImplementedError as e:
raise HTTPException(status_code=501, detail=str(e))
except Exception as e:
logger.exception("Failed to compute monitoring metrics")
raise HTTPException(
status_code=500,
detail=f"Failed to compute monitoring metrics: {str(e)}",
)

@router.get("/monitoring/metrics/features", tags=["Monitoring"])
async def get_feature_metrics(
project: str = Query(...),
feature_view_name: Optional[str] = Query(None),
feature_name: Optional[str] = Query(None),
feature_service_name: Optional[str] = Query(None),
data_source_type: Optional[str] = Query(None),
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
):
if feature_view_name:
_assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE)

svc = _get_monitoring_service()
return {
"metrics": svc.get_feature_metrics(
project=project,
feature_service_name=feature_service_name,
feature_view_name=feature_view_name,
feature_name=feature_name,
data_source_type=data_source_type,
start_date=_parse_date(start_date) if start_date else None,
end_date=_parse_date(end_date) if end_date else None,
)
}

@router.get("/monitoring/metrics/feature_views", tags=["Monitoring"])
async def get_feature_view_metrics(
project: str = Query(...),
feature_view_name: Optional[str] = Query(None),
feature_service_name: Optional[str] = Query(None),
data_source_type: Optional[str] = Query(None),
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
):
if feature_view_name:
_assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE)

svc = _get_monitoring_service()
return {
"metrics": svc.get_feature_view_metrics(
project=project,
feature_service_name=feature_service_name,
feature_view_name=feature_view_name,
data_source_type=data_source_type,
start_date=_parse_date(start_date) if start_date else None,
end_date=_parse_date(end_date) if end_date else None,
)
}

@router.get("/monitoring/metrics/feature_services", tags=["Monitoring"])
async def get_feature_service_metrics(
project: str = Query(...),
feature_service_name: Optional[str] = Query(None),
data_source_type: Optional[str] = Query(None),
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
):
svc = _get_monitoring_service()
return {
"metrics": svc.get_feature_service_metrics(
project=project,
feature_service_name=feature_service_name,
data_source_type=data_source_type,
start_date=_parse_date(start_date) if start_date else None,
end_date=_parse_date(end_date) if end_date else None,
)
}

@router.get("/monitoring/metrics/baseline", tags=["Monitoring"])
async def get_baseline(
project: str = Query(...),
feature_view_name: Optional[str] = Query(None),
feature_name: Optional[str] = Query(None),
data_source_type: Optional[str] = Query(None),
):
if feature_view_name:
_assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE)

svc = _get_monitoring_service()
return {
"metrics": svc.get_baseline(
project=project,
feature_view_name=feature_view_name,
feature_name=feature_name,
data_source_type=data_source_type,
)
}

@router.get("/monitoring/metrics/timeseries", tags=["Monitoring"])
async def get_timeseries(
project: str = Query(...),
feature_view_name: Optional[str] = Query(None),
feature_name: Optional[str] = Query(None),
feature_service_name: Optional[str] = Query(None),
data_source_type: Optional[str] = Query(None),
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
granularity: str = Query("daily"),
):
if feature_view_name:
_assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE)

svc = _get_monitoring_service()
metrics = svc.get_timeseries(
project=project,
feature_view_name=feature_view_name,
feature_name=feature_name,
feature_service_name=feature_service_name,
data_source_type=data_source_type,
start_date=_parse_date(start_date) if start_date else None,
end_date=_parse_date(end_date) if end_date else None,
)

return {
"granularity": granularity,
"timeseries": metrics,
}

return router


def _parse_date(date_str: str) -> date:
return date.fromisoformat(date_str)
Loading
Loading