import logging import uuid from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Union, cast from pydantic import StrictInt, StrictStr from sqlalchemy import ( # type: ignore BigInteger, Column, Index, Integer, LargeBinary, MetaData, String, Table, Text, create_engine, delete, func, insert, select, update, ) from sqlalchemy.engine import Engine from sqlalchemy.exc import IntegrityError from feast import utils from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity from feast.errors import ( ConcurrentVersionConflict, DataSourceObjectNotFoundException, EntityNotFoundException, FeatureServiceNotFoundException, FeatureViewNotFoundException, FeatureViewPinConflict, FeatureViewVersionNotFound, PermissionNotFoundException, ProjectNotFoundException, ProjectObjectNotFoundException, SavedDatasetNotFound, ValidationReferenceNotFound, ) from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import Infra from feast.infra.registry.caching_registry import CachingRegistry from feast.on_demand_feature_view import OnDemandFeatureView from feast.permissions.permission import Permission from feast.project import Project from feast.project_metadata import ProjectMetadata from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto from feast.protos.feast.core.FeatureService_pb2 import ( FeatureService as FeatureServiceProto, ) from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( OnDemandFeatureView as OnDemandFeatureViewProto, ) from feast.protos.feast.core.Permission_pb2 import Permission as PermissionProto from feast.protos.feast.core.Project_pb2 import Project as ProjectProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.core.SavedDataset_pb2 import SavedDataset as SavedDatasetProto from feast.protos.feast.core.StreamFeatureView_pb2 import ( StreamFeatureView as StreamFeatureViewProto, ) from feast.protos.feast.core.ValidationProfile_pb2 import ( ValidationReference as ValidationReferenceProto, ) from feast.repo_config import RegistryConfig from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView from feast.utils import _utc_now from feast.version_utils import ( generate_version_id, parse_version, version_tag, ) metadata = MetaData() projects = Table( "projects", metadata, Column("project_id", String(255), primary_key=True), Column("project_name", String(255), nullable=False), Column("last_updated_timestamp", BigInteger, nullable=False), Column("project_proto", LargeBinary, nullable=False), ) Index("idx_projects_project_id", projects.c.project_id) entities = Table( "entities", metadata, Column("entity_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), Column("entity_proto", LargeBinary, nullable=False), ) Index("idx_entities_project_id", entities.c.project_id) data_sources = Table( "data_sources", metadata, Column("data_source_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), Column("data_source_proto", LargeBinary, nullable=False), ) Index("idx_data_sources_project_id", data_sources.c.project_id) feature_views = Table( "feature_views", metadata, Column("feature_view_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), Column("materialized_intervals", LargeBinary, nullable=True), Column("feature_view_proto", LargeBinary, nullable=False), Column("user_metadata", LargeBinary, nullable=True), ) Index("idx_feature_views_project_id", feature_views.c.project_id) stream_feature_views = Table( "stream_feature_views", metadata, Column("feature_view_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), Column("feature_view_proto", LargeBinary, nullable=False), Column("user_metadata", LargeBinary, nullable=True), ) Index("idx_stream_feature_views_project_id", stream_feature_views.c.project_id) on_demand_feature_views = Table( "on_demand_feature_views", metadata, Column("feature_view_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), Column("feature_view_proto", LargeBinary, nullable=False), Column("user_metadata", LargeBinary, nullable=True), ) Index("idx_on_demand_feature_views_project_id", on_demand_feature_views.c.project_id) feature_services = Table( "feature_services", metadata, Column("feature_service_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), Column("feature_service_proto", LargeBinary, nullable=False), ) Index("idx_feature_services_project_id", feature_services.c.project_id) saved_datasets = Table( "saved_datasets", metadata, Column("saved_dataset_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), Column("saved_dataset_proto", LargeBinary, nullable=False), ) Index("idx_saved_datasets_project_id", saved_datasets.c.project_id) validation_references = Table( "validation_references", metadata, Column("validation_reference_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), Column("validation_reference_proto", LargeBinary, nullable=False), ) Index("idx_validation_references_project_id", validation_references.c.project_id) managed_infra = Table( "managed_infra", metadata, Column("infra_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), Column("infra_proto", LargeBinary, nullable=False), ) Index("idx_managed_infra_project_id", managed_infra.c.project_id) permissions = Table( "permissions", metadata, Column("permission_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("last_updated_timestamp", BigInteger, nullable=False), Column("permission_proto", LargeBinary, nullable=False), ) Index("idx_permissions_project_id", permissions.c.project_id) feature_view_version_history = Table( "feature_view_version_history", metadata, Column("feature_view_name", String(255), primary_key=True), Column("project_id", String(255), primary_key=True), Column("version_number", Integer, primary_key=True), Column("feature_view_type", String(50), nullable=False), Column("feature_view_proto", LargeBinary, nullable=False), Column("created_timestamp", BigInteger, nullable=False), Column("description", Text, nullable=True), Column("version_id", String(36), nullable=False), ) Index( "idx_fv_version_history_project_id", feature_view_version_history.c.project_id, ) class FeastMetadataKeys(Enum): LAST_UPDATED_TIMESTAMP = "last_updated_timestamp" PROJECT_UUID = "project_uuid" feast_metadata = Table( "feast_metadata", metadata, Column("project_id", String(255), primary_key=True), Column("metadata_key", String(50), primary_key=True), Column("metadata_value", Text, nullable=False), Column("last_updated_timestamp", BigInteger, nullable=False), ) Index("idx_feast_metadata_project_id", feast_metadata.c.project_id) logger = logging.getLogger(__name__) class SqlRegistryConfig(RegistryConfig): registry_type: StrictStr = "sql" """ str: Provider name or a class name that implements Registry.""" path: StrictStr = "" """ str: Path to metadata store. If registry_type is 'sql', then this is a database URL as expected by SQLAlchemy """ read_path: Optional[StrictStr] = None """ str: Read Path to metadata store if different from path. If registry_type is 'sql', then this is a Read Endpoint for database URL. If not set, path will be used for read and write. """ sqlalchemy_config_kwargs: Dict[str, Any] = {"echo": False} """ Dict[str, Any]: Extra arguments to pass to SQLAlchemy.create_engine. """ thread_pool_executor_worker_count: StrictInt = 0 """ int: Number of worker threads to use for asynchronous caching in SQL Registry. If set to 0, it doesn't use ThreadPoolExecutor. """ class SqlRegistry(CachingRegistry): def __init__( self, registry_config, project: str, repo_path: Optional[Path], ): assert registry_config is not None and isinstance( registry_config, SqlRegistryConfig ), "SqlRegistry needs a valid registry_config" self.registry_config = registry_config self.write_engine: Engine = create_engine( registry_config.path, **registry_config.sqlalchemy_config_kwargs ) if registry_config.read_path: self.read_engine: Engine = create_engine( registry_config.read_path, **registry_config.sqlalchemy_config_kwargs, ) else: self.read_engine = self.write_engine metadata.create_all(self.write_engine) self.thread_pool_executor_worker_count = ( registry_config.thread_pool_executor_worker_count ) self.purge_feast_metadata = registry_config.purge_feast_metadata self.enable_online_versioning = ( registry_config.enable_online_feature_view_versioning ) super().__init__( project=project, cache_ttl_seconds=registry_config.cache_ttl_seconds, cache_mode=registry_config.cache_mode, ) # Sync feast_metadata to projects table # when purge_feast_metadata is set to True, Delete data from # feast_metadata table and list_project_metadata will not return any data self._sync_feast_metadata_to_projects_table() if not self.purge_feast_metadata: self._maybe_init_project_metadata(project) def _sync_feast_metadata_to_projects_table(self): feast_metadata_projects: dict = {} projects_set: set = [] with self.read_engine.begin() as conn: stmt = select(feast_metadata).where( feast_metadata.c.metadata_key == FeastMetadataKeys.PROJECT_UUID.value ) rows = conn.execute(stmt).all() for row in rows: feast_metadata_projects[row._mapping["project_id"]] = int( row._mapping["last_updated_timestamp"] ) if len(feast_metadata_projects) > 0: with self.read_engine.begin() as conn: stmt = select(projects) rows = conn.execute(stmt).all() for row in rows: projects_set.append(row._mapping["project_id"]) # Find object in feast_metadata_projects but not in projects projects_to_sync = set(feast_metadata_projects.keys()) - set(projects_set) for project_name in projects_to_sync: try: self.apply_project( Project( name=project_name, created_timestamp=datetime.fromtimestamp( feast_metadata_projects[project_name], tz=timezone.utc ), ), commit=True, ) except IntegrityError: logger.info( "Project %s already created in projects table by another process.", project_name, ) if self.purge_feast_metadata: with self.write_engine.begin() as conn: for project_name in feast_metadata_projects: stmt = delete(feast_metadata).where( feast_metadata.c.project_id == project_name ) conn.execute(stmt) def teardown(self): for t in { entities, data_sources, feature_views, stream_feature_views, feature_services, on_demand_feature_views, saved_datasets, validation_references, permissions, feature_view_version_history, }: with self.write_engine.begin() as conn: stmt = delete(t) conn.execute(stmt) def _get_stream_feature_view(self, name: str, project: str): return self._get_object( table=stream_feature_views, name=name, project=project, proto_class=StreamFeatureViewProto, python_class=StreamFeatureView, id_field_name="feature_view_name", proto_field_name="feature_view_proto", not_found_exception=FeatureViewNotFoundException, ) def _list_stream_feature_views( self, project: str, tags: Optional[dict[str, str]] ) -> List[StreamFeatureView]: return self._list_objects( stream_feature_views, project, StreamFeatureViewProto, StreamFeatureView, "feature_view_proto", tags=tags, ) def apply_entity(self, entity: Entity, project: str, commit: bool = True): return self._apply_object( table=entities, project=project, id_field_name="entity_name", obj=entity, proto_field_name="entity_proto", ) def _get_entity(self, name: str, project: str) -> Entity: return self._get_object( table=entities, name=name, project=project, proto_class=EntityProto, python_class=Entity, id_field_name="entity_name", proto_field_name="entity_proto", not_found_exception=EntityNotFoundException, ) def _get_any_feature_view(self, name: str, project: str) -> BaseFeatureView: fv = self._get_object( table=feature_views, name=name, project=project, proto_class=FeatureViewProto, python_class=FeatureView, id_field_name="feature_view_name", proto_field_name="feature_view_proto", not_found_exception=None, ) if not fv: fv = self._get_object( table=on_demand_feature_views, name=name, project=project, proto_class=OnDemandFeatureViewProto, python_class=OnDemandFeatureView, id_field_name="feature_view_name", proto_field_name="feature_view_proto", not_found_exception=None, ) if not fv: fv = self._get_object( table=stream_feature_views, name=name, project=project, proto_class=StreamFeatureViewProto, python_class=StreamFeatureView, id_field_name="feature_view_name", proto_field_name="feature_view_proto", not_found_exception=FeatureViewNotFoundException, ) return fv def _list_all_feature_views( self, project: str, tags: Optional[dict[str, str]] ) -> List[BaseFeatureView]: return ( cast( list[BaseFeatureView], self._list_feature_views(project=project, tags=tags), ) + cast( list[BaseFeatureView], self._list_stream_feature_views(project=project, tags=tags), ) + cast( list[BaseFeatureView], self._list_on_demand_feature_views(project=project, tags=tags), ) ) def _get_feature_view(self, name: str, project: str) -> FeatureView: return self._get_object( table=feature_views, name=name, project=project, proto_class=FeatureViewProto, python_class=FeatureView, id_field_name="feature_view_name", proto_field_name="feature_view_proto", not_found_exception=FeatureViewNotFoundException, ) def _get_on_demand_feature_view( self, name: str, project: str ) -> OnDemandFeatureView: return self._get_object( table=on_demand_feature_views, name=name, project=project, proto_class=OnDemandFeatureViewProto, python_class=OnDemandFeatureView, id_field_name="feature_view_name", proto_field_name="feature_view_proto", not_found_exception=FeatureViewNotFoundException, ) def _get_feature_service(self, name: str, project: str) -> FeatureService: return self._get_object( table=feature_services, name=name, project=project, proto_class=FeatureServiceProto, python_class=FeatureService, id_field_name="feature_service_name", proto_field_name="feature_service_proto", not_found_exception=FeatureServiceNotFoundException, ) def _get_saved_dataset(self, name: str, project: str) -> SavedDataset: return self._get_object( table=saved_datasets, name=name, project=project, proto_class=SavedDatasetProto, python_class=SavedDataset, id_field_name="saved_dataset_name", proto_field_name="saved_dataset_proto", not_found_exception=SavedDatasetNotFound, ) def _get_validation_reference(self, name: str, project: str) -> ValidationReference: return self._get_object( table=validation_references, name=name, project=project, proto_class=ValidationReferenceProto, python_class=ValidationReference, id_field_name="validation_reference_name", proto_field_name="validation_reference_proto", not_found_exception=ValidationReferenceNotFound, ) def _list_validation_references( self, project: str, tags: Optional[dict[str, str]] = None ) -> List[ValidationReference]: return self._list_objects( table=validation_references, project=project, proto_class=ValidationReferenceProto, python_class=ValidationReference, proto_field_name="validation_reference_proto", tags=tags, ) def _list_entities( self, project: str, tags: Optional[dict[str, str]] ) -> List[Entity]: return self._list_objects( entities, project, EntityProto, Entity, "entity_proto", tags=tags ) def delete_entity(self, name: str, project: str, commit: bool = True): return self._delete_object( entities, name, project, "entity_name", EntityNotFoundException ) def delete_feature_view(self, name: str, project: str, commit: bool = True): with self.write_engine.begin() as conn: deleted_count = 0 for table in { feature_views, on_demand_feature_views, stream_feature_views, }: stmt = delete(table).where( table.c.feature_view_name == name, table.c.project_id == project, ) rows = conn.execute(stmt) deleted_count += rows.rowcount if deleted_count == 0: raise FeatureViewNotFoundException(name, project) # Clean up version history in the same transaction stmt = delete(feature_view_version_history).where( feature_view_version_history.c.feature_view_name == name, feature_view_version_history.c.project_id == project, ) conn.execute(stmt) self.apply_project( self.get_project(name=project, allow_cache=False), commit=True ) if not self.purge_feast_metadata: with self.write_engine.begin() as conn: self._set_last_updated_metadata(_utc_now(), project, conn) if self.cache_mode == "sync": self.refresh() def delete_feature_service(self, name: str, project: str, commit: bool = True): return self._delete_object( feature_services, name, project, "feature_service_name", FeatureServiceNotFoundException, ) def _get_data_source(self, name: str, project: str) -> DataSource: return self._get_object( table=data_sources, name=name, project=project, proto_class=DataSourceProto, python_class=DataSource, id_field_name="data_source_name", proto_field_name="data_source_proto", not_found_exception=DataSourceObjectNotFoundException, ) def _list_data_sources( self, project: str, tags: Optional[dict[str, str]] ) -> List[DataSource]: return self._list_objects( data_sources, project, DataSourceProto, DataSource, "data_source_proto", tags=tags, ) def apply_data_source( self, data_source: DataSource, project: str, commit: bool = True ): return self._apply_object( data_sources, project, "data_source_name", data_source, "data_source_proto" ) def apply_feature_view( self, feature_view: BaseFeatureView, project: str, commit: bool = True, no_promote: bool = False, ): feature_view.ensure_valid() self._ensure_feature_view_name_is_unique(feature_view, project) fv_table = self._infer_fv_table(feature_view) fv_type_str = self._infer_fv_type_string(feature_view) is_latest, pin_version = parse_version(feature_view.version) if not is_latest: # Explicit version: check if it exists (pin/revert) or not (forward declaration) snapshot = self._get_version_snapshot( feature_view.name, project, pin_version ) if snapshot is not None: # Version exists → pin/revert to that snapshot # Check that the user hasn't also modified the definition. # Compare user's FV (with version="latest") against active FV. try: active_fv = self._get_any_feature_view(feature_view.name, project) user_fv_copy = feature_view.__copy__() user_fv_copy.version = "latest" active_fv.version = "latest" # Clear metadata that differs due to registry state user_fv_copy.created_timestamp = active_fv.created_timestamp user_fv_copy.last_updated_timestamp = ( active_fv.last_updated_timestamp ) user_fv_copy.current_version_number = ( active_fv.current_version_number ) if hasattr(active_fv, "materialization_intervals"): user_fv_copy.materialization_intervals = ( active_fv.materialization_intervals ) if user_fv_copy != active_fv: raise FeatureViewPinConflict( feature_view.name, version_tag(pin_version) ) except FeatureViewNotFoundException: pass snap_type, snap_proto_bytes = snapshot proto_class, python_class = self._proto_class_for_type(snap_type) snap_proto = proto_class.FromString(snap_proto_bytes) restored_fv = python_class.from_proto(snap_proto) restored_fv.version = feature_view.version restored_fv.current_version_number = pin_version return self._apply_object( fv_table, project, "feature_view_name", restored_fv, "feature_view_proto", ) else: # Version doesn't exist → forward declaration: create it feature_view.current_version_number = pin_version snapshot_proto = feature_view.to_proto() snapshot_proto.spec.project = project snapshot_proto_bytes = snapshot_proto.SerializeToString() try: self._save_version_snapshot( feature_view.name, project, pin_version, fv_type_str, snapshot_proto_bytes, ) except IntegrityError: raise ConcurrentVersionConflict( f"Version v{pin_version} of '{feature_view.name}' was just created by " f"another concurrent apply. Pull latest and retry." ) # Apply the FV as active return self._apply_object( fv_table, project, "feature_view_name", feature_view, "feature_view_proto", ) # Normal (latest) apply: snapshot old version if changed, then save new # First check if the FV already exists so we can snapshot the old one. # Use write_engine for both reads to avoid read replica lag issues. old_proto_bytes = None with self.write_engine.begin() as conn: stmt = select(fv_table).where( fv_table.c.feature_view_name == feature_view.name, fv_table.c.project_id == project, ) row = conn.execute(stmt).first() if row: old_proto_bytes = row._mapping["feature_view_proto"] # Apply the object (handles idempotency check internally) # We need to detect if _apply_object actually made a change # by checking before/after self._apply_object( fv_table, project, "feature_view_name", feature_view, "feature_view_proto" ) # After apply, read the current proto to see if it changed with self.write_engine.begin() as conn: stmt = select(fv_table).where( fv_table.c.feature_view_name == feature_view.name, fv_table.c.project_id == project, ) row = conn.execute(stmt).first() if row: new_proto_bytes = row._mapping["feature_view_proto"] else: return # shouldn't happen if old_proto_bytes is not None: # Deserialize both versions to compare schema/UDF changes proto_class, fv_class = self._proto_class_for_type(fv_type_str) old_proto = proto_class.FromString(old_proto_bytes) new_proto = proto_class.FromString(new_proto_bytes) old_fv = fv_class.from_proto(old_proto) new_fv = fv_class.from_proto(new_proto) if not new_fv._schema_or_udf_changed(old_fv): # No version-significant change, skip version creation return # Something changed (or new FV). Save version snapshot(s). if old_proto_bytes is not None: # Snapshot the old version first (if not already in history) next_ver = self._get_next_version_number(feature_view.name, project) if next_ver == 0: # First time versioning: save old as v0 self._save_version_snapshot( feature_view.name, project, 0, fv_type_str, old_proto_bytes, ) next_ver = 1 # Retry loop: if a concurrent apply claimed the same version number, # re-read MAX+1 and try again. The client said "latest" so the # exact number doesn't matter. max_retries = 3 for attempt in range(max_retries): # Update current_version_number before saving snapshot feature_view.current_version_number = next_ver snapshot_proto = feature_view.to_proto() snapshot_proto.spec.project = project snapshot_proto_bytes = snapshot_proto.SerializeToString() try: # Save new as next version (with correct current_version_number) self._save_version_snapshot( feature_view.name, project, next_ver, fv_type_str, snapshot_proto_bytes, ) break except IntegrityError: if attempt == max_retries - 1: raise ConcurrentVersionConflict( f"Failed to assign version for '{feature_view.name}' after " f"{max_retries} attempts due to concurrent applies. " f"Please retry." ) # Re-read the next available version number next_ver = self._get_next_version_number(feature_view.name, project) if no_promote: # Save version snapshot but skip updating the active row. # The new version is accessible only via explicit @v reads. return # Re-serialize with updated version number with self.write_engine.begin() as conn: update_stmt = ( update(fv_table) .where( fv_table.c.feature_view_name == feature_view.name, fv_table.c.project_id == project, ) .values( feature_view_proto=snapshot_proto_bytes, ) ) conn.execute(update_stmt) else: # New FV: save as v0 feature_view.current_version_number = 0 snapshot_proto = feature_view.to_proto() snapshot_proto.spec.project = project snapshot_proto_bytes = snapshot_proto.SerializeToString() self._save_version_snapshot( feature_view.name, project, 0, fv_type_str, snapshot_proto_bytes, ) with self.write_engine.begin() as conn: update_stmt = ( update(fv_table) .where( fv_table.c.feature_view_name == feature_view.name, fv_table.c.project_id == project, ) .values( feature_view_proto=snapshot_proto_bytes, ) ) conn.execute(update_stmt) def apply_feature_service( self, feature_service: FeatureService, project: str, commit: bool = True ): return self._apply_object( feature_services, project, "feature_service_name", feature_service, "feature_service_proto", ) def delete_data_source(self, name: str, project: str, commit: bool = True): with self.write_engine.begin() as conn: stmt = delete(data_sources).where( data_sources.c.data_source_name == name, data_sources.c.project_id == project, ) rows = conn.execute(stmt) if rows.rowcount < 1: raise DataSourceObjectNotFoundException(name, project) def _list_feature_services( self, project: str, tags: Optional[dict[str, str]] ) -> List[FeatureService]: return self._list_objects( feature_services, project, FeatureServiceProto, FeatureService, "feature_service_proto", tags=tags, ) def _list_feature_views( self, project: str, tags: Optional[dict[str, str]] ) -> List[FeatureView]: return self._list_objects( feature_views, project, FeatureViewProto, FeatureView, "feature_view_proto", tags=tags, ) def _list_saved_datasets( self, project: str, tags: Optional[dict[str, str]] = None ) -> List[SavedDataset]: return self._list_objects( saved_datasets, project, SavedDatasetProto, SavedDataset, "saved_dataset_proto", tags=tags, ) def _list_on_demand_feature_views( self, project: str, tags: Optional[dict[str, str]] ) -> List[OnDemandFeatureView]: return self._list_objects( on_demand_feature_views, project, OnDemandFeatureViewProto, OnDemandFeatureView, "feature_view_proto", tags=tags, ) def _list_project_metadata(self, project: str) -> List[ProjectMetadata]: with self.read_engine.begin() as conn: stmt = select(feast_metadata).where( feast_metadata.c.project_id == project, ) rows = conn.execute(stmt).all() if rows: project_metadata = ProjectMetadata(project_name=project) for row in rows: if ( row._mapping["metadata_key"] == FeastMetadataKeys.PROJECT_UUID.value ): project_metadata.project_uuid = row._mapping["metadata_value"] break # TODO(adchia): Add other project metadata in a structured way return [project_metadata] return [] def apply_saved_dataset( self, saved_dataset: SavedDataset, project: str, commit: bool = True, ): return self._apply_object( saved_datasets, project, "saved_dataset_name", saved_dataset, "saved_dataset_proto", ) def apply_validation_reference( self, validation_reference: ValidationReference, project: str, commit: bool = True, ): return self._apply_object( validation_references, project, "validation_reference_name", validation_reference, "validation_reference_proto", ) def apply_materialization( self, feature_view: Union[FeatureView, OnDemandFeatureView], project: str, start_date: datetime, end_date: datetime, commit: bool = True, ): table = self._infer_fv_table(feature_view) python_class, proto_class = self._infer_fv_classes(feature_view) if python_class in {OnDemandFeatureView}: raise ValueError( f"Cannot apply materialization for feature {feature_view.name} of type {python_class}" ) fv: Union[FeatureView, StreamFeatureView] = self._get_object( table, feature_view.name, project, proto_class, python_class, "feature_view_name", "feature_view_proto", FeatureViewNotFoundException, ) fv.materialization_intervals.append((start_date, end_date)) self._apply_object( table, project, "feature_view_name", fv, "feature_view_proto" ) def delete_validation_reference(self, name: str, project: str, commit: bool = True): self._delete_object( validation_references, name, project, "validation_reference_name", ValidationReferenceNotFound, ) def update_infra(self, infra: Infra, project: str, commit: bool = True): self._apply_object( table=managed_infra, project=project, id_field_name="infra_name", obj=infra, proto_field_name="infra_proto", name="infra_obj", ) def _get_infra(self, project: str) -> Infra: infra_object = self._get_object( table=managed_infra, name="infra_obj", project=project, proto_class=InfraProto, python_class=Infra, id_field_name="infra_name", proto_field_name="infra_proto", not_found_exception=None, ) if infra_object: return infra_object return Infra() def apply_user_metadata( self, project: str, feature_view: BaseFeatureView, metadata_bytes: Optional[bytes], ): table = self._infer_fv_table(feature_view) name = feature_view.name with self.write_engine.begin() as conn: stmt = select(table).where( getattr(table.c, "feature_view_name") == name, table.c.project_id == project, ) row = conn.execute(stmt).first() update_datetime = _utc_now() update_time = int(update_datetime.timestamp()) if row: values = { "user_metadata": metadata_bytes, "last_updated_timestamp": update_time, } update_stmt = ( update(table) .where( getattr(table.c, "feature_view_name") == name, table.c.project_id == project, ) .values( values, ) ) conn.execute(update_stmt) else: raise FeatureViewNotFoundException(feature_view.name, project=project) def _infer_fv_table(self, feature_view): if isinstance(feature_view, StreamFeatureView): table = stream_feature_views elif isinstance(feature_view, FeatureView): table = feature_views elif isinstance(feature_view, OnDemandFeatureView): table = on_demand_feature_views else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") return table def _infer_fv_classes(self, feature_view): if isinstance(feature_view, StreamFeatureView): python_class, proto_class = StreamFeatureView, StreamFeatureViewProto elif isinstance(feature_view, FeatureView): python_class, proto_class = FeatureView, FeatureViewProto elif isinstance(feature_view, OnDemandFeatureView): python_class, proto_class = OnDemandFeatureView, OnDemandFeatureViewProto else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") return python_class, proto_class def _infer_fv_type_string(self, feature_view) -> str: if isinstance(feature_view, StreamFeatureView): return "stream_feature_view" elif isinstance(feature_view, FeatureView): return "feature_view" elif isinstance(feature_view, OnDemandFeatureView): return "on_demand_feature_view" else: raise ValueError(f"Unexpected feature view type: {type(feature_view)}") def _proto_class_for_type(self, fv_type: str): if fv_type == "stream_feature_view": return StreamFeatureViewProto, StreamFeatureView elif fv_type == "feature_view": return FeatureViewProto, FeatureView elif fv_type == "on_demand_feature_view": return OnDemandFeatureViewProto, OnDemandFeatureView else: raise ValueError(f"Unknown feature view type: {fv_type}") def _get_next_version_number(self, name: str, project: str) -> int: with self.write_engine.begin() as conn: stmt = select( func.coalesce( func.max(feature_view_version_history.c.version_number) + 1, 0 ) ).where( feature_view_version_history.c.feature_view_name == name, feature_view_version_history.c.project_id == project, ) result = conn.execute(stmt).scalar() return result or 0 def _save_version_snapshot( self, name: str, project: str, version_number: int, fv_type: str, proto_bytes: bytes, ): now = int(_utc_now().timestamp()) vid = generate_version_id() with self.write_engine.begin() as conn: stmt = insert(feature_view_version_history).values( feature_view_name=name, project_id=project, version_number=version_number, feature_view_type=fv_type, feature_view_proto=proto_bytes, created_timestamp=now, description="", version_id=vid, ) conn.execute(stmt) def _get_version_snapshot( self, name: str, project: str, version_number: int ) -> Optional[tuple]: with self.read_engine.begin() as conn: stmt = select(feature_view_version_history).where( feature_view_version_history.c.feature_view_name == name, feature_view_version_history.c.project_id == project, feature_view_version_history.c.version_number == version_number, ) row = conn.execute(stmt).first() if row: return ( row._mapping["feature_view_type"], row._mapping["feature_view_proto"], ) return None def get_feature_view_by_version( self, name: str, project: str, version_number: int, allow_cache: bool = False ) -> BaseFeatureView: snapshot = self._get_version_snapshot(name, project, version_number) if snapshot is None: raise FeatureViewVersionNotFound(name, version_tag(version_number), project) snap_type, snap_proto_bytes = snapshot proto_class, python_class = self._proto_class_for_type(snap_type) snap_proto = proto_class.FromString(snap_proto_bytes) fv = python_class.from_proto(snap_proto) fv.current_version_number = version_number return fv def list_feature_view_versions( self, name: str, project: str ) -> List[Dict[str, Any]]: with self.read_engine.begin() as conn: stmt = ( select(feature_view_version_history) .where( feature_view_version_history.c.feature_view_name == name, feature_view_version_history.c.project_id == project, ) .order_by(feature_view_version_history.c.version_number) ) rows = conn.execute(stmt).all() return [ { "version": version_tag(row._mapping["version_number"]), "version_number": row._mapping["version_number"], "feature_view_type": row._mapping["feature_view_type"], "created_timestamp": datetime.fromtimestamp( row._mapping["created_timestamp"], tz=timezone.utc ), "version_id": row._mapping["version_id"], } for row in rows ] def get_user_metadata( self, project: str, feature_view: BaseFeatureView ) -> Optional[bytes]: table = self._infer_fv_table(feature_view) name = feature_view.name with self.read_engine.begin() as conn: stmt = select(table).where(getattr(table.c, "feature_view_name") == name) row = conn.execute(stmt).first() if row: return row._mapping["user_metadata"] else: raise FeatureViewNotFoundException(feature_view.name, project=project) def proto(self) -> RegistryProto: r = RegistryProto() last_updated_timestamps = [] def process_project(project: Project): nonlocal r, last_updated_timestamps project_name = project.name last_updated_timestamp = project.last_updated_timestamp r.projects.extend([project.to_proto()]) last_updated_timestamps.append(last_updated_timestamp) for lister, registry_proto_field in [ (self.list_entities, r.entities), (self.list_feature_views, r.feature_views), (self.list_data_sources, r.data_sources), (self.list_on_demand_feature_views, r.on_demand_feature_views), (self.list_stream_feature_views, r.stream_feature_views), (self.list_feature_services, r.feature_services), (self.list_saved_datasets, r.saved_datasets), (self.list_validation_references, r.validation_references), (self.list_permissions, r.permissions), ]: objs: List[Any] = lister(project_name, allow_cache=False) # type: ignore if objs: obj_protos = [obj.to_proto() for obj in objs] for obj_proto in obj_protos: if "spec" in obj_proto.DESCRIPTOR.fields_by_name: obj_proto.spec.project = project_name else: obj_proto.project = project_name registry_proto_field.extend(obj_protos) # This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783, # the registry proto only has a single infra field, which we're currently setting as the "last" project. r.infra.CopyFrom(self.get_infra(project_name).to_proto()) projects_list = self.list_projects(allow_cache=False) if self.thread_pool_executor_worker_count == 0: for project in projects_list: process_project(project) else: with ThreadPoolExecutor( max_workers=self.thread_pool_executor_worker_count ) as executor: executor.map(process_project, projects_list) if last_updated_timestamps: r.last_updated.FromDatetime(max(last_updated_timestamps)) return r def commit(self): # This method is a no-op since we're always writing values eagerly to the db. pass def _initialize_project_if_not_exists(self, project_name: str): try: self.get_project(project_name, allow_cache=True) return except ProjectObjectNotFoundException: try: self.get_project(project_name, allow_cache=False) return except ProjectObjectNotFoundException: self.apply_project(Project(name=project_name), commit=True) def _apply_object( self, table: Table, project: str, id_field_name: str, obj: Any, proto_field_name: str, name: Optional[str] = None, ): if not self.purge_feast_metadata: self._maybe_init_project_metadata(project) # Initialize project is necessary because FeatureStore object can apply objects individually without "feast apply" cli option if not isinstance(obj, Project): self._initialize_project_if_not_exists(project_name=project) name = name or (obj.name if hasattr(obj, "name") else None) assert name, f"name needs to be provided for {obj}" with self.write_engine.begin() as conn: update_datetime = _utc_now() update_time = int(update_datetime.timestamp()) stmt = select(table).where( getattr(table.c, id_field_name) == name, table.c.project_id == project ) row = conn.execute(stmt).first() if row: if proto_field_name in [ "entity_proto", "saved_dataset_proto", "feature_view_proto", "feature_service_proto", "permission_proto", "project_proto", "data_source_proto", ]: deserialized_proto = self.deserialize_registry_values( row._mapping[proto_field_name], type(obj) ) if deserialized_proto is not None: # Check if the object has actually changed (same as feature views) existing_obj = type(obj).from_proto(deserialized_proto) if existing_obj == obj: return # No changes, exit early # Object has changed, preserve created_timestamp, update last_updated_timestamp obj.created_timestamp = deserialized_proto.meta.created_timestamp.ToDatetime().replace( tzinfo=timezone.utc ) if hasattr(obj, "last_updated_timestamp"): obj.last_updated_timestamp = update_datetime if isinstance(obj, (FeatureView, StreamFeatureView)): obj.update_materialization_intervals( type(obj) .from_proto(deserialized_proto) .materialization_intervals ) values = { proto_field_name: obj.to_proto().SerializeToString(), "last_updated_timestamp": update_time, } update_stmt = ( update(table) .where( getattr(table.c, id_field_name) == name, table.c.project_id == project, ) .values( values, ) ) conn.execute(update_stmt) else: # Creating new object - set timestamps consistently for all objects if hasattr(obj, "created_timestamp"): obj.created_timestamp = update_datetime if hasattr(obj, "last_updated_timestamp"): obj.last_updated_timestamp = update_datetime obj_proto = obj.to_proto() if hasattr(obj_proto, "meta") and hasattr( obj_proto.meta, "created_timestamp" ): if not obj_proto.meta.HasField("created_timestamp"): obj_proto.meta.created_timestamp.FromDatetime(update_datetime) values = { id_field_name: name, proto_field_name: obj_proto.SerializeToString(), "last_updated_timestamp": update_time, "project_id": project, } try: with conn.begin_nested(): conn.execute(insert(table).values(values)) except IntegrityError: logger.info( "Object %s in project %s already created by another " "process, skipping.", name, project, ) if not isinstance(obj, Project): self.apply_project( self.get_project(name=project, allow_cache=False), commit=True ) if not self.purge_feast_metadata: self._set_last_updated_metadata(update_datetime, project, conn) if self.cache_mode == "sync": self.refresh() def _maybe_init_project_metadata(self, project): # Initialize project metadata if needed with self.write_engine.begin() as conn: update_datetime = _utc_now() update_time = int(update_datetime.timestamp()) stmt = select(feast_metadata).where( feast_metadata.c.metadata_key == FeastMetadataKeys.PROJECT_UUID.value, feast_metadata.c.project_id == project, ) row = conn.execute(stmt).first() if not row: new_project_uuid = f"{uuid.uuid4()}" values = { "metadata_key": FeastMetadataKeys.PROJECT_UUID.value, "metadata_value": new_project_uuid, "last_updated_timestamp": update_time, "project_id": project, } try: with conn.begin_nested(): conn.execute(insert(feast_metadata).values(values)) except IntegrityError: logger.info( "Project metadata for %s already initialized by " "another process.", project, ) def _delete_object( self, table: Table, name: str, project: str, id_field_name: str, not_found_exception: Optional[Callable], ): with self.write_engine.begin() as conn: stmt = delete(table).where( getattr(table.c, id_field_name) == name, table.c.project_id == project ) rows = conn.execute(stmt) if rows.rowcount < 1 and not_found_exception: raise not_found_exception(name, project) self.apply_project( self.get_project(name=project, allow_cache=False), commit=True ) if not self.purge_feast_metadata: self._set_last_updated_metadata(_utc_now(), project, conn) if self.cache_mode == "sync": self.refresh() return rows.rowcount def _get_object( self, table: Table, name: str, project: str, proto_class: Any, python_class: Any, id_field_name: str, proto_field_name: str, not_found_exception: Optional[Callable], ): with self.read_engine.begin() as conn: stmt = select(table).where( getattr(table.c, id_field_name) == name, table.c.project_id == project ) row = conn.execute(stmt).first() if row: _proto = proto_class.FromString(row._mapping[proto_field_name]) return python_class.from_proto(_proto) if not_found_exception: raise not_found_exception(name, project) else: return None def _list_objects( self, table: Table, project: str, proto_class: Any, python_class: Any, proto_field_name: str, tags: Optional[dict[str, str]] = None, ): with self.read_engine.begin() as conn: stmt = select(table).where(table.c.project_id == project) rows = conn.execute(stmt).all() if rows: objects = [] for row in rows: obj = python_class.from_proto( proto_class.FromString(row._mapping[proto_field_name]) ) if utils.has_all_tags(obj.tags, tags): objects.append(obj) return objects return [] def _set_last_updated_metadata( self, last_updated: datetime, project: str, conn=None ): if conn is None: with self.write_engine.begin() as conn: self._set_last_updated_metadata(last_updated, project, conn) return stmt = select(feast_metadata).where( feast_metadata.c.metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, feast_metadata.c.project_id == project, ) row = conn.execute(stmt).first() update_time = int(last_updated.timestamp()) values = { "metadata_key": FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, "metadata_value": f"{update_time}", "last_updated_timestamp": update_time, "project_id": project, } if row: update_stmt = ( update(feast_metadata) .where( feast_metadata.c.metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, feast_metadata.c.project_id == project, ) .values(values) ) conn.execute(update_stmt) else: insert_stmt = insert(feast_metadata).values( values, ) conn.execute(insert_stmt) def _get_last_updated_metadata(self, project: str): with self.read_engine.begin() as conn: stmt = select(feast_metadata).where( feast_metadata.c.metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, feast_metadata.c.project_id == project, ) row = conn.execute(stmt).first() if not row: return None update_time = int(row._mapping["last_updated_timestamp"]) return datetime.fromtimestamp(update_time, tz=timezone.utc) def _get_permission(self, name: str, project: str) -> Permission: return self._get_object( table=permissions, name=name, project=project, proto_class=PermissionProto, python_class=Permission, id_field_name="permission_name", proto_field_name="permission_proto", not_found_exception=PermissionNotFoundException, ) def _list_permissions( self, project: str, tags: Optional[dict[str, str]] ) -> List[Permission]: return self._list_objects( permissions, project, PermissionProto, Permission, "permission_proto", tags=tags, ) def apply_permission( self, permission: Permission, project: str, commit: bool = True ): return self._apply_object( permissions, project, "permission_name", permission, "permission_proto" ) def delete_permission(self, name: str, project: str, commit: bool = True): with self.write_engine.begin() as conn: stmt = delete(permissions).where( permissions.c.permission_name == name, permissions.c.project_id == project, ) rows = conn.execute(stmt) if rows.rowcount < 1: raise PermissionNotFoundException(name, project) def _list_projects( self, tags: Optional[dict[str, str]], ) -> List[Project]: with self.read_engine.begin() as conn: stmt = select(projects) rows = conn.execute(stmt).all() if rows: objects = [] for row in rows: obj = Project.from_proto( ProjectProto.FromString(row._mapping["project_proto"]) ) if utils.has_all_tags(obj.tags, tags): objects.append(obj) return objects return [] def _get_project( self, name: str, ) -> Project: return self._get_object( table=projects, name=name, project=name, proto_class=ProjectProto, python_class=Project, id_field_name="project_name", proto_field_name="project_proto", not_found_exception=ProjectObjectNotFoundException, ) def apply_project( self, project: Project, commit: bool = True, ): return self._apply_object( projects, project.name, "project_name", project, "project_proto" ) def delete_project( self, name: str, commit: bool = True, ): project = self.get_project(name, allow_cache=False) if project: with self.write_engine.begin() as conn: for t in { managed_infra, saved_datasets, validation_references, feature_services, feature_views, on_demand_feature_views, stream_feature_views, data_sources, entities, permissions, feast_metadata, projects, }: stmt = delete(t).where(t.c.project_id == name) conn.execute(stmt) return raise ProjectNotFoundException(name) def set_project_metadata(self, project: str, key: str, value: str, conn=None): """Set a custom project metadata key-value pair in the feast_metadata table.""" from feast.utils import _utc_now update_time = int(_utc_now().timestamp()) if conn is None: with self.write_engine.begin() as conn: self.set_project_metadata(project, key, value, conn) return stmt = select(feast_metadata).where( feast_metadata.c.project_id == project, feast_metadata.c.metadata_key == key, ) row = conn.execute(stmt).first() values = { "metadata_key": key, "metadata_value": value, "last_updated_timestamp": update_time, "project_id": project, } if row: update_stmt = ( update(feast_metadata) .where( feast_metadata.c.project_id == project, feast_metadata.c.metadata_key == key, ) .values(values) ) conn.execute(update_stmt) else: insert_stmt = insert(feast_metadata).values(values) conn.execute(insert_stmt) def get_project_metadata(self, project: str, key: str) -> Optional[str]: """Get a custom project metadata value by key from the feast_metadata table.""" with self.read_engine.begin() as conn: stmt = select(feast_metadata).where( feast_metadata.c.project_id == project, feast_metadata.c.metadata_key == key, ) row = conn.execute(stmt).first() if row: return row._mapping["metadata_value"] return None