Skip to content

Commit 39aeea3

Browse files
feat: Add tag kwarg to set Snowflake online store table path (feast-dev#3176)
Signed-off-by: Miles Adkins <miles.adkins@snowflake.com> Signed-off-by: Miles Adkins <miles.adkins@snowflake.com>
1 parent dfdd0ca commit 39aeea3

4 files changed

Lines changed: 49 additions & 17 deletions

File tree

docs/reference/online-stores/snowflake.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ The data model for using a Snowflake Transient Table as an online store follows
1717
(This model may be subject to change when Snowflake Hybrid Tables are released)
1818

1919
## Example
20-
2120
{% code title="feature_store.yaml" %}
2221
```yaml
2322
project: my_feature_repo
@@ -34,14 +33,27 @@ online_store:
3433
```
3534
{% endcode %}
3635
36+
## Tags KWARGs Actions:
37+
38+
"ONLINE_PATH": Adding the "ONLINE_PATH" key to a FeatureView tags parameter allows you to choose the online table path for the online serving table (ex. "{database}"."{schema}").
39+
40+
{% code title="example_config.py" %}
41+
```python
42+
driver_stats_fv = FeatureView(
43+
...
44+
tags={"snowflake-online-store/online_path": '"FEAST"."ONLINE"'},
45+
)
46+
```
47+
{% endcode %}
48+
3749
The full set of configuration options is available in [SnowflakeOnlineStoreConfig](https://rtd.feast.dev/en/latest/#feast.infra.online_stores.snowflake.SnowflakeOnlineStoreConfig).
3850

3951
## Functionality Matrix
4052

4153
The set of functionality supported by online stores is described in detail [here](overview.md#functionality).
4254
Below is a matrix indicating which functionality is supported by the Snowflake online store.
4355

44-
| | Snowflake |
56+
| | Snowflake |
4557
| :-------------------------------------------------------- | :-- |
4658
| write feature values to the online store | yes |
4759
| read feature values from the online store | yes |

sdk/python/feast/infra/materialization/snowflake_engine.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
assert_snowflake_feature_names,
2929
execute_snowflake_statement,
3030
get_snowflake_conn,
31+
get_snowflake_online_store_path,
3132
package_snowpark_zip,
3233
)
3334
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -370,8 +371,6 @@ def materialize_to_snowflake_online_store(
370371
) -> None:
371372
assert_snowflake_feature_names(feature_view)
372373

373-
online_table = f"""{repo_config .online_store.database}"."{repo_config.online_store.schema_}"."[online-transient] {project}_{feature_view.name}"""
374-
375374
feature_names_str = '", "'.join(
376375
[feature.name for feature in feature_view.features]
377376
)
@@ -381,8 +380,13 @@ def materialize_to_snowflake_online_store(
381380
else:
382381
fv_created_str = None
383382

383+
online_path = get_snowflake_online_store_path(repo_config, feature_view)
384+
online_table = (
385+
f'{online_path}."[online-transient] {project}_{feature_view.name}"'
386+
)
387+
384388
query = f"""
385-
MERGE INTO "{online_table}" online_table
389+
MERGE INTO {online_table} online_table
386390
USING (
387391
SELECT
388392
"entity_key" || TO_BINARY("feature_name", 'UTF-8') AS "entity_feature_key",

sdk/python/feast/infra/online_stores/snowflake.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from feast.infra.utils.snowflake.snowflake_utils import (
1616
execute_snowflake_statement,
1717
get_snowflake_conn,
18+
get_snowflake_online_store_path,
1819
write_pandas_binary,
1920
)
2021
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -112,9 +113,7 @@ def online_write_batch(
112113
agg_df = pd.concat(dfs)
113114

114115
# This combines both the data upload plus the overwrite in the same transaction
115-
table_path = (
116-
f'"{config.online_store.database}"."{config.online_store.schema_}"'
117-
)
116+
online_path = get_snowflake_online_store_path(config, table)
118117
with get_snowflake_conn(config.online_store, autocommit=False) as conn:
119118
write_pandas_binary(
120119
conn,
@@ -125,7 +124,7 @@ def online_write_batch(
125124
) # special function for writing binary to snowflake
126125

127126
query = f"""
128-
INSERT OVERWRITE INTO {table_path}."[online-transient] {config.project}_{table.name}"
127+
INSERT OVERWRITE INTO {online_path}."[online-transient] {config.project}_{table.name}"
129128
SELECT
130129
"entity_feature_key",
131130
"entity_key",
@@ -138,7 +137,7 @@ def online_write_batch(
138137
*,
139138
ROW_NUMBER() OVER(PARTITION BY "entity_key","feature_name" ORDER BY "event_ts" DESC, "created_ts" DESC) AS "_feast_row"
140139
FROM
141-
{table_path}."[online-transient] {config.project}_{table.name}")
140+
{online_path}."[online-transient] {config.project}_{table.name}")
142141
WHERE
143142
"_feast_row" = 1;
144143
"""
@@ -178,13 +177,13 @@ def online_read(
178177
]
179178
)
180179

181-
table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"'
180+
online_path = get_snowflake_online_store_path(config, table)
182181
with get_snowflake_conn(config.online_store) as conn:
183182
query = f"""
184183
SELECT
185184
"entity_key", "feature_name", "value", "event_ts"
186185
FROM
187-
{table_path}."[online-transient] {config.project}_{table.name}"
186+
{online_path}."[online-transient] {config.project}_{table.name}"
188187
WHERE
189188
"entity_feature_key" IN ({entity_fetch_str})
190189
"""
@@ -221,11 +220,11 @@ def update(
221220
):
222221
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)
223222

224-
table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"'
225223
with get_snowflake_conn(config.online_store) as conn:
226224
for table in tables_to_keep:
225+
online_path = get_snowflake_online_store_path(config, table)
227226
query = f"""
228-
CREATE TRANSIENT TABLE IF NOT EXISTS {table_path}."[online-transient] {config.project}_{table.name}" (
227+
CREATE TRANSIENT TABLE IF NOT EXISTS {online_path}."[online-transient] {config.project}_{table.name}" (
229228
"entity_feature_key" BINARY,
230229
"entity_key" BINARY,
231230
"feature_name" VARCHAR,
@@ -237,7 +236,8 @@ def update(
237236
execute_snowflake_statement(conn, query)
238237

239238
for table in tables_to_delete:
240-
query = f'DROP TABLE IF EXISTS {table_path}."[online-transient] {config.project}_{table.name}"'
239+
online_path = get_snowflake_online_store_path(config, table)
240+
query = f'DROP TABLE IF EXISTS {online_path}."[online-transient] {config.project}_{table.name}"'
241241
execute_snowflake_statement(conn, query)
242242

243243
def teardown(
@@ -248,8 +248,8 @@ def teardown(
248248
):
249249
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)
250250

251-
table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"'
252251
with get_snowflake_conn(config.online_store) as conn:
253252
for table in tables:
254-
query = f'DROP TABLE IF EXISTS {table_path}."[online-transient] {config.project}_{table.name}"'
253+
online_path = get_snowflake_online_store_path(config, table)
254+
query = f'DROP TABLE IF EXISTS {online_path}."[online-transient] {config.project}_{table.name}"'
255255
execute_snowflake_statement(conn, query)

sdk/python/feast/infra/utils/snowflake/snowflake_utils.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import feast
2323
from feast.errors import SnowflakeIncompleteConfig, SnowflakeQueryUnknownError
2424
from feast.feature_view import FeatureView
25+
from feast.repo_config import RepoConfig
2526

2627
try:
2728
import snowflake.connector
@@ -104,6 +105,21 @@ def get_snowflake_conn(config, autocommit=True) -> SnowflakeConnection:
104105
raise SnowflakeIncompleteConfig(e)
105106

106107

108+
def get_snowflake_online_store_path(
109+
config: RepoConfig,
110+
feature_view: FeatureView,
111+
) -> str:
112+
path_tag = "snowflake-online-store/online_path"
113+
if path_tag in feature_view.tags:
114+
online_path = feature_view.tags[path_tag]
115+
else:
116+
online_path = (
117+
f'"{config.online_store.database}"."{config.online_store.schema_}"'
118+
)
119+
120+
return online_path
121+
122+
107123
def package_snowpark_zip(project_name) -> Tuple[str, str]:
108124
path = os.path.dirname(feast.__file__)
109125
copy_path = path + f"/snowflake_feast_{project_name}"

0 commit comments

Comments
 (0)