Skip to content

Commit df6a16d

Browse files
Adds support for arrays in snowflake
Signed-off-by: john.lemmon <john.lemmon@medely.com>
1 parent 052182b commit df6a16d

File tree

9 files changed

+350
-19
lines changed

9 files changed

+350
-19
lines changed

sdk/python/feast/infra/offline_stores/snowflake.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import contextlib
2+
import json
23
import os
34
import uuid
45
import warnings
@@ -51,6 +52,17 @@
5152
)
5253
from feast.repo_config import FeastConfigBaseModel, RepoConfig
5354
from feast.saved_dataset import SavedDatasetStorage
55+
from feast.types import (
56+
Array,
57+
Bool,
58+
Bytes,
59+
Float32,
60+
Float64,
61+
Int32,
62+
Int64,
63+
String,
64+
UnixTimestamp,
65+
)
5466
from feast.usage import log_exceptions_and_usage
5567

5668
try:
@@ -320,6 +332,7 @@ def query_generator() -> Iterator[str]:
320332
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
321333
feature_refs, project, registry
322334
),
335+
feature_views=feature_views,
323336
metadata=RetrievalMetadata(
324337
features=feature_refs,
325338
keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}),
@@ -398,9 +411,12 @@ def __init__(
398411
config: RepoConfig,
399412
full_feature_names: bool,
400413
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
414+
feature_views: Optional[List[FeatureView]] = None,
401415
metadata: Optional[RetrievalMetadata] = None,
402416
):
403417

418+
if feature_views is None:
419+
feature_views = []
404420
if not isinstance(query, str):
405421
self._query_generator = query
406422
else:
@@ -416,6 +432,7 @@ def query_generator() -> Iterator[str]:
416432
self.config = config
417433
self._full_feature_names = full_feature_names
418434
self._on_demand_feature_views = on_demand_feature_views or []
435+
self._feature_views = feature_views
419436
self._metadata = metadata
420437
self.export_path: Optional[str]
421438
if self.config.offline_store.blob_export_location:
@@ -436,6 +453,20 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
436453
self.snowflake_conn, self.to_sql()
437454
).fetch_pandas_all()
438455

456+
for feature_view in self._feature_views:
457+
for feature in feature_view.features:
458+
if feature.dtype in [
459+
Array(String),
460+
Array(Bytes),
461+
Array(Int32),
462+
Array(Int64),
463+
Array(UnixTimestamp),
464+
Array(Float64),
465+
Array(Float32),
466+
Array(Bool),
467+
]:
468+
df[feature.name] = [json.loads(x) for x in df[feature.name]]
469+
439470
return df
440471

441472
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:

sdk/python/feast/infra/offline_stores/snowflake_source.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,12 @@ def get_table_column_names_and_types(
279279
else:
280280
row["snowflake_type"] = "NUMBERwSCALE"
281281

282-
elif row["type_code"] in [5, 9, 10, 12]:
282+
elif row["type_code"] in [5, 9, 12]:
283283
error = snowflake_unsupported_map[row["type_code"]]
284284
raise NotImplementedError(
285285
f"The following Snowflake Data Type is not supported: {error}"
286286
)
287-
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 11, 13]:
287+
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 10, 11, 13]:
288288
row["snowflake_type"] = snowflake_type_code_map[row["type_code"]]
289289
else:
290290
raise NotImplementedError(
@@ -305,14 +305,14 @@ def get_table_column_names_and_types(
305305
6: "TIMESTAMP_LTZ",
306306
7: "TIMESTAMP_TZ",
307307
8: "TIMESTAMP_NTZ",
308+
10: "ARRAY",
308309
11: "BINARY",
309310
13: "BOOLEAN",
310311
}
311312

312313
snowflake_unsupported_map = {
313314
5: "VARIANT -- Try converting to VARCHAR",
314315
9: "OBJECT -- Try converting to VARCHAR",
315-
10: "ARRAY -- Try converting to VARCHAR",
316316
12: "TIME -- Try converting to VARCHAR",
317317
}
318318

sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_python_udfs_creation.sql

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,62 @@ CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_varchar_to_string_pro
1414
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_varchar_to_string_proto'
1515
IMPORTS = ('@STAGE_HOLDER/feast.zip');
1616

17+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
18+
RETURNS BINARY
19+
LANGUAGE PYTHON
20+
RUNTIME_VERSION = '3.8'
21+
PACKAGES = ('protobuf', 'pandas')
22+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
23+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
24+
25+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_varchar_to_list_string_proto(df ARRAY)
26+
RETURNS BINARY
27+
LANGUAGE PYTHON
28+
RUNTIME_VERSION = '3.8'
29+
PACKAGES = ('protobuf', 'pandas')
30+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
31+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
32+
33+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int32_proto(df ARRAY)
34+
RETURNS BINARY
35+
LANGUAGE PYTHON
36+
RUNTIME_VERSION = '3.8'
37+
PACKAGES = ('protobuf', 'pandas')
38+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
39+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
40+
41+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int64_proto(df ARRAY)
42+
RETURNS BINARY
43+
LANGUAGE PYTHON
44+
RUNTIME_VERSION = '3.8'
45+
PACKAGES = ('protobuf', 'pandas')
46+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
47+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
48+
49+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_float_to_list_double_proto(df ARRAY)
50+
RETURNS BINARY
51+
LANGUAGE PYTHON
52+
RUNTIME_VERSION = '3.8'
53+
PACKAGES = ('protobuf', 'pandas')
54+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
55+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
56+
57+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
58+
RETURNS BINARY
59+
LANGUAGE PYTHON
60+
RUNTIME_VERSION = '3.8'
61+
PACKAGES = ('protobuf', 'pandas')
62+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
63+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
64+
65+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
66+
RETURNS BINARY
67+
LANGUAGE PYTHON
68+
RUNTIME_VERSION = '3.8'
69+
PACKAGES = ('protobuf', 'pandas')
70+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
71+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
72+
1773
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_number_to_int32_proto(df NUMBER)
1874
RETURNS BINARY
1975
LANGUAGE PYTHON

sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_udfs.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import sys
22
from binascii import unhexlify
33

4+
import numpy as np
45
import pandas
56
from _snowflake import vectorized
67

@@ -59,6 +60,180 @@ def feast_snowflake_varchar_to_string_proto(df):
5960
return df
6061

6162

63+
"""
64+
CREATE OR REPLACE FUNCTION feast_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
65+
RETURNS BINARY
66+
LANGUAGE PYTHON
67+
RUNTIME_VERSION = '3.8'
68+
PACKAGES = ('protobuf', 'pandas')
69+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
70+
IMPORTS = ('@feast_stage/feast.zip');
71+
"""
72+
# ValueType.STRING_LIST = 12
73+
@vectorized(input=pandas.DataFrame)
74+
def feast_snowflake_array_bytes_to_list_bytes_proto(df):
75+
sys._xoptions["snowflake_partner_attribution"].append("feast")
76+
77+
# Sometimes bytes come in as strings so we need to convert back to float
78+
numpy_arrays = np.asarray(df[0].to_list()).astype(bytes)
79+
80+
df = list(
81+
map(
82+
ValueProto.SerializeToString,
83+
python_values_to_proto_values(numpy_arrays, ValueType.BYTES_LIST),
84+
)
85+
)
86+
return df
87+
88+
89+
"""
90+
CREATE OR REPLACE FUNCTION feast_snowflake_array_varchar_to_list_string_proto(df ARRAY)
91+
RETURNS BINARY
92+
LANGUAGE PYTHON
93+
RUNTIME_VERSION = '3.8'
94+
PACKAGES = ('protobuf', 'pandas')
95+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
96+
IMPORTS = ('@feast_stage/feast.zip');
97+
"""
98+
99+
100+
@vectorized(input=pandas.DataFrame)
101+
def feast_snowflake_array_varchar_to_list_string_proto(df):
102+
sys._xoptions["snowflake_partner_attribution"].append("feast")
103+
104+
df = list(
105+
map(
106+
ValueProto.SerializeToString,
107+
python_values_to_proto_values(df[0].to_numpy(), ValueType.STRING_LIST),
108+
)
109+
)
110+
return df
111+
112+
113+
"""
114+
CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int32_proto(df ARRAY)
115+
RETURNS BINARY
116+
LANGUAGE PYTHON
117+
RUNTIME_VERSION = '3.8'
118+
PACKAGES = ('protobuf', 'pandas')
119+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
120+
IMPORTS = ('@feast_stage/feast.zip');
121+
"""
122+
123+
124+
@vectorized(input=pandas.DataFrame)
125+
def feast_snowflake_array_number_to_list_int32_proto(df):
126+
sys._xoptions["snowflake_partner_attribution"].append("feast")
127+
128+
df = list(
129+
map(
130+
ValueProto.SerializeToString,
131+
python_values_to_proto_values(df[0].to_numpy(), ValueType.INT32_LIST),
132+
)
133+
)
134+
return df
135+
136+
137+
"""
138+
CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int64_proto(df ARRAY)
139+
RETURNS BINARY
140+
LANGUAGE PYTHON
141+
RUNTIME_VERSION = '3.8'
142+
PACKAGES = ('protobuf', 'pandas')
143+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
144+
IMPORTS = ('@feast_stage/feast.zip');
145+
"""
146+
147+
148+
@vectorized(input=pandas.DataFrame)
149+
def feast_snowflake_array_number_to_list_int64_proto(df):
150+
sys._xoptions["snowflake_partner_attribution"].append("feast")
151+
152+
df = list(
153+
map(
154+
ValueProto.SerializeToString,
155+
python_values_to_proto_values(df[0].to_numpy(), ValueType.INT64_LIST),
156+
)
157+
)
158+
return df
159+
160+
161+
"""
162+
CREATE OR REPLACE FUNCTION feast_snowflake_array_float_to_list_double_proto(df ARRAY)
163+
RETURNS BINARY
164+
LANGUAGE PYTHON
165+
RUNTIME_VERSION = '3.8'
166+
PACKAGES = ('protobuf', 'pandas')
167+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
168+
IMPORTS = ('@feast_stage/feast.zip');
169+
"""
170+
171+
172+
@vectorized(input=pandas.DataFrame)
173+
def feast_snowflake_array_float_to_list_double_proto(df):
174+
sys._xoptions["snowflake_partner_attribution"].append("feast")
175+
176+
numpy_arrays = np.asarray(df[0].to_list()).astype(float)
177+
178+
df = list(
179+
map(
180+
ValueProto.SerializeToString,
181+
python_values_to_proto_values(numpy_arrays, ValueType.DOUBLE_LIST),
182+
)
183+
)
184+
return df
185+
186+
187+
"""
188+
CREATE OR REPLACE FUNCTION feast_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
189+
RETURNS BINARY
190+
LANGUAGE PYTHON
191+
RUNTIME_VERSION = '3.8'
192+
PACKAGES = ('protobuf', 'pandas')
193+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
194+
IMPORTS = ('@feast_stage/feast.zip');
195+
"""
196+
197+
198+
@vectorized(input=pandas.DataFrame)
199+
def feast_snowflake_array_boolean_to_list_bool_proto(df):
200+
sys._xoptions["snowflake_partner_attribution"].append("feast")
201+
202+
df = list(
203+
map(
204+
ValueProto.SerializeToString,
205+
python_values_to_proto_values(df[0].to_numpy(), ValueType.BOOL_LIST),
206+
)
207+
)
208+
return df
209+
210+
211+
"""
212+
CREATE OR REPLACE FUNCTION feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
213+
RETURNS BINARY
214+
LANGUAGE PYTHON
215+
RUNTIME_VERSION = '3.8'
216+
PACKAGES = ('protobuf', 'pandas')
217+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
218+
IMPORTS = ('@feast_stage/feast.zip');
219+
"""
220+
221+
222+
@vectorized(input=pandas.DataFrame)
223+
def feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df):
224+
sys._xoptions["snowflake_partner_attribution"].append("feast")
225+
226+
numpy_arrays = np.asarray(df[0].to_list()).astype(np.datetime64)
227+
228+
df = list(
229+
map(
230+
ValueProto.SerializeToString,
231+
python_values_to_proto_values(numpy_arrays, ValueType.UNIX_TIMESTAMP_LIST),
232+
)
233+
)
234+
return df
235+
236+
62237
"""
63238
CREATE OR REPLACE FUNCTION feast_snowflake_number_to_int32_proto(df NUMBER)
64239
RETURNS BINARY

sdk/python/feast/type_map.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,14 @@ def _convert_value_name_to_snowflake_udf(value_name: str, project_name: str) ->
665665
"FLOAT": f"feast_{project_name}_snowflake_float_to_double_proto",
666666
"BOOL": f"feast_{project_name}_snowflake_boolean_to_bool_proto",
667667
"UNIX_TIMESTAMP": f"feast_{project_name}_snowflake_timestamp_to_unix_timestamp_proto",
668+
"BYTES_LIST": f"feast_{project_name}_snowflake_array_bytes_to_list_bytes_proto",
669+
"STRING_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto",
670+
"INT32_LIST": f"feast_{project_name}_snowflake_array_number_to_list_int32_proto",
671+
"INT64_LIST": f"feast_{project_name}_snowflake_array_number_to_list_int64_proto",
672+
"DOUBLE_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto",
673+
"FLOAT_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto",
674+
"BOOL_LIST": f"feast_{project_name}_snowflake_array_boolean_to_list_bool_proto",
675+
"UNIX_TIMESTAMP_LIST": f"feast_{project_name}_snowflake_array_timestamp_to_list_unix_timestamp_proto",
668676
}
669677
return name_map[value_name].upper()
670678

sdk/python/tests/data/data_creator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def get_feature_values_for_dtype(
5959
"int64": [1, 2, 3, 4, 5],
6060
"float": [1.0, None, 3.0, 4.0, 5.0],
6161
"string": ["1", None, "3", "4", "5"],
62+
"bytes": [b"1", None, b"3", b"4", b"5"],
6263
"bool": [True, None, False, True, False],
6364
"datetime": [
6465
datetime(1980, 1, 1),

sdk/python/tests/integration/feature_repos/repo_configuration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@
8383
"password": os.getenv("SNOWFLAKE_CI_PASSWORD", ""),
8484
"role": os.getenv("SNOWFLAKE_CI_ROLE", ""),
8585
"warehouse": os.getenv("SNOWFLAKE_CI_WAREHOUSE", ""),
86-
"database": "FEAST",
87-
"schema": "ONLINE",
86+
"database": os.getenv("SNOWFLAKE_CI_DATABASE", "FEAST"),
87+
"schema": os.getenv("SNOWFLAKE_CI_SCHEMA_ONLINE", "ONLINE"),
8888
}
8989

9090
BIGTABLE_CONFIG = {

0 commit comments

Comments
 (0)