Skip to content

Commit 940a4af

Browse files
committed
Statistical/Distribution metrics in Feast
Signed-off-by: Jitendra Yejare <11752425+jyejare@users.noreply.github.com>
1 parent 73805d3 commit 940a4af

File tree

14 files changed

+2924
-623
lines changed

14 files changed

+2924
-623
lines changed

sdk/python/feast/api/registry/rest/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from feast.api.registry.rest.features import get_feature_router
88
from feast.api.registry.rest.lineage import get_lineage_router
99
from feast.api.registry.rest.metrics import get_metrics_router
10+
from feast.api.registry.rest.monitoring import get_monitoring_router
1011
from feast.api.registry.rest.permissions import get_permission_router
1112
from feast.api.registry.rest.projects import get_project_router
1213
from feast.api.registry.rest.saved_datasets import get_saved_dataset_router
@@ -25,3 +26,4 @@ def register_all_routes(app: FastAPI, grpc_handler, server=None):
2526
app.include_router(get_saved_dataset_router(grpc_handler))
2627
app.include_router(get_search_router(grpc_handler))
2728
app.include_router(get_metrics_router(grpc_handler, server))
29+
app.include_router(get_monitoring_router(grpc_handler, server))
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
import logging
2+
from datetime import date
3+
from typing import List, Optional
4+
5+
from fastapi import APIRouter, HTTPException, Query
6+
from pydantic import BaseModel, Field
7+
8+
from feast.monitoring.monitoring_service import MonitoringService
9+
from feast.permissions.action import AuthzedAction
10+
from feast.permissions.security_manager import assert_permissions
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class ComputeMetricsRequest(BaseModel):
16+
project: str = Field(..., description="Feast project name")
17+
feature_view_name: Optional[str] = Field(
18+
None, description="Feature view name (null = all)"
19+
)
20+
feature_names: Optional[List[str]] = Field(
21+
None, description="Feature names to compute (null = all)"
22+
)
23+
start_date: Optional[str] = Field(
24+
None, description="Start date (YYYY-MM-DD), defaults to yesterday"
25+
)
26+
end_date: Optional[str] = Field(
27+
None, description="End date (YYYY-MM-DD), defaults to today"
28+
)
29+
data_source_type: str = Field(..., description="Data source: 'batch' or 'log'")
30+
set_baseline: bool = Field(
31+
False, description="Mark this computation as the baseline"
32+
)
33+
34+
35+
class ComputeMetricsResponse(BaseModel):
36+
status: str
37+
data_source_type: str
38+
computed_features: int
39+
computed_feature_views: int
40+
computed_feature_services: int
41+
metric_dates: List[str]
42+
duration_ms: int
43+
44+
45+
def get_monitoring_router(grpc_handler, server=None) -> APIRouter:
46+
router = APIRouter()
47+
48+
def _get_monitoring_service() -> MonitoringService:
49+
if server is None or not hasattr(server, "store"):
50+
raise HTTPException(
51+
status_code=500,
52+
detail="Failed to access monitoring service: server store not available",
53+
)
54+
return MonitoringService(server.store)
55+
56+
def _assert_fv_permission(
57+
project: str, feature_view_name: str, action: AuthzedAction
58+
):
59+
try:
60+
fv = server.store.registry.get_feature_view(
61+
name=feature_view_name, project=project
62+
)
63+
assert_permissions(fv, actions=[action])
64+
except Exception:
65+
pass
66+
67+
@router.post(
68+
"/monitoring/compute",
69+
tags=["Monitoring"],
70+
response_model=ComputeMetricsResponse,
71+
)
72+
async def compute_metrics(request: ComputeMetricsRequest):
73+
if request.data_source_type not in ("batch", "log"):
74+
raise HTTPException(
75+
status_code=422,
76+
detail="data_source_type must be 'batch' or 'log'",
77+
)
78+
79+
if request.feature_view_name:
80+
_assert_fv_permission(
81+
request.project, request.feature_view_name, AuthzedAction.UPDATE
82+
)
83+
84+
svc = _get_monitoring_service()
85+
86+
start_d = _parse_date(request.start_date) if request.start_date else None
87+
end_d = _parse_date(request.end_date) if request.end_date else None
88+
89+
try:
90+
result = svc.compute_metrics(
91+
project=request.project,
92+
feature_view_name=request.feature_view_name,
93+
feature_names=request.feature_names,
94+
start_date=start_d,
95+
end_date=end_d,
96+
data_source_type=request.data_source_type,
97+
set_baseline=request.set_baseline,
98+
)
99+
return ComputeMetricsResponse(**result)
100+
except NotImplementedError as e:
101+
raise HTTPException(status_code=501, detail=str(e))
102+
except Exception as e:
103+
logger.exception("Failed to compute monitoring metrics")
104+
raise HTTPException(
105+
status_code=500,
106+
detail=f"Failed to compute monitoring metrics: {str(e)}",
107+
)
108+
109+
@router.get("/monitoring/metrics/features", tags=["Monitoring"])
110+
async def get_feature_metrics(
111+
project: str = Query(...),
112+
feature_view_name: Optional[str] = Query(None),
113+
feature_name: Optional[str] = Query(None),
114+
feature_service_name: Optional[str] = Query(None),
115+
data_source_type: Optional[str] = Query(None),
116+
start_date: Optional[str] = Query(None),
117+
end_date: Optional[str] = Query(None),
118+
):
119+
if feature_view_name:
120+
_assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE)
121+
122+
svc = _get_monitoring_service()
123+
return {
124+
"metrics": svc.get_feature_metrics(
125+
project=project,
126+
feature_service_name=feature_service_name,
127+
feature_view_name=feature_view_name,
128+
feature_name=feature_name,
129+
data_source_type=data_source_type,
130+
start_date=_parse_date(start_date) if start_date else None,
131+
end_date=_parse_date(end_date) if end_date else None,
132+
)
133+
}
134+
135+
@router.get("/monitoring/metrics/feature_views", tags=["Monitoring"])
136+
async def get_feature_view_metrics(
137+
project: str = Query(...),
138+
feature_view_name: Optional[str] = Query(None),
139+
feature_service_name: Optional[str] = Query(None),
140+
data_source_type: Optional[str] = Query(None),
141+
start_date: Optional[str] = Query(None),
142+
end_date: Optional[str] = Query(None),
143+
):
144+
if feature_view_name:
145+
_assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE)
146+
147+
svc = _get_monitoring_service()
148+
return {
149+
"metrics": svc.get_feature_view_metrics(
150+
project=project,
151+
feature_service_name=feature_service_name,
152+
feature_view_name=feature_view_name,
153+
data_source_type=data_source_type,
154+
start_date=_parse_date(start_date) if start_date else None,
155+
end_date=_parse_date(end_date) if end_date else None,
156+
)
157+
}
158+
159+
@router.get("/monitoring/metrics/feature_services", tags=["Monitoring"])
160+
async def get_feature_service_metrics(
161+
project: str = Query(...),
162+
feature_service_name: Optional[str] = Query(None),
163+
data_source_type: Optional[str] = Query(None),
164+
start_date: Optional[str] = Query(None),
165+
end_date: Optional[str] = Query(None),
166+
):
167+
svc = _get_monitoring_service()
168+
return {
169+
"metrics": svc.get_feature_service_metrics(
170+
project=project,
171+
feature_service_name=feature_service_name,
172+
data_source_type=data_source_type,
173+
start_date=_parse_date(start_date) if start_date else None,
174+
end_date=_parse_date(end_date) if end_date else None,
175+
)
176+
}
177+
178+
@router.get("/monitoring/metrics/baseline", tags=["Monitoring"])
179+
async def get_baseline(
180+
project: str = Query(...),
181+
feature_view_name: Optional[str] = Query(None),
182+
feature_name: Optional[str] = Query(None),
183+
data_source_type: Optional[str] = Query(None),
184+
):
185+
if feature_view_name:
186+
_assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE)
187+
188+
svc = _get_monitoring_service()
189+
return {
190+
"metrics": svc.get_baseline(
191+
project=project,
192+
feature_view_name=feature_view_name,
193+
feature_name=feature_name,
194+
data_source_type=data_source_type,
195+
)
196+
}
197+
198+
@router.get("/monitoring/metrics/timeseries", tags=["Monitoring"])
199+
async def get_timeseries(
200+
project: str = Query(...),
201+
feature_view_name: Optional[str] = Query(None),
202+
feature_name: Optional[str] = Query(None),
203+
feature_service_name: Optional[str] = Query(None),
204+
data_source_type: Optional[str] = Query(None),
205+
start_date: Optional[str] = Query(None),
206+
end_date: Optional[str] = Query(None),
207+
granularity: str = Query("daily"),
208+
):
209+
if feature_view_name:
210+
_assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE)
211+
212+
svc = _get_monitoring_service()
213+
metrics = svc.get_timeseries(
214+
project=project,
215+
feature_view_name=feature_view_name,
216+
feature_name=feature_name,
217+
feature_service_name=feature_service_name,
218+
data_source_type=data_source_type,
219+
start_date=_parse_date(start_date) if start_date else None,
220+
end_date=_parse_date(end_date) if end_date else None,
221+
)
222+
223+
return {
224+
"granularity": granularity,
225+
"timeseries": metrics,
226+
}
227+
228+
return router
229+
230+
231+
def _parse_date(date_str: str) -> date:
232+
return date.fromisoformat(date_str)

0 commit comments

Comments
 (0)