Skip to content

Commit 0e56ae2

Browse files
committed
feat: Add feature view versioning support to PostgreSQL and MySQL online stores
Add versioned read/write support so that version-qualified feature references (e.g., driver_stats@v2:trips_today) resolve to the correct versioned table in both PostgreSQL and MySQL online stores. Changes: - PostgreSQL: Updated _table_id() and all callers to support enable_versioning - MySQL: Updated _table_id(), _execute_batch(), write_to_table(), and _drop_table_and_index() to thread versioning flag through - online_store.py: Registered PostgreSQLOnlineStore and MySQLOnlineStore in _check_versioned_read_support() - errors.py: Updated VersionedOnlineReadNotSupported message - Unit tests split per store in tests/unit/infra/online_store/ - Integration tests in tests/integration/online_store/ (Docker, testcontainers) Closes #6168 Closes #6169 Part of #2728 Signed-off-by: yassinnouh21 <yassinnouh21@gmail.com>
1 parent 73805d3 commit 0e56ae2

File tree

8 files changed

+687
-30
lines changed

8 files changed

+687
-30
lines changed

sdk/python/feast/errors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class VersionedOnlineReadNotSupported(FeastError):
142142
def __init__(self, store_name: str, version: int):
143143
super().__init__(
144144
f"Versioned feature reads (@v{version}) are not yet supported by {store_name}. "
145-
f"Currently only SQLite supports version-qualified feature references. "
145+
f"Currently only SQLite, PostgreSQL, and MySQL support version-qualified feature references. "
146146
)
147147

148148

sdk/python/feast/infra/online_stores/mysql_online_store/mysql.py

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def online_write_batch(
7070
cur = conn.cursor()
7171

7272
project = config.project
73+
versioning = config.registry.enable_online_feature_view_versioning
7374

7475
batch_write = config.online_store.batch_write
7576
if not batch_write:
@@ -92,6 +93,7 @@ def online_write_batch(
9293
table,
9394
timestamp,
9495
val,
96+
versioning,
9597
)
9698
conn.commit()
9799
if progress:
@@ -124,7 +126,9 @@ def online_write_batch(
124126

125127
if len(insert_values) >= batch_size:
126128
try:
127-
self._execute_batch(cur, project, table, insert_values)
129+
self._execute_batch(
130+
cur, project, table, insert_values, versioning
131+
)
128132
conn.commit()
129133
if progress:
130134
progress(len(insert_values))
@@ -135,17 +139,20 @@ def online_write_batch(
135139

136140
if insert_values:
137141
try:
138-
self._execute_batch(cur, project, table, insert_values)
142+
self._execute_batch(cur, project, table, insert_values, versioning)
139143
conn.commit()
140144
if progress:
141145
progress(len(insert_values))
142146
except Exception as e:
143147
conn.rollback()
144148
raise e
145149

146-
def _execute_batch(self, cur, project, table, insert_values):
147-
sql = f"""
148-
INSERT INTO {_table_id(project, table)}
150+
def _execute_batch(
151+
self, cur, project, table, insert_values, enable_versioning=False
152+
):
153+
table_name = _table_id(project, table, enable_versioning)
154+
stmt = f"""
155+
INSERT INTO {table_name}
149156
(entity_key, feature_name, value, event_ts, created_ts)
150157
values (%s, %s, %s, %s, %s)
151158
ON DUPLICATE KEY UPDATE
@@ -154,22 +161,29 @@ def _execute_batch(self, cur, project, table, insert_values):
154161
created_ts = VALUES(created_ts);
155162
"""
156163
try:
157-
cur.executemany(sql, insert_values)
164+
cur.executemany(stmt, insert_values)
158165
except Exception as e:
159-
# Log SQL info for debugging without leaking sensitive data
160166
first_sample = insert_values[0] if insert_values else None
161167
raise RuntimeError(
162-
f"Failed to execute batch insert into table '{_table_id(project, table)}' "
168+
f"Failed to execute batch insert into table '{table_name}' "
163169
f"(rows={len(insert_values)}, sample={first_sample}): {e}"
164170
) from e
165171

166172
@staticmethod
167173
def write_to_table(
168-
created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val
174+
created_ts,
175+
cur,
176+
entity_key_bin,
177+
feature_name,
178+
project,
179+
table,
180+
timestamp,
181+
val,
182+
enable_versioning=False,
169183
) -> None:
170184
cur.execute(
171185
f"""
172-
INSERT INTO {_table_id(project, table)}
186+
INSERT INTO {_table_id(project, table, enable_versioning)}
173187
(entity_key, feature_name, value, event_ts, created_ts)
174188
values (%s, %s, %s, %s, %s)
175189
ON DUPLICATE KEY UPDATE
@@ -204,14 +218,15 @@ def online_read(
204218
result: List[Tuple[Optional[datetime], Optional[Dict[str, Any]]]] = []
205219

206220
project = config.project
221+
versioning = config.registry.enable_online_feature_view_versioning
207222
for entity_key in entity_keys:
208223
entity_key_bin = serialize_entity_key(
209224
entity_key,
210225
entity_key_serialization_version=3,
211226
).hex()
212227

213228
cur.execute(
214-
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s",
229+
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table, versioning)} WHERE entity_key = %s",
215230
(entity_key_bin,),
216231
)
217232

@@ -243,10 +258,11 @@ def update(
243258
conn = self._get_conn(config)
244259
cur = conn.cursor()
245260
project = config.project
261+
versioning = config.registry.enable_online_feature_view_versioning
246262

247263
# We don't create any special state for the entities in this implementation.
248264
for table in tables_to_keep:
249-
table_name = _table_id(project, table)
265+
table_name = _table_id(project, table, versioning)
250266
index_name = f"{table_name}_ek"
251267
cur.execute(
252268
f"""CREATE TABLE IF NOT EXISTS {table_name} (entity_key VARCHAR(512),
@@ -269,7 +285,7 @@ def update(
269285
)
270286

271287
for table in tables_to_delete:
272-
_drop_table_and_index(cur, project, table)
288+
_drop_table_and_index(cur, project, table, versioning)
273289

274290
def teardown(
275291
self,
@@ -280,16 +296,26 @@ def teardown(
280296
conn = self._get_conn(config)
281297
cur = conn.cursor()
282298
project = config.project
299+
versioning = config.registry.enable_online_feature_view_versioning
283300

284301
for table in tables:
285-
_drop_table_and_index(cur, project, table)
302+
_drop_table_and_index(cur, project, table, versioning)
286303

287304

288-
def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None:
289-
table_name = _table_id(project, table)
305+
def _drop_table_and_index(
306+
cur: Cursor, project: str, table: FeatureView, enable_versioning: bool = False
307+
) -> None:
308+
table_name = _table_id(project, table, enable_versioning)
290309
cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};")
291310
cur.execute(f"DROP TABLE IF EXISTS {table_name}")
292311

293312

294-
def _table_id(project: str, table: FeatureView) -> str:
295-
return f"{project}_{table.name}"
313+
def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str:
314+
name = table.name
315+
if enable_versioning:
316+
version = getattr(table.projection, "version_tag", None)
317+
if version is None:
318+
version = getattr(table, "current_version_number", None)
319+
if version is not None and version > 0:
320+
name = f"{table.name}_v{version}"
321+
return f"{project}_{name}"

sdk/python/feast/infra/online_stores/online_store.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,17 @@ def get_online_features(
256256

257257
def _check_versioned_read_support(self, grouped_refs):
258258
"""Raise an error if versioned reads are attempted on unsupported stores."""
259+
from feast.infra.online_stores.mysql_online_store.mysql import (
260+
MySQLOnlineStore,
261+
)
262+
from feast.infra.online_stores.postgres_online_store.postgres import (
263+
PostgreSQLOnlineStore,
264+
)
259265
from feast.infra.online_stores.sqlite import SqliteOnlineStore
260266

261-
if isinstance(self, SqliteOnlineStore):
267+
if isinstance(
268+
self, (SqliteOnlineStore, PostgreSQLOnlineStore, MySQLOnlineStore)
269+
):
262270
return
263271
for table, _ in grouped_refs:
264272
version_tag = getattr(table.projection, "version_tag", None)

sdk/python/feast/infra/online_stores/postgres_online_store/postgres.py

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,15 @@ def online_write_batch(
152152
event_ts = EXCLUDED.event_ts,
153153
created_ts = EXCLUDED.created_ts;
154154
"""
155-
).format(sql.Identifier(_table_id(config.project, table)))
155+
).format(
156+
sql.Identifier(
157+
_table_id(
158+
config.project,
159+
table,
160+
config.registry.enable_online_feature_view_versioning,
161+
)
162+
)
163+
)
156164

157165
# Push data into the online store
158166
with self._get_conn(config) as conn, conn.cursor() as cur:
@@ -214,7 +222,13 @@ def _construct_query_and_params(
214222
FROM {} WHERE entity_key = ANY(%s) AND feature_name = ANY(%s);
215223
"""
216224
).format(
217-
sql.Identifier(_table_id(config.project, table)),
225+
sql.Identifier(
226+
_table_id(
227+
config.project,
228+
table,
229+
config.registry.enable_online_feature_view_versioning,
230+
)
231+
),
218232
)
219233
params = (keys, requested_features)
220234
else:
@@ -224,7 +238,13 @@ def _construct_query_and_params(
224238
FROM {} WHERE entity_key = ANY(%s);
225239
"""
226240
).format(
227-
sql.Identifier(_table_id(config.project, table)),
241+
sql.Identifier(
242+
_table_id(
243+
config.project,
244+
table,
245+
config.registry.enable_online_feature_view_versioning,
246+
)
247+
),
228248
)
229249
params = (keys, [])
230250
return query, params
@@ -304,12 +324,13 @@ def update(
304324
),
305325
)
306326

327+
versioning = config.registry.enable_online_feature_view_versioning
307328
for table in tables_to_delete:
308-
table_name = _table_id(project, table)
329+
table_name = _table_id(project, table, versioning)
309330
cur.execute(_drop_table_and_index(table_name))
310331

311332
for table in tables_to_keep:
312-
table_name = _table_id(project, table)
333+
table_name = _table_id(project, table, versioning)
313334
if config.online_store.vector_enabled:
314335
vector_value_type = "vector"
315336
else:
@@ -363,10 +384,11 @@ def teardown(
363384
entities: Sequence[Entity],
364385
):
365386
project = config.project
387+
versioning = config.registry.enable_online_feature_view_versioning
366388
try:
367389
with self._get_conn(config) as conn, conn.cursor() as cur:
368390
for table in tables:
369-
table_name = _table_id(project, table)
391+
table_name = _table_id(project, table, versioning)
370392
cur.execute(_drop_table_and_index(table_name))
371393
conn.commit()
372394
except Exception:
@@ -432,7 +454,9 @@ def retrieve_online_documents(
432454
]
433455
] = []
434456
with self._get_conn(config, autocommit=True) as conn, conn.cursor() as cur:
435-
table_name = _table_id(project, table)
457+
table_name = _table_id(
458+
project, table, config.registry.enable_online_feature_view_versioning
459+
)
436460

437461
# Search query template to find the top k items that are closest to the given embedding
438462
# SELECT * FROM items ORDER BY embedding <-> '[3,1,2]' LIMIT 5;
@@ -533,7 +557,11 @@ def retrieve_online_documents_v2(
533557
and feature.name in requested_features
534558
]
535559

536-
table_name = _table_id(config.project, table)
560+
table_name = _table_id(
561+
config.project,
562+
table,
563+
config.registry.enable_online_feature_view_versioning,
564+
)
537565

538566
with self._get_conn(config, autocommit=True) as conn, conn.cursor() as cur:
539567
query = None
@@ -794,8 +822,15 @@ def retrieve_online_documents_v2(
794822
return result
795823

796824

797-
def _table_id(project: str, table: FeatureView) -> str:
798-
return f"{project}_{table.name}"
825+
def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str:
826+
name = table.name
827+
if enable_versioning:
828+
version = getattr(table.projection, "version_tag", None)
829+
if version is None:
830+
version = getattr(table, "current_version_number", None)
831+
if version is not None and version > 0:
832+
name = f"{table.name}_v{version}"
833+
return f"{project}_{name}"
799834

800835

801836
def _drop_table_and_index(table_name):

0 commit comments

Comments
 (0)