diff --git a/.secrets.baseline b/.secrets.baseline index 96bf780809c..e1ad949904f 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -1510,7 +1510,7 @@ "filename": "sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py", "hashed_secret": "95433727ea51026e1e0dc8deadaabd4a3baaaaf4", "is_verified": false, - "line_number": 19 + "line_number": 21 } ], "sdk/python/tests/universal/feature_repos/universal/online_store/singlestore.py": [ diff --git a/infra/scripts/compile-templates.py b/infra/scripts/compile-templates.py index e3130ab419a..6c582a90c94 100644 --- a/infra/scripts/compile-templates.py +++ b/infra/scripts/compile-templates.py @@ -28,7 +28,7 @@ def find_repo(path): # Template README.md ############################ roadmap_path = repo_root / "docs" / "roadmap.md" -with open(roadmap_path, "r") as f: +with open(roadmap_path, "r", encoding="utf-8") as f: # skip first lines since it has the title roadmap_contents_lines = f.readlines()[2:] @@ -36,7 +36,7 @@ def find_repo(path): roadmap_contents = "".join(roadmap_contents_lines) template_path = repo_root / "infra" / "templates" / "README.md.jinja2" -with open(template_path) as f: +with open(template_path, encoding="utf-8") as f: template = Template(f.read()) # Compile template @@ -49,5 +49,5 @@ def find_repo(path): ) readme_path = repo_root / "README.md" -with open(readme_path, "w") as f: +with open(readme_path, "w", encoding="utf-8") as f: f.write(readme_md) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 515a6c39b11..f814e76c781 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -142,7 +142,7 @@ class VersionedOnlineReadNotSupported(FeastError): def __init__(self, store_name: str, version: int): super().__init__( f"Versioned feature reads (@v{version}) are not yet supported by {store_name}. " - f"Currently only SQLite, PostgreSQL, MySQL, FAISS, Redis, and DynamoDB support version-qualified feature references. " + f"Currently only SQLite, PostgreSQL, MySQL, FAISS, Redis, DynamoDB, and SingleStore support version-qualified feature references. " ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f95bbf10c03..ed4be94a7c4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1335,7 +1335,9 @@ def teardown(self): entities = self.list_entities() - self._get_provider().teardown_infra(self.project, tables, entities) + self._get_provider().teardown_infra( + self.project, tables, entities, registry=self.registry + ) self.registry.teardown() def get_historical_features( diff --git a/sdk/python/feast/infra/online_stores/bigtable.py b/sdk/python/feast/infra/online_stores/bigtable.py index 3479f7f289a..4d35faf626a 100644 --- a/sdk/python/feast/infra/online_stores/bigtable.py +++ b/sdk/python/feast/infra/online_stores/bigtable.py @@ -13,6 +13,7 @@ from feast.feature_view import DUMMY_ENTITY_NAME from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -306,6 +307,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): # Because of historical reasons, Feast calls them tables. We use this alias for # readability. diff --git a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py index 0870bc709db..210f2515ac8 100644 --- a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py @@ -460,6 +460,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the database. diff --git a/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py b/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py index c80f9e1285c..086e5cbfb6f 100644 --- a/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py +++ b/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py @@ -274,6 +274,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the database. diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 9ae10792f5a..0c577521d9b 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -139,6 +139,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 1998167e4b0..b064944aad4 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -124,6 +124,8 @@ class DynamoDBOnlineStore(OnlineStore): _type_deserializer: Cached TypeDeserializer instance for performance. """ + supports_versioned_online_reads = True + _dynamodb_client = None _dynamodb_resource = None # Class-level cached TypeDeserializer to avoid per-request instantiation @@ -359,6 +361,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the DynamoDB Online Store. diff --git a/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py b/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py index b78d003ac25..c5a799be248 100644 --- a/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py @@ -263,6 +263,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): project = config.project try: diff --git a/sdk/python/feast/infra/online_stores/faiss_online_store.py b/sdk/python/feast/infra/online_stores/faiss_online_store.py index dfa7d6c376b..05e9d0627eb 100644 --- a/sdk/python/feast/infra/online_stores/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/faiss_online_store.py @@ -51,6 +51,8 @@ def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) class FaissOnlineStore(OnlineStore): _logger: logging.Logger = logging.getLogger(__name__) + supports_versioned_online_reads = True + def __init__(self): super().__init__() self._indices: Dict[str, faiss.IndexIVFFlat] = {} @@ -100,6 +102,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): versioning = config.registry.enable_online_feature_view_versioning for table in tables: diff --git a/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py b/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py index 21359b45bca..16a6a2ea9db 100644 --- a/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py +++ b/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py @@ -299,6 +299,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): online_store_config = config.online_store if not isinstance(online_store_config, HazelcastOnlineStoreConfig): diff --git a/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py index dc48d2c4efc..5942874a0b9 100644 --- a/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py @@ -212,6 +212,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the Hbase Online Store. diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py index 59ef9185c1f..5e4766a0706 100644 --- a/sdk/python/feast/infra/online_stores/helpers.py +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -1,9 +1,10 @@ import struct from datetime import datetime, timezone -from typing import Any, List +from typing import Any, List, Optional import mmh3 +from feast.feature_view import FeatureView from feast.importer import import_class from feast.infra.key_encoding_utils import ( serialize_entity_key, @@ -72,8 +73,25 @@ def _to_naive_utc(ts: datetime) -> datetime: return ts.astimezone(tz=timezone.utc).replace(tzinfo=None) -def compute_versioned_name(table: Any, enable_versioning: bool = False) -> str: - """Return the table name with a ``_v{N}`` suffix when versioning is enabled.""" +def online_store_table_id( + project: str, + table: FeatureView, + enable_versioning: bool = False, + version: Optional[int] = None, +) -> str: + name = table.name + if enable_versioning: + resolved_version = version + if resolved_version is None: + resolved_version = getattr(table.projection, "version_tag", None) + if resolved_version is None: + resolved_version = getattr(table, "current_version_number", None) + if resolved_version is not None and resolved_version > 0: + name = f"{table.name}_v{resolved_version}" + return f"{project}_{name}" + + +def compute_versioned_name(table: FeatureView, enable_versioning: bool = False) -> str: name = table.name if enable_versioning: version = getattr(table.projection, "version_tag", None) @@ -85,5 +103,4 @@ def compute_versioned_name(table: Any, enable_versioning: bool = False) -> str: def compute_table_id(project: str, table: Any, enable_versioning: bool = False) -> str: - """Build the online-store table name, appending a version suffix when versioning is enabled.""" - return f"{project}_{compute_versioned_name(table, enable_versioning)}" + return online_store_table_id(project, table, enable_versioning) diff --git a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py index 8faefdbd344..31c3a67cdbb 100644 --- a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py +++ b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py @@ -47,6 +47,7 @@ from feast import Entity, FeatureView, RepoConfig from feast.infra.online_stores.helpers import get_online_store_from_config from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, get_online_config_from_type @@ -294,6 +295,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): """ Teardown all managed online stores for the given FeatureViews and Entities. @@ -324,4 +326,6 @@ def teardown( online_store = self._get_online_store(tribe, config) if online_store: config = RepoConfig(**self._prepare_repo_conf(config, tribe)) - online_store.teardown(config, tables, entities) + online_store.teardown( + config, tables, entities, registry=registry + ) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index e33765c1ecf..8401d4e51ae 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -509,6 +509,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): self.client = self._connect(config) for table in tables: diff --git a/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py b/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py index 3e7a3db84c8..30bd50f44ae 100644 --- a/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py +++ b/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py @@ -260,6 +260,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Drop the backing collection and close the client. diff --git a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py index 415184ea248..9dbe32842b2 100644 --- a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py +++ b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py @@ -12,6 +12,7 @@ from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.helpers import compute_table_id from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel @@ -43,6 +44,10 @@ class MySQLOnlineStore(OnlineStore): _conn: Optional[Connection] = None + @property + def supports_versioned_online_reads(self) -> bool: + return True + def _get_conn(self, config: RepoConfig) -> Connection: online_store_config = config.online_store assert isinstance(online_store_config, MySQLOnlineStoreConfig) @@ -78,7 +83,7 @@ def online_write_batch( for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=3, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() timestamp = to_naive_utc(timestamp) if created_ts is not None: @@ -100,14 +105,14 @@ def online_write_batch( if progress: progress(1) else: - batch_size = config.online_store.bacth_size + batch_size = config.online_store.batch_size if not batch_size or batch_size < 2: raise ValueError("Batch size must be at least 2") insert_values = [] for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=2, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() timestamp = to_naive_utc(timestamp) if created_ts is not None: @@ -223,7 +228,7 @@ def online_read( for entity_key in entity_keys: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=3, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() cur.execute( @@ -296,6 +301,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ) -> None: conn = self._get_conn(config) cur = conn.cursor() diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index c3fda86cc5e..877242f669d 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -39,6 +39,10 @@ class OnlineStore(ABC): The interface that Feast uses to interact with the storage system that handles online features. """ + @property + def supports_versioned_online_reads(self) -> bool: + return False + @property def async_supported(self) -> SupportedAsyncMethods: return SupportedAsyncMethods() @@ -189,7 +193,7 @@ def get_online_features( ) # Check for versioned reads on unsupported stores - self._check_versioned_read_support(grouped_refs) + self._check_versioned_read_support(grouped_refs, config) _track_read = False try: from feast.metrics import _config as _metrics_config @@ -253,55 +257,35 @@ def get_online_features( ) return OnlineResponse(online_features_response, feature_types=feature_types) - def _check_versioned_read_support(self, grouped_refs): + def _check_versioned_read_support( + self, grouped_refs, config: Optional[RepoConfig] = None + ): """Raise an error if versioned reads are attempted on unsupported stores.""" from feast.infra.online_stores.sqlite import SqliteOnlineStore - supported_types: list[type] = [SqliteOnlineStore] - try: - from feast.infra.online_stores.mysql_online_store.mysql import ( - MySQLOnlineStore, - ) - - supported_types.append(MySQLOnlineStore) - except ImportError: - pass - try: - from feast.infra.online_stores.postgres_online_store.postgres import ( - PostgreSQLOnlineStore, - ) - - supported_types.append(PostgreSQLOnlineStore) - except ImportError: - pass - try: - from feast.infra.online_stores.faiss_online_store import FaissOnlineStore - - supported_types.append(FaissOnlineStore) - except ImportError: - pass - try: - from feast.infra.online_stores.redis import RedisOnlineStore - - supported_types.append(RedisOnlineStore) - except Exception: - pass - try: - from feast.infra.online_stores.dynamodb import DynamoDBOnlineStore - - supported_types.append(DynamoDBOnlineStore) - except Exception: - pass - - if isinstance(self, tuple(supported_types)): + if config is None: return for table, _ in grouped_refs: version_tag = getattr(table.projection, "version_tag", None) - if version_tag is not None: + if version_tag is None: + continue + + # Version-qualified refs (e.g. @v2) are only supported when online versioning is enabled. + if not config.registry.enable_online_feature_view_versioning: raise VersionedOnlineReadNotSupported( self.__class__.__name__, version_tag ) + # Online versioning enabled: allow stores that implement versioned routing. + if self.supports_versioned_online_reads: + continue + + # SQLite is always allowed, since it is the reference implementation. + if isinstance(self, SqliteOnlineStore): + continue + + raise VersionedOnlineReadNotSupported(self.__class__.__name__, version_tag) + async def get_online_features_async( self, config: RepoConfig, @@ -346,7 +330,7 @@ async def get_online_features_async( ) # Check for versioned reads on unsupported stores - self._check_versioned_read_support(grouped_refs) + self._check_versioned_read_support(grouped_refs, config) async def query_table(table, requested_features): # Get the correct set of entity values with the correct join keys. @@ -488,6 +472,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): """ Tears down all cloud resources for the specified set of Feast objects. diff --git a/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py b/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py index 4a79349e91c..1fd68b8e948 100644 --- a/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py +++ b/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py @@ -56,6 +56,10 @@ class PostgreSQLOnlineStore(OnlineStore): _conn_async: Optional[AsyncConnection] = None _conn_pool_async: Optional[AsyncConnectionPool] = None + @property + def supports_versioned_online_reads(self) -> bool: + return True + @contextlib.contextmanager def _get_conn( self, config: RepoConfig, autocommit: bool = False @@ -385,6 +389,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): project = config.project schema_name = config.online_store.db_schema or config.online_store.user diff --git a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py index 29a6edf30ad..a221fdb7043 100644 --- a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py +++ b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py @@ -247,6 +247,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): project = config.project try: diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 1868a32792d..41416b4286e 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -39,6 +39,7 @@ compute_versioned_name, ) from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel @@ -108,6 +109,8 @@ class RedisOnlineStore(OnlineStore): None ) + supports_versioned_online_reads = True + def delete_entity_values(self, config: RepoConfig, join_keys: List[str]): client = self._get_client(config.online_store) deleted_count = 0 @@ -190,6 +193,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): """ We delete the keys in redis for tables/views being removed. diff --git a/sdk/python/feast/infra/online_stores/remote.py b/sdk/python/feast/infra/online_stores/remote.py index 9bead1fcb9d..96b5c16cd09 100644 --- a/sdk/python/feast/infra/online_stores/remote.py +++ b/sdk/python/feast/infra/online_stores/remote.py @@ -24,6 +24,7 @@ from feast import Entity, FeatureView, RepoConfig from feast.infra.online_stores.helpers import _to_naive_utc from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.permissions.client.http_auth_requests_wrapper import HttpSessionManager from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -617,6 +618,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): pass diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index eb598ec5e7a..319f38d5733 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import logging from collections import defaultdict from datetime import datetime from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple @@ -11,12 +12,15 @@ from feast import Entity, FeatureView, RepoConfig from feast.infra.key_encoding_utils import serialize_entity_key -from feast.infra.online_stores.helpers import _to_naive_utc +from feast.infra.online_stores.helpers import _to_naive_utc, online_store_table_id from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel +logger = logging.getLogger(__name__) + class SingleStoreOnlineStoreConfig(FeastConfigBaseModel): """ @@ -41,6 +45,10 @@ class SingleStoreOnlineStore(OnlineStore): _conn: Optional[Connection] = None + @property + def supports_versioned_online_reads(self) -> bool: + return True + def _init_conn(self, config: RepoConfig) -> Connection: online_store_config = config.online_store assert isinstance(online_store_config, SingleStoreOnlineStoreConfig) @@ -80,7 +88,7 @@ def online_write_batch( for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=3, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() timestamp = _to_naive_utc(timestamp) if created_ts is not None: @@ -102,7 +110,7 @@ def online_write_batch( current_batch = insert_values[i : i + batch_size] cur.executemany( f""" - INSERT INTO {_table_id(project, table)} + INSERT INTO {_quote_identifier(_table_id(project, table, config.registry.enable_online_feature_view_versioning))} (entity_key, feature_name, value, event_ts, created_ts) values (%s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE @@ -130,7 +138,7 @@ def online_read( keys.append( serialize_entity_key( entity_key, - entity_key_serialization_version=3, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() ) @@ -138,7 +146,7 @@ def online_read( entity_key_placeholders = ",".join(["%s" for _ in keys]) cur.execute( f""" - SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)} + SELECT entity_key, feature_name, value, event_ts FROM {_quote_identifier(_table_id(project, table, config.registry.enable_online_feature_view_versioning))} WHERE entity_key IN ({entity_key_placeholders}) ORDER BY event_ts; """, @@ -151,7 +159,7 @@ def online_read( ) cur.execute( f""" - SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)} + SELECT entity_key, feature_name, value, event_ts FROM {_quote_identifier(_table_id(project, table, config.registry.enable_online_feature_view_versioning))} WHERE entity_key IN ({entity_key_placeholders}) and feature_name IN ({requested_features_placeholders}) ORDER BY event_ts; """, @@ -191,39 +199,105 @@ def update( partial: bool, ) -> None: project = config.project + versioning = config.registry.enable_online_feature_view_versioning with self._get_cursor(config) as cur: # We don't create any special state for the entities in this implementation. for table in tables_to_keep: + table_name = _table_id(project, table, versioning) cur.execute( - f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), + f"""CREATE TABLE IF NOT EXISTS {_quote_identifier(table_name)} (entity_key VARCHAR(512), feature_name VARCHAR(256), value BLOB, event_ts timestamp NULL DEFAULT NULL, created_ts timestamp NULL DEFAULT NULL, PRIMARY KEY(entity_key, feature_name), - INDEX {_table_id(project, table)}_ek (entity_key))""" + INDEX {_quote_identifier(table_name + "_ek")} (entity_key))""" ) for table in tables_to_delete: - _drop_table_and_index(cur, project, table) + _drop_table_and_index(cur, project, table, versioning) def teardown( self, config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ) -> None: project = config.project + versioning = config.registry.enable_online_feature_view_versioning with self._get_cursor(config) as cur: for table in tables: - _drop_table_and_index(cur, project, table) + if versioning: + _drop_all_version_tables(cur, project, table) + else: + _drop_table_and_index(cur, project, table, enable_versioning=False) -def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None: - table_name = _table_id(project, table) - cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};") - cur.execute(f"DROP TABLE IF EXISTS {table_name}") +def _drop_table_and_index( + cur: Cursor, + project: str, + table: FeatureView, + enable_versioning: bool, + version: Optional[int] = None, +) -> None: + table_name = online_store_table_id(project, table, enable_versioning, version) + table_name_quoted = _quote_identifier(table_name) + index_name_quoted = _quote_identifier(f"{table_name}_ek") + cur.execute(f"DROP INDEX IF EXISTS {index_name_quoted} ON {table_name_quoted};") + cur.execute(f"DROP TABLE IF EXISTS {table_name_quoted}") + + +def _drop_all_version_tables(cur: Cursor, project: str, table: FeatureView) -> None: + base = online_store_table_id(project, table, enable_versioning=False) + cur.execute( + "SELECT table_name FROM information_schema.tables " + "WHERE table_schema = DATABASE() AND (table_name = %s OR table_name REGEXP %s)", + (base, f"^{base}_v[0-9]+$"), + ) + for (name,) in cur.fetchall() or []: + index_name = f"{name}_ek" + cur.execute( + f"DROP INDEX IF EXISTS {_quote_identifier(index_name)} ON {_quote_identifier(name)};" + ) + cur.execute(f"DROP TABLE IF EXISTS {_quote_identifier(name)}") -def _table_id(project: str, table: FeatureView) -> str: - return f"{project}_{table.name}" +def _quote_identifier(identifier: str) -> str: + escaped = identifier.replace("`", "``") + return f"`{escaped}`" + + +def _table_id( + project: str, + table: FeatureView, + enable_versioning: bool = False, + version: Optional[int] = None, +) -> str: + return online_store_table_id(project, table, enable_versioning, version) + + +def _drop_discovered_versioned_tables( + cur: Cursor, project: str, table: FeatureView +) -> None: + base_table_name = online_store_table_id(project, table, enable_versioning=False) + escaped_base_table_name = base_table_name.replace("\\", "\\\\") + escaped_base_table_name = escaped_base_table_name.replace("%", "\\%") + escaped_base_table_name = escaped_base_table_name.replace("_", "\\_") + like_pattern = f"{escaped_base_table_name}\\_v%" + try: + cur.execute("SHOW TABLES LIKE %s", (like_pattern,)) + rows = cur.fetchall() or [] + for row in rows: + table_name = row[0] + index_name = f"{table_name}_ek" + cur.execute( + f"DROP INDEX IF EXISTS {_quote_identifier(index_name)} ON {_quote_identifier(table_name)};" + ) + cur.execute(f"DROP TABLE IF EXISTS {_quote_identifier(table_name)}") + except Exception as e: + logger.warning( + "Failed to discover/drop versioned tables for %s during teardown fallback. Error: %s", + table.name, + e, + ) diff --git a/sdk/python/feast/infra/online_stores/snowflake.py b/sdk/python/feast/infra/online_stores/snowflake.py index d2df674ed94..02a3e7ca3c7 100644 --- a/sdk/python/feast/infra/online_stores/snowflake.py +++ b/sdk/python/feast/infra/online_stores/snowflake.py @@ -10,6 +10,7 @@ from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.utils.snowflake.snowflake_utils import ( GetSnowflakeConnection, execute_snowflake_statement, @@ -253,6 +254,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): assert isinstance(config.online_store, SnowflakeOnlineStoreConfig) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 009e9a8c405..0a32b1d1206 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -45,6 +45,7 @@ from feast.infra.online_stores.helpers import compute_table_id from feast.infra.online_stores.online_store import OnlineStore from feast.infra.online_stores.vector_store import VectorStoreConfig +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto @@ -125,6 +126,10 @@ class SqliteOnlineStore(OnlineStore): _conn: Optional[sqlite3.Connection] = None + @property + def supports_versioned_online_reads(self) -> bool: + return True + @staticmethod def _get_db_path(config: RepoConfig) -> str: assert ( @@ -339,6 +344,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): try: os.unlink(self._get_db_path(config)) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 20334e53a2e..9fa93295566 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -178,9 +178,12 @@ def teardown_infra( project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ) -> None: if self.online_store: - self.online_store.teardown(self.repo_config, tables, entities) + self.online_store.teardown( + self.repo_config, tables, entities, registry=registry + ) if self.batch_engine: self.batch_engine.teardown_infra(project, tables, entities) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 9bdf681fb69..c78c3956331 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -109,6 +109,7 @@ def teardown_infra( project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): """ Tears down all cloud resources for the specified set of Feast objects. diff --git a/sdk/python/pytest.ini b/sdk/python/pytest.ini index 1ad76b978e4..ec89cdc71bb 100644 --- a/sdk/python/pytest.ini +++ b/sdk/python/pytest.ini @@ -4,7 +4,6 @@ env = IS_TEST=True filterwarnings = error::_pytest.warning_types.PytestConfigWarning - error::_pytest.warning_types.PytestUnhandledCoroutineWarning ignore::DeprecationWarning:pyspark.sql.pandas.*: ignore::DeprecationWarning:pyspark.sql.connect.*: ignore::DeprecationWarning:httpx.*: diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 8302e313a2d..6eca8297b09 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -85,7 +85,7 @@ def pytest_configure(config): - if platform in ["darwin", "windows"]: + if platform in ["darwin"] or platform.startswith("win"): multiprocessing.set_start_method("spawn", force=True) else: multiprocessing.set_start_method("fork") diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 82cfc7fb513..b5cad523419 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -67,6 +67,7 @@ def teardown_infra( project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): pass diff --git a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py index 45426c63b8d..fda5f87088c 100644 --- a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py +++ b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py @@ -31,7 +31,7 @@ def test_registration_and_retrieval_from_custom_s3_endpoint( "It may be better to deduplicate AWS configuration or use sub-processes for isolation" ) - os.environ["AWS_ACCESS_KEY_ID"] = "AKIAIOSFODNN7EXAMPLE" + os.environ["AWS_ACCESS_KEY_ID"] = "AKIAIOSFODNN7EXAMPLE" # pragma: allowlist secret os.environ["AWS_SECRET_ACCESS_KEY"] = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" with construct_test_environment(config) as environment: diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 0c27585139e..b17544cee8d 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -283,6 +283,77 @@ def test_write_to_online_store(environment, universal_data_sources): assertpy.assert_that(df["conv_rate"].iloc[0]).is_close_to(0.85, 1e-6) +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["singlestore"]) +def test_singlestore_versioned_online_reads(environment, universal_data_sources): + fs = environment.feature_store + fs.config.registry.enable_online_feature_view_versioning = True + + entities, datasets, data_sources = universal_data_sources + driver_entity = driver() + + # Apply v0 + driver_hourly_stats_v0 = create_driver_hourly_stats_feature_view( + data_sources.driver + ) + fs.apply([driver_hourly_stats_v0, driver_entity]) + + # Write v0 data + df_v0 = pd.DataFrame( + { + "driver_id": [1], + "conv_rate": [0.1], + "acc_rate": [0.2], + "avg_daily_trips": [10], + "driver_metadata": [None], + "driver_config": [None], + "driver_profile": [None], + "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")], + "created": [pd.Timestamp(_utc_now()).round("ms")], + } + ) + fs.write_to_online_store("driver_stats", df_v0) + + # Apply a schema change to create v1 + driver_hourly_stats_v1 = FeatureView( + name="driver_stats", + entities=[driver_entity], + schema=driver_hourly_stats_v0.schema + + [Field(name="new_feature", dtype=Float32)], + source=data_sources.driver, + ttl=driver_hourly_stats_v0.ttl, + tags=TAGS, + ) + fs.apply([driver_hourly_stats_v1, driver_entity]) + + # Write v1 data + df_v1 = pd.DataFrame( + { + "driver_id": [1], + "conv_rate": [0.1], + "acc_rate": [0.2], + "avg_daily_trips": [20], + "new_feature": [1.0], + "driver_metadata": [None], + "driver_config": [None], + "driver_profile": [None], + "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")], + "created": [pd.Timestamp(_utc_now()).round("ms")], + } + ) + fs.write_to_online_store("driver_stats", df_v1) + + # Read v0 and v1 explicitly + df = fs.get_online_features( + features=["driver_stats@v0:avg_daily_trips", "driver_stats@v1:avg_daily_trips"], + entity_rows=[{"driver_id": 1}], + full_feature_names=True, + ).to_df() + + assertpy.assert_that(df["driver_stats@v0__avg_daily_trips"].iloc[0]).is_equal_to(10) + assertpy.assert_that(df["driver_stats@v1__avg_daily_trips"].iloc[0]).is_equal_to(20) + + def _get_online_features_dict_remotely( endpoint: str, features: Union[List[str], FeatureService], diff --git a/sdk/python/tests/unit/infra/online_store/test_online_store_base.py b/sdk/python/tests/unit/infra/online_store/test_online_store_base.py index 11195521deb..3b023c0bd0d 100644 --- a/sdk/python/tests/unit/infra/online_store/test_online_store_base.py +++ b/sdk/python/tests/unit/infra/online_store/test_online_store_base.py @@ -3,6 +3,7 @@ import pytest from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.supported_async_methods import SupportedAsyncMethods @@ -26,7 +27,7 @@ def update( ): pass - def teardown(self, config, tables, entities): + def teardown(self, config, tables, entities, registry: BaseRegistry = None): pass