diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index cf10a528bd1..88691d82f90 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -79,6 +79,7 @@ * [PostgreSQL (contrib)](reference/offline-stores/postgres.md) * [Online stores](reference/online-stores/README.md) * [SQLite](reference/online-stores/sqlite.md) + * [Snowflake](reference/online-stores/snowflake.md) * [Redis](reference/online-stores/redis.md) * [Datastore](reference/online-stores/datastore.md) * [DynamoDB](reference/online-stores/dynamodb.md) diff --git a/docs/reference/offline-stores/snowflake.md b/docs/reference/offline-stores/snowflake.md index aa006b43bb0..e2afaef90d8 100644 --- a/docs/reference/offline-stores/snowflake.md +++ b/docs/reference/offline-stores/snowflake.md @@ -2,7 +2,7 @@ ## Description -The Snowflake offline store provides support for reading [SnowflakeSources](../data-sources/snowflake.md). +The [Snowflake](https://trial.snowflake.com) offline store provides support for reading [SnowflakeSources](../data-sources/snowflake.md). * Snowflake tables and views are allowed as sources. * All joins happen within Snowflake. @@ -11,7 +11,7 @@ The Snowflake offline store provides support for reading [SnowflakeSources](../d * This allows you to call * `to_snowflake` to save the dataset into Snowflake * `to_sql` to get the SQL query that would execute on `to_df` - * `to_arrow_chunks` to get the result in batches ([Snowflake python connector docs](https://docs.snowflake.com/en/user-guide/python-connector-api.html#get_result_batches)) + * `to_arrow_chunks` to get the result in batches ([Snowflake python connector docs](https://docs.snowflake.com/en/user-guide/python-connector-api.html#get_result_batches)) ## Example diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index b3578b85394..5eb566af3c5 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -4,6 +4,8 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli {% page-ref page="sqlite.md" %} +{% page-ref page="snowflake.md" %} + {% page-ref page="redis.md" %} {% page-ref page="datastore.md" %} diff --git a/docs/reference/online-stores/snowflake.md b/docs/reference/online-stores/snowflake.md new file mode 100644 index 00000000000..ccf3d526dae --- /dev/null +++ b/docs/reference/online-stores/snowflake.md @@ -0,0 +1,35 @@ +# Snowflake + +## Description + +The [Snowflake](https://trial.snowflake.com) online store provides support for materializing feature values into a Snowflake Transient Table for serving online features. + +* Only the latest feature values are persisted + +The data model for using a Snowflake Transient Table as an online store follows a tall format (one row per feature)): +* "entity_feature_key" (BINARY) -- unique key used when reading specific feature_view x entity combination +* "entity_key" (BINARY) -- repeated key currently unused for reading entity_combination +* "feature_name" (VARCHAR) +* "value" (BINARY) +* "event_ts" (TIMESTAMP) +* "created_ts" (TIMESTAMP) + + (This model may be subject to change when Snowflake Hybrid Tables are released) + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: snowflake.online + account: SNOWFLAKE_DEPLOYMENT_URL + user: SNOWFLAKE_USER + password: SNOWFLAKE_PASSWORD + role: SNOWFLAKE_ROLE + warehouse: SNOWFLAKE_WAREHOUSE + database: SNOWFLAKE_DATABASE +``` +{% endcode %} diff --git a/docs/roadmap.md b/docs/roadmap.md index efe2164b9ca..e481453dffb 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -27,6 +27,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [In-memory / Pandas](https://docs.feast.dev/reference/offline-stores/file) * [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/adding-a-new-offline-store) * **Online Stores** + * [x] [Snowflake](https://docs.feast.dev/reference/online-stores/snowflake) * [x] [DynamoDB](https://docs.feast.dev/reference/online-stores/dynamodb) * [x] [Redis](https://docs.feast.dev/reference/online-stores/redis) * [x] [Datastore](https://docs.feast.dev/reference/online-stores/datastore) @@ -59,4 +60,4 @@ The list below contains the functionality that contributors are planning to deve * [x] Model-centric feature tracking (feature services) * [x] Amundsen integration (see [Feast extractor](https://github.com/amundsen-io/amundsen/blob/main/databuilder/databuilder/extractor/feast_extractor.py)) * [x] DataHub integration (see [DataHub Feast docs](https://datahubproject.io/docs/generated/ingestion/sources/feast/)) - * [x] Feast Web UI (Alpha release. See [docs](https://docs.feast.dev/reference/alpha-web-ui)) \ No newline at end of file + * [x] Feast Web UI (Alpha release. See [docs](https://docs.feast.dev/reference/alpha-web-ui)) diff --git a/docs/tutorials/driver-stats-on-snowflake.md b/docs/tutorials/driver-stats-on-snowflake.md index 01b158cb1a1..306ae2f59b2 100644 --- a/docs/tutorials/driver-stats-on-snowflake.md +++ b/docs/tutorials/driver-stats-on-snowflake.md @@ -1,6 +1,6 @@ --- description: >- - Initial demonstration of Snowflake as an offline store with Feast, using the Snowflake demo template. + Initial demonstration of Snowflake as an offline+online store with Feast, using the Snowflake demo template. --- # Drivers stats on Snowflake @@ -61,6 +61,14 @@ offline_store: role: ROLE_NAME #case sensitive warehouse: WAREHOUSE_NAME #case sensitive database: DATABASE_NAME #case cap sensitive +online_store: + type: snowflake.online + account: SNOWFLAKE_DEPLOYMENT_URL #drop .snowflakecomputing.com + user: USERNAME + password: PASSWORD + role: ROLE_NAME #case sensitive + warehouse: WAREHOUSE_NAME #case sensitive + database: DATABASE_NAME #case cap sensitive ``` {% endcode %} diff --git a/sdk/python/docs/source/conf.py b/sdk/python/docs/source/conf.py index b311a196647..8f873d21b61 100644 --- a/sdk/python/docs/source/conf.py +++ b/sdk/python/docs/source/conf.py @@ -115,7 +115,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = [] +html_static_path = ["_static"] # -- Options for HTMLHelp output ------------------------------------------ diff --git a/sdk/python/docs/source/feast.infra.online_stores.rst b/sdk/python/docs/source/feast.infra.online_stores.rst index 842522c9d7c..65758c409c0 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.rst @@ -52,6 +52,14 @@ feast.infra.online\_stores.redis module :undoc-members: :show-inheritance: +feast.infra.online\_stores.snowflake module +------------------------------------------- + +.. automodule:: feast.infra.online_stores.snowflake + :members: + :undoc-members: + :show-inheritance: + feast.infra.online\_stores.sqlite module ---------------------------------------- diff --git a/sdk/python/docs/source/index.rst b/sdk/python/docs/source/index.rst index bc034c295d4..07b9d9a77e1 100644 --- a/sdk/python/docs/source/index.rst +++ b/sdk/python/docs/source/index.rst @@ -250,18 +250,21 @@ Sqlite Online Store .. automodule:: feast.infra.online_stores.sqlite :members: + :noindex: Datastore Online Store ---------------------- .. automodule:: feast.infra.online_stores.datastore :members: + :noindex: DynamoDB Online Store --------------------- .. automodule:: feast.infra.online_stores.dynamodb :members: + :noindex: Redis Online Store ------------------ diff --git a/sdk/python/feast/infra/online_stores/snowflake.py b/sdk/python/feast/infra/online_stores/snowflake.py new file mode 100644 index 00000000000..e5e7b680be2 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/snowflake.py @@ -0,0 +1,266 @@ +import itertools +import os +from binascii import hexlify +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple + +import pandas as pd +import pytz +from pydantic import Field +from pydantic.schema import Literal + +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.infra.utils.snowflake_utils import get_snowflake_conn, write_pandas_binary +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.usage import log_exceptions_and_usage + + +class SnowflakeOnlineStoreConfig(FeastConfigBaseModel): + """Online store config for Snowflake""" + + type: Literal["snowflake.online"] = "snowflake.online" + """ Online store type selector""" + + config_path: Optional[str] = ( + Path(os.environ["HOME"]) / ".snowsql/config" + ).__str__() + """ Snowflake config path -- absolute path required (Can't use ~)""" + + account: Optional[str] = None + """ Snowflake deployment identifier -- drop .snowflakecomputing.com""" + + user: Optional[str] = None + """ Snowflake user name """ + + password: Optional[str] = None + """ Snowflake password """ + + role: Optional[str] = None + """ Snowflake role name""" + + warehouse: Optional[str] = None + """ Snowflake warehouse name """ + + database: Optional[str] = None + """ Snowflake database name """ + + schema_: Optional[str] = Field("PUBLIC", alias="schema") + """ Snowflake schema name """ + + class Config: + allow_population_by_field_name = True + + +class SnowflakeOnlineStore(OnlineStore): + @log_exceptions_and_usage(online_store="snowflake") + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + assert isinstance(config.online_store, SnowflakeOnlineStoreConfig) + + dfs = [None] * len(data) + for i, (entity_key, values, timestamp, created_ts) in enumerate(data): + df = pd.DataFrame( + columns=[ + "entity_feature_key", + "entity_key", + "feature_name", + "value", + "event_ts", + "created_ts", + ], + index=range(0, len(values)), + ) + + timestamp = _to_naive_utc(timestamp) + if created_ts is not None: + created_ts = _to_naive_utc(created_ts) + + entity_key_serialization_version = ( + config.entity_key_serialization_version + if config.entity_key_serialization_version + else 2 + ) + for j, (feature_name, val) in enumerate(values.items()): + df.loc[j, "entity_feature_key"] = serialize_entity_key( + entity_key, + entity_key_serialization_version=entity_key_serialization_version, + ) + bytes(feature_name, encoding="utf-8") + df.loc[j, "entity_key"] = serialize_entity_key( + entity_key, + entity_key_serialization_version=entity_key_serialization_version, + ) + df.loc[j, "feature_name"] = feature_name + df.loc[j, "value"] = val.SerializeToString() + df.loc[j, "event_ts"] = timestamp + df.loc[j, "created_ts"] = created_ts + + dfs[i] = df + + if dfs: + agg_df = pd.concat(dfs) + + # This combines both the data upload plus the overwrite in the same transaction + with get_snowflake_conn(config.online_store, autocommit=False) as conn: + write_pandas_binary( + conn, agg_df, f"[online-transient] {config.project}_{table.name}" + ) # special function for writing binary to snowflake + + query = f""" + INSERT OVERWRITE INTO "{config.online_store.database}"."{config.online_store.schema_}"."[online-transient] {config.project}_{table.name}" + SELECT + "entity_feature_key", + "entity_key", + "feature_name", + "value", + "event_ts", + "created_ts" + FROM + (SELECT + *, + ROW_NUMBER() OVER(PARTITION BY "entity_key","feature_name" ORDER BY "event_ts" DESC, "created_ts" DESC) AS "_feast_row" + FROM + "{config.online_store.database}"."{config.online_store.schema_}"."[online-transient] {config.project}_{table.name}") + WHERE + "_feast_row" = 1; + """ + + conn.cursor().execute(query) + + if progress: + progress(len(data)) + + return None + + @log_exceptions_and_usage(online_store="snowflake") + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: List[str], + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + assert isinstance(config.online_store, SnowflakeOnlineStoreConfig) + + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + + entity_fetch_str = ",".join( + [ + ( + "TO_BINARY(" + + hexlify( + serialize_entity_key(combo[0]) + + bytes(combo[1], encoding="utf-8") + ).__str__()[1:] + + ")" + ) + for combo in itertools.product(entity_keys, requested_features) + ] + ) + + with get_snowflake_conn(config.online_store) as conn: + + df = ( + conn.cursor() + .execute( + f""" + SELECT + "entity_key", "feature_name", "value", "event_ts" + FROM + "{config.online_store.database}"."{config.online_store.schema_}"."[online-transient] {config.project}_{table.name}" + WHERE + "entity_feature_key" IN ({entity_fetch_str}) + """, + ) + .fetch_pandas_all() + ) + + entity_key_serialization_version = ( + config.entity_key_serialization_version + if config.entity_key_serialization_version + else 2 + ) + for entity_key in entity_keys: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=entity_key_serialization_version, + ) + res = {} + res_ts = None + for index, row in df[df["entity_key"] == entity_key_bin].iterrows(): + val = ValueProto() + val.ParseFromString(row["value"]) + res[row["feature_name"]] = val + res_ts = row["event_ts"].to_pydatetime() + + if not res: + result.append((None, None)) + else: + result.append((res_ts, res)) + return result + + @log_exceptions_and_usage(online_store="snowflake") + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + assert isinstance(config.online_store, SnowflakeOnlineStoreConfig) + + with get_snowflake_conn(config.online_store) as conn: + + for table in tables_to_keep: + + conn.cursor().execute( + f"""CREATE TRANSIENT TABLE IF NOT EXISTS "{config.online_store.database}"."{config.online_store.schema_}"."[online-transient] {config.project}_{table.name}" ( + "entity_feature_key" BINARY, + "entity_key" BINARY, + "feature_name" VARCHAR, + "value" BINARY, + "event_ts" TIMESTAMP, + "created_ts" TIMESTAMP + )""" + ) + + for table in tables_to_delete: + + conn.cursor().execute( + f'DROP TABLE IF EXISTS "{config.online_store.database}"."{config.online_store.schema_}"."[online-transient] {config.project}_{table.name}"' + ) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + assert isinstance(config.online_store, SnowflakeOnlineStoreConfig) + + with get_snowflake_conn(config.online_store) as conn: + + for table in tables: + query = f'DROP TABLE IF EXISTS "{config.online_store.database}"."{config.online_store.schema_}"."[online-transient] {config.project}_{table.name}"' + conn.cursor().execute(query) + + +def _to_naive_utc(ts: datetime): + if ts.tzinfo is None: + return ts + else: + return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/utils/snowflake_utils.py b/sdk/python/feast/infra/utils/snowflake_utils.py index 78d505bd083..f54288e45d8 100644 --- a/sdk/python/feast/infra/utils/snowflake_utils.py +++ b/sdk/python/feast/infra/utils/snowflake_utils.py @@ -44,8 +44,12 @@ def execute_snowflake_statement(conn: SnowflakeConnection, query) -> SnowflakeCu def get_snowflake_conn(config, autocommit=True) -> SnowflakeConnection: - assert config.type == "snowflake.offline" - config_header = "connections.feast_offline_store" + assert config.type in ["snowflake.offline", "snowflake.online"] + + if config.type == "snowflake.offline": + config_header = "connections.feast_offline_store" + elif config.type == "snowflake.online": + config_header = "connections.feast_online_store" config_dict = dict(config) @@ -122,8 +126,8 @@ def write_pandas( conn: Connection to be used to communicate with Snowflake. df: Dataframe we'd like to write back. table_name: Table name where we want to insert into. - database: Database schema and table is in, if not provided the default one will be used (Default value = None). - schema: Schema table is in, if not provided the default one will be used (Default value = None). + database: Database table is in, if not provided the connection one will be used. + schema: Schema table is in, if not provided the connection one will be used. chunk_size: Number of elements to be inserted once, if not provided all elements will be dumped once (Default value = None). compression: The compression used on the Parquet files, can only be gzip, or snappy. Gzip gives supposedly a @@ -432,3 +436,176 @@ def parse_private_key_path(key_path: str, private_key_passphrase: str) -> bytes: ) return pkb + + +def write_pandas_binary( + conn: SnowflakeConnection, + df: pd.DataFrame, + table_name: str, + database: Optional[str] = None, + schema: Optional[str] = None, + chunk_size: Optional[int] = None, + compression: str = "gzip", + on_error: str = "abort_statement", + parallel: int = 4, + quote_identifiers: bool = True, + auto_create_table: bool = False, + create_temp_table: bool = False, +): + """Allows users to most efficiently write back a pandas DataFrame to Snowflake. + + It works by dumping the DataFrame into Parquet files, uploading them and finally copying their data into the table. + + Returns whether all files were ingested correctly, number of chunks uploaded, and number of rows ingested + with all of the COPY INTO command's output for debugging purposes. + + Example usage: + import pandas + from snowflake.connector.pandas_tools import write_pandas + + df = pandas.DataFrame([('Mark', 10), ('Luke', 20)], columns=['name', 'balance']) + success, nchunks, nrows, _ = write_pandas(cnx, df, 'customers') + + Args: + conn: Connection to be used to communicate with Snowflake. + df: Dataframe we'd like to write back. + table_name: Table name where we want to insert into. + database: Database table is in, if not provided the connection one will be used. + schema: Schema table is in, if not provided the connection one will be used. + chunk_size: Number of elements to be inserted once, if not provided all elements will be dumped once + (Default value = None). + compression: The compression used on the Parquet files, can only be gzip, or snappy. Gzip gives supposedly a + better compression, while snappy is faster. Use whichever is more appropriate (Default value = 'gzip'). + on_error: Action to take when COPY INTO statements fail, default follows documentation at: + https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#copy-options-copyoptions + (Default value = 'abort_statement'). + parallel: Number of threads to be used when uploading chunks, default follows documentation at: + https://docs.snowflake.com/en/sql-reference/sql/put.html#optional-parameters (Default value = 4). + quote_identifiers: By default, identifiers, specifically database, schema, table and column names + (from df.columns) will be quoted. If set to False, identifiers are passed on to Snowflake without quoting. + I.e. identifiers will be coerced to uppercase by Snowflake. (Default value = True) + auto_create_table: When true, will automatically create a table with corresponding columns for each column in + the passed in DataFrame. The table will not be created if it already exists + create_temp_table: Will make the auto-created table as a temporary table + """ + if database is not None and schema is None: + raise ProgrammingError( + "Schema has to be provided to write_pandas when a database is provided" + ) + # This dictionary maps the compression algorithm to Snowflake put copy into command type + # https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#type-parquet + compression_map = {"gzip": "auto", "snappy": "snappy"} + if compression not in compression_map.keys(): + raise ProgrammingError( + "Invalid compression '{}', only acceptable values are: {}".format( + compression, compression_map.keys() + ) + ) + if quote_identifiers: + location = ( + (('"' + database + '".') if database else "") + + (('"' + schema + '".') if schema else "") + + ('"' + table_name + '"') + ) + else: + location = ( + (database + "." if database else "") + + (schema + "." if schema else "") + + (table_name) + ) + if chunk_size is None: + chunk_size = len(df) + cursor: SnowflakeCursor = conn.cursor() + stage_name = create_temporary_sfc_stage(cursor) + + with TemporaryDirectory() as tmp_folder: + for i, chunk in chunk_helper(df, chunk_size): + chunk_path = os.path.join(tmp_folder, "file{}.txt".format(i)) + # Dump chunk into parquet file + chunk.to_parquet( + chunk_path, + compression=compression, + use_deprecated_int96_timestamps=True, + ) + # Upload parquet file + upload_sql = ( + "PUT /* Python:snowflake.connector.pandas_tools.write_pandas() */ " + "'file://{path}' @\"{stage_name}\" PARALLEL={parallel}" + ).format( + path=chunk_path.replace("\\", "\\\\").replace("'", "\\'"), + stage_name=stage_name, + parallel=parallel, + ) + logger.debug(f"uploading files with '{upload_sql}'") + cursor.execute(upload_sql, _is_internal=True) + # Remove chunk file + os.remove(chunk_path) + if quote_identifiers: + columns = '"' + '","'.join(list(df.columns)) + '"' + else: + columns = ",".join(list(df.columns)) + + if auto_create_table: + file_format_name = create_file_format(compression, compression_map, cursor) + infer_schema_sql = f"SELECT COLUMN_NAME, TYPE FROM table(infer_schema(location=>'@\"{stage_name}\"', file_format=>'{file_format_name}'))" + logger.debug(f"inferring schema with '{infer_schema_sql}'") + result_cursor = cursor.execute(infer_schema_sql, _is_internal=True) + if result_cursor is None: + raise SnowflakeQueryUnknownError(infer_schema_sql) + result = cast(List[Tuple[str, str]], result_cursor.fetchall()) + column_type_mapping: Dict[str, str] = dict(result) + # Infer schema can return the columns out of order depending on the chunking we do when uploading + # so we have to iterate through the dataframe columns to make sure we create the table with its + # columns in order + quote = '"' if quote_identifiers else "" + create_table_columns = ", ".join( + [f"{quote}{c}{quote} {column_type_mapping[c]}" for c in df.columns] + ) + create_table_sql = ( + f"CREATE {'TEMP ' if create_temp_table else ''}TABLE IF NOT EXISTS {location} " + f"({create_table_columns})" + f" /* Python:snowflake.connector.pandas_tools.write_pandas() */ " + ) + logger.debug(f"auto creating table with '{create_table_sql}'") + cursor.execute(create_table_sql, _is_internal=True) + drop_file_format_sql = f"DROP FILE FORMAT IF EXISTS {file_format_name}" + logger.debug(f"dropping file format with '{drop_file_format_sql}'") + cursor.execute(drop_file_format_sql, _is_internal=True) + + # in Snowflake, all parquet data is stored in a single column, $1, so we must select columns explicitly + # see (https://docs.snowflake.com/en/user-guide/script-data-load-transform-parquet.html) + if quote_identifiers: + parquet_columns = ",".join( + f'TO_BINARY($1:"{c}")' + if c in ["entity_feature_key", "entity_key", "value"] + else f'$1:"{c}"' + for c in df.columns + ) + else: + parquet_columns = ",".join( + f"TO_BINARY($1:{c})" + if c in ["entity_feature_key", "entity_key", "value"] + else f"$1:{c}" + for c in df.columns + ) + + copy_into_sql = ( + "COPY INTO {location} /* Python:snowflake.connector.pandas_tools.write_pandas() */ " + "({columns}) " + 'FROM (SELECT {parquet_columns} FROM @"{stage_name}") ' + "FILE_FORMAT=(TYPE=PARQUET COMPRESSION={compression} BINARY_AS_TEXT = FALSE) " + "PURGE=TRUE ON_ERROR={on_error}" + ).format( + location=location, + columns=columns, + parquet_columns=parquet_columns, + stage_name=stage_name, + compression=compression_map[compression], + on_error=on_error, + ) + logger.debug("copying into with '{}'".format(copy_into_sql)) + # Snowflake returns the original cursor if the query execution succeeded. + result_cursor = cursor.execute(copy_into_sql, _is_internal=True) + if result_cursor is None: + raise SnowflakeQueryUnknownError(copy_into_sql) + result_cursor.close() diff --git a/sdk/python/feast/templates/snowflake/bootstrap.py b/sdk/python/feast/templates/snowflake/bootstrap.py index 194ba08c08b..1663a1fb8b0 100644 --- a/sdk/python/feast/templates/snowflake/bootstrap.py +++ b/sdk/python/feast/templates/snowflake/bootstrap.py @@ -13,7 +13,6 @@ def bootstrap(): from feast.driver_test_data import create_driver_hourly_stats_df repo_path = pathlib.Path(__file__).parent.absolute() - config_file = repo_path / "feature_store.yaml" project_name = str(repo_path)[str(repo_path).rfind("/") + 1 :] @@ -23,7 +22,6 @@ def bootstrap(): driver_entities = [1001, 1002, 1003, 1004, 1005] driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) - repo_path = pathlib.Path(__file__).parent.absolute() data_path = repo_path / "data" data_path.mkdir(exist_ok=True) driver_stats_path = data_path / "driver_stats.parquet" @@ -38,6 +36,17 @@ def bootstrap(): snowflake_warehouse = click.prompt("Snowflake Warehouse Name (Case Sensitive):") snowflake_database = click.prompt("Snowflake Database Name (Case Sensitive):") + config_file = repo_path / "feature_store.yaml" + for i in range(2): + replace_str_in_file( + config_file, "SNOWFLAKE_DEPLOYMENT_URL", snowflake_deployment_url + ) + replace_str_in_file(config_file, "SNOWFLAKE_USER", snowflake_user) + replace_str_in_file(config_file, "SNOWFLAKE_PASSWORD", snowflake_password) + replace_str_in_file(config_file, "SNOWFLAKE_ROLE", snowflake_role) + replace_str_in_file(config_file, "SNOWFLAKE_WAREHOUSE", snowflake_warehouse) + replace_str_in_file(config_file, "SNOWFLAKE_DATABASE", snowflake_database) + if click.confirm( f'Should I upload example data to Snowflake (overwriting "{project_name}_feast_driver_hourly_stats" table)?', default=True, @@ -66,20 +75,6 @@ def bootstrap(): ) conn.close() - repo_path = pathlib.Path(__file__).parent.absolute() - config_file = repo_path / "feature_store.yaml" - driver_file = repo_path / "driver_repo.py" - replace_str_in_file( - config_file, "SNOWFLAKE_DEPLOYMENT_URL", snowflake_deployment_url - ) - replace_str_in_file(config_file, "SNOWFLAKE_USER", snowflake_user) - replace_str_in_file(config_file, "SNOWFLAKE_PASSWORD", snowflake_password) - replace_str_in_file(config_file, "SNOWFLAKE_ROLE", snowflake_role) - replace_str_in_file(config_file, "SNOWFLAKE_WAREHOUSE", snowflake_warehouse) - replace_str_in_file(config_file, "SNOWFLAKE_DATABASE", snowflake_database) - - replace_str_in_file(driver_file, "SNOWFLAKE_WAREHOUSE", snowflake_warehouse) - def replace_str_in_file(file_path, match_str, sub_str): with open(file_path, "r") as f: diff --git a/sdk/python/feast/templates/snowflake/driver_repo.py b/sdk/python/feast/templates/snowflake/driver_repo.py index 297a3f5ef0f..54f6b67126c 100644 --- a/sdk/python/feast/templates/snowflake/driver_repo.py +++ b/sdk/python/feast/templates/snowflake/driver_repo.py @@ -2,8 +2,7 @@ import yaml -from feast import Entity, FeatureService, FeatureView, Field, SnowflakeSource -from feast.types import Float32, Int64 +from feast import Entity, FeatureService, FeatureView, SnowflakeSource # Define an entity for the driver. Entities can be thought of as primary keys used to # retrieve features. Entities are also used to join multiple tables/views during the @@ -25,7 +24,6 @@ # The Snowflake table where features can be found database=yaml.safe_load(open("feature_store.yaml"))["offline_store"]["database"], table=f"{project_name}_feast_driver_hourly_stats", - warehouse="SNOWFLAKE_WAREHOUSE", # The event timestamp is used for point-in-time joins and for ensuring only # features within the TTL are returned timestamp_field="event_timestamp", @@ -51,14 +49,6 @@ # amount of historical scanning required for historical feature values # during retrieval ttl=timedelta(weeks=52), - # The list of features defined below act as a schema to both define features - # for both materialization of features into a store, and are used as references - # during retrieval for building a training dataset or serving features - schema=[ - Field(name="conv_rate", dtype=Float32), - Field(name="acc_rate", dtype=Float32), - Field(name="avg_daily_trips", dtype=Int64), - ], # Batch sources are used to find feature values. In the case of this feature # view we will query a source table on Redshift for driver statistics # features diff --git a/sdk/python/feast/templates/snowflake/feature_store.yaml b/sdk/python/feast/templates/snowflake/feature_store.yaml index 948869897bd..3e2e3c3ceaa 100644 --- a/sdk/python/feast/templates/snowflake/feature_store.yaml +++ b/sdk/python/feast/templates/snowflake/feature_store.yaml @@ -9,4 +9,11 @@ offline_store: role: SNOWFLAKE_ROLE warehouse: SNOWFLAKE_WAREHOUSE database: SNOWFLAKE_DATABASE -entity_key_serialization_version: 2 +online_store: + type: snowflake.online + account: SNOWFLAKE_DEPLOYMENT_URL + user: SNOWFLAKE_USER + password: SNOWFLAKE_PASSWORD + role: SNOWFLAKE_ROLE + warehouse: SNOWFLAKE_WAREHOUSE + database: SNOWFLAKE_DATABASE diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index bad2f529064..8f1aa51b19c 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -73,6 +73,17 @@ "connection_string": "127.0.0.1:6001,127.0.0.1:6002,127.0.0.1:6003", } +SNOWFLAKE_CONFIG = { + "type": "snowflake.online", + "account": os.environ["SNOWFLAKE_CI_DEPLOYMENT"], + "user": os.environ["SNOWFLAKE_CI_USER"], + "password": os.environ["SNOWFLAKE_CI_PASSWORD"], + "role": os.environ["SNOWFLAKE_CI_ROLE"], + "warehouse": os.environ["SNOWFLAKE_CI_WAREHOUSE"], + "database": "FEAST", + "schema": "ONLINE", +} + OFFLINE_STORE_TO_PROVIDER_CONFIG: Dict[str, DataSourceCreator] = { "file": ("local", FileDataSourceCreator), "bigquery": ("gcp", BigQueryDataSourceCreator), @@ -103,6 +114,7 @@ AVAILABLE_ONLINE_STORES["redis"] = (REDIS_CONFIG, None) AVAILABLE_ONLINE_STORES["dynamodb"] = (DYNAMO_CONFIG, None) AVAILABLE_ONLINE_STORES["datastore"] = ("datastore", None) + AVAILABLE_ONLINE_STORES["snowflake"] = (SNOWFLAKE_CONFIG, None) full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME)