Skip to content

Commit 88a6e5b

Browse files
committed
Add SingleStore online store support for feature view versioning
Signed-off-by: antznette1 <ochiezeanthonette@gmail.com>
1 parent 73805d3 commit 88a6e5b

File tree

5 files changed

+112
-21
lines changed

5 files changed

+112
-21
lines changed

sdk/python/feast/infra/online_stores/online_store.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ def get_online_features(
188188
)
189189

190190
# Check for versioned reads on unsupported stores
191-
self._check_versioned_read_support(grouped_refs)
191+
self._check_versioned_read_support(grouped_refs, config)
192192
_track_read = False
193193
try:
194194
from feast.metrics import _config as _metrics_config
@@ -254,19 +254,30 @@ def get_online_features(
254254
)
255255
return OnlineResponse(online_features_response)
256256

257-
def _check_versioned_read_support(self, grouped_refs):
257+
def _check_versioned_read_support(self, grouped_refs, config: RepoConfig):
258258
"""Raise an error if versioned reads are attempted on unsupported stores."""
259259
from feast.infra.online_stores.sqlite import SqliteOnlineStore
260+
from feast.infra.online_stores.singlestore_online_store.singlestore import (
261+
SingleStoreOnlineStore,
262+
)
260263

261-
if isinstance(self, SqliteOnlineStore):
262-
return
263264
for table, _ in grouped_refs:
264265
version_tag = getattr(table.projection, "version_tag", None)
265-
if version_tag is not None:
266+
if version_tag is None:
267+
continue
268+
269+
# Version-qualified refs (e.g. @v2) are only supported when online versioning is enabled.
270+
if not config.registry.enable_online_feature_view_versioning:
266271
raise VersionedOnlineReadNotSupported(
267272
self.__class__.__name__, version_tag
268273
)
269274

275+
# Online versioning enabled: allow stores that implement versioned routing.
276+
if isinstance(self, (SqliteOnlineStore, SingleStoreOnlineStore)):
277+
continue
278+
279+
raise VersionedOnlineReadNotSupported(self.__class__.__name__, version_tag)
280+
270281
async def get_online_features_async(
271282
self,
272283
config: RepoConfig,
@@ -311,7 +322,7 @@ async def get_online_features_async(
311322
)
312323

313324
# Check for versioned reads on unsupported stores
314-
self._check_versioned_read_support(grouped_refs)
325+
self._check_versioned_read_support(grouped_refs, config)
315326

316327
async def query_table(table, requested_features):
317328
# Get the correct set of entity values with the correct join keys.

sdk/python/feast/infra/online_stores/singlestore_online_store/singlestore.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def online_write_batch(
8080
for entity_key, values, timestamp, created_ts in data:
8181
entity_key_bin = serialize_entity_key(
8282
entity_key,
83-
entity_key_serialization_version=3,
83+
entity_key_serialization_version=config.entity_key_serialization_version,
8484
).hex()
8585
timestamp = _to_naive_utc(timestamp)
8686
if created_ts is not None:
@@ -102,7 +102,7 @@ def online_write_batch(
102102
current_batch = insert_values[i : i + batch_size]
103103
cur.executemany(
104104
f"""
105-
INSERT INTO {_table_id(project, table)}
105+
INSERT INTO {_table_id(project, table, config.registry.enable_online_feature_view_versioning)}
106106
(entity_key, feature_name, value, event_ts, created_ts)
107107
values (%s, %s, %s, %s, %s)
108108
ON DUPLICATE KEY UPDATE
@@ -130,15 +130,15 @@ def online_read(
130130
keys.append(
131131
serialize_entity_key(
132132
entity_key,
133-
entity_key_serialization_version=3,
133+
entity_key_serialization_version=config.entity_key_serialization_version,
134134
).hex()
135135
)
136136

137137
if not requested_features:
138138
entity_key_placeholders = ",".join(["%s" for _ in keys])
139139
cur.execute(
140140
f"""
141-
SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)}
141+
SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table, config.registry.enable_online_feature_view_versioning)}
142142
WHERE entity_key IN ({entity_key_placeholders})
143143
ORDER BY event_ts;
144144
""",
@@ -151,7 +151,7 @@ def online_read(
151151
)
152152
cur.execute(
153153
f"""
154-
SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)}
154+
SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table, config.registry.enable_online_feature_view_versioning)}
155155
WHERE entity_key IN ({entity_key_placeholders}) and feature_name IN ({requested_features_placeholders})
156156
ORDER BY event_ts;
157157
""",
@@ -191,21 +191,23 @@ def update(
191191
partial: bool,
192192
) -> None:
193193
project = config.project
194+
versioning = config.registry.enable_online_feature_view_versioning
194195
with self._get_cursor(config) as cur:
195196
# We don't create any special state for the entities in this implementation.
196197
for table in tables_to_keep:
198+
table_name = _table_id(project, table, versioning)
197199
cur.execute(
198-
f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512),
200+
f"""CREATE TABLE IF NOT EXISTS {table_name} (entity_key VARCHAR(512),
199201
feature_name VARCHAR(256),
200202
value BLOB,
201203
event_ts timestamp NULL DEFAULT NULL,
202204
created_ts timestamp NULL DEFAULT NULL,
203205
PRIMARY KEY(entity_key, feature_name),
204-
INDEX {_table_id(project, table)}_ek (entity_key))"""
206+
INDEX {table_name}_ek (entity_key))"""
205207
)
206208

207209
for table in tables_to_delete:
208-
_drop_table_and_index(cur, project, table)
210+
_drop_table_and_index(cur, project, table, versioning)
209211

210212
def teardown(
211213
self,
@@ -214,16 +216,26 @@ def teardown(
214216
entities: Sequence[Entity],
215217
) -> None:
216218
project = config.project
219+
versioning = config.registry.enable_online_feature_view_versioning
217220
with self._get_cursor(config) as cur:
218221
for table in tables:
219-
_drop_table_and_index(cur, project, table)
222+
_drop_table_and_index(cur, project, table, versioning)
220223

221224

222-
def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None:
223-
table_name = _table_id(project, table)
225+
def _drop_table_and_index(
226+
cur: Cursor, project: str, table: FeatureView, enable_versioning: bool
227+
) -> None:
228+
table_name = _table_id(project, table, enable_versioning)
224229
cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};")
225230
cur.execute(f"DROP TABLE IF EXISTS {table_name}")
226231

227232

228-
def _table_id(project: str, table: FeatureView) -> str:
229-
return f"{project}_{table.name}"
233+
def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str:
234+
name = table.name
235+
if enable_versioning:
236+
version = getattr(table.projection, "version_tag", None)
237+
if version is None:
238+
version = getattr(table, "current_version_number", None)
239+
if version is not None and version > 0:
240+
name = f"{table.name}_v{version}"
241+
return f"{project}_{name}"

sdk/python/pytest.ini

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ env =
44
IS_TEST=True
55
filterwarnings =
66
error::_pytest.warning_types.PytestConfigWarning
7-
error::_pytest.warning_types.PytestUnhandledCoroutineWarning
87
ignore::DeprecationWarning:pyspark.sql.pandas.*:
98
ignore::DeprecationWarning:pyspark.sql.connect.*:
109
ignore::DeprecationWarning:httpx.*:

sdk/python/tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585

8686

8787
def pytest_configure(config):
88-
if platform in ["darwin", "windows"]:
88+
if platform in ["darwin"] or platform.startswith("win"):
8989
multiprocessing.set_start_method("spawn", force=True)
9090
else:
9191
multiprocessing.set_start_method("fork")

sdk/python/tests/integration/online_store/test_universal_online.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,75 @@ def test_write_to_online_store(environment, universal_data_sources):
283283
assertpy.assert_that(df["conv_rate"].iloc[0]).is_close_to(0.85, 1e-6)
284284

285285

286+
@pytest.mark.integration
287+
@pytest.mark.universal_online_stores(only=["singlestore"])
288+
def test_singlestore_versioned_online_reads(environment, universal_data_sources):
289+
fs = environment.feature_store
290+
fs.config.registry.enable_online_feature_view_versioning = True
291+
292+
entities, datasets, data_sources = universal_data_sources
293+
driver_entity = driver()
294+
295+
# Apply v0
296+
driver_hourly_stats_v0 = create_driver_hourly_stats_feature_view(data_sources.driver)
297+
fs.apply([driver_hourly_stats_v0, driver_entity])
298+
299+
# Write v0 data
300+
df_v0 = pd.DataFrame(
301+
{
302+
"driver_id": [1],
303+
"conv_rate": [0.1],
304+
"acc_rate": [0.2],
305+
"avg_daily_trips": [10],
306+
"driver_metadata": [None],
307+
"driver_config": [None],
308+
"driver_profile": [None],
309+
"event_timestamp": [pd.Timestamp(_utc_now()).round("ms")],
310+
"created": [pd.Timestamp(_utc_now()).round("ms")],
311+
}
312+
)
313+
fs.write_to_online_store("driver_stats", df_v0)
314+
315+
# Apply a schema change to create v1
316+
driver_hourly_stats_v1 = FeatureView(
317+
name="driver_stats",
318+
entities=[driver_entity],
319+
schema=driver_hourly_stats_v0.schema
320+
+ [Field(name="new_feature", dtype=Float32)],
321+
source=data_sources.driver,
322+
ttl=driver_hourly_stats_v0.ttl,
323+
tags=TAGS,
324+
)
325+
fs.apply([driver_hourly_stats_v1, driver_entity])
326+
327+
# Write v1 data
328+
df_v1 = pd.DataFrame(
329+
{
330+
"driver_id": [1],
331+
"conv_rate": [0.1],
332+
"acc_rate": [0.2],
333+
"avg_daily_trips": [20],
334+
"new_feature": [1.0],
335+
"driver_metadata": [None],
336+
"driver_config": [None],
337+
"driver_profile": [None],
338+
"event_timestamp": [pd.Timestamp(_utc_now()).round("ms")],
339+
"created": [pd.Timestamp(_utc_now()).round("ms")],
340+
}
341+
)
342+
fs.write_to_online_store("driver_stats", df_v1)
343+
344+
# Read v0 and v1 explicitly
345+
df = fs.get_online_features(
346+
features=["driver_stats@v0:avg_daily_trips", "driver_stats@v1:avg_daily_trips"],
347+
entity_rows=[{"driver_id": 1}],
348+
full_feature_names=True,
349+
).to_df()
350+
351+
assertpy.assert_that(df["driver_stats@v0__avg_daily_trips"].iloc[0]).is_equal_to(10)
352+
assertpy.assert_that(df["driver_stats@v1__avg_daily_trips"].iloc[0]).is_equal_to(20)
353+
354+
286355
def _get_online_features_dict_remotely(
287356
endpoint: str,
288357
features: Union[List[str], FeatureService],

0 commit comments

Comments
 (0)