Skip to content

Commit 977034b

Browse files
committed
feat: Add feature view versioning support to Snowflake online store
Introduces `_snowflake_table_name()` helper that appends a `_v{N}` suffix when `enable_online_feature_view_versioning` is set, routing write_batch, online_read, update, and teardown to the correct versioned transient table. Registers SnowflakeOnlineStore in the supported-types list so versioned reads do not raise VersionedOnlineReadNotSupported. Unit tests follow the same pattern as the DynamoDB and Redis versioning test suites. Closes #6167 Signed-off-by: Abhishek8108 <87538407+Abhishek8108@users.noreply.github.com>
1 parent bedc0ef commit 977034b

3 files changed

Lines changed: 202 additions & 7 deletions

File tree

sdk/python/feast/infra/online_stores/online_store.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,12 @@ def _check_versioned_read_support(self, grouped_refs):
300300
supported_types.append(MilvusOnlineStore)
301301
except ImportError:
302302
pass
303+
try:
304+
from feast.infra.online_stores.snowflake import SnowflakeOnlineStore
305+
306+
supported_types.append(SnowflakeOnlineStore)
307+
except ImportError:
308+
pass
303309

304310
if isinstance(self, tuple(supported_types)):
305311
return

sdk/python/feast/infra/online_stores/snowflake.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from feast.entity import Entity
1010
from feast.feature_view import FeatureView
1111
from feast.infra.key_encoding_utils import serialize_entity_key
12+
from feast.infra.online_stores.helpers import compute_versioned_name
1213
from feast.infra.online_stores.online_store import OnlineStore
1314
from feast.infra.utils.snowflake.snowflake_utils import (
1415
GetSnowflakeConnection,
@@ -22,6 +23,12 @@
2223
from feast.utils import to_naive_utc
2324

2425

26+
def _snowflake_table_name(
27+
project: str, table: FeatureView, enable_versioning: bool = False
28+
) -> str:
29+
return f"[online-transient] {project}_{compute_versioned_name(table, enable_versioning)}"
30+
31+
2532
class SnowflakeOnlineStoreConfig(FeastConfigBaseModel):
2633
"""Online store config for Snowflake"""
2734

@@ -120,17 +127,19 @@ def online_write_batch(
120127

121128
# This combines both the data upload plus the overwrite in the same transaction
122129
online_path = get_snowflake_online_store_path(config, table)
130+
versioning = config.registry.enable_online_feature_view_versioning
131+
tbl = _snowflake_table_name(config.project, table, versioning)
123132
with GetSnowflakeConnection(config.online_store, autocommit=False) as conn:
124133
write_pandas_binary(
125134
conn,
126135
agg_df,
127-
table_name=f"[online-transient] {config.project}_{table.name}",
136+
table_name=tbl,
128137
database=f"{config.online_store.database}",
129138
schema=f"{config.online_store.schema_}",
130139
) # special function for writing binary to snowflake
131140

132141
query = f"""
133-
INSERT OVERWRITE INTO {online_path}."[online-transient] {config.project}_{table.name}"
142+
INSERT OVERWRITE INTO {online_path}."{tbl}"
134143
SELECT
135144
"entity_feature_key",
136145
"entity_key",
@@ -143,7 +152,7 @@ def online_write_batch(
143152
*,
144153
ROW_NUMBER() OVER(PARTITION BY "entity_key","feature_name" ORDER BY "event_ts" DESC, "created_ts" DESC) AS "_feast_row"
145154
FROM
146-
{online_path}."[online-transient] {config.project}_{table.name}")
155+
{online_path}."{tbl}")
147156
WHERE
148157
"_feast_row" = 1;
149158
"""
@@ -191,12 +200,15 @@ def online_read(
191200
)
192201

193202
online_path = get_snowflake_online_store_path(config, table)
203+
tbl = _snowflake_table_name(
204+
config.project, table, config.registry.enable_online_feature_view_versioning
205+
)
194206
with GetSnowflakeConnection(config.online_store) as conn:
195207
query = f"""
196208
SELECT
197209
"entity_key", "feature_name", "value", "event_ts"
198210
FROM
199-
{online_path}."[online-transient] {config.project}_{table.name}"
211+
{online_path}."{tbl}"
200212
WHERE
201213
"entity_feature_key" IN ({entity_fetch_str})
202214
"""
@@ -228,11 +240,13 @@ def update(
228240
):
229241
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)
230242

243+
versioning = config.registry.enable_online_feature_view_versioning
231244
with GetSnowflakeConnection(config.online_store) as conn:
232245
for table in tables_to_keep:
233246
online_path = get_snowflake_online_store_path(config, table)
247+
tbl = _snowflake_table_name(config.project, table, versioning)
234248
query = f"""
235-
CREATE TRANSIENT TABLE IF NOT EXISTS {online_path}."[online-transient] {config.project}_{table.name}" (
249+
CREATE TRANSIENT TABLE IF NOT EXISTS {online_path}."{tbl}" (
236250
"entity_feature_key" BINARY,
237251
"entity_key" BINARY,
238252
"feature_name" VARCHAR,
@@ -245,7 +259,8 @@ def update(
245259

246260
for table in tables_to_delete:
247261
online_path = get_snowflake_online_store_path(config, table)
248-
query = f'DROP TABLE IF EXISTS {online_path}."[online-transient] {config.project}_{table.name}"'
262+
tbl = _snowflake_table_name(config.project, table, versioning)
263+
query = f'DROP TABLE IF EXISTS {online_path}."{tbl}"'
249264
execute_snowflake_statement(conn, query)
250265

251266
def teardown(
@@ -256,8 +271,10 @@ def teardown(
256271
):
257272
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)
258273

274+
versioning = config.registry.enable_online_feature_view_versioning
259275
with GetSnowflakeConnection(config.online_store) as conn:
260276
for table in tables:
261277
online_path = get_snowflake_online_store_path(config, table)
262-
query = f'DROP TABLE IF EXISTS {online_path}."[online-transient] {config.project}_{table.name}"'
278+
tbl = _snowflake_table_name(config.project, table, versioning)
279+
query = f'DROP TABLE IF EXISTS {online_path}."{tbl}"'
263280
execute_snowflake_statement(conn, query)
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
"""Unit tests for Snowflake online store feature view versioning."""
2+
3+
import sys
4+
from datetime import timedelta
5+
from types import ModuleType
6+
from unittest.mock import MagicMock
7+
8+
from feast import Entity, FeatureView
9+
from feast.field import Field
10+
from feast.types import Float32
11+
from feast.value_type import ValueType
12+
13+
14+
def _stub_snowflake_modules():
15+
"""Stub out Snowflake connector and cryptography so the online store can be imported."""
16+
17+
# Build a proper package hierarchy so submodule imports don't fail.
18+
def _mod(name):
19+
m = ModuleType(name)
20+
sys.modules[name] = m
21+
return m
22+
23+
if "cryptography" not in sys.modules:
24+
crypto = _mod("cryptography")
25+
hazmat = _mod("cryptography.hazmat")
26+
backends = _mod("cryptography.hazmat.backends")
27+
backends.default_backend = MagicMock()
28+
primitives = _mod("cryptography.hazmat.primitives")
29+
serialization = _mod("cryptography.hazmat.primitives.serialization")
30+
serialization.Encoding = MagicMock()
31+
serialization.PrivateFormat = MagicMock()
32+
serialization.NoEncryption = MagicMock()
33+
crypto.hazmat = hazmat
34+
hazmat.backends = backends
35+
hazmat.primitives = primitives
36+
primitives.serialization = serialization
37+
38+
if "snowflake" not in sys.modules:
39+
sf = _mod("snowflake")
40+
connector = _mod("snowflake.connector")
41+
connector.ProgrammingError = Exception
42+
connector.SnowflakeConnection = MagicMock()
43+
cursor_mod = _mod("snowflake.connector.cursor")
44+
cursor_mod.SnowflakeCursor = MagicMock()
45+
sf.connector = connector
46+
connector.cursor = cursor_mod
47+
48+
if "tenacity" not in sys.modules:
49+
tenacity = _mod("tenacity")
50+
tenacity.retry = lambda *a, **kw: lambda f: f
51+
tenacity.retry_if_exception_type = MagicMock()
52+
tenacity.stop_after_attempt = MagicMock()
53+
tenacity.wait_exponential = MagicMock()
54+
55+
56+
_stub_snowflake_modules()
57+
58+
59+
def _make_feature_view(name="driver_stats", version_number=None, version_tag=None):
60+
entity = Entity(
61+
name="driver_id",
62+
join_keys=["driver_id"],
63+
value_type=ValueType.INT64,
64+
)
65+
fv = FeatureView(
66+
name=name,
67+
entities=[entity],
68+
ttl=timedelta(days=1),
69+
schema=[Field(name="trips_today", dtype=Float32)],
70+
)
71+
if version_number is not None:
72+
fv.current_version_number = version_number
73+
if version_tag is not None:
74+
fv.projection.version_tag = version_tag
75+
return fv
76+
77+
78+
class TestSnowflakeTableName:
79+
"""Test _snowflake_table_name with versioning enabled/disabled."""
80+
81+
def test_no_versioning(self):
82+
from feast.infra.online_stores.snowflake import _snowflake_table_name
83+
84+
fv = _make_feature_view()
85+
assert (
86+
_snowflake_table_name("test_project", fv, False)
87+
== "[online-transient] test_project_driver_stats"
88+
)
89+
90+
def test_versioning_disabled_ignores_version(self):
91+
from feast.infra.online_stores.snowflake import _snowflake_table_name
92+
93+
fv = _make_feature_view(version_number=5)
94+
assert (
95+
_snowflake_table_name("test_project", fv, False)
96+
== "[online-transient] test_project_driver_stats"
97+
)
98+
99+
def test_versioning_enabled_no_version_set(self):
100+
from feast.infra.online_stores.snowflake import _snowflake_table_name
101+
102+
fv = _make_feature_view()
103+
assert (
104+
_snowflake_table_name("test_project", fv, True)
105+
== "[online-transient] test_project_driver_stats"
106+
)
107+
108+
def test_versioning_enabled_with_current_version_number(self):
109+
from feast.infra.online_stores.snowflake import _snowflake_table_name
110+
111+
fv = _make_feature_view(version_number=2)
112+
assert (
113+
_snowflake_table_name("test_project", fv, True)
114+
== "[online-transient] test_project_driver_stats_v2"
115+
)
116+
117+
def test_version_zero_no_suffix(self):
118+
from feast.infra.online_stores.snowflake import _snowflake_table_name
119+
120+
fv = _make_feature_view(version_number=0)
121+
assert (
122+
_snowflake_table_name("test_project", fv, True)
123+
== "[online-transient] test_project_driver_stats"
124+
)
125+
126+
def test_projection_version_tag_takes_priority(self):
127+
from feast.infra.online_stores.snowflake import _snowflake_table_name
128+
129+
fv = _make_feature_view(version_number=1, version_tag=3)
130+
assert (
131+
_snowflake_table_name("test_project", fv, True)
132+
== "[online-transient] test_project_driver_stats_v3"
133+
)
134+
135+
def test_projection_version_tag_zero_no_suffix(self):
136+
from feast.infra.online_stores.snowflake import _snowflake_table_name
137+
138+
fv = _make_feature_view(version_tag=0, version_number=3)
139+
assert (
140+
_snowflake_table_name("test_project", fv, True)
141+
== "[online-transient] test_project_driver_stats"
142+
)
143+
144+
def test_different_versions_produce_different_table_names(self):
145+
from feast.infra.online_stores.snowflake import _snowflake_table_name
146+
147+
fv_v1 = _make_feature_view(version_number=1)
148+
fv_v2 = _make_feature_view(version_number=2)
149+
name_v1 = _snowflake_table_name("prod", fv_v1, True)
150+
name_v2 = _snowflake_table_name("prod", fv_v2, True)
151+
assert name_v1 != name_v2
152+
assert name_v1 == "[online-transient] prod_driver_stats_v1"
153+
assert name_v2 == "[online-transient] prod_driver_stats_v2"
154+
155+
156+
class TestSnowflakeVersionedReadSupport:
157+
"""Test that SnowflakeOnlineStore passes _check_versioned_read_support."""
158+
159+
def test_allowed_with_version_tag(self):
160+
from feast.infra.online_stores.snowflake import SnowflakeOnlineStore
161+
162+
store = SnowflakeOnlineStore()
163+
fv = _make_feature_view()
164+
fv.projection.version_tag = 2
165+
store._check_versioned_read_support([(fv, ["trips_today"])])
166+
167+
def test_allowed_without_version_tag(self):
168+
from feast.infra.online_stores.snowflake import SnowflakeOnlineStore
169+
170+
store = SnowflakeOnlineStore()
171+
fv = _make_feature_view()
172+
store._check_versioned_read_support([(fv, ["trips_today"])])

0 commit comments

Comments
 (0)