Skip to content

Commit 3664e38

Browse files
committed
chore: Update the monitoring backend based on frontent needs
Signed-off-by: Jitendra Yejare <11752425+jyejare@users.noreply.github.com>
1 parent 7ca9824 commit 3664e38

7 files changed

Lines changed: 71 additions & 22 deletions

File tree

sdk/python/feast/api/registry/rest/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from feast.api.registry.rest.search import get_search_router
1515

1616

17-
def register_all_routes(app: FastAPI, grpc_handler, server=None):
17+
def register_all_routes(app: FastAPI, grpc_handler, server=None, store=None):
1818
app.include_router(get_entity_router(grpc_handler))
1919
app.include_router(get_data_source_router(grpc_handler))
2020
app.include_router(get_feature_service_router(grpc_handler))
@@ -26,4 +26,7 @@ def register_all_routes(app: FastAPI, grpc_handler, server=None):
2626
app.include_router(get_saved_dataset_router(grpc_handler))
2727
app.include_router(get_search_router(grpc_handler))
2828
app.include_router(get_metrics_router(grpc_handler, server))
29-
app.include_router(get_monitoring_router(grpc_handler, server))
29+
resolved_store = store or (
30+
server.store if server and hasattr(server, "store") else None
31+
)
32+
app.include_router(get_monitoring_router(grpc_handler, store=resolved_store))

sdk/python/feast/api/registry/rest/monitoring.py

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,37 +51,43 @@ class ComputeTransientRequest(BaseModel):
5151
end_date: Optional[str] = None
5252

5353

54-
def get_monitoring_router(grpc_handler, server=None):
54+
def get_monitoring_router(grpc_handler, store=None):
5555
router = APIRouter()
5656

5757
_monitoring_service = None
5858

5959
def _get_monitoring_service():
6060
nonlocal _monitoring_service
6161
if _monitoring_service is None:
62+
if store is None:
63+
raise HTTPException(
64+
status_code=503,
65+
detail="Monitoring service is not available: no FeatureStore configured",
66+
)
6267
from feast.monitoring.monitoring_service import MonitoringService
6368

64-
store = server.store if server else grpc_handler.store
6569
_monitoring_service = MonitoringService(store)
6670
return _monitoring_service
6771

6872
def _get_store():
69-
return server.store if server else grpc_handler.store
73+
if store is None:
74+
raise HTTPException(
75+
status_code=503,
76+
detail="Monitoring service is not available: no FeatureStore configured",
77+
)
78+
return store
7079

7180
# ------------------------------------------------------------------ #
7281
# DQM Job: submit and track
7382
# ------------------------------------------------------------------ #
7483

7584
@router.post("/monitoring/compute", tags=["Monitoring"])
7685
async def compute_metrics(request: ComputeMetricsRequest):
77-
"""Submit a DQM job to compute and store metrics. Returns job_id."""
78-
if request.granularity not in VALID_GRANULARITIES:
79-
raise HTTPException(
80-
status_code=400,
81-
detail=f"Invalid granularity '{request.granularity}'. "
82-
f"Must be one of {VALID_GRANULARITIES}",
83-
)
86+
"""Submit a DQM job to compute and store metrics. Returns job_id.
8487
88+
When set_baseline is True and no date range is provided, computes
89+
baseline from all available source data.
90+
"""
8591
store = _get_store()
8692
if request.feature_view_name:
8793
fv = store.registry.get_feature_view(
@@ -91,6 +97,24 @@ async def compute_metrics(request: ComputeMetricsRequest):
9197

9298
svc = _get_monitoring_service()
9399

100+
if request.set_baseline and not request.start_date and not request.end_date:
101+
try:
102+
result = svc.compute_baseline(
103+
project=request.project,
104+
feature_view_name=request.feature_view_name,
105+
feature_names=request.feature_names,
106+
)
107+
return result
108+
except Exception as e:
109+
raise HTTPException(status_code=500, detail=str(e))
110+
111+
if request.granularity not in VALID_GRANULARITIES:
112+
raise HTTPException(
113+
status_code=400,
114+
detail=f"Invalid granularity '{request.granularity}'. "
115+
f"Must be one of {VALID_GRANULARITIES}",
116+
)
117+
94118
params: Dict[str, Any] = {}
95119
if request.start_date:
96120
params["start_date"] = request.start_date
@@ -108,7 +132,6 @@ async def compute_metrics(request: ComputeMetricsRequest):
108132
parameters=params,
109133
)
110134

111-
# Execute synchronously for now; async worker is a future enhancement
112135
try:
113136
result = svc.execute_job(job_id)
114137
return {"job_id": job_id, **result}

sdk/python/feast/api/registry/rest/rest_registry_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ async def dispatch(self, request: Request, call_next):
269269
)
270270

271271
def _register_routes(self):
272-
register_all_routes(self.app, self.grpc_handler, self)
272+
register_all_routes(self.app, self.grpc_handler, self, store=self.store)
273273

274274
def _add_openapi_security(self):
275275
if self.app.openapi_schema:

sdk/python/feast/cli/monitor.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,19 @@ def _run_batch_monitoring(
131131
set_baseline,
132132
auto_mode,
133133
):
134-
if auto_mode and not set_baseline:
134+
if auto_mode and set_baseline and not start_date and not end_date:
135+
click.echo("Computing baseline from all available source data...")
136+
result = svc.compute_baseline(
137+
project=project,
138+
feature_view_name=feature_view,
139+
feature_names=feat_names,
140+
)
141+
click.echo(f"Status: {result['status']}")
142+
click.echo(f"Features computed: {result['computed_features']}")
143+
click.echo(f"Feature views computed: {result['computed_feature_views']}")
144+
click.echo(f"Duration: {result['duration_ms']}ms")
145+
click.echo("Baseline: SET")
146+
elif auto_mode and not set_baseline:
135147
click.echo("Auto-computing batch metrics for all granularities...")
136148
result = svc.auto_compute(
137149
project=project,

sdk/python/feast/repo_operations.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,15 @@ def _submit_baseline_jobs_if_needed(store, project_name, repo):
428428
)
429429
for job_id in job_ids:
430430
click.echo(f" → Queued baseline metrics computation (DQM job: {job_id})")
431+
try:
432+
svc.execute_job(job_id)
433+
click.echo(f" ✓ Baseline computed (job: {job_id})")
434+
except Exception:
435+
logging.getLogger(__name__).debug(
436+
"Baseline job %s execution failed (non-critical)",
437+
job_id,
438+
exc_info=True,
439+
)
431440
except Exception:
432441
logging.getLogger(__name__).debug(
433442
"Monitoring baseline submission skipped (non-critical)", exc_info=True

sdk/python/tests/integration/monitoring/test_monitoring_integration.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -482,17 +482,18 @@ def app(self):
482482
from feast.api.registry.rest.monitoring import get_monitoring_router
483483

484484
mock_handler = MagicMock()
485-
mock_server = MagicMock()
486485

487486
fv = _make_feature_view(
488487
"driver_stats",
489488
[_make_feature_field("conv_rate", PrimitiveFeastType.FLOAT64)],
490489
)
491-
mock_server.store = _make_mock_store([fv])
490+
mock_store = _make_mock_store([fv])
492491

493492
app = FastAPI()
494-
app.include_router(get_monitoring_router(mock_handler, mock_server))
493+
app.include_router(get_monitoring_router(mock_handler, store=mock_store))
495494

495+
mock_server = MagicMock()
496+
mock_server.store = mock_store
496497
return TestClient(app), mock_server
497498

498499
@patch("feast.api.registry.rest.monitoring.assert_permissions")
@@ -569,17 +570,18 @@ def app(self):
569570
from feast.api.registry.rest.monitoring import get_monitoring_router
570571

571572
mock_handler = MagicMock()
572-
mock_server = MagicMock()
573573

574574
fv = _make_feature_view(
575575
"driver_stats",
576576
[_make_feature_field("conv_rate", PrimitiveFeastType.FLOAT64)],
577577
)
578-
mock_server.store = _make_mock_store([fv])
578+
mock_store = _make_mock_store([fv])
579579

580580
app = FastAPI()
581-
app.include_router(get_monitoring_router(mock_handler, mock_server))
581+
app.include_router(get_monitoring_router(mock_handler, store=mock_store))
582582

583+
mock_server = MagicMock()
584+
mock_server.store = mock_store
583585
return TestClient(app), mock_server
584586

585587
@patch("feast.api.registry.rest.monitoring.assert_permissions")

sdk/python/tests/unit/api/test_api_rest_registry_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def test_rest_registry_server_initializes_correctly(
4141

4242
# Validate route registration and auth init
4343
mock_register_all_routes.assert_called_once_with(
44-
server.app, mock_grpc_handler, server
44+
server.app, mock_grpc_handler, server, store=store
4545
)
4646
mock_init_security_manager.assert_called_once()
4747
mock_init_auth_manager.assert_called_once()

0 commit comments

Comments
 (0)