Skip to content
Merged
Prev Previous commit
Next Next commit
fix: drop all versioned tables on teardown and extract shared _table_id
Teardown was only dropping the current version's table, orphaning older
versioned tables (e.g. _v1, _v2). Now queries the database catalog to
find and drop all versioned tables for each feature view.

Also extracts the duplicated _table_id logic from SQLite, MySQL, and
Postgres into a shared compute_table_id helper.

Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com>
  • Loading branch information
YassinNouh21 authored and ntkathole committed Apr 6, 2026
commit 4abdda3134d2f48fbe1f924cdc6de79e8270b38e
14 changes: 14 additions & 0 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,17 @@ def _to_naive_utc(ts: datetime) -> datetime:
return ts
else:
return ts.astimezone(tz=timezone.utc).replace(tzinfo=None)


def compute_table_id(
project: str, table: Any, enable_versioning: bool = False
) -> str:
"""Build the online-store table name, appending a version suffix when versioning is enabled."""
name = table.name
if enable_versioning:
version = getattr(table.projection, "version_tag", None)
if version is None:
version = getattr(table, "current_version_number", None)
if version is not None and version > 0:
name = f"{table.name}_v{version}"
return f"{project}_{name}"
44 changes: 29 additions & 15 deletions sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from feast import Entity, FeatureView, RepoConfig
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.helpers import compute_table_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
Expand Down Expand Up @@ -285,7 +286,10 @@ def update(
)

for table in tables_to_delete:
_drop_table_and_index(cur, project, table, versioning)
if versioning:
_drop_all_version_tables(cur, project, table)
else:
_drop_table_and_index(cur, _table_id(project, table))

def teardown(
self,
Expand All @@ -299,23 +303,33 @@ def teardown(
versioning = config.registry.enable_online_feature_view_versioning

for table in tables:
_drop_table_and_index(cur, project, table, versioning)
if versioning:
_drop_all_version_tables(cur, project, table)
else:
_drop_table_and_index(cur, _table_id(project, table))


def _drop_table_and_index(
cur: Cursor, project: str, table: FeatureView, enable_versioning: bool = False
) -> None:
table_name = _table_id(project, table, enable_versioning)
def _drop_table_and_index(cur: Cursor, table_name: str) -> None:
cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};")
cur.execute(f"DROP TABLE IF EXISTS {table_name}")


def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str:
name = table.name
if enable_versioning:
version = getattr(table.projection, "version_tag", None)
if version is None:
version = getattr(table, "current_version_number", None)
if version is not None and version > 0:
name = f"{table.name}_v{version}"
return f"{project}_{name}"
def _drop_all_version_tables(
cur: Cursor, project: str, table: FeatureView
) -> None:
"""Drop the base table and all versioned tables (e.g. _v1, _v2, ...)."""
base = f"{project}_{table.name}"
cur.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_schema = DATABASE() AND (table_name = %s OR table_name LIKE %s)",
(base, f"{base}_v%"),
Comment thread
YassinNouh21 marked this conversation as resolved.
Outdated
)
for (name,) in cur.fetchall():
cur.execute(f"DROP INDEX IF EXISTS {name}_ek ON {name};")
cur.execute(f"DROP TABLE IF EXISTS {name}")


def _table_id(
project: str, table: FeatureView, enable_versioning: bool = False
) -> str:
return compute_table_id(project, table, enable_versioning)
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from feast import Entity, FeatureView, ValueType
from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key
from feast.infra.online_stores.helpers import _to_naive_utc
from feast.infra.online_stores.helpers import _to_naive_utc, compute_table_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.online_stores.vector_store import VectorStoreConfig
from feast.infra.utils.postgres.connection_utils import (
Expand Down Expand Up @@ -326,8 +326,11 @@ def update(

versioning = config.registry.enable_online_feature_view_versioning
for table in tables_to_delete:
table_name = _table_id(project, table, versioning)
cur.execute(_drop_table_and_index(table_name))
if versioning:
_drop_all_version_tables(cur, project, table)
else:
table_name = _table_id(project, table)
cur.execute(_drop_table_and_index(table_name))

for table in tables_to_keep:
table_name = _table_id(project, table, versioning)
Expand Down Expand Up @@ -388,8 +391,11 @@ def teardown(
try:
with self._get_conn(config) as conn, conn.cursor() as cur:
for table in tables:
table_name = _table_id(project, table, versioning)
cur.execute(_drop_table_and_index(table_name))
if versioning:
_drop_all_version_tables(cur, project, table)
else:
table_name = _table_id(project, table)
cur.execute(_drop_table_and_index(table_name))
conn.commit()
except Exception:
logging.exception("Teardown failed")
Expand Down Expand Up @@ -822,15 +828,10 @@ def retrieve_online_documents_v2(
return result


def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str:
name = table.name
if enable_versioning:
version = getattr(table.projection, "version_tag", None)
if version is None:
version = getattr(table, "current_version_number", None)
if version is not None and version > 0:
name = f"{table.name}_v{version}"
return f"{project}_{name}"
def _table_id(
project: str, table: FeatureView, enable_versioning: bool = False
) -> str:
return compute_table_id(project, table, enable_versioning)


def _drop_table_and_index(table_name):
Expand All @@ -843,3 +844,16 @@ def _drop_table_and_index(table_name):
sql.Identifier(table_name),
sql.Identifier(f"{table_name}_ek"),
)


def _drop_all_version_tables(cur, project: str, table: FeatureView) -> None:
"""Drop the base table and all versioned tables (e.g. _v1, _v2, ...)."""
base = f"{project}_{table.name}"
cur.execute(
sql.SQL(
"SELECT tablename FROM pg_tables WHERE tablename = %s OR tablename LIKE %s"
),
(base, f"{base}_v%"),
Comment thread
YassinNouh21 marked this conversation as resolved.
Outdated
)
for (name,) in cur.fetchall():
cur.execute(_drop_table_and_index(name))
16 changes: 5 additions & 11 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
serialize_entity_key,
serialize_f32,
)
from feast.infra.online_stores.helpers import compute_table_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.online_stores.vector_store import VectorStoreConfig
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
Expand Down Expand Up @@ -715,17 +716,10 @@ def _initialize_conn(
return db


def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str:
name = table.name
if enable_versioning:
# Prefer version_tag from the projection (set by version-qualified refs like @v2)
# over current_version_number (the FV's active version in metadata).
version = getattr(table.projection, "version_tag", None)
if version is None:
version = getattr(table, "current_version_number", None)
if version is not None and version > 0:
name = f"{table.name}_v{version}"
return f"{project}_{name}"
def _table_id(
project: str, table: FeatureView, enable_versioning: bool = False
) -> str:
return compute_table_id(project, table, enable_versioning)


class SqliteTable(InfraObject):
Expand Down