From b4eed3e9fdd939d3b8142a59d0183be6ba470ca7 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Sun, 29 Mar 2026 23:35:30 +0100 Subject: [PATCH 01/18] Add SingleStore online store support for feature view versioning Signed-off-by: antznette1 --- sdk/python/feast/errors.py | 2 +- .../feast/infra/online_stores/online_store.py | 43 +++++------ .../singlestore_online_store/singlestore.py | 38 ++++++---- sdk/python/pytest.ini | 1 - sdk/python/tests/conftest.py | 2 +- .../online_store/test_universal_online.py | 71 +++++++++++++++++++ 6 files changed, 116 insertions(+), 41 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index fb35ff79de8..c7e11e75e62 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -142,7 +142,7 @@ class VersionedOnlineReadNotSupported(FeastError): def __init__(self, store_name: str, version: int): super().__init__( f"Versioned feature reads (@v{version}) are not yet supported by {store_name}. " - f"Currently only SQLite, PostgreSQL, and MySQL support version-qualified feature references. " + f"Currently only SQLite, PostgreSQL, MySQL, and SingleStore support version-qualified feature references. " ) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 41555ccfb2b..fd5ab8be387 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -189,7 +189,7 @@ def get_online_features( ) # Check for versioned reads on unsupported stores - self._check_versioned_read_support(grouped_refs) + self._check_versioned_read_support(grouped_refs, config) _track_read = False try: from feast.metrics import _config as _metrics_config @@ -253,37 +253,30 @@ def get_online_features( ) return OnlineResponse(online_features_response, feature_types=feature_types) - def _check_versioned_read_support(self, grouped_refs): + def _check_versioned_read_support(self, grouped_refs, config: RepoConfig): """Raise an error if versioned reads are attempted on unsupported stores.""" from feast.infra.online_stores.sqlite import SqliteOnlineStore - - supported_types: list[type] = [SqliteOnlineStore] - try: - from feast.infra.online_stores.mysql_online_store.mysql import ( - MySQLOnlineStore, - ) - - supported_types.append(MySQLOnlineStore) - except ImportError: - pass - try: - from feast.infra.online_stores.postgres_online_store.postgres import ( - PostgreSQLOnlineStore, - ) - - supported_types.append(PostgreSQLOnlineStore) - except ImportError: - pass - - if isinstance(self, tuple(supported_types)): - return for table, _ in grouped_refs: version_tag = getattr(table.projection, "version_tag", None) - if version_tag is not None: + if version_tag is None: + continue + + # Version-qualified refs (e.g. @v2) are only supported when online versioning is enabled. + if not config.registry.enable_online_feature_view_versioning: raise VersionedOnlineReadNotSupported( self.__class__.__name__, version_tag ) + # Online versioning enabled: allow stores that implement versioned routing. + if self.supports_versioned_online_reads: + continue + + # SQLite is always allowed, since it is the reference implementation. + if isinstance(self, SqliteOnlineStore): + continue + + raise VersionedOnlineReadNotSupported(self.__class__.__name__, version_tag) + async def get_online_features_async( self, config: RepoConfig, @@ -328,7 +321,7 @@ async def get_online_features_async( ) # Check for versioned reads on unsupported stores - self._check_versioned_read_support(grouped_refs) + self._check_versioned_read_support(grouped_refs, config) async def query_table(table, requested_features): # Get the correct set of entity values with the correct join keys. diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index eb598ec5e7a..b5ae1178cc1 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -80,7 +80,7 @@ def online_write_batch( for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=3, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() timestamp = _to_naive_utc(timestamp) if created_ts is not None: @@ -102,7 +102,7 @@ def online_write_batch( current_batch = insert_values[i : i + batch_size] cur.executemany( f""" - INSERT INTO {_table_id(project, table)} + INSERT INTO {_table_id(project, table, config.registry.enable_online_feature_view_versioning)} (entity_key, feature_name, value, event_ts, created_ts) values (%s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE @@ -130,7 +130,7 @@ def online_read( keys.append( serialize_entity_key( entity_key, - entity_key_serialization_version=3, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() ) @@ -138,7 +138,7 @@ def online_read( entity_key_placeholders = ",".join(["%s" for _ in keys]) cur.execute( f""" - SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)} + SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table, config.registry.enable_online_feature_view_versioning)} WHERE entity_key IN ({entity_key_placeholders}) ORDER BY event_ts; """, @@ -151,7 +151,7 @@ def online_read( ) cur.execute( f""" - SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)} + SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table, config.registry.enable_online_feature_view_versioning)} WHERE entity_key IN ({entity_key_placeholders}) and feature_name IN ({requested_features_placeholders}) ORDER BY event_ts; """, @@ -191,21 +191,23 @@ def update( partial: bool, ) -> None: project = config.project + versioning = config.registry.enable_online_feature_view_versioning with self._get_cursor(config) as cur: # We don't create any special state for the entities in this implementation. for table in tables_to_keep: + table_name = _table_id(project, table, versioning) cur.execute( - f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), + f"""CREATE TABLE IF NOT EXISTS {table_name} (entity_key VARCHAR(512), feature_name VARCHAR(256), value BLOB, event_ts timestamp NULL DEFAULT NULL, created_ts timestamp NULL DEFAULT NULL, PRIMARY KEY(entity_key, feature_name), - INDEX {_table_id(project, table)}_ek (entity_key))""" + INDEX {table_name}_ek (entity_key))""" ) for table in tables_to_delete: - _drop_table_and_index(cur, project, table) + _drop_table_and_index(cur, project, table, versioning) def teardown( self, @@ -214,16 +216,26 @@ def teardown( entities: Sequence[Entity], ) -> None: project = config.project + versioning = config.registry.enable_online_feature_view_versioning with self._get_cursor(config) as cur: for table in tables: - _drop_table_and_index(cur, project, table) + _drop_table_and_index(cur, project, table, versioning) -def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None: - table_name = _table_id(project, table) +def _drop_table_and_index( + cur: Cursor, project: str, table: FeatureView, enable_versioning: bool +) -> None: + table_name = _table_id(project, table, enable_versioning) cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};") cur.execute(f"DROP TABLE IF EXISTS {table_name}") -def _table_id(project: str, table: FeatureView) -> str: - return f"{project}_{table.name}" +def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str: + name = table.name + if enable_versioning: + version = getattr(table.projection, "version_tag", None) + if version is None: + version = getattr(table, "current_version_number", None) + if version is not None and version > 0: + name = f"{table.name}_v{version}" + return f"{project}_{name}" diff --git a/sdk/python/pytest.ini b/sdk/python/pytest.ini index 1ad76b978e4..ec89cdc71bb 100644 --- a/sdk/python/pytest.ini +++ b/sdk/python/pytest.ini @@ -4,7 +4,6 @@ env = IS_TEST=True filterwarnings = error::_pytest.warning_types.PytestConfigWarning - error::_pytest.warning_types.PytestUnhandledCoroutineWarning ignore::DeprecationWarning:pyspark.sql.pandas.*: ignore::DeprecationWarning:pyspark.sql.connect.*: ignore::DeprecationWarning:httpx.*: diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 8302e313a2d..6eca8297b09 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -85,7 +85,7 @@ def pytest_configure(config): - if platform in ["darwin", "windows"]: + if platform in ["darwin"] or platform.startswith("win"): multiprocessing.set_start_method("spawn", force=True) else: multiprocessing.set_start_method("fork") diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 0c27585139e..b17544cee8d 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -283,6 +283,77 @@ def test_write_to_online_store(environment, universal_data_sources): assertpy.assert_that(df["conv_rate"].iloc[0]).is_close_to(0.85, 1e-6) +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["singlestore"]) +def test_singlestore_versioned_online_reads(environment, universal_data_sources): + fs = environment.feature_store + fs.config.registry.enable_online_feature_view_versioning = True + + entities, datasets, data_sources = universal_data_sources + driver_entity = driver() + + # Apply v0 + driver_hourly_stats_v0 = create_driver_hourly_stats_feature_view( + data_sources.driver + ) + fs.apply([driver_hourly_stats_v0, driver_entity]) + + # Write v0 data + df_v0 = pd.DataFrame( + { + "driver_id": [1], + "conv_rate": [0.1], + "acc_rate": [0.2], + "avg_daily_trips": [10], + "driver_metadata": [None], + "driver_config": [None], + "driver_profile": [None], + "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")], + "created": [pd.Timestamp(_utc_now()).round("ms")], + } + ) + fs.write_to_online_store("driver_stats", df_v0) + + # Apply a schema change to create v1 + driver_hourly_stats_v1 = FeatureView( + name="driver_stats", + entities=[driver_entity], + schema=driver_hourly_stats_v0.schema + + [Field(name="new_feature", dtype=Float32)], + source=data_sources.driver, + ttl=driver_hourly_stats_v0.ttl, + tags=TAGS, + ) + fs.apply([driver_hourly_stats_v1, driver_entity]) + + # Write v1 data + df_v1 = pd.DataFrame( + { + "driver_id": [1], + "conv_rate": [0.1], + "acc_rate": [0.2], + "avg_daily_trips": [20], + "new_feature": [1.0], + "driver_metadata": [None], + "driver_config": [None], + "driver_profile": [None], + "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")], + "created": [pd.Timestamp(_utc_now()).round("ms")], + } + ) + fs.write_to_online_store("driver_stats", df_v1) + + # Read v0 and v1 explicitly + df = fs.get_online_features( + features=["driver_stats@v0:avg_daily_trips", "driver_stats@v1:avg_daily_trips"], + entity_rows=[{"driver_id": 1}], + full_feature_names=True, + ).to_df() + + assertpy.assert_that(df["driver_stats@v0__avg_daily_trips"].iloc[0]).is_equal_to(10) + assertpy.assert_that(df["driver_stats@v1__avg_daily_trips"].iloc[0]).is_equal_to(20) + + def _get_online_features_dict_remotely( endpoint: str, features: Union[List[str], FeatureService], From d550c71c57206e48d01d62eccfec9e63fd7a7bd8 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Wed, 1 Apr 2026 22:59:43 +0100 Subject: [PATCH 02/18] Add versioned table ids + teardown for SingleStore online store Signed-off-by: antznette1 --- .../feast/infra/online_stores/helpers.py | 27 ++++-- .../feast/infra/online_stores/online_store.py | 4 + .../singlestore_online_store/singlestore.py | 88 ++++++++++++++----- .../feast/infra/online_stores/sqlite.py | 5 ++ 4 files changed, 96 insertions(+), 28 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py index 40e0f50a62c..819ba41103e 100644 --- a/sdk/python/feast/infra/online_stores/helpers.py +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -1,6 +1,6 @@ import struct from datetime import datetime, timezone -from typing import Any, List +from typing import Any, List, Optional import mmh3 @@ -11,6 +11,7 @@ ) from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.feature_view import FeatureView def get_online_store_from_config(online_store_config: Any) -> OnlineStore: @@ -72,13 +73,23 @@ def _to_naive_utc(ts: datetime) -> datetime: return ts.astimezone(tz=timezone.utc).replace(tzinfo=None) -def compute_table_id(project: str, table: Any, enable_versioning: bool = False) -> str: - """Build the online-store table name, appending a version suffix when versioning is enabled.""" +def online_store_table_id( + project: str, + table: FeatureView, + enable_versioning: bool = False, + version: Optional[int] = None, +) -> str: name = table.name if enable_versioning: - version = getattr(table.projection, "version_tag", None) - if version is None: - version = getattr(table, "current_version_number", None) - if version is not None and version > 0: - name = f"{table.name}_v{version}" + resolved_version = version + if resolved_version is None: + resolved_version = getattr(table.projection, "version_tag", None) + if resolved_version is None: + resolved_version = getattr(table, "current_version_number", None) + if resolved_version is not None and resolved_version > 0: + name = f"{table.name}_v{resolved_version}" return f"{project}_{name}" + + +def compute_table_id(project: str, table: Any, enable_versioning: bool = False) -> str: + return online_store_table_id(project, table, enable_versioning) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index fd5ab8be387..62113250985 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -39,6 +39,10 @@ class OnlineStore(ABC): The interface that Feast uses to interact with the storage system that handles online features. """ + @property + def supports_versioned_online_reads(self) -> bool: + return False + @property def async_supported(self) -> SupportedAsyncMethods: return SupportedAsyncMethods() diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index b5ae1178cc1..9d21bb22dea 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -10,8 +10,9 @@ from singlestoredb.exceptions import InterfaceError from feast import Entity, FeatureView, RepoConfig +from feast.importer import import_class from feast.infra.key_encoding_utils import serialize_entity_key -from feast.infra.online_stores.helpers import _to_naive_utc +from feast.infra.online_stores.helpers import _to_naive_utc, online_store_table_id from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -41,6 +42,10 @@ class SingleStoreOnlineStore(OnlineStore): _conn: Optional[Connection] = None + @property + def supports_versioned_online_reads(self) -> bool: + return True + def _init_conn(self, config: RepoConfig) -> Connection: online_store_config = config.online_store assert isinstance(online_store_config, SingleStoreOnlineStoreConfig) @@ -102,7 +107,7 @@ def online_write_batch( current_batch = insert_values[i : i + batch_size] cur.executemany( f""" - INSERT INTO {_table_id(project, table, config.registry.enable_online_feature_view_versioning)} + INSERT INTO {online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning)} (entity_key, feature_name, value, event_ts, created_ts) values (%s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE @@ -138,7 +143,7 @@ def online_read( entity_key_placeholders = ",".join(["%s" for _ in keys]) cur.execute( f""" - SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table, config.registry.enable_online_feature_view_versioning)} + SELECT entity_key, feature_name, value, event_ts FROM {online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning)} WHERE entity_key IN ({entity_key_placeholders}) ORDER BY event_ts; """, @@ -151,7 +156,7 @@ def online_read( ) cur.execute( f""" - SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table, config.registry.enable_online_feature_view_versioning)} + SELECT entity_key, feature_name, value, event_ts FROM {online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning)} WHERE entity_key IN ({entity_key_placeholders}) and feature_name IN ({requested_features_placeholders}) ORDER BY event_ts; """, @@ -195,7 +200,7 @@ def update( with self._get_cursor(config) as cur: # We don't create any special state for the entities in this implementation. for table in tables_to_keep: - table_name = _table_id(project, table, versioning) + table_name = online_store_table_id(project, table, versioning) cur.execute( f"""CREATE TABLE IF NOT EXISTS {table_name} (entity_key VARCHAR(512), feature_name VARCHAR(256), @@ -219,23 +224,66 @@ def teardown( versioning = config.registry.enable_online_feature_view_versioning with self._get_cursor(config) as cur: for table in tables: - _drop_table_and_index(cur, project, table, versioning) + if not versioning: + _drop_table_and_index(cur, project, table, enable_versioning=False) + continue + + versions = [] + try: + from feast.repo_config import REGISTRY_CLASS_FOR_TYPE + + registry_type = getattr(config.registry, "registry_type", "file") + registry_class_path = REGISTRY_CLASS_FOR_TYPE[registry_type] + module_name, class_name = registry_class_path.rsplit(".", 1) + registry_cls = import_class(module_name, class_name, "Registry") + if registry_type == "file": + registry = registry_cls( + project=project, + registry_config=config.registry, + repo_path=config.repo_path, + ) + elif registry_type in {"sql", "remote", "snowflake.registry"}: + registry = registry_cls( + registry_config=config.registry, + project=project, + repo_path=config.repo_path, + ) + else: + registry = registry_cls( + project=project, + registry_config=config.registry, + repo_path=config.repo_path, + ) + versions = registry.list_feature_view_versions( + name=table.name, project=project + ) + except Exception: + versions = [] + + if not versions: + _drop_table_and_index(cur, project, table, enable_versioning=True) + continue + + for record in versions: + version_number = record.get("version_number") + if version_number is None: + continue + _drop_table_and_index( + cur, + project, + table, + enable_versioning=True, + version=version_number, + ) def _drop_table_and_index( - cur: Cursor, project: str, table: FeatureView, enable_versioning: bool + cur: Cursor, + project: str, + table: FeatureView, + enable_versioning: bool, + version: Optional[int] = None, ) -> None: - table_name = _table_id(project, table, enable_versioning) - cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};") + table_name = online_store_table_id(project, table, enable_versioning, version) + cur.execute(f"DROP INDEX IF EXISTS {table_name}_ek ON {table_name};") cur.execute(f"DROP TABLE IF EXISTS {table_name}") - - -def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str: - name = table.name - if enable_versioning: - version = getattr(table.projection, "version_tag", None) - if version is None: - version = getattr(table, "current_version_number", None) - if version is not None and version > 0: - name = f"{table.name}_v{version}" - return f"{project}_{name}" diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 009e9a8c405..73566bd53ed 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -59,6 +59,7 @@ _serialize_vector_to_float_list, to_naive_utc, ) +from feast.infra.online_stores.helpers import online_store_table_id def adapt_date_iso(val: date): @@ -125,6 +126,10 @@ class SqliteOnlineStore(OnlineStore): _conn: Optional[sqlite3.Connection] = None + @property + def supports_versioned_online_reads(self) -> bool: + return True + @staticmethod def _get_db_path(config: RepoConfig) -> str: assert ( From e551b513e5ab0ecaa118b6b87811d4aec3bc0881 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Thu, 2 Apr 2026 00:40:47 +0100 Subject: [PATCH 03/18] Harden SingleStore teardown and quote identifiers Signed-off-by: antznette1 --- .../feast/infra/online_stores/helpers.py | 2 +- .../singlestore_online_store/singlestore.py | 60 ++++++++++++++++--- .../feast/infra/online_stores/sqlite.py | 3 +- 3 files changed, 55 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py index 819ba41103e..a5678ba1d35 100644 --- a/sdk/python/feast/infra/online_stores/helpers.py +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -4,6 +4,7 @@ import mmh3 +from feast.feature_view import FeatureView from feast.importer import import_class from feast.infra.key_encoding_utils import ( serialize_entity_key, @@ -11,7 +12,6 @@ ) from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.feature_view import FeatureView def get_online_store_from_config(online_store_config: Any) -> OnlineStore: diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index 9d21bb22dea..5e55fdcafe3 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -2,6 +2,7 @@ from collections import defaultdict from datetime import datetime +import logging from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple import singlestoredb @@ -18,6 +19,8 @@ from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel +logger = logging.getLogger(__name__) + class SingleStoreOnlineStoreConfig(FeastConfigBaseModel): """ @@ -107,7 +110,7 @@ def online_write_batch( current_batch = insert_values[i : i + batch_size] cur.executemany( f""" - INSERT INTO {online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning)} + INSERT INTO {_quote_identifier(online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning))} (entity_key, feature_name, value, event_ts, created_ts) values (%s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE @@ -143,7 +146,7 @@ def online_read( entity_key_placeholders = ",".join(["%s" for _ in keys]) cur.execute( f""" - SELECT entity_key, feature_name, value, event_ts FROM {online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning)} + SELECT entity_key, feature_name, value, event_ts FROM {_quote_identifier(online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning))} WHERE entity_key IN ({entity_key_placeholders}) ORDER BY event_ts; """, @@ -156,7 +159,7 @@ def online_read( ) cur.execute( f""" - SELECT entity_key, feature_name, value, event_ts FROM {online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning)} + SELECT entity_key, feature_name, value, event_ts FROM {_quote_identifier(online_store_table_id(project, table, config.registry.enable_online_feature_view_versioning))} WHERE entity_key IN ({entity_key_placeholders}) and feature_name IN ({requested_features_placeholders}) ORDER BY event_ts; """, @@ -202,13 +205,13 @@ def update( for table in tables_to_keep: table_name = online_store_table_id(project, table, versioning) cur.execute( - f"""CREATE TABLE IF NOT EXISTS {table_name} (entity_key VARCHAR(512), + f"""CREATE TABLE IF NOT EXISTS {_quote_identifier(table_name)} (entity_key VARCHAR(512), feature_name VARCHAR(256), value BLOB, event_ts timestamp NULL DEFAULT NULL, created_ts timestamp NULL DEFAULT NULL, PRIMARY KEY(entity_key, feature_name), - INDEX {table_name}_ek (entity_key))""" + INDEX {_quote_identifier(table_name + '_ek')} (entity_key))""" ) for table in tables_to_delete: @@ -257,11 +260,17 @@ def teardown( versions = registry.list_feature_view_versions( name=table.name, project=project ) - except Exception: + except Exception as e: + logger.warning( + "Failed to list feature view versions for %s during teardown; will fall back to dropping discovered versioned tables. Error: %s", + table.name, + e, + ) versions = [] if not versions: _drop_table_and_index(cur, project, table, enable_versioning=True) + _drop_discovered_versioned_tables(cur, project, table) continue for record in versions: @@ -276,6 +285,8 @@ def teardown( version=version_number, ) + _drop_discovered_versioned_tables(cur, project, table) + def _drop_table_and_index( cur: Cursor, @@ -285,5 +296,38 @@ def _drop_table_and_index( version: Optional[int] = None, ) -> None: table_name = online_store_table_id(project, table, enable_versioning, version) - cur.execute(f"DROP INDEX IF EXISTS {table_name}_ek ON {table_name};") - cur.execute(f"DROP TABLE IF EXISTS {table_name}") + table_name_quoted = _quote_identifier(table_name) + index_name_quoted = _quote_identifier(f"{table_name}_ek") + cur.execute(f"DROP INDEX IF EXISTS {index_name_quoted} ON {table_name_quoted};") + cur.execute(f"DROP TABLE IF EXISTS {table_name_quoted}") + + +def _quote_identifier(identifier: str) -> str: + """Quote a SingleStore/MySQL identifier safely using backticks. + + Backticks are escaped by doubling them. This does not validate existence, + but prevents SQL injection through identifier interpolation. + """ + + escaped = identifier.replace("`", "``") + return f"`{escaped}`" + + +def _drop_discovered_versioned_tables(cur: Cursor, project: str, table: FeatureView) -> None: + base_table_name = online_store_table_id(project, table, enable_versioning=False) + like_pattern = f"{base_table_name}_v%" + try: + cur.execute("SHOW TABLES LIKE %s", (like_pattern,)) + rows = cur.fetchall() or [] + for row in rows: + table_name = row[0] + cur.execute( + f"DROP INDEX IF EXISTS {_quote_identifier(table_name + '_ek')} ON {_quote_identifier(table_name)};" + ) + cur.execute(f"DROP TABLE IF EXISTS {_quote_identifier(table_name)}") + except Exception as e: + logger.warning( + "Failed to discover/drop versioned tables for %s during teardown fallback. Error: %s", + table.name, + e, + ) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 73566bd53ed..bac688f5f86 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -39,6 +39,8 @@ from feast.infra.infra_object import SQLITE_INFRA_OBJECT_CLASS_TYPE, InfraObject from feast.infra.key_encoding_utils import ( deserialize_entity_key, + deserialize_f32, + deserialize_val, serialize_entity_key, serialize_f32, ) @@ -59,7 +61,6 @@ _serialize_vector_to_float_list, to_naive_utc, ) -from feast.infra.online_stores.helpers import online_store_table_id def adapt_date_iso(val: date): From 31c809431f2bd65c2c2f2b481bfed1883de32cd5 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Thu, 2 Apr 2026 13:17:32 +0100 Subject: [PATCH 04/18] Fix SingleStore teardown LIKE escaping Signed-off-by: antznette1 --- .../singlestore_online_store/singlestore.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index 5e55fdcafe3..73471902d66 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -1,8 +1,8 @@ from __future__ import absolute_import +import logging from collections import defaultdict from datetime import datetime -import logging from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple import singlestoredb @@ -211,7 +211,7 @@ def update( event_ts timestamp NULL DEFAULT NULL, created_ts timestamp NULL DEFAULT NULL, PRIMARY KEY(entity_key, feature_name), - INDEX {_quote_identifier(table_name + '_ek')} (entity_key))""" + INDEX {_quote_identifier(table_name + "_ek")} (entity_key))""" ) for table in tables_to_delete: @@ -313,16 +313,22 @@ def _quote_identifier(identifier: str) -> str: return f"`{escaped}`" -def _drop_discovered_versioned_tables(cur: Cursor, project: str, table: FeatureView) -> None: +def _drop_discovered_versioned_tables( + cur: Cursor, project: str, table: FeatureView +) -> None: base_table_name = online_store_table_id(project, table, enable_versioning=False) - like_pattern = f"{base_table_name}_v%" + escaped_base_table_name = base_table_name.replace("\\", "\\\\") + escaped_base_table_name = escaped_base_table_name.replace("%", "\\%") + escaped_base_table_name = escaped_base_table_name.replace("_", "\\_") + like_pattern = f"{escaped_base_table_name}\\_v%" try: - cur.execute("SHOW TABLES LIKE %s", (like_pattern,)) + cur.execute("SHOW TABLES LIKE %s ESCAPE '\\\\'", (like_pattern,)) rows = cur.fetchall() or [] for row in rows: table_name = row[0] + index_name = f"{table_name}_ek" cur.execute( - f"DROP INDEX IF EXISTS {_quote_identifier(table_name + '_ek')} ON {_quote_identifier(table_name)};" + f"DROP INDEX IF EXISTS {_quote_identifier(index_name)} ON {_quote_identifier(table_name)};" ) cur.execute(f"DROP TABLE IF EXISTS {_quote_identifier(table_name)}") except Exception as e: From 7d720e55eb52e42376ba6bf59b377e82c405c0f3 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Sun, 29 Mar 2026 23:35:30 +0100 Subject: [PATCH 05/18] Add SingleStore online store support for feature view versioning Signed-off-by: antznette1 --- .../singlestore_online_store/singlestore.py | 56 +++++-------------- 1 file changed, 13 insertions(+), 43 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index 73471902d66..bf06b398c73 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -11,10 +11,10 @@ from singlestoredb.exceptions import InterfaceError from feast import Entity, FeatureView, RepoConfig -from feast.importer import import_class from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.helpers import _to_naive_utc, online_store_table_id from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel @@ -222,6 +222,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ) -> None: project = config.project versioning = config.registry.enable_online_feature_view_versioning @@ -232,41 +233,18 @@ def teardown( continue versions = [] - try: - from feast.repo_config import REGISTRY_CLASS_FOR_TYPE - - registry_type = getattr(config.registry, "registry_type", "file") - registry_class_path = REGISTRY_CLASS_FOR_TYPE[registry_type] - module_name, class_name = registry_class_path.rsplit(".", 1) - registry_cls = import_class(module_name, class_name, "Registry") - if registry_type == "file": - registry = registry_cls( - project=project, - registry_config=config.registry, - repo_path=config.repo_path, + if registry is not None: + try: + versions = registry.list_feature_view_versions( + name=table.name, project=project ) - elif registry_type in {"sql", "remote", "snowflake.registry"}: - registry = registry_cls( - registry_config=config.registry, - project=project, - repo_path=config.repo_path, + except Exception as e: + logger.warning( + "Failed to list feature view versions for %s during teardown; will fall back to dropping discovered versioned tables. Error: %s", + table.name, + e, ) - else: - registry = registry_cls( - project=project, - registry_config=config.registry, - repo_path=config.repo_path, - ) - versions = registry.list_feature_view_versions( - name=table.name, project=project - ) - except Exception as e: - logger.warning( - "Failed to list feature view versions for %s during teardown; will fall back to dropping discovered versioned tables. Error: %s", - table.name, - e, - ) - versions = [] + versions = [] if not versions: _drop_table_and_index(cur, project, table, enable_versioning=True) @@ -303,19 +281,11 @@ def _drop_table_and_index( def _quote_identifier(identifier: str) -> str: - """Quote a SingleStore/MySQL identifier safely using backticks. - - Backticks are escaped by doubling them. This does not validate existence, - but prevents SQL injection through identifier interpolation. - """ - escaped = identifier.replace("`", "``") return f"`{escaped}`" -def _drop_discovered_versioned_tables( - cur: Cursor, project: str, table: FeatureView -) -> None: +def _drop_discovered_versioned_tables(cur: Cursor, project: str, table: FeatureView) -> None: base_table_name = online_store_table_id(project, table, enable_versioning=False) escaped_base_table_name = base_table_name.replace("\\", "\\\\") escaped_base_table_name = escaped_base_table_name.replace("%", "\\%") From 17819055de095cc82b6c287e4bee1092f6618dc3 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Fri, 3 Apr 2026 11:21:52 +0100 Subject: [PATCH 06/18] Pass registry to online store teardown Signed-off-by: antznette1 --- sdk/python/feast/feature_store.py | 4 +- .../feast/infra/online_stores/bigtable.py | 1 + .../cassandra_online_store.py | 1 + .../couchbase_online_store/couchbase.py | 1 + .../feast/infra/online_stores/datastore.py | 1 + .../feast/infra/online_stores/dynamodb.py | 1 + .../elasticsearch.py | 1 + .../infra/online_stores/faiss_online_store.py | 1 + .../hazelcast_online_store.py | 1 + .../online_stores/hbase_online_store/hbase.py | 1 + .../hybrid_online_store.py | 39 +++++-------------- .../milvus_online_store/milvus.py | 1 + .../mongodb_online_store/mongodb.py | 1 + .../online_stores/mysql_online_store/mysql.py | 3 +- .../feast/infra/online_stores/online_store.py | 1 + .../postgres_online_store/postgres.py | 1 + .../qdrant_online_store/qdrant.py | 1 + sdk/python/feast/infra/online_stores/redis.py | 1 + .../feast/infra/online_stores/remote.py | 1 + .../feast/infra/online_stores/snowflake.py | 1 + .../feast/infra/online_stores/sqlite.py | 2 + .../feast/infra/passthrough_provider.py | 5 ++- sdk/python/feast/infra/provider.py | 1 + sdk/python/tests/foo_provider.py | 1 + .../online_store/test_online_store_base.py | 3 +- 25 files changed, 42 insertions(+), 33 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f95bbf10c03..ed4be94a7c4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1335,7 +1335,9 @@ def teardown(self): entities = self.list_entities() - self._get_provider().teardown_infra(self.project, tables, entities) + self._get_provider().teardown_infra( + self.project, tables, entities, registry=self.registry + ) self.registry.teardown() def get_historical_features( diff --git a/sdk/python/feast/infra/online_stores/bigtable.py b/sdk/python/feast/infra/online_stores/bigtable.py index 3479f7f289a..0405d4509b5 100644 --- a/sdk/python/feast/infra/online_stores/bigtable.py +++ b/sdk/python/feast/infra/online_stores/bigtable.py @@ -306,6 +306,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): # Because of historical reasons, Feast calls them tables. We use this alias for # readability. diff --git a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py index 0870bc709db..210f2515ac8 100644 --- a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py @@ -460,6 +460,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the database. diff --git a/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py b/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py index c80f9e1285c..086e5cbfb6f 100644 --- a/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py +++ b/sdk/python/feast/infra/online_stores/couchbase_online_store/couchbase.py @@ -274,6 +274,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the database. diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 9ae10792f5a..0c577521d9b 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -139,6 +139,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 814058c77e5..3fbaa9a61e5 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -359,6 +359,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the DynamoDB Online Store. diff --git a/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py b/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py index b78d003ac25..c5a799be248 100644 --- a/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/elasticsearch_online_store/elasticsearch.py @@ -263,6 +263,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): project = config.project try: diff --git a/sdk/python/feast/infra/online_stores/faiss_online_store.py b/sdk/python/feast/infra/online_stores/faiss_online_store.py index 3e3d92cde6d..5ef801f9976 100644 --- a/sdk/python/feast/infra/online_stores/faiss_online_store.py +++ b/sdk/python/feast/infra/online_stores/faiss_online_store.py @@ -86,6 +86,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): self._index = None self._in_memory_store.teardown() diff --git a/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py b/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py index 21359b45bca..16a6a2ea9db 100644 --- a/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py +++ b/sdk/python/feast/infra/online_stores/hazelcast_online_store/hazelcast_online_store.py @@ -299,6 +299,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): online_store_config = config.online_store if not isinstance(online_store_config, HazelcastOnlineStoreConfig): diff --git a/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py index dc48d2c4efc..5942874a0b9 100644 --- a/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/hbase_online_store/hbase.py @@ -212,6 +212,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Delete tables from the Hbase Online Store. diff --git a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py index 8faefdbd344..baf4f232058 100644 --- a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py +++ b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py @@ -289,39 +289,20 @@ def update( f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration." ) + tag_name = getattr(config.online_store, "routing_tag", "tribe") + raise ValueError( + f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore." + ) + def teardown( self, config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): - """ - Teardown all managed online stores for the given FeatureViews and Entities. + """Teardown all managed online stores for the given FeatureViews and Entities.""" - Args: - config: Feast RepoConfig. - tables: Sequence of FeatureViews to teardown. - entities: Sequence of Entities to teardown. - """ - # Use a set of (tribe, store_type, conf_id) to avoid duplicate teardowns for the same instance - tribes_seen = set() - online_stores_cfg = getattr(config.online_store, "online_stores", []) - tag_name = getattr(config.online_store, "routing_tag", "tribe") - for table in tables: - tribe = table.tags.get(tag_name) - if not tribe: - continue - # Find all store configs matching this tribe (supporting multiple instances of the same type) - for store_cfg in online_stores_cfg: - store_type = store_cfg.type - # Use id(store_cfg.conf) to distinguish different configs of the same type - key = (tribe, store_type, id(store_cfg.conf)) - if key in tribes_seen: - continue - tribes_seen.add(key) - # Only select the online store if tribe matches the type (or you can add a mapping in config for more flexibility) - if tribe.lower() == store_type.split(".")[-1].lower(): - online_store = self._get_online_store(tribe, config) - if online_store: - config = RepoConfig(**self._prepare_repo_conf(config, tribe)) - online_store.teardown(config, tables, entities) + self._initialize_online_stores(config) + for store in self.online_stores.values(): + store.teardown(config, tables, entities, registry=registry) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index ee2534684cc..c9c43259707 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -505,6 +505,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): self.client = self._connect(config) for table in tables: diff --git a/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py b/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py index 3e7a3db84c8..30bd50f44ae 100644 --- a/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py +++ b/sdk/python/feast/infra/online_stores/mongodb_online_store/mongodb.py @@ -260,6 +260,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): """ Drop the backing collection and close the client. diff --git a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py index 415184ea248..2e43c254525 100644 --- a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py +++ b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py @@ -100,7 +100,7 @@ def online_write_batch( if progress: progress(1) else: - batch_size = config.online_store.bacth_size + batch_size = config.online_store.batch_size if not batch_size or batch_size < 2: raise ValueError("Batch size must be at least 2") insert_values = [] @@ -296,6 +296,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ) -> None: conn = self._get_conn(config) cur = conn.cursor() diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 62113250985..f27cf732dce 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -467,6 +467,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): """ Tears down all cloud resources for the specified set of Feast objects. diff --git a/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py b/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py index 4a79349e91c..f8cf6b0ce7a 100644 --- a/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py +++ b/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py @@ -385,6 +385,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): project = config.project schema_name = config.online_store.db_schema or config.online_store.user diff --git a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py index 29a6edf30ad..a221fdb7043 100644 --- a/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py +++ b/sdk/python/feast/infra/online_stores/qdrant_online_store/qdrant.py @@ -247,6 +247,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry=None, ): project = config.project try: diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index aeeb540b910..be46f2f0b2d 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -177,6 +177,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): """ We delete the keys in redis for tables/views being removed. diff --git a/sdk/python/feast/infra/online_stores/remote.py b/sdk/python/feast/infra/online_stores/remote.py index 9bead1fcb9d..56b25ffca3b 100644 --- a/sdk/python/feast/infra/online_stores/remote.py +++ b/sdk/python/feast/infra/online_stores/remote.py @@ -617,6 +617,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): pass diff --git a/sdk/python/feast/infra/online_stores/snowflake.py b/sdk/python/feast/infra/online_stores/snowflake.py index d2df674ed94..8940192efd7 100644 --- a/sdk/python/feast/infra/online_stores/snowflake.py +++ b/sdk/python/feast/infra/online_stores/snowflake.py @@ -253,6 +253,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): assert isinstance(config.online_store, SnowflakeOnlineStoreConfig) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index bac688f5f86..b300b7b509c 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -46,6 +46,7 @@ ) from feast.infra.online_stores.helpers import compute_table_id from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.online_stores.vector_store import VectorStoreConfig from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto @@ -345,6 +346,7 @@ def teardown( config: RepoConfig, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): try: os.unlink(self._get_db_path(config)) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 20334e53a2e..9fa93295566 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -178,9 +178,12 @@ def teardown_infra( project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ) -> None: if self.online_store: - self.online_store.teardown(self.repo_config, tables, entities) + self.online_store.teardown( + self.repo_config, tables, entities, registry=registry + ) if self.batch_engine: self.batch_engine.teardown_infra(project, tables, entities) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 9bdf681fb69..c78c3956331 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -109,6 +109,7 @@ def teardown_infra( project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): """ Tears down all cloud resources for the specified set of Feast objects. diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 82cfc7fb513..b5cad523419 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -67,6 +67,7 @@ def teardown_infra( project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], + registry: Optional[BaseRegistry] = None, ): pass diff --git a/sdk/python/tests/unit/infra/online_store/test_online_store_base.py b/sdk/python/tests/unit/infra/online_store/test_online_store_base.py index 11195521deb..3b023c0bd0d 100644 --- a/sdk/python/tests/unit/infra/online_store/test_online_store_base.py +++ b/sdk/python/tests/unit/infra/online_store/test_online_store_base.py @@ -3,6 +3,7 @@ import pytest from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.supported_async_methods import SupportedAsyncMethods @@ -26,7 +27,7 @@ def update( ): pass - def teardown(self, config, tables, entities): + def teardown(self, config, tables, entities, registry: BaseRegistry = None): pass From bf4c562b289638d47f71edbd3b4159e87d6ec9c7 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Fri, 3 Apr 2026 11:48:39 +0100 Subject: [PATCH 07/18] Fix BaseRegistry typing imports Signed-off-by: antznette1 --- sdk/python/feast/infra/online_stores/bigtable.py | 1 + sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py | 1 + sdk/python/feast/infra/online_stores/redis.py | 1 + sdk/python/feast/infra/online_stores/remote.py | 1 + sdk/python/feast/infra/online_stores/snowflake.py | 1 + 5 files changed, 5 insertions(+) diff --git a/sdk/python/feast/infra/online_stores/bigtable.py b/sdk/python/feast/infra/online_stores/bigtable.py index 0405d4509b5..4d35faf626a 100644 --- a/sdk/python/feast/infra/online_stores/bigtable.py +++ b/sdk/python/feast/infra/online_stores/bigtable.py @@ -13,6 +13,7 @@ from feast.feature_view import DUMMY_ENTITY_NAME from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig diff --git a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py index 2e43c254525..39ae1a8da98 100644 --- a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py +++ b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py @@ -12,6 +12,7 @@ from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.helpers import compute_table_id from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index be46f2f0b2d..06da85a4070 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -34,6 +34,7 @@ from feast import Entity, FeatureView, RepoConfig, utils from feast.infra.online_stores.helpers import _mmh3, _redis_key, _redis_key_prefix from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel diff --git a/sdk/python/feast/infra/online_stores/remote.py b/sdk/python/feast/infra/online_stores/remote.py index 56b25ffca3b..96b5c16cd09 100644 --- a/sdk/python/feast/infra/online_stores/remote.py +++ b/sdk/python/feast/infra/online_stores/remote.py @@ -24,6 +24,7 @@ from feast import Entity, FeatureView, RepoConfig from feast.infra.online_stores.helpers import _to_naive_utc from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.permissions.client.http_auth_requests_wrapper import HttpSessionManager from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto diff --git a/sdk/python/feast/infra/online_stores/snowflake.py b/sdk/python/feast/infra/online_stores/snowflake.py index 8940192efd7..02a3e7ca3c7 100644 --- a/sdk/python/feast/infra/online_stores/snowflake.py +++ b/sdk/python/feast/infra/online_stores/snowflake.py @@ -10,6 +10,7 @@ from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.registry.base_registry import BaseRegistry from feast.infra.utils.snowflake.snowflake_utils import ( GetSnowflakeConnection, execute_snowflake_statement, From 3093f6e2109378b1f0c2734e389db9401830fde2 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Fri, 3 Apr 2026 21:52:14 +0100 Subject: [PATCH 08/18] Fix online store update/teardown and pre-commit issues Signed-off-by: antznette1 --- infra/scripts/compile-templates.py | 6 +++--- .../hybrid_online_store/hybrid_online_store.py | 5 ----- .../online_stores/singlestore_online_store/singlestore.py | 4 +++- sdk/python/feast/infra/online_stores/sqlite.py | 2 +- .../integration/offline_store/test_s3_custom_endpoint.py | 2 +- 5 files changed, 8 insertions(+), 11 deletions(-) diff --git a/infra/scripts/compile-templates.py b/infra/scripts/compile-templates.py index e3130ab419a..6c582a90c94 100644 --- a/infra/scripts/compile-templates.py +++ b/infra/scripts/compile-templates.py @@ -28,7 +28,7 @@ def find_repo(path): # Template README.md ############################ roadmap_path = repo_root / "docs" / "roadmap.md" -with open(roadmap_path, "r") as f: +with open(roadmap_path, "r", encoding="utf-8") as f: # skip first lines since it has the title roadmap_contents_lines = f.readlines()[2:] @@ -36,7 +36,7 @@ def find_repo(path): roadmap_contents = "".join(roadmap_contents_lines) template_path = repo_root / "infra" / "templates" / "README.md.jinja2" -with open(template_path) as f: +with open(template_path, encoding="utf-8") as f: template = Template(f.read()) # Compile template @@ -49,5 +49,5 @@ def find_repo(path): ) readme_path = repo_root / "README.md" -with open(readme_path, "w") as f: +with open(readme_path, "w", encoding="utf-8") as f: f.write(readme_md) diff --git a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py index baf4f232058..1503e3cf02d 100644 --- a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py +++ b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py @@ -289,11 +289,6 @@ def update( f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration." ) - tag_name = getattr(config.online_store, "routing_tag", "tribe") - raise ValueError( - f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore." - ) - def teardown( self, config: RepoConfig, diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index bf06b398c73..994d0323057 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -285,7 +285,9 @@ def _quote_identifier(identifier: str) -> str: return f"`{escaped}`" -def _drop_discovered_versioned_tables(cur: Cursor, project: str, table: FeatureView) -> None: +def _drop_discovered_versioned_tables( + cur: Cursor, project: str, table: FeatureView +) -> None: base_table_name = online_store_table_id(project, table, enable_versioning=False) escaped_base_table_name = base_table_name.replace("\\", "\\\\") escaped_base_table_name = escaped_base_table_name.replace("%", "\\%") diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index b300b7b509c..24d8be30fa4 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -46,8 +46,8 @@ ) from feast.infra.online_stores.helpers import compute_table_id from feast.infra.online_stores.online_store import OnlineStore -from feast.infra.registry.base_registry import BaseRegistry from feast.infra.online_stores.vector_store import VectorStoreConfig +from feast.infra.registry.base_registry import BaseRegistry from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto diff --git a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py index 45426c63b8d..fda5f87088c 100644 --- a/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py +++ b/sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py @@ -31,7 +31,7 @@ def test_registration_and_retrieval_from_custom_s3_endpoint( "It may be better to deduplicate AWS configuration or use sub-processes for isolation" ) - os.environ["AWS_ACCESS_KEY_ID"] = "AKIAIOSFODNN7EXAMPLE" + os.environ["AWS_ACCESS_KEY_ID"] = "AKIAIOSFODNN7EXAMPLE" # pragma: allowlist secret os.environ["AWS_SECRET_ACCESS_KEY"] = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" with construct_test_environment(config) as environment: From 969ea69c656f2585fe8e6290f4a07fffdec63ed6 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Sat, 4 Apr 2026 00:03:01 +0100 Subject: [PATCH 09/18] Fix HybridOnlineStore teardown routing Signed-off-by: antznette1 --- .../hybrid_online_store.py | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py index 1503e3cf02d..575be6ab8f8 100644 --- a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py +++ b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py @@ -299,5 +299,27 @@ def teardown( """Teardown all managed online stores for the given FeatureViews and Entities.""" self._initialize_online_stores(config) - for store in self.online_stores.values(): - store.teardown(config, tables, entities, registry=registry) + tables_by_tribe: Dict[str, List[FeatureView]] = {} + for table in tables: + tribe = self._get_routing_tag_value(table, config) + if not tribe: + tag_name = getattr(config.online_store, "routing_tag", "tribe") + raise ValueError( + f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore." + ) + tables_by_tribe.setdefault(tribe, []).append(table) + + for tribe, tribe_tables in tables_by_tribe.items(): + online_store = self._get_online_store(tribe, config) + if not online_store: + raise NotImplementedError( + f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration." + ) + + tribe_config = RepoConfig(**self._prepare_repo_conf(config, tribe)) + online_store.teardown( + tribe_config, + tribe_tables, + entities, + registry=registry, + ) From 26e4a7ef7fc99f359b78ffca1e76100b929936e0 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Sat, 4 Apr 2026 00:45:00 +0100 Subject: [PATCH 10/18] Fix SingleStore teardown fallback table dropping Signed-off-by: antznette1 --- .../infra/online_stores/singlestore_online_store/singlestore.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index 994d0323057..3927f575c3e 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -247,6 +247,7 @@ def teardown( versions = [] if not versions: + _drop_table_and_index(cur, project, table, enable_versioning=False) _drop_table_and_index(cur, project, table, enable_versioning=True) _drop_discovered_versioned_tables(cur, project, table) continue From 18f9fa2dbe7e66382d2875dfcf7832eddd90ebb6 Mon Sep 17 00:00:00 2001 From: Anthonette Adanyin <106275232+antznette1@users.noreply.github.com> Date: Sat, 4 Apr 2026 00:59:01 +0100 Subject: [PATCH 11/18] Update sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: antznette1 --- .../infra/online_stores/singlestore_online_store/singlestore.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index 3927f575c3e..4bf5179cff6 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -264,6 +264,8 @@ def teardown( version=version_number, ) + # Always drop the base (unversioned) table as well + _drop_table_and_index(cur, project, table, enable_versioning=False) _drop_discovered_versioned_tables(cur, project, table) From ded5edd6f13776dbf6913c5276ca5a7bcfd4c4cc Mon Sep 17 00:00:00 2001 From: Anthonette Adanyin <106275232+antznette1@users.noreply.github.com> Date: Sat, 4 Apr 2026 00:59:13 +0100 Subject: [PATCH 12/18] Update sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Signed-off-by: antznette1 --- .../infra/online_stores/singlestore_online_store/singlestore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py index 4bf5179cff6..dc8e4b2340c 100644 --- a/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py @@ -297,7 +297,7 @@ def _drop_discovered_versioned_tables( escaped_base_table_name = escaped_base_table_name.replace("_", "\\_") like_pattern = f"{escaped_base_table_name}\\_v%" try: - cur.execute("SHOW TABLES LIKE %s ESCAPE '\\\\'", (like_pattern,)) + cur.execute("SHOW TABLES LIKE %s", (like_pattern,)) rows = cur.fetchall() or [] for row in rows: table_name = row[0] From 2241b8c7df0f9947e3ecf157d742986dbf43de55 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Sat, 4 Apr 2026 02:44:29 +0100 Subject: [PATCH 13/18] Stabilize CI: pgvector readiness + operator base images Signed-off-by: antznette1 --- .secrets.baseline | 2 +- infra/feast-operator/Dockerfile | 9 +++-- .../universal/online_store/postgres.py | 37 +++++++++++++++++-- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index 96bf780809c..e1ad949904f 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -1510,7 +1510,7 @@ "filename": "sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py", "hashed_secret": "95433727ea51026e1e0dc8deadaabd4a3baaaaf4", "is_verified": false, - "line_number": 19 + "line_number": 21 } ], "sdk/python/tests/universal/feature_repos/universal/online_store/singlestore.py": [ diff --git a/infra/feast-operator/Dockerfile b/infra/feast-operator/Dockerfile index f0814b25576..7e210d54459 100644 --- a/infra/feast-operator/Dockerfile +++ b/infra/feast-operator/Dockerfile @@ -1,8 +1,10 @@ # Build the manager binary -FROM registry.access.redhat.com/ubi9/go-toolset:1.22.9 AS builder +FROM golang:1.22.9 AS builder ARG TARGETOS ARG TARGETARCH +WORKDIR /workspace + # Copy the Go Modules manifests COPY go.mod go.mod COPY go.sum go.sum @@ -22,9 +24,8 @@ COPY internal/controller/ internal/controller/ # by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/main.go -FROM registry.access.redhat.com/ubi9/ubi-minimal:9.5 +FROM gcr.io/distroless/base-debian12:nonroot WORKDIR / -COPY --from=builder /opt/app-root/src/manager . -USER 65532:65532 +COPY --from=builder /workspace/manager . ENTRYPOINT ["/manager"] diff --git a/sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py b/sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py index b9fda20d26a..571ec342f2c 100644 --- a/sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py +++ b/sdk/python/tests/universal/feature_repos/universal/online_store/postgres.py @@ -1,6 +1,8 @@ import os +import time from typing import Any, Dict +import psycopg from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs from testcontainers.postgres import PostgresContainer @@ -55,20 +57,47 @@ def create_online_store(self) -> Dict[str, Any]: self.container.start() log_string_to_wait_for = "database system is ready to accept connections" wait_for_logs( - container=self.container, predicate=log_string_to_wait_for, timeout=10 + container=self.container, predicate=log_string_to_wait_for, timeout=60 ) init_log_string_to_wait_for = "PostgreSQL init process complete" wait_for_logs( - container=self.container, predicate=init_log_string_to_wait_for, timeout=10 + container=self.container, predicate=init_log_string_to_wait_for, timeout=60 ) + + host = "localhost" + port = int(self.container.get_exposed_port(5432)) + + deadline = time.time() + 60 + last_exc: Exception | None = None + while time.time() < deadline: + try: + conn = psycopg.connect( + host=host, + port=port, + user="root", + password="test!@#$%", + dbname="test", + connect_timeout=2, + sslmode="disable", + ) + conn.close() + last_exc = None + break + except psycopg.OperationalError as e: + last_exc = e + time.sleep(1) + + if last_exc is not None: + raise last_exc + return { - "host": "localhost", + "host": host, "type": "postgres", "user": "root", "password": "test!@#$%", "database": "test", "vector_enabled": True, - "port": self.container.get_exposed_port(5432), + "port": port, "sslmode": "disable", } From 4b9469700586d209560452478c64355bc193a4ff Mon Sep 17 00:00:00 2001 From: antznette1 Date: Sat, 4 Apr 2026 12:46:09 +0100 Subject: [PATCH 14/18] Make metrics import resilient when prometheus_client is unavailable Signed-off-by: antznette1 --- sdk/python/feast/metrics.py | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/metrics.py b/sdk/python/feast/metrics.py index 7786af6f2f5..8f415039de5 100644 --- a/sdk/python/feast/metrics.py +++ b/sdk/python/feast/metrics.py @@ -102,7 +102,33 @@ def _cleanup_multiprocess_dir(): atexit.register(_cleanup_multiprocess_dir) # Now safe to import prometheus_client — it will detect the env var. -from prometheus_client import Counter, Gauge, Histogram # noqa: E402 +_prometheus_available = True +try: + from prometheus_client import Counter, Gauge, Histogram # noqa: E402 +except Exception: + _prometheus_available = False + + class _NoOpMetric: + def labels(self, **kwargs): + return self + + def inc(self, amount: float = 1): + return None + + def observe(self, amount: float): + return None + + def set(self, value: float): + return None + + def Counter(*args, **kwargs): # type: ignore + return _NoOpMetric() + + def Gauge(*args, **kwargs): # type: ignore + return _NoOpMetric() + + def Histogram(*args, **kwargs): # type: ignore + return _NoOpMetric() # --------------------------------------------------------------------------- @@ -473,6 +499,12 @@ def start_metrics_server( """ global _config + if not _prometheus_available: + logger.warning( + "Prometheus metrics are unavailable because prometheus_client could not be imported." + ) + return + if metrics_config is not None: _config = metrics_config else: From b06b1ce729ee0f0b4917cd0f4e6736118881bcf9 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Mon, 6 Apr 2026 17:11:20 +0100 Subject: [PATCH 15/18] Revert operator Dockerfile changes Signed-off-by: antznette1 --- infra/feast-operator/Dockerfile | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/infra/feast-operator/Dockerfile b/infra/feast-operator/Dockerfile index 7e210d54459..f0814b25576 100644 --- a/infra/feast-operator/Dockerfile +++ b/infra/feast-operator/Dockerfile @@ -1,10 +1,8 @@ # Build the manager binary -FROM golang:1.22.9 AS builder +FROM registry.access.redhat.com/ubi9/go-toolset:1.22.9 AS builder ARG TARGETOS ARG TARGETARCH -WORKDIR /workspace - # Copy the Go Modules manifests COPY go.mod go.mod COPY go.sum go.sum @@ -24,8 +22,9 @@ COPY internal/controller/ internal/controller/ # by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/main.go -FROM gcr.io/distroless/base-debian12:nonroot +FROM registry.access.redhat.com/ubi9/ubi-minimal:9.5 WORKDIR / -COPY --from=builder /workspace/manager . +COPY --from=builder /opt/app-root/src/manager . +USER 65532:65532 ENTRYPOINT ["/manager"] From 8d02cabf0af3717b1c7c316892b9311d25c2eb1e Mon Sep 17 00:00:00 2001 From: antznette1 Date: Wed, 8 Apr 2026 12:59:30 +0100 Subject: [PATCH 16/18] Fix MySQL/Postgres versioned online read support Signed-off-by: antznette1 --- .../feast/infra/online_stores/mysql_online_store/mysql.py | 4 ++++ .../infra/online_stores/postgres_online_store/postgres.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py index 39ae1a8da98..92348109415 100644 --- a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py +++ b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py @@ -44,6 +44,10 @@ class MySQLOnlineStore(OnlineStore): _conn: Optional[Connection] = None + @property + def supports_versioned_online_reads(self) -> bool: + return True + def _get_conn(self, config: RepoConfig) -> Connection: online_store_config = config.online_store assert isinstance(online_store_config, MySQLOnlineStoreConfig) diff --git a/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py b/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py index f8cf6b0ce7a..1fd68b8e948 100644 --- a/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py +++ b/sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py @@ -56,6 +56,10 @@ class PostgreSQLOnlineStore(OnlineStore): _conn_async: Optional[AsyncConnection] = None _conn_pool_async: Optional[AsyncConnectionPool] = None + @property + def supports_versioned_online_reads(self) -> bool: + return True + @contextlib.contextmanager def _get_conn( self, config: RepoConfig, autocommit: bool = False From 6d545df2792c2a6fc4fb701cbcd0f6fe00402342 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Wed, 8 Apr 2026 13:14:46 +0100 Subject: [PATCH 17/18] Fix MySQL batch write entity key serialization Signed-off-by: antznette1 --- .../feast/infra/online_stores/mysql_online_store/mysql.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py index 92348109415..9dbe32842b2 100644 --- a/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py +++ b/sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py @@ -83,7 +83,7 @@ def online_write_batch( for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=3, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() timestamp = to_naive_utc(timestamp) if created_ts is not None: @@ -112,7 +112,7 @@ def online_write_batch( for entity_key, values, timestamp, created_ts in data: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=2, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() timestamp = to_naive_utc(timestamp) if created_ts is not None: @@ -228,7 +228,7 @@ def online_read( for entity_key in entity_keys: entity_key_bin = serialize_entity_key( entity_key, - entity_key_serialization_version=3, + entity_key_serialization_version=config.entity_key_serialization_version, ).hex() cur.execute( From f97a9978c5e5f21b3e3773f4582ccb14bdbec1c6 Mon Sep 17 00:00:00 2001 From: antznette1 Date: Thu, 9 Apr 2026 10:39:08 +0100 Subject: [PATCH 18/18] Run format-files pre-commit fixes Signed-off-by: antznette1 --- sdk/python/feast/infra/online_stores/online_store.py | 1 + sdk/python/feast/infra/online_stores/sqlite.py | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index f27cf732dce..bbd613be1a9 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -260,6 +260,7 @@ def get_online_features( def _check_versioned_read_support(self, grouped_refs, config: RepoConfig): """Raise an error if versioned reads are attempted on unsupported stores.""" from feast.infra.online_stores.sqlite import SqliteOnlineStore + for table, _ in grouped_refs: version_tag = getattr(table.projection, "version_tag", None) if version_tag is None: diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 24d8be30fa4..0a32b1d1206 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -39,8 +39,6 @@ from feast.infra.infra_object import SQLITE_INFRA_OBJECT_CLASS_TYPE, InfraObject from feast.infra.key_encoding_utils import ( deserialize_entity_key, - deserialize_f32, - deserialize_val, serialize_entity_key, serialize_f32, )