from datetime import date from typing import List, Optional import click from feast.infra.offline_stores.offline_store import OfflineStore from feast.repo_operations import create_feature_store VALID_GRANULARITIES = OfflineStore.MONITORING_VALID_GRANULARITIES @click.group(name="monitor") def monitor_cmd(): """Feature monitoring commands.""" pass @monitor_cmd.command("run") @click.option( "--project", "-p", default=None, help="Feast project name. Defaults to the project in feature_store.yaml.", ) @click.option( "--feature-view", "-v", default=None, help="Feature view name. If omitted, all feature views are computed.", ) @click.option( "--feature-name", "-f", multiple=True, help="Feature name(s) to compute. Can be specified multiple times.", ) @click.option( "--start-date", default=None, help="Start date (YYYY-MM-DD). If omitted, auto-detected from source data.", ) @click.option( "--end-date", default=None, help="End date (YYYY-MM-DD). If omitted, auto-detected from source data.", ) @click.option( "--granularity", "-g", default=None, type=click.Choice(list(VALID_GRANULARITIES)), help="Metric granularity. If omitted, all granularities are computed (auto mode).", ) @click.option( "--set-baseline", is_flag=True, default=False, help="Mark this computation as the baseline for drift detection.", ) @click.option( "--feature-service", "-s", default=None, help="Feature service name (required for --source-type log with explicit dates).", ) @click.option( "--source-type", type=click.Choice(["batch", "log", "all"]), default="batch", help="Data source type: 'batch' (offline store), 'log' (serving logs), or 'all'.", ) @click.pass_context def monitor_run( ctx: click.Context, project: Optional[str], feature_view: Optional[str], feature_name: tuple, start_date: Optional[str], end_date: Optional[str], granularity: Optional[str], set_baseline: bool, feature_service: Optional[str], source_type: str, ): """Compute feature quality metrics. Without --start-date/--end-date/--granularity, runs in auto mode: detects date ranges from source data and computes all granularities. Use --source-type log to compute metrics from feature serving logs (requires feature services with logging configured). """ store = create_feature_store(ctx) if project is None: project = store.project from feast.monitoring.monitoring_service import MonitoringService svc = MonitoringService(store) auto_mode = start_date is None and end_date is None and granularity is None feat_names: Optional[List[str]] = list(feature_name) if feature_name else None if source_type in ("batch", "all"): _run_batch_monitoring( svc, project, feature_view, feat_names, start_date, end_date, granularity, set_baseline, auto_mode, ) if source_type in ("log", "all"): _run_log_monitoring( svc, project, feature_service, start_date, end_date, granularity, auto_mode, ) def _run_batch_monitoring( svc, project, feature_view, feat_names, start_date, end_date, granularity, set_baseline, auto_mode, ): if auto_mode and set_baseline and not start_date and not end_date: click.echo("Computing baseline from all available source data...") result = svc.compute_baseline( project=project, feature_view_name=feature_view, feature_names=feat_names, ) click.echo(f"Status: {result['status']}") click.echo(f"Features computed: {result['computed_features']}") click.echo(f"Feature views computed: {result['computed_feature_views']}") click.echo(f"Duration: {result['duration_ms']}ms") click.echo("Baseline: SET") elif auto_mode and not set_baseline: click.echo("Auto-computing batch metrics for all granularities...") result = svc.auto_compute( project=project, feature_view_name=feature_view, ) click.echo(f"Status: {result['status']}") click.echo(f"Feature views computed: {result['computed_feature_views']}") click.echo(f"Features computed: {result['computed_features']}") click.echo(f"Granularities: {', '.join(result['granularities'])}") click.echo(f"Duration: {result['duration_ms']}ms") else: start_d = date.fromisoformat(start_date) if start_date else None end_d = date.fromisoformat(end_date) if end_date else None result = svc.compute_metrics( project=project, feature_view_name=feature_view, feature_names=feat_names, start_date=start_d, end_date=end_d, granularity=granularity or "daily", set_baseline=set_baseline, ) click.echo(f"Status: {result['status']}") click.echo(f"Granularity: {result['granularity']}") click.echo(f"Features computed: {result['computed_features']}") click.echo(f"Feature views computed: {result['computed_feature_views']}") click.echo(f"Feature services computed: {result['computed_feature_services']}") click.echo(f"Metric dates: {', '.join(result['metric_dates'])}") click.echo(f"Duration: {result['duration_ms']}ms") if set_baseline: click.echo("Baseline: SET") def _run_log_monitoring( svc, project, feature_service_name, start_date, end_date, granularity, auto_mode ): if auto_mode: click.echo("Auto-computing log metrics for all granularities...") result = svc.auto_compute_log_metrics( project=project, feature_service_name=feature_service_name, ) click.echo(f"Status: {result['status']}") click.echo(f"Feature services computed: {result['computed_feature_services']}") click.echo(f"Features computed: {result['computed_features']}") click.echo(f"Granularities: {', '.join(result['granularities'])}") click.echo(f"Duration: {result['duration_ms']}ms") else: if not feature_service_name: click.echo( "Error: --feature-service is required for log source with explicit dates." ) return start_d = date.fromisoformat(start_date) if start_date else None end_d = date.fromisoformat(end_date) if end_date else None result = svc.compute_log_metrics( project=project, feature_service_name=feature_service_name, start_date=start_d, end_date=end_d, granularity=granularity or "daily", ) click.echo(f"Status: {result['status']}") click.echo("Source: log") click.echo(f"Features computed: {result.get('computed_features', 0)}") click.echo(f"Duration: {result['duration_ms']}ms")