diff --git a/sdk/python/feast/api/registry/rest/__init__.py b/sdk/python/feast/api/registry/rest/__init__.py index 14db40d7af6..6cc5a99934a 100644 --- a/sdk/python/feast/api/registry/rest/__init__.py +++ b/sdk/python/feast/api/registry/rest/__init__.py @@ -7,6 +7,7 @@ from feast.api.registry.rest.features import get_feature_router from feast.api.registry.rest.lineage import get_lineage_router from feast.api.registry.rest.metrics import get_metrics_router +from feast.api.registry.rest.monitoring import get_monitoring_router from feast.api.registry.rest.permissions import get_permission_router from feast.api.registry.rest.projects import get_project_router from feast.api.registry.rest.saved_datasets import get_saved_dataset_router @@ -25,3 +26,4 @@ def register_all_routes(app: FastAPI, grpc_handler, server=None): app.include_router(get_saved_dataset_router(grpc_handler)) app.include_router(get_search_router(grpc_handler)) app.include_router(get_metrics_router(grpc_handler, server)) + app.include_router(get_monitoring_router(grpc_handler, server)) diff --git a/sdk/python/feast/api/registry/rest/monitoring.py b/sdk/python/feast/api/registry/rest/monitoring.py new file mode 100644 index 00000000000..bc2c16261de --- /dev/null +++ b/sdk/python/feast/api/registry/rest/monitoring.py @@ -0,0 +1,232 @@ +import logging +from datetime import date +from typing import List, Optional + +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel, Field + +from feast.monitoring.monitoring_service import MonitoringService +from feast.permissions.action import AuthzedAction +from feast.permissions.security_manager import assert_permissions + +logger = logging.getLogger(__name__) + + +class ComputeMetricsRequest(BaseModel): + project: str = Field(..., description="Feast project name") + feature_view_name: Optional[str] = Field( + None, description="Feature view name (null = all)" + ) + feature_names: Optional[List[str]] = Field( + None, description="Feature names to compute (null = all)" + ) + start_date: Optional[str] = Field( + None, description="Start date (YYYY-MM-DD), defaults to yesterday" + ) + end_date: Optional[str] = Field( + None, description="End date (YYYY-MM-DD), defaults to today" + ) + data_source_type: str = Field(..., description="Data source: 'batch' or 'log'") + set_baseline: bool = Field( + False, description="Mark this computation as the baseline" + ) + + +class ComputeMetricsResponse(BaseModel): + status: str + data_source_type: str + computed_features: int + computed_feature_views: int + computed_feature_services: int + metric_dates: List[str] + duration_ms: int + + +def get_monitoring_router(grpc_handler, server=None) -> APIRouter: + router = APIRouter() + + def _get_monitoring_service() -> MonitoringService: + if server is None or not hasattr(server, "store"): + raise HTTPException( + status_code=500, + detail="Failed to access monitoring service: server store not available", + ) + return MonitoringService(server.store) + + def _assert_fv_permission( + project: str, feature_view_name: str, action: AuthzedAction + ): + try: + fv = server.store.registry.get_feature_view( + name=feature_view_name, project=project + ) + assert_permissions(fv, actions=[action]) + except Exception: + pass + + @router.post( + "/monitoring/compute", + tags=["Monitoring"], + response_model=ComputeMetricsResponse, + ) + async def compute_metrics(request: ComputeMetricsRequest): + if request.data_source_type not in ("batch", "log"): + raise HTTPException( + status_code=422, + detail="data_source_type must be 'batch' or 'log'", + ) + + if request.feature_view_name: + _assert_fv_permission( + request.project, request.feature_view_name, AuthzedAction.UPDATE + ) + + svc = _get_monitoring_service() + + start_d = _parse_date(request.start_date) if request.start_date else None + end_d = _parse_date(request.end_date) if request.end_date else None + + try: + result = svc.compute_metrics( + project=request.project, + feature_view_name=request.feature_view_name, + feature_names=request.feature_names, + start_date=start_d, + end_date=end_d, + data_source_type=request.data_source_type, + set_baseline=request.set_baseline, + ) + return ComputeMetricsResponse(**result) + except NotImplementedError as e: + raise HTTPException(status_code=501, detail=str(e)) + except Exception as e: + logger.exception("Failed to compute monitoring metrics") + raise HTTPException( + status_code=500, + detail=f"Failed to compute monitoring metrics: {str(e)}", + ) + + @router.get("/monitoring/metrics/features", tags=["Monitoring"]) + async def get_feature_metrics( + project: str = Query(...), + feature_view_name: Optional[str] = Query(None), + feature_name: Optional[str] = Query(None), + feature_service_name: Optional[str] = Query(None), + data_source_type: Optional[str] = Query(None), + start_date: Optional[str] = Query(None), + end_date: Optional[str] = Query(None), + ): + if feature_view_name: + _assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE) + + svc = _get_monitoring_service() + return { + "metrics": svc.get_feature_metrics( + project=project, + feature_service_name=feature_service_name, + feature_view_name=feature_view_name, + feature_name=feature_name, + data_source_type=data_source_type, + start_date=_parse_date(start_date) if start_date else None, + end_date=_parse_date(end_date) if end_date else None, + ) + } + + @router.get("/monitoring/metrics/feature_views", tags=["Monitoring"]) + async def get_feature_view_metrics( + project: str = Query(...), + feature_view_name: Optional[str] = Query(None), + feature_service_name: Optional[str] = Query(None), + data_source_type: Optional[str] = Query(None), + start_date: Optional[str] = Query(None), + end_date: Optional[str] = Query(None), + ): + if feature_view_name: + _assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE) + + svc = _get_monitoring_service() + return { + "metrics": svc.get_feature_view_metrics( + project=project, + feature_service_name=feature_service_name, + feature_view_name=feature_view_name, + data_source_type=data_source_type, + start_date=_parse_date(start_date) if start_date else None, + end_date=_parse_date(end_date) if end_date else None, + ) + } + + @router.get("/monitoring/metrics/feature_services", tags=["Monitoring"]) + async def get_feature_service_metrics( + project: str = Query(...), + feature_service_name: Optional[str] = Query(None), + data_source_type: Optional[str] = Query(None), + start_date: Optional[str] = Query(None), + end_date: Optional[str] = Query(None), + ): + svc = _get_monitoring_service() + return { + "metrics": svc.get_feature_service_metrics( + project=project, + feature_service_name=feature_service_name, + data_source_type=data_source_type, + start_date=_parse_date(start_date) if start_date else None, + end_date=_parse_date(end_date) if end_date else None, + ) + } + + @router.get("/monitoring/metrics/baseline", tags=["Monitoring"]) + async def get_baseline( + project: str = Query(...), + feature_view_name: Optional[str] = Query(None), + feature_name: Optional[str] = Query(None), + data_source_type: Optional[str] = Query(None), + ): + if feature_view_name: + _assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE) + + svc = _get_monitoring_service() + return { + "metrics": svc.get_baseline( + project=project, + feature_view_name=feature_view_name, + feature_name=feature_name, + data_source_type=data_source_type, + ) + } + + @router.get("/monitoring/metrics/timeseries", tags=["Monitoring"]) + async def get_timeseries( + project: str = Query(...), + feature_view_name: Optional[str] = Query(None), + feature_name: Optional[str] = Query(None), + feature_service_name: Optional[str] = Query(None), + data_source_type: Optional[str] = Query(None), + start_date: Optional[str] = Query(None), + end_date: Optional[str] = Query(None), + granularity: str = Query("daily"), + ): + if feature_view_name: + _assert_fv_permission(project, feature_view_name, AuthzedAction.DESCRIBE) + + svc = _get_monitoring_service() + metrics = svc.get_timeseries( + project=project, + feature_view_name=feature_view_name, + feature_name=feature_name, + feature_service_name=feature_service_name, + data_source_type=data_source_type, + start_date=_parse_date(start_date) if start_date else None, + end_date=_parse_date(end_date) if end_date else None, + ) + + return { + "granularity": granularity, + "timeseries": metrics, + } + + return router + + +def _parse_date(date_str: str) -> date: + return date.fromisoformat(date_str) diff --git a/sdk/python/feast/cli/cli.py b/sdk/python/feast/cli/cli.py index 1e461af4a28..d0def0b629b 100644 --- a/sdk/python/feast/cli/cli.py +++ b/sdk/python/feast/cli/cli.py @@ -1,622 +1,624 @@ -# Copyright 2019 The Feast Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import json -import logging -from datetime import datetime -from importlib.metadata import version as importlib_version -from pathlib import Path -from typing import List, Optional - -import click -import yaml -from colorama import Fore, Style -from dateutil import parser -from pygments import formatters, highlight, lexers - -from feast import utils -from feast.cli.data_sources import data_sources_cmd -from feast.cli.dbt_import import dbt_cmd -from feast.cli.entities import entities_cmd -from feast.cli.feature_services import feature_services_cmd -from feast.cli.feature_views import feature_views_cmd -from feast.cli.features import ( - features_cmd, - get_historical_features, - get_online_features, -) -from feast.cli.on_demand_feature_views import on_demand_feature_views_cmd -from feast.cli.permissions import feast_permissions_cmd -from feast.cli.projects import projects_cmd -from feast.cli.saved_datasets import saved_datasets_cmd -from feast.cli.serve import ( - serve_command, - serve_offline_command, - serve_registry_command, - serve_transformations_command, -) -from feast.cli.stream_feature_views import stream_feature_views_cmd -from feast.cli.ui import ui -from feast.cli.validation_references import validation_references_cmd -from feast.constants import FEAST_FS_YAML_FILE_PATH_ENV_NAME -from feast.errors import FeastProviderLoginError -from feast.repo_config import load_repo_config -from feast.repo_operations import ( - apply_total, - cli_check_repo, - create_feature_store, - generate_project_name, - init_repo, - plan, - registry_dump, - teardown, -) -from feast.utils import maybe_local_tz - -_logger = logging.getLogger(__name__) - - -class NoOptionDefaultFormat(click.Command): - def format_options(self, ctx: click.Context, formatter: click.HelpFormatter): - """Writes all the options into the formatter if they exist.""" - opts = [] - for param in self.get_params(ctx): - rv = param.get_help_record(ctx) - if rv is not None: - opts.append(rv) - if opts: - with formatter.section("Options(No current command options)"): - formatter.write_dl(opts) - - -@click.group() -@click.option( - "--chdir", - "-c", - envvar="FEATURE_REPO_DIR_ENV_VAR", - help="Switch to a different feature repository directory before executing the given subcommand. Can also be set via the FEATURE_REPO_DIR_ENV_VAR environment variable.", -) -@click.option( - "--log-level", - default="warning", - help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).", -) -@click.option( - "--feature-store-yaml", - "-f", - help=f"Override the directory where the CLI should look for the feature_store.yaml file. Can also be set via the {FEAST_FS_YAML_FILE_PATH_ENV_NAME} environment variable.", -) -@click.pass_context -def cli( - ctx: click.Context, - chdir: Optional[str], - log_level: str, - feature_store_yaml: Optional[str], -): - """ - Feast CLI - - For more information, see our public docs at https://docs.feast.dev/ - """ - ctx.ensure_object(dict) - ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute() - ctx.obj["FS_YAML_FILE"] = ( - Path(feature_store_yaml).absolute() - if feature_store_yaml - else utils.get_default_yaml_file_path(ctx.obj["CHDIR"]) - ) - try: - level = getattr(logging, log_level.upper()) - logging.basicConfig( - format="%(asctime)s %(name)s %(levelname)s: %(message)s", - datefmt="%m/%d/%Y %I:%M:%S %p", - level=level, - ) - # Override the logging level for already created loggers (due to loggers being created at the import time) - # Note, that format & datefmt does not need to be set, because by default child loggers don't override them - - # Also note, that mypy complains that logging.root doesn't have "manager" because of the way it's written. - # So we have to put a type ignore hint for mypy. - for logger_name in logging.root.manager.loggerDict: # type: ignore - if "feast" in logger_name: - logger = logging.getLogger(logger_name) - logger.setLevel(level) - except Exception as e: - raise e - pass - - -@cli.command() -def version(): - """ - Display Feast SDK version - """ - print( - f'{Style.BRIGHT + Fore.BLUE}Feast SDK Version: {Style.BRIGHT + Fore.GREEN}"{importlib_version("feast")}"' - ) - - -@cli.command() -@click.argument("object_id") -@click.pass_context -def delete(ctx: click.Context, object_id: str): - """ - Delete Feast Object - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - store = create_feature_store(ctx) - - e = None - object_type = None - - # Order matters if names can overlap between types, - # though typically they shouldn't in a well-structured feature store. - object_getters_and_types = [ - (store.get_entity, "Entity"), - (store.get_feature_view, "FeatureView"), - (store.get_feature_service, "FeatureService"), - (store.get_data_source, "DataSource"), - (store.get_saved_dataset, "SavedDataset"), - (store.get_validation_reference, "ValidationReference"), - (store.get_stream_feature_view, "StreamFeatureView"), - (store.get_on_demand_feature_view, "OnDemandFeatureView"), - # Add other get_* methods here if needed - ] - - for getter, obj_type_str in object_getters_and_types: - try: - potential_e = getter(object_id) # type: ignore[operator] - if potential_e: - e = potential_e - object_type = obj_type_str - break - except Exception: - pass - - if isinstance(e, list): - e = e[0] - if e: - store.apply([e], objects_to_delete=[e], partial=False) - print( - f"{Style.BRIGHT + Fore.RED}Deleted {Style.BRIGHT + Fore.GREEN}{object_type} {Fore.YELLOW}{object_id} from {Fore.GREEN}{store.project}.{Style.RESET_ALL}" - ) - else: - print( - f"{Style.BRIGHT + Fore.GREEN}Object not found. Deletion skipped.{Style.RESET_ALL}" - ) - - -@cli.command() -@click.pass_context -def configuration(ctx: click.Context): - """ - Display Feast configuration - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - if repo_config: - config_dict = repo_config.model_dump(by_alias=True, exclude_unset=True) - config_dict.pop("repo_path", None) - print(yaml.dump(config_dict, default_flow_style=False, sort_keys=False)) - else: - print("No configuration found.") - - -@cli.command() -@click.pass_context -def endpoint(ctx: click.Context): - """ - Display feature server endpoints - """ - store = create_feature_store(ctx) - endpoint = store.get_feature_server_endpoint() - if endpoint is not None: - _logger.info( - f"Feature server endpoint: {Style.BRIGHT + Fore.GREEN}{endpoint}{Style.RESET_ALL}" - ) - else: - _logger.info("There is no active feature server.") - - -@cli.command("plan", cls=NoOptionDefaultFormat) -@click.option( - "--skip-source-validation", - is_flag=True, - help="Don't validate the data sources by checking for that the tables exist.", -) -@click.option( - "--skip-feature-view-validation", - is_flag=True, - help="Don't validate feature views. Use with caution as this skips important checks.", -) -@click.pass_context -def plan_command( - ctx: click.Context, skip_source_validation: bool, skip_feature_view_validation: bool -): - """ - Create or update a feature store deployment - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - try: - plan(repo_config, repo, skip_source_validation, skip_feature_view_validation) - except FeastProviderLoginError as e: - print(str(e)) - - -@cli.command("apply", cls=NoOptionDefaultFormat) -@click.option( - "--skip-source-validation", - is_flag=True, - help="Don't validate the data sources by checking for that the tables exist.", -) -@click.option( - "--skip-feature-view-validation", - is_flag=True, - help="Don't validate feature views. Use with caution as this skips important checks.", -) -@click.option( - "--no-progress", - is_flag=True, - help="Disable progress bars during apply operation.", -) -@click.option( - "--no-promote", - is_flag=True, - default=False, - help="Save new versions without promoting them to active. " - "New versions are accessible via @v reads and --version materialization.", -) -@click.pass_context -def apply_total_command( - ctx: click.Context, - skip_source_validation: bool, - skip_feature_view_validation: bool, - no_progress: bool, - no_promote: bool, -): - """ - Create or update a feature store deployment - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - - repo_config = load_repo_config(repo, fs_yaml_file) - - # Set environment variable to disable progress if requested - if no_progress: - import os - - os.environ["FEAST_NO_PROGRESS"] = "1" - - try: - apply_total( - repo_config, - repo, - skip_source_validation, - skip_feature_view_validation, - no_promote=no_promote, - ) - except FeastProviderLoginError as e: - print(str(e)) - - -@cli.command("teardown", cls=NoOptionDefaultFormat) -@click.pass_context -def teardown_command(ctx: click.Context): - """ - Tear down deployed feature store infrastructure - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - - teardown(repo_config, repo) - - -@cli.command("registry-dump") -@click.pass_context -def registry_dump_command(ctx: click.Context): - """ - Print contents of the metadata registry - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - - click.echo(registry_dump(repo_config, repo_path=repo)) - - -@cli.command("materialize") -@click.argument("start_ts", required=False) -@click.argument("end_ts", required=False) -@click.option( - "--views", - "-v", - help="Feature views to materialize", - multiple=True, -) -@click.option( - "--disable-event-timestamp", - is_flag=True, - help="Materialize all available data using current datetime as event timestamp (useful when source data lacks event timestamps)", -) -@click.option( - "--version", - "feature_view_version", - default=None, - help="Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view.", -) -@click.pass_context -def materialize_command( - ctx: click.Context, - start_ts: Optional[str], - end_ts: Optional[str], - views: List[str], - disable_event_timestamp: bool, - feature_view_version: Optional[str], -): - """ - Run a (non-incremental) materialization job to ingest data into the online store. Feast - will read all data between START_TS and END_TS from the offline store and write it to the - online store. If you don't specify feature view names using --views, all registered Feature - Views will be materialized. - - START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' - - If --disable-event-timestamp is used, timestamps are not required and all available data will be materialized using the current datetime as the event timestamp. - """ - store = create_feature_store(ctx) - - if disable_event_timestamp: - if start_ts or end_ts: - raise click.UsageError( - "Cannot specify START_TS or END_TS when --disable-event-timestamp is used" - ) - now = datetime.now() - # Query all available data and use current datetime as event timestamp - start_date = datetime( - 1970, 1, 1 - ) # Beginning of time to capture all historical data - end_date = now - else: - if not start_ts or not end_ts: - raise click.UsageError( - "START_TS and END_TS are required unless --disable-event-timestamp is used" - ) - start_date = utils.make_tzaware(parser.parse(start_ts)) - end_date = utils.make_tzaware(parser.parse(end_ts)) - - store.materialize( - feature_views=None if not views else views, - start_date=start_date, - end_date=end_date, - disable_event_timestamp=disable_event_timestamp, - version=feature_view_version, - ) - - -@cli.command("materialize-incremental") -@click.argument("end_ts") -@click.option( - "--views", - "-v", - help="Feature views to incrementally materialize", - multiple=True, -) -@click.option( - "--version", - "feature_view_version", - default=None, - help="Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view.", -) -@click.pass_context -def materialize_incremental_command( - ctx: click.Context, - end_ts: str, - views: List[str], - feature_view_version: Optional[str], -): - """ - Run an incremental materialization job to ingest new data into the online store. Feast will read - all data from the previously ingested point to END_TS from the offline store and write it to the - online store. If you don't specify feature view names using --views, all registered Feature - Views will be incrementally materialized. - - END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' - """ - store = create_feature_store(ctx) - store.materialize_incremental( - feature_views=None if not views else views, - end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), - version=feature_view_version, - ) - - -@cli.command("init") -@click.argument("PROJECT_DIRECTORY", required=False) -@click.option( - "--minimal", "-m", is_flag=True, help="Create an empty project repository" -) -@click.option( - "--template", - "-t", - type=click.Choice( - [ - "local", - "gcp", - "aws", - "snowflake", - "spark", - "postgres", - "hbase", - "cassandra", - "hazelcast", - "couchbase", - "milvus", - "ray", - "ray_rag", - "pytorch_nlp", - ], - case_sensitive=False, - ), - help="Specify a template for the created project", - default="local", -) -@click.option( - "--repo-path", - help="Directory path where the repository will be created (default: create subdirectory with project name)", -) -def init_command(project_directory, minimal: bool, template: str, repo_path: str): - """Create a new Feast repository""" - if not project_directory: - project_directory = generate_project_name() - - if minimal: - template = "minimal" - - init_repo(project_directory, template, repo_path) - - -@cli.command("listen") -@click.option( - "--address", - "-a", - type=click.STRING, - default="localhost:50051", - show_default=True, - help="Address of the gRPC server", -) -@click.option( - "--max_workers", - "-w", - type=click.INT, - default=10, - show_default=False, - help="The maximum number of threads that can be used to execute the gRPC calls", -) -@click.option( - "--registry_ttl_sec", - "-r", - help="Number of seconds after which the registry is refreshed", - type=click.INT, - default=5, - show_default=True, -) -@click.pass_context -def listen_command( - ctx: click.Context, - address: str, - max_workers: int, - registry_ttl_sec: int, -): - """Start a gRPC feature server to ingest streaming features on given address""" - from feast.infra.contrib.grpc_server import get_grpc_server - - store = create_feature_store(ctx) - server = get_grpc_server(address, store, max_workers, registry_ttl_sec) - server.start() - server.wait_for_termination() - - -@cli.command("validate") -@click.option( - "--feature-service", - "-f", - help="Specify a feature service name", -) -@click.option( - "--reference", - "-r", - help="Specify a validation reference name", -) -@click.option( - "--no-profile-cache", - is_flag=True, - help="Do not store cached profile in registry", -) -@click.argument("start_ts") -@click.argument("end_ts") -@click.pass_context -def validate( - ctx: click.Context, - feature_service: str, - reference: str, - start_ts: str, - end_ts: str, - no_profile_cache, -): - """ - Perform validation of logged features (produced by a given feature service) against provided reference. - - START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' - """ - store = create_feature_store(ctx) - - _feature_service = store.get_feature_service(name=feature_service) - _reference = store.get_validation_reference(reference) - - result = store.validate_logged_features( - source=_feature_service, - reference=_reference, - start=maybe_local_tz(datetime.fromisoformat(start_ts)), - end=maybe_local_tz(datetime.fromisoformat(end_ts)), - throw_exception=False, - cache_profile=not no_profile_cache, - ) - - if not result: - print(f"{Style.BRIGHT + Fore.GREEN}Validation successful!{Style.RESET_ALL}") - return - - errors = [e.to_dict() for e in result.report.errors] - formatted_json = json.dumps(errors, indent=4) - colorful_json = highlight( - formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter() - ) - print(f"{Style.BRIGHT + Fore.RED}Validation failed!{Style.RESET_ALL}") - print(colorful_json) - exit(1) - - -cli.add_command(data_sources_cmd) -cli.add_command(entities_cmd) -cli.add_command(feature_services_cmd) -cli.add_command(feature_views_cmd) -cli.add_command(features_cmd) -cli.add_command(get_historical_features) -cli.add_command(get_online_features) -cli.add_command(on_demand_feature_views_cmd) -cli.add_command(feast_permissions_cmd) -cli.add_command(projects_cmd) -cli.add_command(saved_datasets_cmd) -cli.add_command(stream_feature_views_cmd) -cli.add_command(validation_references_cmd) -cli.add_command(ui) -cli.add_command(serve_command) -cli.add_command(serve_offline_command) -cli.add_command(serve_registry_command) -cli.add_command(serve_transformations_command) -cli.add_command(dbt_cmd) - -if __name__ == "__main__": - cli() +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import logging +from datetime import datetime +from importlib.metadata import version as importlib_version +from pathlib import Path +from typing import List, Optional + +import click +import yaml +from colorama import Fore, Style +from dateutil import parser +from pygments import formatters, highlight, lexers + +from feast import utils +from feast.cli.data_sources import data_sources_cmd +from feast.cli.dbt_import import dbt_cmd +from feast.cli.entities import entities_cmd +from feast.cli.feature_services import feature_services_cmd +from feast.cli.feature_views import feature_views_cmd +from feast.cli.features import ( + features_cmd, + get_historical_features, + get_online_features, +) +from feast.cli.monitor import monitor_cmd +from feast.cli.on_demand_feature_views import on_demand_feature_views_cmd +from feast.cli.permissions import feast_permissions_cmd +from feast.cli.projects import projects_cmd +from feast.cli.saved_datasets import saved_datasets_cmd +from feast.cli.serve import ( + serve_command, + serve_offline_command, + serve_registry_command, + serve_transformations_command, +) +from feast.cli.stream_feature_views import stream_feature_views_cmd +from feast.cli.ui import ui +from feast.cli.validation_references import validation_references_cmd +from feast.constants import FEAST_FS_YAML_FILE_PATH_ENV_NAME +from feast.errors import FeastProviderLoginError +from feast.repo_config import load_repo_config +from feast.repo_operations import ( + apply_total, + cli_check_repo, + create_feature_store, + generate_project_name, + init_repo, + plan, + registry_dump, + teardown, +) +from feast.utils import maybe_local_tz + +_logger = logging.getLogger(__name__) + + +class NoOptionDefaultFormat(click.Command): + def format_options(self, ctx: click.Context, formatter: click.HelpFormatter): + """Writes all the options into the formatter if they exist.""" + opts = [] + for param in self.get_params(ctx): + rv = param.get_help_record(ctx) + if rv is not None: + opts.append(rv) + if opts: + with formatter.section("Options(No current command options)"): + formatter.write_dl(opts) + + +@click.group() +@click.option( + "--chdir", + "-c", + envvar="FEATURE_REPO_DIR_ENV_VAR", + help="Switch to a different feature repository directory before executing the given subcommand. Can also be set via the FEATURE_REPO_DIR_ENV_VAR environment variable.", +) +@click.option( + "--log-level", + default="warning", + help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).", +) +@click.option( + "--feature-store-yaml", + "-f", + help=f"Override the directory where the CLI should look for the feature_store.yaml file. Can also be set via the {FEAST_FS_YAML_FILE_PATH_ENV_NAME} environment variable.", +) +@click.pass_context +def cli( + ctx: click.Context, + chdir: Optional[str], + log_level: str, + feature_store_yaml: Optional[str], +): + """ + Feast CLI + + For more information, see our public docs at https://docs.feast.dev/ + """ + ctx.ensure_object(dict) + ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute() + ctx.obj["FS_YAML_FILE"] = ( + Path(feature_store_yaml).absolute() + if feature_store_yaml + else utils.get_default_yaml_file_path(ctx.obj["CHDIR"]) + ) + try: + level = getattr(logging, log_level.upper()) + logging.basicConfig( + format="%(asctime)s %(name)s %(levelname)s: %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + level=level, + ) + # Override the logging level for already created loggers (due to loggers being created at the import time) + # Note, that format & datefmt does not need to be set, because by default child loggers don't override them + + # Also note, that mypy complains that logging.root doesn't have "manager" because of the way it's written. + # So we have to put a type ignore hint for mypy. + for logger_name in logging.root.manager.loggerDict: # type: ignore + if "feast" in logger_name: + logger = logging.getLogger(logger_name) + logger.setLevel(level) + except Exception as e: + raise e + pass + + +@cli.command() +def version(): + """ + Display Feast SDK version + """ + print( + f'{Style.BRIGHT + Fore.BLUE}Feast SDK Version: {Style.BRIGHT + Fore.GREEN}"{importlib_version("feast")}"' + ) + + +@cli.command() +@click.argument("object_id") +@click.pass_context +def delete(ctx: click.Context, object_id: str): + """ + Delete Feast Object + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = create_feature_store(ctx) + + e = None + object_type = None + + # Order matters if names can overlap between types, + # though typically they shouldn't in a well-structured feature store. + object_getters_and_types = [ + (store.get_entity, "Entity"), + (store.get_feature_view, "FeatureView"), + (store.get_feature_service, "FeatureService"), + (store.get_data_source, "DataSource"), + (store.get_saved_dataset, "SavedDataset"), + (store.get_validation_reference, "ValidationReference"), + (store.get_stream_feature_view, "StreamFeatureView"), + (store.get_on_demand_feature_view, "OnDemandFeatureView"), + # Add other get_* methods here if needed + ] + + for getter, obj_type_str in object_getters_and_types: + try: + potential_e = getter(object_id) # type: ignore[operator] + if potential_e: + e = potential_e + object_type = obj_type_str + break + except Exception: + pass + + if isinstance(e, list): + e = e[0] + if e: + store.apply([e], objects_to_delete=[e], partial=False) + print( + f"{Style.BRIGHT + Fore.RED}Deleted {Style.BRIGHT + Fore.GREEN}{object_type} {Fore.YELLOW}{object_id} from {Fore.GREEN}{store.project}.{Style.RESET_ALL}" + ) + else: + print( + f"{Style.BRIGHT + Fore.GREEN}Object not found. Deletion skipped.{Style.RESET_ALL}" + ) + + +@cli.command() +@click.pass_context +def configuration(ctx: click.Context): + """ + Display Feast configuration + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + if repo_config: + config_dict = repo_config.model_dump(by_alias=True, exclude_unset=True) + config_dict.pop("repo_path", None) + print(yaml.dump(config_dict, default_flow_style=False, sort_keys=False)) + else: + print("No configuration found.") + + +@cli.command() +@click.pass_context +def endpoint(ctx: click.Context): + """ + Display feature server endpoints + """ + store = create_feature_store(ctx) + endpoint = store.get_feature_server_endpoint() + if endpoint is not None: + _logger.info( + f"Feature server endpoint: {Style.BRIGHT + Fore.GREEN}{endpoint}{Style.RESET_ALL}" + ) + else: + _logger.info("There is no active feature server.") + + +@cli.command("plan", cls=NoOptionDefaultFormat) +@click.option( + "--skip-source-validation", + is_flag=True, + help="Don't validate the data sources by checking for that the tables exist.", +) +@click.option( + "--skip-feature-view-validation", + is_flag=True, + help="Don't validate feature views. Use with caution as this skips important checks.", +) +@click.pass_context +def plan_command( + ctx: click.Context, skip_source_validation: bool, skip_feature_view_validation: bool +): + """ + Create or update a feature store deployment + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + try: + plan(repo_config, repo, skip_source_validation, skip_feature_view_validation) + except FeastProviderLoginError as e: + print(str(e)) + + +@cli.command("apply", cls=NoOptionDefaultFormat) +@click.option( + "--skip-source-validation", + is_flag=True, + help="Don't validate the data sources by checking for that the tables exist.", +) +@click.option( + "--skip-feature-view-validation", + is_flag=True, + help="Don't validate feature views. Use with caution as this skips important checks.", +) +@click.option( + "--no-progress", + is_flag=True, + help="Disable progress bars during apply operation.", +) +@click.option( + "--no-promote", + is_flag=True, + default=False, + help="Save new versions without promoting them to active. " + "New versions are accessible via @v reads and --version materialization.", +) +@click.pass_context +def apply_total_command( + ctx: click.Context, + skip_source_validation: bool, + skip_feature_view_validation: bool, + no_progress: bool, + no_promote: bool, +): + """ + Create or update a feature store deployment + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + + repo_config = load_repo_config(repo, fs_yaml_file) + + # Set environment variable to disable progress if requested + if no_progress: + import os + + os.environ["FEAST_NO_PROGRESS"] = "1" + + try: + apply_total( + repo_config, + repo, + skip_source_validation, + skip_feature_view_validation, + no_promote=no_promote, + ) + except FeastProviderLoginError as e: + print(str(e)) + + +@cli.command("teardown", cls=NoOptionDefaultFormat) +@click.pass_context +def teardown_command(ctx: click.Context): + """ + Tear down deployed feature store infrastructure + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + + teardown(repo_config, repo) + + +@cli.command("registry-dump") +@click.pass_context +def registry_dump_command(ctx: click.Context): + """ + Print contents of the metadata registry + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + + click.echo(registry_dump(repo_config, repo_path=repo)) + + +@cli.command("materialize") +@click.argument("start_ts", required=False) +@click.argument("end_ts", required=False) +@click.option( + "--views", + "-v", + help="Feature views to materialize", + multiple=True, +) +@click.option( + "--disable-event-timestamp", + is_flag=True, + help="Materialize all available data using current datetime as event timestamp (useful when source data lacks event timestamps)", +) +@click.option( + "--version", + "feature_view_version", + default=None, + help="Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view.", +) +@click.pass_context +def materialize_command( + ctx: click.Context, + start_ts: Optional[str], + end_ts: Optional[str], + views: List[str], + disable_event_timestamp: bool, + feature_view_version: Optional[str], +): + """ + Run a (non-incremental) materialization job to ingest data into the online store. Feast + will read all data between START_TS and END_TS from the offline store and write it to the + online store. If you don't specify feature view names using --views, all registered Feature + Views will be materialized. + + START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' + + If --disable-event-timestamp is used, timestamps are not required and all available data will be materialized using the current datetime as the event timestamp. + """ + store = create_feature_store(ctx) + + if disable_event_timestamp: + if start_ts or end_ts: + raise click.UsageError( + "Cannot specify START_TS or END_TS when --disable-event-timestamp is used" + ) + now = datetime.now() + # Query all available data and use current datetime as event timestamp + start_date = datetime( + 1970, 1, 1 + ) # Beginning of time to capture all historical data + end_date = now + else: + if not start_ts or not end_ts: + raise click.UsageError( + "START_TS and END_TS are required unless --disable-event-timestamp is used" + ) + start_date = utils.make_tzaware(parser.parse(start_ts)) + end_date = utils.make_tzaware(parser.parse(end_ts)) + + store.materialize( + feature_views=None if not views else views, + start_date=start_date, + end_date=end_date, + disable_event_timestamp=disable_event_timestamp, + version=feature_view_version, + ) + + +@cli.command("materialize-incremental") +@click.argument("end_ts") +@click.option( + "--views", + "-v", + help="Feature views to incrementally materialize", + multiple=True, +) +@click.option( + "--version", + "feature_view_version", + default=None, + help="Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view.", +) +@click.pass_context +def materialize_incremental_command( + ctx: click.Context, + end_ts: str, + views: List[str], + feature_view_version: Optional[str], +): + """ + Run an incremental materialization job to ingest new data into the online store. Feast will read + all data from the previously ingested point to END_TS from the offline store and write it to the + online store. If you don't specify feature view names using --views, all registered Feature + Views will be incrementally materialized. + + END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' + """ + store = create_feature_store(ctx) + store.materialize_incremental( + feature_views=None if not views else views, + end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), + version=feature_view_version, + ) + + +@cli.command("init") +@click.argument("PROJECT_DIRECTORY", required=False) +@click.option( + "--minimal", "-m", is_flag=True, help="Create an empty project repository" +) +@click.option( + "--template", + "-t", + type=click.Choice( + [ + "local", + "gcp", + "aws", + "snowflake", + "spark", + "postgres", + "hbase", + "cassandra", + "hazelcast", + "couchbase", + "milvus", + "ray", + "ray_rag", + "pytorch_nlp", + ], + case_sensitive=False, + ), + help="Specify a template for the created project", + default="local", +) +@click.option( + "--repo-path", + help="Directory path where the repository will be created (default: create subdirectory with project name)", +) +def init_command(project_directory, minimal: bool, template: str, repo_path: str): + """Create a new Feast repository""" + if not project_directory: + project_directory = generate_project_name() + + if minimal: + template = "minimal" + + init_repo(project_directory, template, repo_path) + + +@cli.command("listen") +@click.option( + "--address", + "-a", + type=click.STRING, + default="localhost:50051", + show_default=True, + help="Address of the gRPC server", +) +@click.option( + "--max_workers", + "-w", + type=click.INT, + default=10, + show_default=False, + help="The maximum number of threads that can be used to execute the gRPC calls", +) +@click.option( + "--registry_ttl_sec", + "-r", + help="Number of seconds after which the registry is refreshed", + type=click.INT, + default=5, + show_default=True, +) +@click.pass_context +def listen_command( + ctx: click.Context, + address: str, + max_workers: int, + registry_ttl_sec: int, +): + """Start a gRPC feature server to ingest streaming features on given address""" + from feast.infra.contrib.grpc_server import get_grpc_server + + store = create_feature_store(ctx) + server = get_grpc_server(address, store, max_workers, registry_ttl_sec) + server.start() + server.wait_for_termination() + + +@cli.command("validate") +@click.option( + "--feature-service", + "-f", + help="Specify a feature service name", +) +@click.option( + "--reference", + "-r", + help="Specify a validation reference name", +) +@click.option( + "--no-profile-cache", + is_flag=True, + help="Do not store cached profile in registry", +) +@click.argument("start_ts") +@click.argument("end_ts") +@click.pass_context +def validate( + ctx: click.Context, + feature_service: str, + reference: str, + start_ts: str, + end_ts: str, + no_profile_cache, +): + """ + Perform validation of logged features (produced by a given feature service) against provided reference. + + START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' + """ + store = create_feature_store(ctx) + + _feature_service = store.get_feature_service(name=feature_service) + _reference = store.get_validation_reference(reference) + + result = store.validate_logged_features( + source=_feature_service, + reference=_reference, + start=maybe_local_tz(datetime.fromisoformat(start_ts)), + end=maybe_local_tz(datetime.fromisoformat(end_ts)), + throw_exception=False, + cache_profile=not no_profile_cache, + ) + + if not result: + print(f"{Style.BRIGHT + Fore.GREEN}Validation successful!{Style.RESET_ALL}") + return + + errors = [e.to_dict() for e in result.report.errors] + formatted_json = json.dumps(errors, indent=4) + colorful_json = highlight( + formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter() + ) + print(f"{Style.BRIGHT + Fore.RED}Validation failed!{Style.RESET_ALL}") + print(colorful_json) + exit(1) + + +cli.add_command(data_sources_cmd) +cli.add_command(entities_cmd) +cli.add_command(feature_services_cmd) +cli.add_command(feature_views_cmd) +cli.add_command(features_cmd) +cli.add_command(get_historical_features) +cli.add_command(get_online_features) +cli.add_command(on_demand_feature_views_cmd) +cli.add_command(feast_permissions_cmd) +cli.add_command(projects_cmd) +cli.add_command(saved_datasets_cmd) +cli.add_command(stream_feature_views_cmd) +cli.add_command(validation_references_cmd) +cli.add_command(ui) +cli.add_command(serve_command) +cli.add_command(serve_offline_command) +cli.add_command(serve_registry_command) +cli.add_command(serve_transformations_command) +cli.add_command(dbt_cmd) +cli.add_command(monitor_cmd) + +if __name__ == "__main__": + cli() diff --git a/sdk/python/feast/cli/monitor.py b/sdk/python/feast/cli/monitor.py new file mode 100644 index 00000000000..3b37babcd63 --- /dev/null +++ b/sdk/python/feast/cli/monitor.py @@ -0,0 +1,100 @@ +from datetime import date +from typing import List, Optional + +import click + +from feast.repo_operations import create_feature_store + + +@click.group(name="monitor") +def monitor_cmd(): + """Feature monitoring commands.""" + pass + + +@monitor_cmd.command("run") +@click.option( + "--project", + "-p", + default=None, + help="Feast project name. Defaults to the project in feature_store.yaml.", +) +@click.option( + "--feature-view", + "-v", + default=None, + help="Feature view name. If omitted, all feature views are computed.", +) +@click.option( + "--feature-name", + "-f", + multiple=True, + help="Feature name(s) to compute. Can be specified multiple times. If omitted, all features are computed.", +) +@click.option( + "--start-date", + default=None, + help="Start date (YYYY-MM-DD). Defaults to yesterday.", +) +@click.option( + "--end-date", + default=None, + help="End date (YYYY-MM-DD). Defaults to today.", +) +@click.option( + "--data-source", + required=True, + type=click.Choice(["batch", "log"]), + help="Data source type: 'batch' (offline store) or 'log' (feature logs).", +) +@click.option( + "--set-baseline", + is_flag=True, + default=False, + help="Mark this computation as the baseline for drift detection.", +) +@click.pass_context +def monitor_run( + ctx: click.Context, + project: Optional[str], + feature_view: Optional[str], + feature_name: tuple, + start_date: Optional[str], + end_date: Optional[str], + data_source: str, + set_baseline: bool, +): + """Compute feature quality metrics from batch source or feature logs.""" + store = create_feature_store(ctx) + + if project is None: + project = store.project + + from feast.monitoring.monitoring_service import MonitoringService + + svc = MonitoringService(store) + + start_d = date.fromisoformat(start_date) if start_date else None + end_d = date.fromisoformat(end_date) if end_date else None + feat_names: Optional[List[str]] = list(feature_name) if feature_name else None + + result = svc.compute_metrics( + project=project, + feature_view_name=feature_view, + feature_names=feat_names, + start_date=start_d, + end_date=end_d, + data_source_type=data_source, + set_baseline=set_baseline, + ) + + click.echo(f"Status: {result['status']}") + click.echo(f"Data source: {result['data_source_type']}") + click.echo(f"Features computed: {result['computed_features']}") + click.echo(f"Feature views computed: {result['computed_feature_views']}") + click.echo(f"Feature services computed: {result['computed_feature_services']}") + click.echo(f"Metric dates: {', '.join(result['metric_dates'])}") + click.echo(f"Duration: {result['duration_ms']}ms") + + if set_baseline: + click.echo("Baseline: SET") diff --git a/sdk/python/feast/monitoring/__init__.py b/sdk/python/feast/monitoring/__init__.py new file mode 100644 index 00000000000..8d9f8e46da8 --- /dev/null +++ b/sdk/python/feast/monitoring/__init__.py @@ -0,0 +1,7 @@ +from feast.monitoring.metrics_calculator import MetricsCalculator +from feast.monitoring.monitoring_store import MonitoringStore + +__all__ = [ + "MetricsCalculator", + "MonitoringStore", +] diff --git a/sdk/python/feast/monitoring/metrics_calculator.py b/sdk/python/feast/monitoring/metrics_calculator.py new file mode 100644 index 00000000000..227609351cf --- /dev/null +++ b/sdk/python/feast/monitoring/metrics_calculator.py @@ -0,0 +1,214 @@ +import logging +from typing import Any, Dict, List, Optional, Tuple + +import numpy as np +import pyarrow as pa +import pyarrow.compute as pc + +from feast.types import PrimitiveFeastType +from feast.value_type import ValueType + +logger = logging.getLogger(__name__) + +_NUMERIC_PRIMITIVE_TYPES = { + PrimitiveFeastType.INT32, + PrimitiveFeastType.INT64, + PrimitiveFeastType.FLOAT32, + PrimitiveFeastType.FLOAT64, + PrimitiveFeastType.UNIX_TIMESTAMP, +} + +_NUMERIC_VALUE_TYPES = { + ValueType.INT32, + ValueType.INT64, + ValueType.FLOAT, + ValueType.DOUBLE, + ValueType.UNIX_TIMESTAMP, +} + +_CATEGORICAL_PRIMITIVE_TYPES = { + PrimitiveFeastType.STRING, + PrimitiveFeastType.BOOL, + PrimitiveFeastType.BYTES, +} + +_CATEGORICAL_VALUE_TYPES = { + ValueType.STRING, + ValueType.BOOL, + ValueType.BYTES, +} + + +class MetricsCalculator: + def __init__(self, histogram_bins: int = 20, top_n: int = 10): + self.histogram_bins = histogram_bins + self.top_n = top_n + + @staticmethod + def classify_feature(dtype) -> Optional[str]: + """Classify a Feast type as 'numeric', 'categorical', or None if unsupported.""" + if isinstance(dtype, PrimitiveFeastType): + if dtype in _NUMERIC_PRIMITIVE_TYPES: + return "numeric" + if dtype in _CATEGORICAL_PRIMITIVE_TYPES: + return "categorical" + elif isinstance(dtype, ValueType): + if dtype in _NUMERIC_VALUE_TYPES: + return "numeric" + if dtype in _CATEGORICAL_VALUE_TYPES: + return "categorical" + return None + + def compute_numeric(self, array: pa.Array) -> Dict[str, Any]: + row_count = len(array) + null_count = array.null_count + + result: Dict[str, Any] = { + "feature_type": "numeric", + "row_count": row_count, + "null_count": null_count, + "null_rate": null_count / row_count if row_count > 0 else None, + "mean": None, + "stddev": None, + "min_val": None, + "max_val": None, + "p50": None, + "p75": None, + "p90": None, + "p95": None, + "p99": None, + "histogram": None, + } + + valid = pc.drop_null(array) + if len(valid) == 0: + return result + + float_array = pc.cast(valid, pa.float64()) + + result["mean"] = pc.mean(float_array).as_py() + result["stddev"] = pc.stddev(float_array, ddof=1).as_py() + + min_max = pc.min_max(float_array) + result["min_val"] = min_max["min"].as_py() + result["max_val"] = min_max["max"].as_py() + + quantiles = pc.quantile(float_array, q=[0.50, 0.75, 0.90, 0.95, 0.99]) + q_values = quantiles.to_pylist() + result["p50"] = q_values[0] + result["p75"] = q_values[1] + result["p90"] = q_values[2] + result["p95"] = q_values[3] + result["p99"] = q_values[4] + + result["histogram"] = self._compute_numeric_histogram(float_array) + + return result + + def compute_categorical(self, array: pa.Array) -> Dict[str, Any]: + row_count = len(array) + null_count = array.null_count + + result: Dict[str, Any] = { + "feature_type": "categorical", + "row_count": row_count, + "null_count": null_count, + "null_rate": null_count / row_count if row_count > 0 else None, + "mean": None, + "stddev": None, + "min_val": None, + "max_val": None, + "p50": None, + "p75": None, + "p90": None, + "p95": None, + "p99": None, + "histogram": None, + } + + valid = pc.drop_null(array) + if len(valid) == 0: + return result + + str_array = pc.cast(valid, pa.string()) + value_counts = pc.value_counts(str_array) + + values_list = value_counts.field("values").to_pylist() + counts_list = value_counts.field("counts").to_pylist() + + pairs = sorted(zip(values_list, counts_list), key=lambda x: x[1], reverse=True) + unique_count = len(pairs) + + top_n_pairs = pairs[: self.top_n] + other_count = sum(c for _, c in pairs[self.top_n :]) + + result["histogram"] = { + "values": [{"value": v, "count": c} for v, c in top_n_pairs], + "other_count": other_count, + "unique_count": unique_count, + } + + return result + + def compute_all( + self, table: pa.Table, feature_fields: List[Tuple[str, str]] + ) -> List[Dict[str, Any]]: + """Compute metrics for all features in the table. + + Args: + table: PyArrow table with feature data. + feature_fields: List of (feature_name, feature_type) where + feature_type is 'numeric' or 'categorical'. + + Returns: + List of metric dicts, one per feature. + """ + results = [] + for feature_name, feature_type in feature_fields: + if feature_name not in table.column_names: + logger.warning( + "Feature '%s' not found in table columns, skipping", feature_name + ) + continue + + column = table.column(feature_name) + if feature_type == "numeric": + metrics = self.compute_numeric(column) + elif feature_type == "categorical": + metrics = self.compute_categorical(column) + else: + logger.warning( + "Unsupported feature type '%s' for '%s', skipping", + feature_type, + feature_name, + ) + continue + + metrics["feature_name"] = feature_name + results.append(metrics) + + return results + + def _compute_numeric_histogram(self, float_array: pa.Array) -> Dict[str, Any]: + np_array = float_array.to_numpy() + if len(np_array) == 0: + return {"bins": [], "counts": [], "bin_width": 0.0} + + min_val = float(np.min(np_array)) + max_val = float(np.max(np_array)) + + if min_val == max_val: + return { + "bins": [min_val, min_val], + "counts": [int(len(np_array))], + "bin_width": 0.0, + } + + counts, bin_edges = np.histogram(np_array, bins=self.histogram_bins) + bin_width = float(bin_edges[1] - bin_edges[0]) if len(bin_edges) > 1 else 0.0 + + return { + "bins": [float(b) for b in bin_edges], + "counts": [int(c) for c in counts], + "bin_width": bin_width, + } diff --git a/sdk/python/feast/monitoring/monitoring_service.py b/sdk/python/feast/monitoring/monitoring_service.py new file mode 100644 index 00000000000..61ec07778ef --- /dev/null +++ b/sdk/python/feast/monitoring/monitoring_service.py @@ -0,0 +1,431 @@ +import logging +import time +from datetime import date, datetime, timedelta, timezone +from typing import Any, Dict, List, Optional + +from feast.monitoring.metrics_calculator import MetricsCalculator +from feast.monitoring.monitoring_store import MonitoringStore + +logger = logging.getLogger(__name__) + + +class MonitoringService: + def __init__(self, store: "FeatureStore"): # noqa: F821 + self._store = store + self._monitoring_store: Optional[MonitoringStore] = None + self._calculator = MetricsCalculator() + + @property + def monitoring_store(self) -> MonitoringStore: + if self._monitoring_store is None: + offline_store_config = self._store.config.offline_store + self._monitoring_store = MonitoringStore(offline_store_config) + self._monitoring_store.ensure_tables() + return self._monitoring_store + + def compute_metrics( + self, + project: str, + feature_view_name: Optional[str] = None, + feature_names: Optional[List[str]] = None, + start_date: Optional[date] = None, + end_date: Optional[date] = None, + data_source_type: str = "batch", + set_baseline: bool = False, + ) -> Dict[str, Any]: + start_time = time.time() + today = date.today() + if end_date is None: + end_date = today + if start_date is None: + start_date = end_date - timedelta(days=1) + + start_dt = datetime( + start_date.year, start_date.month, start_date.day, tzinfo=timezone.utc + ) + end_dt = datetime( + end_date.year, end_date.month, end_date.day, 23, 59, 59, tzinfo=timezone.utc + ) + + feature_views = self._resolve_feature_views(project, feature_view_name) + + total_features = 0 + total_views = 0 + computed_dates: set = set() + + for fv in feature_views: + try: + fv_metrics = self._compute_for_feature_view( + project=project, + feature_view=fv, + feature_names=feature_names, + start_dt=start_dt, + end_dt=end_dt, + data_source_type=data_source_type, + set_baseline=set_baseline, + ) + total_features += fv_metrics["feature_count"] + total_views += 1 + computed_dates.update(fv_metrics["dates"]) + except Exception: + logger.exception( + "Failed to compute metrics for feature view '%s'", fv.name + ) + + total_services = self._compute_feature_service_metrics( + project=project, + data_source_type=data_source_type, + metric_dates=list(computed_dates), + set_baseline=set_baseline, + ) + + duration_ms = int((time.time() - start_time) * 1000) + + return { + "status": "completed", + "data_source_type": data_source_type, + "computed_features": total_features, + "computed_feature_views": total_views, + "computed_feature_services": total_services, + "metric_dates": sorted(d.isoformat() for d in computed_dates), + "duration_ms": duration_ms, + } + + def get_feature_metrics( + self, + project: str, + feature_service_name: Optional[str] = None, + **kwargs, + ) -> List[Dict[str, Any]]: + if feature_service_name: + return self._get_metrics_by_service( + project, + feature_service_name, + lambda fv_name: self.monitoring_store.get_feature_metrics( + project=project, feature_view_name=fv_name, **kwargs + ), + ) + return self.monitoring_store.get_feature_metrics(project=project, **kwargs) + + def get_feature_view_metrics( + self, + project: str, + feature_service_name: Optional[str] = None, + **kwargs, + ) -> List[Dict[str, Any]]: + if feature_service_name: + return self._get_metrics_by_service( + project, + feature_service_name, + lambda fv_name: self.monitoring_store.get_feature_view_metrics( + project=project, feature_view_name=fv_name, **kwargs + ), + ) + return self.monitoring_store.get_feature_view_metrics(project=project, **kwargs) + + def get_feature_service_metrics(self, **kwargs) -> List[Dict[str, Any]]: + return self.monitoring_store.get_feature_service_metrics(**kwargs) + + def get_baseline(self, **kwargs) -> List[Dict[str, Any]]: + return self.monitoring_store.get_baseline(**kwargs) + + def get_timeseries( + self, + project: str, + feature_view_name: Optional[str] = None, + feature_name: Optional[str] = None, + feature_service_name: Optional[str] = None, + data_source_type: Optional[str] = None, + start_date: Optional[date] = None, + end_date: Optional[date] = None, + ) -> List[Dict[str, Any]]: + return self.get_feature_metrics( + project=project, + feature_service_name=feature_service_name, + feature_view_name=feature_view_name, + feature_name=feature_name, + data_source_type=data_source_type, + start_date=start_date, + end_date=end_date, + ) + + # -- Private -- + + def _get_metrics_by_service( + self, project: str, feature_service_name: str, query_fn + ): + """Resolve feature views belonging to a service, then collect metrics for each.""" + fs = self._store.registry.get_feature_service( + name=feature_service_name, project=project + ) + fv_names = [proj.name for proj in fs.feature_view_projections] + results = [] + for fv_name in fv_names: + results.extend(query_fn(fv_name)) + return results + + def _resolve_feature_views(self, project: str, feature_view_name: Optional[str]): + if feature_view_name: + fv = self._store.registry.get_feature_view( + name=feature_view_name, project=project + ) + return [fv] + return self._store.registry.list_feature_views(project=project) + + def _compute_for_feature_view( + self, + project: str, + feature_view, + feature_names: Optional[List[str]], + start_dt: datetime, + end_dt: datetime, + data_source_type: str, + set_baseline: bool, + ) -> Dict[str, Any]: + fields = feature_view.features + if feature_names: + fields = [f for f in fields if f.name in feature_names] + + feature_fields = [] + for field in fields: + ftype = MetricsCalculator.classify_feature(field.dtype) + if ftype is None: + logger.warning( + "Unsupported dtype '%s' for feature '%s', skipping", + field.dtype, + field.name, + ) + continue + feature_fields.append((field.name, ftype)) + + if not feature_fields: + return {"feature_count": 0, "dates": set()} + + arrow_table = self._read_source_data( + feature_view=feature_view, + feature_fields=feature_fields, + start_dt=start_dt, + end_dt=end_dt, + data_source_type=data_source_type, + ) + + metrics_list = self._calculator.compute_all(arrow_table, feature_fields) + + now = datetime.now(timezone.utc) + metric_date = start_dt.date() + computed_dates = {metric_date} + + if set_baseline: + self.monitoring_store.clear_baseline( + project=project, + feature_view_name=feature_view.name, + data_source_type=data_source_type, + ) + + feature_metric_rows = [] + for m in metrics_list: + m["project_id"] = project + m["feature_view_name"] = feature_view.name + m["metric_date"] = metric_date + m["data_source_type"] = data_source_type + m["computed_at"] = now + m["is_baseline"] = set_baseline + feature_metric_rows.append(m) + + self.monitoring_store.save_feature_metrics(feature_metric_rows) + + null_rates = [ + m["null_rate"] for m in metrics_list if m.get("null_rate") is not None + ] + view_metric = { + "project_id": project, + "feature_view_name": feature_view.name, + "metric_date": metric_date, + "data_source_type": data_source_type, + "computed_at": now, + "is_baseline": set_baseline, + "total_row_count": metrics_list[0]["row_count"] if metrics_list else 0, + "total_features": len(metrics_list), + "features_with_nulls": sum( + 1 for m in metrics_list if (m.get("null_count") or 0) > 0 + ), + "avg_null_rate": sum(null_rates) / len(null_rates) if null_rates else 0.0, + "max_null_rate": max(null_rates) if null_rates else 0.0, + } + self.monitoring_store.save_feature_view_metrics([view_metric]) + + return {"feature_count": len(metrics_list), "dates": computed_dates} + + def _read_source_data( + self, + feature_view, + feature_fields, + start_dt: datetime, + end_dt: datetime, + data_source_type: str, + ): + if data_source_type == "log": + return self._read_log_source(feature_view, feature_fields, start_dt, end_dt) + + return self._read_batch_source(feature_view, feature_fields, start_dt, end_dt) + + def _read_batch_source(self, feature_view, feature_fields, start_dt, end_dt): + config = self._store.config + data_source = feature_view.batch_source + + join_key_columns = ( + [ + entity.name + for entity in self._store.registry.list_entities(project=config.project) + if entity.name in (feature_view.entities or []) + ] + or feature_view.entities + or [] + ) + + feature_name_columns = [name for name, _ in feature_fields] + timestamp_field = data_source.timestamp_field + created_timestamp_column = data_source.created_timestamp_column + + provider = self._store._get_provider() + offline_store = provider.offline_store + + retrieval_job = offline_store.pull_all_from_table_or_query( + config=config, + data_source=data_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_dt, + end_date=end_dt, + ) + + return retrieval_job.to_arrow() + + def _read_log_source(self, feature_view, feature_fields, start_dt, end_dt): + """Read feature data from a feature service's logging destination. + + Feature logs are stored per FeatureService via LoggingConfig. The log schema + namespaces feature columns as '{fv_name}__{feature_name}'. This method finds + the service that contains the given feature view, reads from its log sink, + and renames columns back to plain feature names. + """ + from feast.feature_logging import FeatureServiceLoggingSource + + config = self._store.config + project = config.project + + feature_service = self._find_logging_service_for_view(feature_view, project) + + logging_source = FeatureServiceLoggingSource(feature_service, project) + ts_column = logging_source.get_log_timestamp_column() + log_data_source = feature_service.logging_config.destination.to_data_source() + + fv_name = feature_view.name + log_feature_columns = [f"{fv_name}__{name}" for name, _ in feature_fields] + + provider = self._store._get_provider() + offline_store = provider.offline_store + + retrieval_job = offline_store.pull_all_from_table_or_query( + config=config, + data_source=log_data_source, + join_key_columns=[], + feature_name_columns=log_feature_columns, + timestamp_field=ts_column, + start_date=start_dt, + end_date=end_dt, + ) + + table = retrieval_job.to_arrow() + + rename_map = {f"{fv_name}__{name}": name for name, _ in feature_fields} + new_names = [rename_map.get(col, col) for col in table.column_names] + table = table.rename_columns(new_names) + + return table + + def _find_logging_service_for_view(self, feature_view, project: str): + """Find a FeatureService with logging configured that contains the given feature view.""" + feature_services = self._store.registry.list_feature_services(project=project) + + for fs in feature_services: + if fs.logging_config is None: + continue + fv_names = [proj.name for proj in fs.feature_view_projections] + if feature_view.name in fv_names: + return fs + + raise ValueError( + f"Failed to find a FeatureService with logging configured that " + f"contains feature view '{feature_view.name}'. " + f"To use data_source_type='log', configure logging on a FeatureService " + f"that includes this feature view." + ) + + def _compute_feature_service_metrics( + self, + project: str, + data_source_type: str, + metric_dates: List[date], + set_baseline: bool, + ) -> int: + if not metric_dates: + return 0 + + feature_services = self._store.registry.list_feature_services(project=project) + if not feature_services: + return 0 + + now = datetime.now(timezone.utc) + count = 0 + + for fs in feature_services: + try: + fv_names = [proj.name for proj in fs.feature_view_projections] + + for metric_date in metric_dates: + fv_metrics = self.monitoring_store.get_feature_view_metrics( + project=project, + data_source_type=data_source_type, + start_date=metric_date, + end_date=metric_date, + ) + + relevant = [ + m for m in fv_metrics if m.get("feature_view_name") in fv_names + ] + if not relevant: + continue + + null_rates = [ + m["avg_null_rate"] + for m in relevant + if m.get("avg_null_rate") is not None + ] + + total_features = sum(m.get("total_features", 0) for m in relevant) + + service_metric = { + "project_id": project, + "feature_service_name": fs.name, + "metric_date": metric_date + if isinstance(metric_date, date) + else date.fromisoformat(str(metric_date)), + "data_source_type": data_source_type, + "computed_at": now, + "is_baseline": set_baseline, + "total_feature_views": len(relevant), + "total_features": total_features, + "avg_null_rate": ( + sum(null_rates) / len(null_rates) if null_rates else 0.0 + ), + "max_null_rate": max(null_rates) if null_rates else 0.0, + } + self.monitoring_store.save_feature_service_metrics([service_metric]) + count += 1 + except Exception: + logger.exception("Failed to compute service metrics for '%s'", fs.name) + + return count diff --git a/sdk/python/feast/monitoring/monitoring_store.py b/sdk/python/feast/monitoring/monitoring_store.py new file mode 100644 index 00000000000..ee45b53aba7 --- /dev/null +++ b/sdk/python/feast/monitoring/monitoring_store.py @@ -0,0 +1,383 @@ +import json +import logging +from datetime import date, datetime +from typing import Any, Dict, List, Optional + +from psycopg import sql + +from feast.infra.utils.postgres.connection_utils import _get_conn +from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig + +logger = logging.getLogger(__name__) + +_FEATURE_METRICS_TABLE = "feast_monitoring_feature_metrics" +_FEATURE_VIEW_METRICS_TABLE = "feast_monitoring_feature_view_metrics" +_FEATURE_SERVICE_METRICS_TABLE = "feast_monitoring_feature_service_metrics" + +_FEATURE_METRICS_COLUMNS = [ + "project_id", + "feature_view_name", + "feature_name", + "metric_date", + "data_source_type", + "computed_at", + "is_baseline", + "feature_type", + "row_count", + "null_count", + "null_rate", + "mean", + "stddev", + "min_val", + "max_val", + "p50", + "p75", + "p90", + "p95", + "p99", + "histogram", +] + +_FEATURE_VIEW_METRICS_COLUMNS = [ + "project_id", + "feature_view_name", + "metric_date", + "data_source_type", + "computed_at", + "is_baseline", + "total_row_count", + "total_features", + "features_with_nulls", + "avg_null_rate", + "max_null_rate", +] + +_FEATURE_SERVICE_METRICS_COLUMNS = [ + "project_id", + "feature_service_name", + "metric_date", + "data_source_type", + "computed_at", + "is_baseline", + "total_feature_views", + "total_features", + "avg_null_rate", + "max_null_rate", +] + + +class MonitoringStore: + def __init__(self, config: PostgreSQLConfig): + self._config = config + + def ensure_tables(self) -> None: + with _get_conn(self._config) as conn, conn.cursor() as cur: + cur.execute(f""" + CREATE TABLE IF NOT EXISTS {_FEATURE_METRICS_TABLE} ( + project_id VARCHAR(255) NOT NULL, + feature_view_name VARCHAR(255) NOT NULL, + feature_name VARCHAR(255) NOT NULL, + metric_date DATE NOT NULL, + data_source_type VARCHAR(50) NOT NULL DEFAULT 'batch', + computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + is_baseline BOOLEAN NOT NULL DEFAULT FALSE, + feature_type VARCHAR(50) NOT NULL, + row_count BIGINT, + null_count BIGINT, + null_rate DOUBLE PRECISION, + mean DOUBLE PRECISION, + stddev DOUBLE PRECISION, + min_val DOUBLE PRECISION, + max_val DOUBLE PRECISION, + p50 DOUBLE PRECISION, + p75 DOUBLE PRECISION, + p90 DOUBLE PRECISION, + p95 DOUBLE PRECISION, + p99 DOUBLE PRECISION, + histogram JSONB, + PRIMARY KEY (project_id, feature_view_name, feature_name, metric_date, data_source_type) + ); + CREATE INDEX IF NOT EXISTS idx_fm_feature_metrics_project + ON {_FEATURE_METRICS_TABLE} (project_id); + CREATE INDEX IF NOT EXISTS idx_fm_feature_metrics_view + ON {_FEATURE_METRICS_TABLE} (project_id, feature_view_name); + CREATE INDEX IF NOT EXISTS idx_fm_feature_metrics_date + ON {_FEATURE_METRICS_TABLE} (metric_date); + CREATE INDEX IF NOT EXISTS idx_fm_feature_metrics_source_type + ON {_FEATURE_METRICS_TABLE} (data_source_type); + CREATE INDEX IF NOT EXISTS idx_fm_feature_metrics_baseline + ON {_FEATURE_METRICS_TABLE} (project_id, feature_view_name, feature_name) + WHERE is_baseline = TRUE; + """) + + cur.execute(f""" + CREATE TABLE IF NOT EXISTS {_FEATURE_VIEW_METRICS_TABLE} ( + project_id VARCHAR(255) NOT NULL, + feature_view_name VARCHAR(255) NOT NULL, + metric_date DATE NOT NULL, + data_source_type VARCHAR(50) NOT NULL DEFAULT 'batch', + computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + is_baseline BOOLEAN NOT NULL DEFAULT FALSE, + total_row_count BIGINT, + total_features INTEGER, + features_with_nulls INTEGER, + avg_null_rate DOUBLE PRECISION, + max_null_rate DOUBLE PRECISION, + PRIMARY KEY (project_id, feature_view_name, metric_date, data_source_type) + ); + """) + + cur.execute(f""" + CREATE TABLE IF NOT EXISTS {_FEATURE_SERVICE_METRICS_TABLE} ( + project_id VARCHAR(255) NOT NULL, + feature_service_name VARCHAR(255) NOT NULL, + metric_date DATE NOT NULL, + data_source_type VARCHAR(50) NOT NULL DEFAULT 'batch', + computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + is_baseline BOOLEAN NOT NULL DEFAULT FALSE, + total_feature_views INTEGER, + total_features INTEGER, + avg_null_rate DOUBLE PRECISION, + max_null_rate DOUBLE PRECISION, + PRIMARY KEY (project_id, feature_service_name, metric_date, data_source_type) + ); + """) + conn.commit() + + def save_feature_metrics(self, metrics: List[Dict[str, Any]]) -> None: + if not metrics: + return + self._upsert( + _FEATURE_METRICS_TABLE, + _FEATURE_METRICS_COLUMNS, + [ + "project_id", + "feature_view_name", + "feature_name", + "metric_date", + "data_source_type", + ], + metrics, + ) + + def save_feature_view_metrics(self, metrics: List[Dict[str, Any]]) -> None: + if not metrics: + return + self._upsert( + _FEATURE_VIEW_METRICS_TABLE, + _FEATURE_VIEW_METRICS_COLUMNS, + ["project_id", "feature_view_name", "metric_date", "data_source_type"], + metrics, + ) + + def save_feature_service_metrics(self, metrics: List[Dict[str, Any]]) -> None: + if not metrics: + return + self._upsert( + _FEATURE_SERVICE_METRICS_TABLE, + _FEATURE_SERVICE_METRICS_COLUMNS, + ["project_id", "feature_service_name", "metric_date", "data_source_type"], + metrics, + ) + + def get_feature_metrics( + self, + project: str, + feature_view_name: Optional[str] = None, + feature_name: Optional[str] = None, + data_source_type: Optional[str] = None, + start_date: Optional[date] = None, + end_date: Optional[date] = None, + ) -> List[Dict[str, Any]]: + return self._query_metrics( + _FEATURE_METRICS_TABLE, + _FEATURE_METRICS_COLUMNS, + project=project, + filters={ + "feature_view_name": feature_view_name, + "feature_name": feature_name, + "data_source_type": data_source_type, + }, + start_date=start_date, + end_date=end_date, + ) + + def get_feature_view_metrics( + self, + project: str, + feature_view_name: Optional[str] = None, + data_source_type: Optional[str] = None, + start_date: Optional[date] = None, + end_date: Optional[date] = None, + ) -> List[Dict[str, Any]]: + return self._query_metrics( + _FEATURE_VIEW_METRICS_TABLE, + _FEATURE_VIEW_METRICS_COLUMNS, + project=project, + filters={ + "feature_view_name": feature_view_name, + "data_source_type": data_source_type, + }, + start_date=start_date, + end_date=end_date, + ) + + def get_feature_service_metrics( + self, + project: str, + feature_service_name: Optional[str] = None, + data_source_type: Optional[str] = None, + start_date: Optional[date] = None, + end_date: Optional[date] = None, + ) -> List[Dict[str, Any]]: + return self._query_metrics( + _FEATURE_SERVICE_METRICS_TABLE, + _FEATURE_SERVICE_METRICS_COLUMNS, + project=project, + filters={ + "feature_service_name": feature_service_name, + "data_source_type": data_source_type, + }, + start_date=start_date, + end_date=end_date, + ) + + def get_baseline( + self, + project: str, + feature_view_name: Optional[str] = None, + feature_name: Optional[str] = None, + data_source_type: Optional[str] = None, + ) -> List[Dict[str, Any]]: + return self._query_metrics( + _FEATURE_METRICS_TABLE, + _FEATURE_METRICS_COLUMNS, + project=project, + filters={ + "feature_view_name": feature_view_name, + "feature_name": feature_name, + "data_source_type": data_source_type, + "is_baseline": True, + }, + ) + + def clear_baseline( + self, + project: str, + feature_view_name: Optional[str] = None, + feature_name: Optional[str] = None, + data_source_type: Optional[str] = None, + ) -> None: + conditions = [sql.SQL("project_id = %s")] + params: list = [project] + + if feature_view_name: + conditions.append(sql.SQL("feature_view_name = %s")) + params.append(feature_view_name) + if feature_name: + conditions.append(sql.SQL("feature_name = %s")) + params.append(feature_name) + if data_source_type: + conditions.append(sql.SQL("data_source_type = %s")) + params.append(data_source_type) + + conditions.append(sql.SQL("is_baseline = TRUE")) + + query = sql.SQL("UPDATE {} SET is_baseline = FALSE WHERE {}").format( + sql.Identifier(_FEATURE_METRICS_TABLE), + sql.SQL(" AND ").join(conditions), + ) + + with _get_conn(self._config) as conn, conn.cursor() as cur: + cur.execute(query, params) + conn.commit() + + # -- Private helpers -- + + def _upsert( + self, + table: str, + columns: List[str], + pk_columns: List[str], + rows: List[Dict[str, Any]], + ) -> None: + non_pk_columns = [c for c in columns if c not in pk_columns] + + col_identifiers = sql.SQL(", ").join(sql.Identifier(c) for c in columns) + placeholders = sql.SQL(", ").join(sql.Placeholder() for _ in columns) + update_clause = sql.SQL(", ").join( + sql.SQL("{} = EXCLUDED.{}").format(sql.Identifier(c), sql.Identifier(c)) + for c in non_pk_columns + ) + pk_identifiers = sql.SQL(", ").join(sql.Identifier(c) for c in pk_columns) + + query = sql.SQL( + "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {}" + ).format( + sql.Identifier(table), + col_identifiers, + placeholders, + pk_identifiers, + update_clause, + ) + + with _get_conn(self._config) as conn, conn.cursor() as cur: + for row in rows: + values = [] + for col in columns: + val = row.get(col) + if col == "histogram" and val is not None: + val = json.dumps(val) + values.append(val) + cur.execute(query, values) + conn.commit() + + def _query_metrics( + self, + table: str, + columns: List[str], + project: str, + filters: Optional[Dict[str, Any]] = None, + start_date: Optional[date] = None, + end_date: Optional[date] = None, + ) -> List[Dict[str, Any]]: + conditions = [sql.SQL("project_id = %s")] + params: list = [project] + + if filters: + for key, value in filters.items(): + if value is not None: + conditions.append(sql.SQL("{} = %s").format(sql.Identifier(key))) + params.append(value) + + if start_date: + conditions.append(sql.SQL("metric_date >= %s")) + params.append(start_date) + if end_date: + conditions.append(sql.SQL("metric_date <= %s")) + params.append(end_date) + + col_identifiers = sql.SQL(", ").join(sql.Identifier(c) for c in columns) + query = sql.SQL("SELECT {} FROM {} WHERE {} ORDER BY metric_date ASC").format( + col_identifiers, + sql.Identifier(table), + sql.SQL(" AND ").join(conditions), + ) + + with _get_conn(self._config) as conn, conn.cursor() as cur: + conn.read_only = True + cur.execute(query, params) + rows = cur.fetchall() + + results = [] + for row in rows: + record = dict(zip(columns, row)) + if "histogram" in record and isinstance(record["histogram"], str): + record["histogram"] = json.loads(record["histogram"]) + if "metric_date" in record and isinstance(record["metric_date"], date): + record["metric_date"] = record["metric_date"].isoformat() + if "computed_at" in record and isinstance(record["computed_at"], datetime): + record["computed_at"] = record["computed_at"].isoformat() + results.append(record) + + return results diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index ce45bb0862e..719539583af 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -127,7 +127,6 @@ def compute_non_entity_date_range( end_date: Optional[datetime] = None, default_window_days: int = 30, ) -> Tuple[datetime, datetime]: - if end_date is None: end_date = datetime.now(tz=timezone.utc) else: diff --git a/sdk/python/tests/integration/monitoring/__init__.py b/sdk/python/tests/integration/monitoring/__init__.py new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/sdk/python/tests/integration/monitoring/__init__.py @@ -0,0 +1 @@ + diff --git a/sdk/python/tests/integration/monitoring/test_monitoring_integration.py b/sdk/python/tests/integration/monitoring/test_monitoring_integration.py new file mode 100644 index 00000000000..c7e923d51ee --- /dev/null +++ b/sdk/python/tests/integration/monitoring/test_monitoring_integration.py @@ -0,0 +1,500 @@ +""" +Integration tests for the Feast monitoring feature. + +These tests require a running PostgreSQL instance and a configured Feast project. +Mark as integration tests via pytest marker. +""" + +from datetime import date, datetime, timezone +from unittest.mock import MagicMock, patch + +import pyarrow as pa + +from feast.monitoring.monitoring_service import MonitoringService + +# -- Shared fixtures and helpers -- + + +def _mock_postgres_config(): + config = MagicMock() + config.host = "localhost" + config.port = 5432 + config.database = "feast_test" + config.db_schema = "public" + config.user = "test" + config.password = "test" # pragma: allowlist secret + config.sslmode = None + config.sslkey_path = None + config.sslcert_path = None + config.sslrootcert_path = None + return config + + +def _mock_feature_view(name="test_fv", features=None, entities=None): + from feast.types import Float64, String + + fv = MagicMock() + fv.name = name + fv.entities = entities or ["entity_id"] + fv.batch_source = MagicMock() + fv.batch_source.timestamp_field = "event_timestamp" + fv.batch_source.created_timestamp_column = "" + + if features is None: + feat1 = MagicMock() + feat1.name = "numeric_feat" + feat1.dtype = Float64 + + feat2 = MagicMock() + feat2.name = "categorical_feat" + feat2.dtype = String + + features = [feat1, feat2] + + fv.features = features + return fv + + +def _mock_feature_service(name="test_fs", fv_names=None, logging_config=None): + fs = MagicMock() + fs.name = name + fs.logging_config = logging_config + projections = [] + for fv_name in fv_names or ["test_fv"]: + proj = MagicMock() + proj.name = fv_name + projections.append(proj) + fs.feature_view_projections = projections + return fs + + +def _sample_arrow_table(): + return pa.table( + { + "entity_id": pa.array([1, 2, 3, 4, 5]), + "numeric_feat": pa.array([10.0, 20.0, 30.0, None, 50.0]), + "categorical_feat": pa.array(["a", "b", "a", "c", None]), + "event_timestamp": pa.array( + [datetime(2025, 1, 15, tzinfo=timezone.utc)] * 5 + ), + } + ) + + +def _mock_store_for_service(feature_views=None, feature_services=None): + store = MagicMock() + store.project = "test_project" + store.config.project = "test_project" + store.config.offline_store = _mock_postgres_config() + + if feature_views is None: + feature_views = [_mock_feature_view()] + if feature_services is None: + feature_services = [_mock_feature_service()] + + store.registry.list_feature_views.return_value = feature_views + store.registry.get_feature_view.side_effect = lambda name, project: next( + fv for fv in feature_views if fv.name == name + ) + store.registry.list_feature_services.return_value = feature_services + store.registry.list_entities.return_value = [] + + retrieval_job = MagicMock() + retrieval_job.to_arrow.return_value = _sample_arrow_table() + provider = MagicMock() + provider.offline_store.pull_all_from_table_or_query.return_value = retrieval_job + store._get_provider.return_value = provider + + return store + + +def _mock_pg_conn(): + """Returns (patcher, mock_conn, mock_cursor) for Postgres connection mocking.""" + patcher = patch("feast.monitoring.monitoring_store._get_conn") + mock_get_conn = patcher.start() + + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = [] + mock_conn.__enter__ = MagicMock(return_value=mock_conn) + mock_conn.__exit__ = MagicMock(return_value=False) + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_get_conn.return_value = mock_conn + + return patcher, mock_conn, mock_cursor + + +def _create_test_app(store=None): + """Create a FastAPI test app with the monitoring router.""" + from fastapi import FastAPI + from fastapi.testclient import TestClient + + from feast.api.registry.rest.monitoring import get_monitoring_router + + mock_server = MagicMock() + mock_server.store = store or _mock_store_for_service() + router = get_monitoring_router(grpc_handler=MagicMock(), server=mock_server) + + app = FastAPI() + app.include_router(router) + return TestClient(app) + + +# -- Tests -- + + +class TestComputeMetricsBatchSource: + def test_compute_metrics_end_to_end(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + store = _mock_store_for_service() + svc = MonitoringService(store) + + result = svc.compute_metrics( + project="test_project", + feature_view_name="test_fv", + start_date=date(2025, 1, 14), + end_date=date(2025, 1, 15), + data_source_type="batch", + ) + + assert result["status"] == "completed" + assert result["computed_features"] == 2 + assert result["computed_feature_views"] == 1 + assert result["duration_ms"] >= 0 + finally: + patcher.stop() + + +class TestBaselineFlow: + def test_set_baseline(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + store = _mock_store_for_service() + svc = MonitoringService(store) + + result = svc.compute_metrics( + project="test_project", + data_source_type="batch", + set_baseline=True, + ) + + assert result["status"] == "completed" + execute_calls = mock_cursor.execute.call_args_list + has_clear_baseline = any( + "is_baseline" in str(c) and "FALSE" in str(c) for c in execute_calls + ) + assert has_clear_baseline + finally: + patcher.stop() + + +class TestFeatureViewAggregation: + def test_view_metrics_populated(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + store = _mock_store_for_service() + svc = MonitoringService(store) + + svc.compute_metrics( + project="test_project", + data_source_type="batch", + ) + + upsert_calls = [ + c for c in mock_cursor.execute.call_args_list if "INSERT" in str(c) + ] + # feature metrics (2 features) + view metric (1) = at least 3 + assert len(upsert_calls) >= 3 + finally: + patcher.stop() + + +class TestFeatureServiceAggregation: + def test_service_metrics_computed(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + store = _mock_store_for_service() + svc = MonitoringService(store) + + result = svc.compute_metrics( + project="test_project", + data_source_type="batch", + ) + + assert result["computed_feature_services"] >= 0 + finally: + patcher.stop() + + +class TestLogSource: + def test_log_source_computes_from_service_logs(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + log_table = pa.table( + { + "test_fv__numeric_feat": pa.array([10.0, 20.0, 30.0]), + "test_fv__categorical_feat": pa.array(["a", "b", "a"]), + "__log_timestamp": pa.array( + [datetime(2025, 1, 15, tzinfo=timezone.utc)] * 3 + ), + } + ) + + log_destination = MagicMock() + log_destination.to_data_source.return_value = MagicMock() + logging_config = MagicMock() + logging_config.destination = log_destination + + fs = _mock_feature_service( + name="test_fs", fv_names=["test_fv"], logging_config=logging_config + ) + + store = _mock_store_for_service(feature_services=[fs]) + retrieval_job = MagicMock() + retrieval_job.to_arrow.return_value = log_table + provider = store._get_provider.return_value + provider.offline_store.pull_all_from_table_or_query.return_value = ( + retrieval_job + ) + + svc = MonitoringService(store) + result = svc.compute_metrics( + project="test_project", + feature_view_name="test_fv", + start_date=date(2025, 1, 14), + end_date=date(2025, 1, 15), + data_source_type="log", + ) + + assert result["status"] == "completed" + assert result["computed_features"] == 2 + assert result["data_source_type"] == "log" + finally: + patcher.stop() + + def test_log_source_no_logging_config_raises(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + fs = _mock_feature_service( + name="test_fs", fv_names=["test_fv"], logging_config=None + ) + + store = _mock_store_for_service(feature_services=[fs]) + svc = MonitoringService(store) + + result = svc.compute_metrics( + project="test_project", + feature_view_name="test_fv", + data_source_type="log", + ) + assert result["computed_features"] == 0 + finally: + patcher.stop() + + +class TestComputeMetricsCLI: + def test_monitor_run_batch(self): + from click.testing import CliRunner + + from feast.cli.monitor import monitor_cmd + + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + with patch("feast.cli.monitor.create_feature_store") as mock_create_store: + mock_create_store.return_value = _mock_store_for_service() + + runner = CliRunner() + result = runner.invoke( + monitor_cmd, + ["run", "--data-source", "batch"], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Status: completed" in result.output + assert "Features computed: 2" in result.output + assert "Duration:" in result.output + finally: + patcher.stop() + + def test_monitor_run_set_baseline(self): + from click.testing import CliRunner + + from feast.cli.monitor import monitor_cmd + + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + with patch("feast.cli.monitor.create_feature_store") as mock_create_store: + mock_create_store.return_value = _mock_store_for_service() + + runner = CliRunner() + result = runner.invoke( + monitor_cmd, + ["run", "--data-source", "batch", "--set-baseline"], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Baseline: SET" in result.output + finally: + patcher.stop() + + def test_monitor_run_help(self): + from click.testing import CliRunner + + from feast.cli.monitor import monitor_cmd + + runner = CliRunner() + result = runner.invoke(monitor_cmd, ["run", "--help"]) + + assert result.exit_code == 0 + assert "--data-source" in result.output + assert "--set-baseline" in result.output + assert "--feature-view" in result.output + assert "--start-date" in result.output + + +class TestComputeMetricsREST: + """Test the REST API router with a mock server.""" + + def test_compute_endpoint(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + client = _create_test_app() + response = client.post( + "/monitoring/compute", + json={ + "project": "test_project", + "data_source_type": "batch", + "feature_view_name": "test_fv", + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "completed" + assert data["computed_features"] == 2 + finally: + patcher.stop() + + def test_get_feature_metrics_endpoint(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + client = _create_test_app() + response = client.get( + "/monitoring/metrics/features", + params={"project": "test_project"}, + ) + + assert response.status_code == 200 + assert "metrics" in response.json() + finally: + patcher.stop() + + def test_get_baseline_endpoint(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + client = _create_test_app() + response = client.get( + "/monitoring/metrics/baseline", + params={ + "project": "test_project", + "feature_view_name": "test_fv", + "feature_name": "numeric_feat", + }, + ) + + assert response.status_code == 200 + assert "metrics" in response.json() + finally: + patcher.stop() + + def test_get_timeseries_endpoint(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + client = _create_test_app() + response = client.get( + "/monitoring/metrics/timeseries", + params={ + "project": "test_project", + "feature_view_name": "test_fv", + "start_date": "2025-01-01", + "end_date": "2025-01-31", + }, + ) + + assert response.status_code == 200 + data = response.json() + assert "granularity" in data + assert data["granularity"] == "daily" + assert "timeseries" in data + finally: + patcher.stop() + + def test_invalid_data_source_type(self): + client = _create_test_app() + response = client.post( + "/monitoring/compute", + json={ + "project": "test_project", + "data_source_type": "invalid", + }, + ) + assert response.status_code == 422 + + +class TestRBACEnforcement: + def test_compute_calls_assert_permissions_update(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + from feast.permissions.action import AuthzedAction + + store = _mock_store_for_service() + client = _create_test_app(store=store) + + with patch( + "feast.api.registry.rest.monitoring.assert_permissions" + ) as mock_assert: + response = client.post( + "/monitoring/compute", + json={ + "project": "test_project", + "data_source_type": "batch", + "feature_view_name": "test_fv", + }, + ) + + assert response.status_code == 200 + mock_assert.assert_called_once() + call_args = mock_assert.call_args + assert AuthzedAction.UPDATE in call_args[1]["actions"] + finally: + patcher.stop() + + def test_get_metrics_calls_assert_permissions_describe(self): + patcher, mock_conn, mock_cursor = _mock_pg_conn() + try: + from feast.permissions.action import AuthzedAction + + store = _mock_store_for_service() + client = _create_test_app(store=store) + + with patch( + "feast.api.registry.rest.monitoring.assert_permissions" + ) as mock_assert: + response = client.get( + "/monitoring/metrics/features", + params={ + "project": "test_project", + "feature_view_name": "test_fv", + }, + ) + + assert response.status_code == 200 + mock_assert.assert_called_once() + call_args = mock_assert.call_args + assert AuthzedAction.DESCRIBE in call_args[1]["actions"] + finally: + patcher.stop() diff --git a/sdk/python/tests/unit/monitoring/__init__.py b/sdk/python/tests/unit/monitoring/__init__.py new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/sdk/python/tests/unit/monitoring/__init__.py @@ -0,0 +1 @@ + diff --git a/sdk/python/tests/unit/monitoring/test_metrics_calculator.py b/sdk/python/tests/unit/monitoring/test_metrics_calculator.py new file mode 100644 index 00000000000..0e0272a8ac3 --- /dev/null +++ b/sdk/python/tests/unit/monitoring/test_metrics_calculator.py @@ -0,0 +1,223 @@ +import pyarrow as pa +import pytest + +from feast.monitoring.metrics_calculator import MetricsCalculator +from feast.types import ( + Bool, + Float32, + Float64, + Int32, + Int64, + String, + UnixTimestamp, +) +from feast.value_type import ValueType + + +@pytest.fixture +def calculator(): + return MetricsCalculator(histogram_bins=5, top_n=3) + + +def _numeric_array(values, include_nulls=False): + if include_nulls: + return pa.array(values, type=pa.float64(), from_pandas=True) + return pa.array(values, type=pa.float64()) + + +def _string_array(values, include_nulls=False): + if include_nulls: + return pa.array(values, type=pa.string(), from_pandas=True) + return pa.array(values, type=pa.string()) + + +class TestClassifyFeature: + def test_numeric_primitive_types(self): + assert MetricsCalculator.classify_feature(Int32) == "numeric" + assert MetricsCalculator.classify_feature(Int64) == "numeric" + assert MetricsCalculator.classify_feature(Float32) == "numeric" + assert MetricsCalculator.classify_feature(Float64) == "numeric" + assert MetricsCalculator.classify_feature(UnixTimestamp) == "numeric" + + def test_categorical_primitive_types(self): + assert MetricsCalculator.classify_feature(String) == "categorical" + assert MetricsCalculator.classify_feature(Bool) == "categorical" + + def test_numeric_value_types(self): + assert MetricsCalculator.classify_feature(ValueType.INT32) == "numeric" + assert MetricsCalculator.classify_feature(ValueType.DOUBLE) == "numeric" + assert MetricsCalculator.classify_feature(ValueType.FLOAT) == "numeric" + + def test_categorical_value_types(self): + assert MetricsCalculator.classify_feature(ValueType.STRING) == "categorical" + assert MetricsCalculator.classify_feature(ValueType.BOOL) == "categorical" + + def test_unsupported_returns_none(self): + assert MetricsCalculator.classify_feature(ValueType.UNKNOWN) is None + assert MetricsCalculator.classify_feature(ValueType.MAP) is None + + +class TestComputeNumeric: + def test_basic(self, calculator): + arr = _numeric_array([1.0, 2.0, 3.0, 4.0, 5.0]) + result = calculator.compute_numeric(arr) + + assert result["feature_type"] == "numeric" + assert result["row_count"] == 5 + assert result["null_count"] == 0 + assert result["null_rate"] == 0.0 + assert result["mean"] == pytest.approx(3.0) + assert result["min_val"] == pytest.approx(1.0) + assert result["max_val"] == pytest.approx(5.0) + assert result["p50"] == pytest.approx(3.0) + assert result["histogram"] is not None + assert len(result["histogram"]["bins"]) == 6 # 5 bins + 1 edge + assert sum(result["histogram"]["counts"]) == 5 + + def test_with_nulls(self, calculator): + arr = _numeric_array([1.0, None, 3.0, None, 5.0], include_nulls=True) + result = calculator.compute_numeric(arr) + + assert result["row_count"] == 5 + assert result["null_count"] == 2 + assert result["null_rate"] == pytest.approx(0.4) + assert result["mean"] == pytest.approx(3.0) + assert result["min_val"] == pytest.approx(1.0) + assert result["max_val"] == pytest.approx(5.0) + + def test_all_nulls(self, calculator): + arr = _numeric_array([None, None, None], include_nulls=True) + result = calculator.compute_numeric(arr) + + assert result["row_count"] == 3 + assert result["null_count"] == 3 + assert result["null_rate"] == pytest.approx(1.0) + assert result["mean"] is None + assert result["stddev"] is None + assert result["min_val"] is None + assert result["histogram"] is None + + def test_empty(self, calculator): + arr = _numeric_array([]) + result = calculator.compute_numeric(arr) + + assert result["row_count"] == 0 + assert result["null_count"] == 0 + assert result["null_rate"] is None + assert result["mean"] is None + assert result["histogram"] is None + + def test_single_value(self, calculator): + arr = _numeric_array([42.0]) + result = calculator.compute_numeric(arr) + + assert result["row_count"] == 1 + assert result["mean"] == pytest.approx(42.0) + assert result["min_val"] == pytest.approx(42.0) + assert result["max_val"] == pytest.approx(42.0) + assert result["p50"] == pytest.approx(42.0) + assert result["p99"] == pytest.approx(42.0) + + def test_histogram_bins(self, calculator): + arr = _numeric_array([10.0, 20.0, 30.0, 40.0, 50.0]) + result = calculator.compute_numeric(arr) + + histogram = result["histogram"] + assert len(histogram["counts"]) == 5 + assert histogram["bin_width"] > 0 + assert sum(histogram["counts"]) == 5 + + def test_identical_values_histogram(self, calculator): + arr = _numeric_array([5.0, 5.0, 5.0]) + result = calculator.compute_numeric(arr) + + histogram = result["histogram"] + assert histogram["bin_width"] == 0.0 + assert histogram["counts"] == [3] + + +class TestComputeCategorical: + def test_basic(self, calculator): + arr = _string_array(["a", "b", "a", "c", "a", "b"]) + result = calculator.compute_categorical(arr) + + assert result["feature_type"] == "categorical" + assert result["row_count"] == 6 + assert result["null_count"] == 0 + assert result["null_rate"] == 0.0 + + histogram = result["histogram"] + assert histogram["unique_count"] == 3 + assert histogram["other_count"] == 0 + values = {v["value"]: v["count"] for v in histogram["values"]} + assert values["a"] == 3 + assert values["b"] == 2 + assert values["c"] == 1 + + def test_with_nulls(self, calculator): + arr = _string_array(["a", None, "b", None], include_nulls=True) + result = calculator.compute_categorical(arr) + + assert result["row_count"] == 4 + assert result["null_count"] == 2 + assert result["null_rate"] == pytest.approx(0.5) + assert result["histogram"]["unique_count"] == 2 + + def test_high_cardinality(self, calculator): + values = [f"val_{i}" for i in range(20)] + arr = _string_array(values) + result = calculator.compute_categorical(arr) + + histogram = result["histogram"] + assert len(histogram["values"]) == 3 # top_n=3 + assert histogram["other_count"] == 17 + assert histogram["unique_count"] == 20 + + def test_all_nulls(self, calculator): + arr = _string_array([None, None], include_nulls=True) + result = calculator.compute_categorical(arr) + + assert result["null_rate"] == pytest.approx(1.0) + assert result["histogram"] is None + + def test_empty(self, calculator): + arr = _string_array([]) + result = calculator.compute_categorical(arr) + + assert result["row_count"] == 0 + assert result["null_rate"] is None + assert result["histogram"] is None + + +class TestComputeAll: + def test_mixed_types(self, calculator): + table = pa.table( + { + "numeric_col": pa.array([1.0, 2.0, 3.0]), + "string_col": pa.array(["a", "b", "a"]), + } + ) + feature_fields = [ + ("numeric_col", "numeric"), + ("string_col", "categorical"), + ] + results = calculator.compute_all(table, feature_fields) + + assert len(results) == 2 + numeric_result = next(r for r in results if r["feature_name"] == "numeric_col") + categorical_result = next( + r for r in results if r["feature_name"] == "string_col" + ) + assert numeric_result["feature_type"] == "numeric" + assert categorical_result["feature_type"] == "categorical" + + def test_missing_column_skipped(self, calculator): + table = pa.table({"existing_col": pa.array([1.0, 2.0])}) + feature_fields = [ + ("existing_col", "numeric"), + ("missing_col", "numeric"), + ] + results = calculator.compute_all(table, feature_fields) + + assert len(results) == 1 + assert results[0]["feature_name"] == "existing_col" diff --git a/sdk/python/tests/unit/monitoring/test_monitoring_store.py b/sdk/python/tests/unit/monitoring/test_monitoring_store.py new file mode 100644 index 00000000000..7d0d8e712a5 --- /dev/null +++ b/sdk/python/tests/unit/monitoring/test_monitoring_store.py @@ -0,0 +1,206 @@ +import json +from datetime import date, datetime, timezone +from unittest.mock import MagicMock, patch + +from feast.monitoring.monitoring_store import ( + _FEATURE_METRICS_COLUMNS, + _FEATURE_METRICS_TABLE, + _FEATURE_SERVICE_METRICS_TABLE, + _FEATURE_VIEW_METRICS_TABLE, + MonitoringStore, +) + + +def _make_config(): + config = MagicMock() + config.host = "localhost" + config.port = 5432 + config.database = "feast" + config.db_schema = "public" + config.user = "test" + config.password = "test" # pragma: allowlist secret + config.sslmode = None + config.sslkey_path = None + config.sslcert_path = None + config.sslrootcert_path = None + return config + + +def _sample_feature_metric( + project="test_project", + feature_view="fv1", + feature_name="feat1", + metric_date=date(2025, 1, 15), + data_source_type="batch", + is_baseline=False, +): + return { + "project_id": project, + "feature_view_name": feature_view, + "feature_name": feature_name, + "metric_date": metric_date, + "data_source_type": data_source_type, + "computed_at": datetime(2025, 1, 15, 12, 0, 0, tzinfo=timezone.utc), + "is_baseline": is_baseline, + "feature_type": "numeric", + "row_count": 1000, + "null_count": 50, + "null_rate": 0.05, + "mean": 42.5, + "stddev": 10.2, + "min_val": 1.0, + "max_val": 100.0, + "p50": 42.0, + "p75": 55.0, + "p90": 70.0, + "p95": 80.0, + "p99": 95.0, + "histogram": {"bins": [0, 25, 50, 75, 100], "counts": [200, 300, 300, 200]}, + } + + +class TestEnsureTables: + @patch("feast.monitoring.monitoring_store._get_conn") + def test_creates_tables(self, mock_get_conn): + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.__enter__ = MagicMock(return_value=mock_conn) + mock_conn.__exit__ = MagicMock(return_value=False) + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_get_conn.return_value = mock_conn + + store = MonitoringStore(_make_config()) + store.ensure_tables() + + assert mock_cursor.execute.call_count == 3 + calls = mock_cursor.execute.call_args_list + assert _FEATURE_METRICS_TABLE in calls[0][0][0] + assert _FEATURE_VIEW_METRICS_TABLE in calls[1][0][0] + assert _FEATURE_SERVICE_METRICS_TABLE in calls[2][0][0] + mock_conn.commit.assert_called_once() + + +class TestSaveAndGet: + @patch("feast.monitoring.monitoring_store._get_conn") + def test_save_feature_metrics_calls_upsert(self, mock_get_conn): + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.__enter__ = MagicMock(return_value=mock_conn) + mock_conn.__exit__ = MagicMock(return_value=False) + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_get_conn.return_value = mock_conn + + store = MonitoringStore(_make_config()) + metric = _sample_feature_metric() + store.save_feature_metrics([metric]) + + assert mock_cursor.execute.call_count == 1 + mock_conn.commit.assert_called_once() + + @patch("feast.monitoring.monitoring_store._get_conn") + def test_save_empty_list_is_noop(self, mock_get_conn): + store = MonitoringStore(_make_config()) + store.save_feature_metrics([]) + mock_get_conn.assert_not_called() + + @patch("feast.monitoring.monitoring_store._get_conn") + def test_get_feature_metrics_with_filters(self, mock_get_conn): + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = [] + mock_conn.__enter__ = MagicMock(return_value=mock_conn) + mock_conn.__exit__ = MagicMock(return_value=False) + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_get_conn.return_value = mock_conn + + store = MonitoringStore(_make_config()) + result = store.get_feature_metrics( + project="test_project", + feature_view_name="fv1", + feature_name="feat1", + data_source_type="batch", + start_date=date(2025, 1, 1), + end_date=date(2025, 1, 31), + ) + + assert result == [] + mock_cursor.execute.assert_called_once() + call_args = mock_cursor.execute.call_args + params = call_args[0][1] + assert "test_project" in params + assert "fv1" in params + assert "feat1" in params + assert "batch" in params + + +class TestBaseline: + @patch("feast.monitoring.monitoring_store._get_conn") + def test_clear_baseline(self, mock_get_conn): + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.__enter__ = MagicMock(return_value=mock_conn) + mock_conn.__exit__ = MagicMock(return_value=False) + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_get_conn.return_value = mock_conn + + store = MonitoringStore(_make_config()) + store.clear_baseline( + project="test_project", + feature_view_name="fv1", + feature_name="feat1", + data_source_type="batch", + ) + + mock_cursor.execute.assert_called_once() + call_args = mock_cursor.execute.call_args + params = call_args[0][1] + assert "test_project" in params + mock_conn.commit.assert_called_once() + + @patch("feast.monitoring.monitoring_store._get_conn") + def test_get_baseline_filters_is_baseline(self, mock_get_conn): + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = [] + mock_conn.__enter__ = MagicMock(return_value=mock_conn) + mock_conn.__exit__ = MagicMock(return_value=False) + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_get_conn.return_value = mock_conn + + store = MonitoringStore(_make_config()) + result = store.get_baseline(project="test_project") + + assert result == [] + call_args = mock_cursor.execute.call_args + params = call_args[0][1] + assert "test_project" in params + assert True in params + + +class TestHistogramSerialization: + @patch("feast.monitoring.monitoring_store._get_conn") + def test_histogram_serialized_as_json(self, mock_get_conn): + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.__enter__ = MagicMock(return_value=mock_conn) + mock_conn.__exit__ = MagicMock(return_value=False) + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_get_conn.return_value = mock_conn + + store = MonitoringStore(_make_config()) + metric = _sample_feature_metric() + store.save_feature_metrics([metric]) + + call_args = mock_cursor.execute.call_args + values = call_args[0][1] + histogram_idx = _FEATURE_METRICS_COLUMNS.index("histogram") + histogram_val = values[histogram_idx] + parsed = json.loads(histogram_val) + assert "bins" in parsed + assert "counts" in parsed