Skip to content
Merged
Next Next commit
feat: Add feature view versioning support to PostgreSQL and MySQL onl…
…ine stores

Add versioned read/write support so that version-qualified feature references
(e.g., driver_stats@v2:trips_today) resolve to the correct versioned table in
both PostgreSQL and MySQL online stores.

Changes:
- PostgreSQL: Updated _table_id() and all callers to support enable_versioning
- MySQL: Updated _table_id(), _execute_batch(), write_to_table(), and
  _drop_table_and_index() to thread versioning flag through
- online_store.py: Registered PostgreSQLOnlineStore and MySQLOnlineStore in
  _check_versioned_read_support()
- errors.py: Updated VersionedOnlineReadNotSupported message
- Unit tests split per store in tests/unit/infra/online_store/
- Integration tests in tests/integration/online_store/ (Docker, testcontainers)

Closes #6168
Closes #6169
Part of #2728

Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com>
  • Loading branch information
YassinNouh21 authored and ntkathole committed Apr 6, 2026
commit bd35805b19be9ca3c811e9f7f526ebd447b031b5
2 changes: 1 addition & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class VersionedOnlineReadNotSupported(FeastError):
def __init__(self, store_name: str, version: int):
super().__init__(
f"Versioned feature reads (@v{version}) are not yet supported by {store_name}. "
f"Currently only SQLite supports version-qualified feature references. "
f"Currently only SQLite, PostgreSQL, and MySQL support version-qualified feature references. "
)


Expand Down
62 changes: 44 additions & 18 deletions sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def online_write_batch(
cur = conn.cursor()

project = config.project
versioning = config.registry.enable_online_feature_view_versioning

batch_write = config.online_store.batch_write
if not batch_write:
Expand All @@ -92,6 +93,7 @@ def online_write_batch(
table,
timestamp,
val,
versioning,
)
conn.commit()
if progress:
Expand Down Expand Up @@ -124,7 +126,9 @@ def online_write_batch(

if len(insert_values) >= batch_size:
try:
self._execute_batch(cur, project, table, insert_values)
self._execute_batch(
cur, project, table, insert_values, versioning
)
conn.commit()
if progress:
progress(len(insert_values))
Expand All @@ -135,17 +139,20 @@ def online_write_batch(

if insert_values:
try:
self._execute_batch(cur, project, table, insert_values)
self._execute_batch(cur, project, table, insert_values, versioning)
conn.commit()
if progress:
progress(len(insert_values))
except Exception as e:
conn.rollback()
raise e

def _execute_batch(self, cur, project, table, insert_values):
sql = f"""
INSERT INTO {_table_id(project, table)}
def _execute_batch(
self, cur, project, table, insert_values, enable_versioning=False
):
table_name = _table_id(project, table, enable_versioning)
stmt = f"""
INSERT INTO {table_name}
(entity_key, feature_name, value, event_ts, created_ts)
values (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
Expand All @@ -154,22 +161,29 @@ def _execute_batch(self, cur, project, table, insert_values):
created_ts = VALUES(created_ts);
"""
try:
cur.executemany(sql, insert_values)
cur.executemany(stmt, insert_values)
except Exception as e:
# Log SQL info for debugging without leaking sensitive data
first_sample = insert_values[0] if insert_values else None
raise RuntimeError(
f"Failed to execute batch insert into table '{_table_id(project, table)}' "
f"Failed to execute batch insert into table '{table_name}' "
f"(rows={len(insert_values)}, sample={first_sample}): {e}"
) from e

@staticmethod
def write_to_table(
created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val
created_ts,
cur,
entity_key_bin,
feature_name,
project,
table,
timestamp,
val,
enable_versioning=False,
) -> None:
cur.execute(
f"""
INSERT INTO {_table_id(project, table)}
INSERT INTO {_table_id(project, table, enable_versioning)}
(entity_key, feature_name, value, event_ts, created_ts)
values (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
Expand Down Expand Up @@ -204,14 +218,15 @@ def online_read(
result: List[Tuple[Optional[datetime], Optional[Dict[str, Any]]]] = []

project = config.project
versioning = config.registry.enable_online_feature_view_versioning
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=3,
).hex()

cur.execute(
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s",
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table, versioning)} WHERE entity_key = %s",
(entity_key_bin,),
)

Expand Down Expand Up @@ -243,10 +258,11 @@ def update(
conn = self._get_conn(config)
cur = conn.cursor()
project = config.project
versioning = config.registry.enable_online_feature_view_versioning

# We don't create any special state for the entities in this implementation.
for table in tables_to_keep:
table_name = _table_id(project, table)
table_name = _table_id(project, table, versioning)
index_name = f"{table_name}_ek"
cur.execute(
f"""CREATE TABLE IF NOT EXISTS {table_name} (entity_key VARCHAR(512),
Expand All @@ -269,7 +285,7 @@ def update(
)

for table in tables_to_delete:
_drop_table_and_index(cur, project, table)
_drop_table_and_index(cur, project, table, versioning)

def teardown(
self,
Expand All @@ -280,16 +296,26 @@ def teardown(
conn = self._get_conn(config)
cur = conn.cursor()
project = config.project
versioning = config.registry.enable_online_feature_view_versioning

for table in tables:
_drop_table_and_index(cur, project, table)
_drop_table_and_index(cur, project, table, versioning)
Comment thread
YassinNouh21 marked this conversation as resolved.
Outdated


def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None:
table_name = _table_id(project, table)
def _drop_table_and_index(
cur: Cursor, project: str, table: FeatureView, enable_versioning: bool = False
) -> None:
table_name = _table_id(project, table, enable_versioning)
cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};")
cur.execute(f"DROP TABLE IF EXISTS {table_name}")


def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"
def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str:
name = table.name
if enable_versioning:
version = getattr(table.projection, "version_tag", None)
if version is None:
version = getattr(table, "current_version_number", None)
if version is not None and version > 0:
name = f"{table.name}_v{version}"
return f"{project}_{name}"
10 changes: 9 additions & 1 deletion sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,17 @@ def get_online_features(

def _check_versioned_read_support(self, grouped_refs):
"""Raise an error if versioned reads are attempted on unsupported stores."""
from feast.infra.online_stores.mysql_online_store.mysql import (
MySQLOnlineStore,
)
from feast.infra.online_stores.postgres_online_store.postgres import (
PostgreSQLOnlineStore,
)
from feast.infra.online_stores.sqlite import SqliteOnlineStore

if isinstance(self, SqliteOnlineStore):
if isinstance(
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Outdated
self, (SqliteOnlineStore, PostgreSQLOnlineStore, MySQLOnlineStore)
):
return
for table, _ in grouped_refs:
version_tag = getattr(table.projection, "version_tag", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,15 @@ def online_write_batch(
event_ts = EXCLUDED.event_ts,
created_ts = EXCLUDED.created_ts;
"""
).format(sql.Identifier(_table_id(config.project, table)))
).format(
sql.Identifier(
_table_id(
config.project,
table,
config.registry.enable_online_feature_view_versioning,
)
)
)

# Push data into the online store
with self._get_conn(config) as conn, conn.cursor() as cur:
Expand Down Expand Up @@ -214,7 +222,13 @@ def _construct_query_and_params(
FROM {} WHERE entity_key = ANY(%s) AND feature_name = ANY(%s);
"""
).format(
sql.Identifier(_table_id(config.project, table)),
sql.Identifier(
_table_id(
config.project,
table,
config.registry.enable_online_feature_view_versioning,
)
),
)
params = (keys, requested_features)
else:
Expand All @@ -224,7 +238,13 @@ def _construct_query_and_params(
FROM {} WHERE entity_key = ANY(%s);
"""
).format(
sql.Identifier(_table_id(config.project, table)),
sql.Identifier(
_table_id(
config.project,
table,
config.registry.enable_online_feature_view_versioning,
)
),
)
params = (keys, [])
return query, params
Expand Down Expand Up @@ -304,12 +324,13 @@ def update(
),
)

versioning = config.registry.enable_online_feature_view_versioning
for table in tables_to_delete:
table_name = _table_id(project, table)
table_name = _table_id(project, table, versioning)
cur.execute(_drop_table_and_index(table_name))

for table in tables_to_keep:
table_name = _table_id(project, table)
table_name = _table_id(project, table, versioning)
if config.online_store.vector_enabled:
vector_value_type = "vector"
else:
Expand Down Expand Up @@ -363,10 +384,11 @@ def teardown(
entities: Sequence[Entity],
):
project = config.project
versioning = config.registry.enable_online_feature_view_versioning
try:
with self._get_conn(config) as conn, conn.cursor() as cur:
for table in tables:
table_name = _table_id(project, table)
table_name = _table_id(project, table, versioning)
cur.execute(_drop_table_and_index(table_name))
conn.commit()
except Exception:
Expand Down Expand Up @@ -432,7 +454,9 @@ def retrieve_online_documents(
]
] = []
with self._get_conn(config, autocommit=True) as conn, conn.cursor() as cur:
table_name = _table_id(project, table)
table_name = _table_id(
project, table, config.registry.enable_online_feature_view_versioning
)

# Search query template to find the top k items that are closest to the given embedding
# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
Expand Down Expand Up @@ -533,7 +557,11 @@ def retrieve_online_documents_v2(
and feature.name in requested_features
]

table_name = _table_id(config.project, table)
table_name = _table_id(
config.project,
table,
config.registry.enable_online_feature_view_versioning,
)

with self._get_conn(config, autocommit=True) as conn, conn.cursor() as cur:
query = None
Expand Down Expand Up @@ -794,8 +822,15 @@ def retrieve_online_documents_v2(
return result


def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"
def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str:
Comment thread
YassinNouh21 marked this conversation as resolved.
name = table.name
if enable_versioning:
version = getattr(table.projection, "version_tag", None)
if version is None:
version = getattr(table, "current_version_number", None)
if version is not None and version > 0:
name = f"{table.name}_v{version}"
return f"{project}_{name}"


def _drop_table_and_index(table_name):
Expand Down
Loading