Skip to content

Commit f66c037

Browse files
Adds support for arrays in snowflake
Signed-off-by: john.lemmon <john.lemmon@medely.com>
1 parent 052182b commit f66c037

File tree

8 files changed

+413
-13
lines changed

8 files changed

+413
-13
lines changed

sdk/python/feast/infra/offline_stores/snowflake.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import contextlib
2+
import json
23
import os
34
import uuid
45
import warnings
@@ -51,6 +52,16 @@
5152
)
5253
from feast.repo_config import FeastConfigBaseModel, RepoConfig
5354
from feast.saved_dataset import SavedDatasetStorage
55+
from feast.types import (
56+
Array,
57+
Bool,
58+
Float32,
59+
Float64,
60+
Int32,
61+
Int64,
62+
String,
63+
UnixTimestamp,
64+
)
5465
from feast.usage import log_exceptions_and_usage
5566

5667
try:
@@ -320,6 +331,7 @@ def query_generator() -> Iterator[str]:
320331
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
321332
feature_refs, project, registry
322333
),
334+
feature_views=feature_views,
323335
metadata=RetrievalMetadata(
324336
features=feature_refs,
325337
keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}),
@@ -398,9 +410,12 @@ def __init__(
398410
config: RepoConfig,
399411
full_feature_names: bool,
400412
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
413+
feature_views: Optional[List[FeatureView]] = None,
401414
metadata: Optional[RetrievalMetadata] = None,
402415
):
403416

417+
if feature_views is None:
418+
feature_views = []
404419
if not isinstance(query, str):
405420
self._query_generator = query
406421
else:
@@ -416,6 +431,7 @@ def query_generator() -> Iterator[str]:
416431
self.config = config
417432
self._full_feature_names = full_feature_names
418433
self._on_demand_feature_views = on_demand_feature_views or []
434+
self._feature_views = feature_views
419435
self._metadata = metadata
420436
self.export_path: Optional[str]
421437
if self.config.offline_store.blob_export_location:
@@ -436,6 +452,19 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
436452
self.snowflake_conn, self.to_sql()
437453
).fetch_pandas_all()
438454

455+
for feature_view in self._feature_views:
456+
for feature in feature_view.features:
457+
if feature.dtype in [
458+
Array(String),
459+
Array(Int32),
460+
Array(Int64),
461+
Array(UnixTimestamp),
462+
Array(Float64),
463+
Array(Float32),
464+
Array(Bool),
465+
]:
466+
df[feature.name] = [json.loads(x) for x in df[feature.name]]
467+
439468
return df
440469

441470
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:

sdk/python/feast/infra/offline_stores/snowflake_source.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,12 @@ def get_table_column_names_and_types(
279279
else:
280280
row["snowflake_type"] = "NUMBERwSCALE"
281281

282-
elif row["type_code"] in [5, 9, 10, 12]:
282+
elif row["type_code"] in [9, 12]:
283283
error = snowflake_unsupported_map[row["type_code"]]
284284
raise NotImplementedError(
285285
f"The following Snowflake Data Type is not supported: {error}"
286286
)
287-
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 11, 13]:
287+
elif row["type_code"] in [1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 13]:
288288
row["snowflake_type"] = snowflake_type_code_map[row["type_code"]]
289289
else:
290290
raise NotImplementedError(
@@ -302,17 +302,17 @@ def get_table_column_names_and_types(
302302
2: "VARCHAR",
303303
3: "DATE",
304304
4: "TIMESTAMP",
305+
5: "VARIANT",
305306
6: "TIMESTAMP_LTZ",
306307
7: "TIMESTAMP_TZ",
307308
8: "TIMESTAMP_NTZ",
309+
10: "ARRAY",
308310
11: "BINARY",
309311
13: "BOOLEAN",
310312
}
311313

312314
snowflake_unsupported_map = {
313-
5: "VARIANT -- Try converting to VARCHAR",
314315
9: "OBJECT -- Try converting to VARCHAR",
315-
10: "ARRAY -- Try converting to VARCHAR",
316316
12: "TIME -- Try converting to VARCHAR",
317317
}
318318

sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_python_udfs_creation.sql

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,62 @@ CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_varchar_to_string_pro
1414
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_varchar_to_string_proto'
1515
IMPORTS = ('@STAGE_HOLDER/feast.zip');
1616

17+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
18+
RETURNS BINARY
19+
LANGUAGE PYTHON
20+
RUNTIME_VERSION = '3.8'
21+
PACKAGES = ('protobuf', 'pandas')
22+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
23+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
24+
25+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_varchar_to_list_string_proto(df ARRAY)
26+
RETURNS BINARY
27+
LANGUAGE PYTHON
28+
RUNTIME_VERSION = '3.8'
29+
PACKAGES = ('protobuf', 'pandas')
30+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
31+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
32+
33+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int32_proto(df ARRAY)
34+
RETURNS BINARY
35+
LANGUAGE PYTHON
36+
RUNTIME_VERSION = '3.8'
37+
PACKAGES = ('protobuf', 'pandas')
38+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
39+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
40+
41+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int64_proto(df ARRAY)
42+
RETURNS BINARY
43+
LANGUAGE PYTHON
44+
RUNTIME_VERSION = '3.8'
45+
PACKAGES = ('protobuf', 'pandas')
46+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
47+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
48+
49+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_float_to_list_double_proto(df ARRAY)
50+
RETURNS BINARY
51+
LANGUAGE PYTHON
52+
RUNTIME_VERSION = '3.8'
53+
PACKAGES = ('protobuf', 'pandas')
54+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
55+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
56+
57+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
58+
RETURNS BINARY
59+
LANGUAGE PYTHON
60+
RUNTIME_VERSION = '3.8'
61+
PACKAGES = ('protobuf', 'pandas')
62+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
63+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
64+
65+
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
66+
RETURNS BINARY
67+
LANGUAGE PYTHON
68+
RUNTIME_VERSION = '3.8'
69+
PACKAGES = ('protobuf', 'pandas')
70+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
71+
IMPORTS = ('@STAGE_HOLDER/feast.zip');
72+
1773
CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_number_to_int32_proto(df NUMBER)
1874
RETURNS BINARY
1975
LANGUAGE PYTHON

sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_udfs.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import sys
22
from binascii import unhexlify
3+
from datetime import datetime
34

5+
import numpy as np
46
import pandas
57
from _snowflake import vectorized
68

@@ -13,6 +15,13 @@
1315
)
1416
from feast.value_type import ValueType
1517

18+
19+
def _cast_array_list_elements_to(array: np.ndarray, mapper):
20+
for row in array:
21+
for i, elem in enumerate(row):
22+
row[i] = mapper(elem)
23+
24+
1625
"""
1726
CREATE OR REPLACE FUNCTION feast_snowflake_binary_to_bytes_proto(df BINARY)
1827
RETURNS BINARY
@@ -59,6 +68,185 @@ def feast_snowflake_varchar_to_string_proto(df):
5968
return df
6069

6170

71+
"""
72+
CREATE OR REPLACE FUNCTION feast_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
73+
RETURNS BINARY
74+
LANGUAGE PYTHON
75+
RUNTIME_VERSION = '3.8'
76+
PACKAGES = ('protobuf', 'pandas')
77+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
78+
IMPORTS = ('@feast_stage/feast.zip');
79+
"""
80+
# ValueType.STRING_LIST = 12
81+
@vectorized(input=pandas.DataFrame)
82+
def feast_snowflake_array_bytes_to_list_bytes_proto(df):
83+
sys._xoptions["snowflake_partner_attribution"].append("feast")
84+
85+
df = list(
86+
map(
87+
ValueProto.SerializeToString,
88+
python_values_to_proto_values(df[0].to_numpy(), ValueType.BYTES_LIST),
89+
)
90+
)
91+
return df
92+
93+
94+
"""
95+
CREATE OR REPLACE FUNCTION feast_snowflake_array_varchar_to_list_string_proto(df ARRAY)
96+
RETURNS BINARY
97+
LANGUAGE PYTHON
98+
RUNTIME_VERSION = '3.8'
99+
PACKAGES = ('protobuf', 'pandas')
100+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
101+
IMPORTS = ('@feast_stage/feast.zip');
102+
"""
103+
104+
105+
@vectorized(input=pandas.DataFrame)
106+
def feast_snowflake_array_varchar_to_list_string_proto(df):
107+
sys._xoptions["snowflake_partner_attribution"].append("feast")
108+
109+
df = list(
110+
map(
111+
ValueProto.SerializeToString,
112+
python_values_to_proto_values(df[0].to_numpy(), ValueType.STRING_LIST),
113+
)
114+
)
115+
return df
116+
117+
118+
"""
119+
CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int32_proto(df ARRAY)
120+
RETURNS BINARY
121+
LANGUAGE PYTHON
122+
RUNTIME_VERSION = '3.8'
123+
PACKAGES = ('protobuf', 'pandas')
124+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
125+
IMPORTS = ('@feast_stage/feast.zip');
126+
"""
127+
128+
129+
@vectorized(input=pandas.DataFrame)
130+
def feast_snowflake_array_number_to_list_int32_proto(df):
131+
sys._xoptions["snowflake_partner_attribution"].append("feast")
132+
133+
df = list(
134+
map(
135+
ValueProto.SerializeToString,
136+
python_values_to_proto_values(df[0].to_numpy(), ValueType.INT32_LIST),
137+
)
138+
)
139+
return df
140+
141+
142+
"""
143+
CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int64_proto(df ARRAY)
144+
RETURNS BINARY
145+
LANGUAGE PYTHON
146+
RUNTIME_VERSION = '3.8'
147+
PACKAGES = ('protobuf', 'pandas')
148+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
149+
IMPORTS = ('@feast_stage/feast.zip');
150+
"""
151+
152+
153+
@vectorized(input=pandas.DataFrame)
154+
def feast_snowflake_array_number_to_list_int64_proto(df):
155+
sys._xoptions["snowflake_partner_attribution"].append("feast")
156+
157+
df = list(
158+
map(
159+
ValueProto.SerializeToString,
160+
python_values_to_proto_values(df[0].to_numpy(), ValueType.INT64_LIST),
161+
)
162+
)
163+
return df
164+
165+
166+
"""
167+
CREATE OR REPLACE FUNCTION feast_snowflake_array_float_to_list_double_proto(df ARRAY)
168+
RETURNS BINARY
169+
LANGUAGE PYTHON
170+
RUNTIME_VERSION = '3.8'
171+
PACKAGES = ('protobuf', 'pandas')
172+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
173+
IMPORTS = ('@feast_stage/feast.zip');
174+
"""
175+
176+
177+
@vectorized(input=pandas.DataFrame)
178+
def feast_snowflake_array_float_to_list_double_proto(df):
179+
sys._xoptions["snowflake_partner_attribution"].append("feast")
180+
181+
numpy_arrays = df[0].to_numpy()
182+
# Sometimes floats come in as ints so we need to convert back to float
183+
_cast_array_list_elements_to(numpy_arrays, float)
184+
185+
df = list(
186+
map(
187+
ValueProto.SerializeToString,
188+
python_values_to_proto_values(numpy_arrays, ValueType.DOUBLE_LIST),
189+
)
190+
)
191+
return df
192+
193+
194+
"""
195+
CREATE OR REPLACE FUNCTION feast_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
196+
RETURNS BINARY
197+
LANGUAGE PYTHON
198+
RUNTIME_VERSION = '3.8'
199+
PACKAGES = ('protobuf', 'pandas')
200+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
201+
IMPORTS = ('@feast_stage/feast.zip');
202+
"""
203+
204+
205+
@vectorized(input=pandas.DataFrame)
206+
def feast_snowflake_array_boolean_to_list_bool_proto(df):
207+
sys._xoptions["snowflake_partner_attribution"].append("feast")
208+
209+
df = list(
210+
map(
211+
ValueProto.SerializeToString,
212+
python_values_to_proto_values(df[0].to_numpy(), ValueType.BOOL_LIST),
213+
)
214+
)
215+
return df
216+
217+
218+
"""
219+
CREATE OR REPLACE FUNCTION feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
220+
RETURNS BINARY
221+
LANGUAGE PYTHON
222+
RUNTIME_VERSION = '3.8'
223+
PACKAGES = ('protobuf', 'pandas')
224+
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
225+
IMPORTS = ('@feast_stage/feast.zip');
226+
"""
227+
228+
229+
@vectorized(input=pandas.DataFrame)
230+
def feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df):
231+
sys._xoptions["snowflake_partner_attribution"].append("feast")
232+
233+
numpy_arrays = df[0].to_numpy()
234+
# Timestamps are coming in as strings so we should convert to timestamps
235+
_cast_array_list_elements_to(
236+
numpy_arrays, lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f")
237+
)
238+
239+
df = list(
240+
map(
241+
ValueProto.SerializeToString,
242+
python_values_to_proto_values(
243+
df[0].to_numpy(), ValueType.UNIX_TIMESTAMP_LIST
244+
),
245+
)
246+
)
247+
return df
248+
249+
62250
"""
63251
CREATE OR REPLACE FUNCTION feast_snowflake_number_to_int32_proto(df NUMBER)
64252
RETURNS BINARY

0 commit comments

Comments
 (0)