From 5b8fb41760d2f04f252c5923d898732cf2165fca Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 6 Jul 2022 13:06:23 -0700 Subject: [PATCH 01/14] chore: Implement to_remote_storage for RedshiftRetrievalJob Signed-off-by: Achal Shah --- .../feast/infra/offline_stores/redshift.py | 7 +++++++ sdk/python/feast/infra/utils/aws_utils.py | 19 ++++++++++++++++--- .../test_universal_historical_retrieval.py | 6 +++--- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 5f071a814f3..1d7b79727ec 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -490,6 +490,13 @@ def persist(self, storage: SavedDatasetStorage): def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata + def supports_remote_storage_export(self) -> bool: + return True + + def to_remote_storage(self) -> List[str]: + path = self.to_s3() + return aws_utils.list_s3_files(self._config.offline_store.region, path) + def _upload_entity_df( entity_df: Union[pd.DataFrame, str], diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index 7badda98460..dae9d84d5d0 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -3,7 +3,7 @@ import tempfile import uuid from pathlib import Path -from typing import Any, Dict, Iterator, Optional, Tuple, Union +from typing import Any, Dict, Iterator, Optional, Tuple, Union, List import pandas as pd import pyarrow @@ -433,7 +433,7 @@ def download_s3_directory(s3_resource, bucket: str, key: str, local_dir: str): if key != "" and not key.endswith("/"): key = key + "/" for obj in bucket_obj.objects.filter(Prefix=key): - local_file_path = local_dir + "/" + obj.key[len(key) :] + local_file_path = local_dir + "/" + obj.key[len(key):] local_file_dir = os.path.dirname(local_file_path) os.makedirs(local_file_dir, exist_ok=True) bucket_obj.download_file(obj.key, local_file_path) @@ -473,7 +473,7 @@ def execute_redshift_query_and_unload_to_s3( # Run the query, unload the results to S3 unique_table_name = "_" + str(uuid.uuid4()).replace("-", "") query = f"CREATE TEMPORARY TABLE {unique_table_name} AS ({query});\n" - query += f"UNLOAD ('SELECT * FROM {unique_table_name}') TO '{s3_path}/' IAM_ROLE '{iam_role}' PARQUET" + query += f"UNLOAD ('SELECT * FROM {unique_table_name}') TO '{s3_path}/' IAM_ROLE '{iam_role}' FORMAT AS PARQUET" execute_redshift_statement(redshift_data_client, cluster_id, database, user, query) @@ -632,3 +632,16 @@ def delete_api_gateway(api_gateway_client, api_gateway_id: str) -> Dict: def get_account_id() -> str: """Get AWS Account ID""" return boto3.client("sts").get_caller_identity().get("Account") + + +def list_s3_files(aws_region: str, path: str) -> List[str]: + s3 = boto3.client("s3", config=Config(region_name=aws_region)) + if path.startswith("s3://"): + path = path[len("s3://"):] + bucket, prefix = path.split("/", 1) + objects = s3.list_objects_v2( + Bucket=bucket, + Prefix=prefix) + contents = objects["Contents"] + files = [f"s3://{bucket}/{content['Key']}" for content in contents] + return files diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 0b2965084d8..9213b8f2539 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -359,9 +359,9 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "customer_profile:current_balance", "customer_profile:avg_passenger_count", "customer_profile:lifetime_trip_count", - "conv_rate_plus_100:conv_rate_plus_100", - "conv_rate_plus_100:conv_rate_plus_100_rounded", - "conv_rate_plus_100:conv_rate_plus_val_to_add", + # "conv_rate_plus_100:conv_rate_plus_100", + # "conv_rate_plus_100:conv_rate_plus_100_rounded", + # "conv_rate_plus_100:conv_rate_plus_val_to_add", "order:order_is_success", "global_stats:num_rides", "global_stats:avg_ride_length", From e9df8ace2d07b2023486c3e65ab87351ee9305b6 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 6 Jul 2022 13:26:31 -0700 Subject: [PATCH 02/14] Implement to_remote_storage for BigQuery Signed-off-by: Achal Shah --- .../feast/infra/offline_stores/bigquery.py | 45 +++++++++++++++++++ sdk/python/feast/infra/utils/aws_utils.py | 10 ++--- .../test_universal_historical_retrieval.py | 6 +-- 3 files changed, 52 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index cb5b3a045af..7e2fc0d7953 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -57,6 +57,7 @@ from google.cloud import bigquery from google.cloud.bigquery import Client, SchemaField, Table from google.cloud.bigquery._pandas_helpers import ARROW_SCALAR_IDS_TO_BQ + from google.cloud.storage import Client as StorageClient except ImportError as e: from feast.errors import FeastExtrasDependencyImportError @@ -83,6 +84,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): For more information on BigQuery data locations see: https://cloud.google.com/bigquery/docs/locations """ + gcs_staging_location: Optional[str] = None + """ (optional) GCS location used for offloading BigQuery results as parquet files.""" + class BigQueryOfflineStore(OfflineStore): @staticmethod @@ -386,6 +390,14 @@ def query_generator() -> Iterator[str]: on_demand_feature_views if on_demand_feature_views else [] ) self._metadata = metadata + if self.config.offline_store.gcs_staging_location: + self._gcs_path = ( + self.config.offline_store.gcs_staging_location + + f"/{self.config.project}/export/" + + str(uuid.uuid4()) + ) + else: + self._gcs_path = None @property def full_feature_names(self) -> bool: @@ -478,6 +490,39 @@ def persist(self, storage: SavedDatasetStorage): def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata + def supports_remote_storage_export(self) -> bool: + return True + + def to_remote_storage(self) -> List[str]: + if self.config.offline_store.gcs_staging_location or not self._gcs_path: + raise ValueError( + "gcs_staging_location needs to be specified for the big query " + "offline store when executing `to_remote_storage()`" + ) + + table = self.to_bigquery() + + job_config = bigquery.job.ExtractJobConfig() + job_config.destination_format = "PARQUET" + + extract_job = self.client.extract_table( + table, + destination_uris=[f"{self._gcs_path}/*.parquet"], + location=self.config.offline_store.location, + job_config=job_config, + ) + extract_job.result() + + storage_client = StorageClient(project=self.config.offline_store.project_id) + bucket, prefix = self._gcs_path[len("gs://") :].split("/", 1) + prefix = prefix.rsplit("/", 1)[0] + + blobs = storage_client.list_blobs(bucket, prefix=prefix) + results = [] + for b in blobs: + results.append(f"gs://{b.bucket.name}/{b.name}") + return results + def block_until_done( client: Client, diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index dae9d84d5d0..51aecbf8a72 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -3,7 +3,7 @@ import tempfile import uuid from pathlib import Path -from typing import Any, Dict, Iterator, Optional, Tuple, Union, List +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import pandas as pd import pyarrow @@ -433,7 +433,7 @@ def download_s3_directory(s3_resource, bucket: str, key: str, local_dir: str): if key != "" and not key.endswith("/"): key = key + "/" for obj in bucket_obj.objects.filter(Prefix=key): - local_file_path = local_dir + "/" + obj.key[len(key):] + local_file_path = local_dir + "/" + obj.key[len(key) :] local_file_dir = os.path.dirname(local_file_path) os.makedirs(local_file_dir, exist_ok=True) bucket_obj.download_file(obj.key, local_file_path) @@ -637,11 +637,9 @@ def get_account_id() -> str: def list_s3_files(aws_region: str, path: str) -> List[str]: s3 = boto3.client("s3", config=Config(region_name=aws_region)) if path.startswith("s3://"): - path = path[len("s3://"):] + path = path[len("s3://") :] bucket, prefix = path.split("/", 1) - objects = s3.list_objects_v2( - Bucket=bucket, - Prefix=prefix) + objects = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) contents = objects["Contents"] files = [f"s3://{bucket}/{content['Key']}" for content in contents] return files diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 9213b8f2539..0b2965084d8 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -359,9 +359,9 @@ def test_historical_features(environment, universal_data_sources, full_feature_n "customer_profile:current_balance", "customer_profile:avg_passenger_count", "customer_profile:lifetime_trip_count", - # "conv_rate_plus_100:conv_rate_plus_100", - # "conv_rate_plus_100:conv_rate_plus_100_rounded", - # "conv_rate_plus_100:conv_rate_plus_val_to_add", + "conv_rate_plus_100:conv_rate_plus_100", + "conv_rate_plus_100:conv_rate_plus_100_rounded", + "conv_rate_plus_100:conv_rate_plus_val_to_add", "order:order_is_success", "global_stats:num_rides", "global_stats:avg_ride_length", From 62265770d13c67f2ff0d1bf73f9d6d83bf9bc849 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 6 Jul 2022 15:55:44 -0700 Subject: [PATCH 03/14] add for snowflake as well Signed-off-by: Achal Shah --- .../feast/infra/offline_stores/snowflake.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index a5befc33e23..444dd6c7361 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -1,5 +1,6 @@ import contextlib import os +import uuid from datetime import datetime from pathlib import Path from typing import ( @@ -90,6 +91,10 @@ class SnowflakeOfflineStoreConfig(FeastConfigBaseModel): schema_: Optional[str] = Field(None, alias="schema") """ Snowflake schema name """ + storage_integration_name: Optional[str] = None + + blob_export_location: Optional[str] = None + class Config: allow_population_by_field_name = True @@ -453,6 +458,33 @@ def persist(self, storage: SavedDatasetStorage): def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata + def supports_remote_storage_export(self) -> bool: + return True + + def to_remote_storage(self) -> List[str]: + if not self.config.offline_store.s3_staging_location: + raise ValueError( + "to_remote_storage() requires `s3_staging_location` to be specified in config" + ) + if not self.config.offline_store.storage_integration_name: + raise ValueError( + "to_remote_storage() requires `storage_integration_name` to be specified in config" + ) + + table = f"temporary_{uuid.uuid4()}" + self.to_snowflake(table) + + copy_into_query = ( + f'COPY INTO "{self.config.offline_store.s3_staging_location}/{table}" from {table}\n' + f" storage_integration = {self.config.offline_store.storage_integration_name}\n" + f" file_format = (TYPE = PARQUET)" + f" DETAILED_OUTPUT = TRUE;\n" + ) + + execute_snowflake_statement(self.snowflake_conn, copy_into_query) + # TODO(achal) what next here?? + return [] + def _get_entity_schema( entity_df: Union[pd.DataFrame, str], From 5672110c90f2c913394625abdd3d1624c2b67ba8 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 11:17:51 -0700 Subject: [PATCH 04/14] fully fleshed for snowflake Signed-off-by: Achal Shah --- .../feast/infra/offline_stores/snowflake.py | 43 ++++++++++++------- .../test_universal_historical_retrieval.py | 3 ++ 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 444dd6c7361..cb727f095a6 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -383,6 +383,11 @@ def query_generator() -> Iterator[str]: on_demand_feature_views if on_demand_feature_views else [] ) self._metadata = metadata + self.export_path: Optional[str] + if self.config.offline_store.blob_export_location: + self.export_path = f"{self.config.offline_store.blob_export_location}/{self.config.project}/{uuid.uuid4()}" + else: + self.export_path = None @property def full_feature_names(self) -> bool: @@ -418,7 +423,7 @@ def _to_arrow_internal(self) -> pa.Table: pd.DataFrame(columns=[md.name for md in empty_result.description]) ) - def to_snowflake(self, table_name: str) -> None: + def to_snowflake(self, table_name: str, temporary=False) -> None: """Save dataset as a new Snowflake table""" if self.on_demand_feature_views is not None: transformed_df = self.to_df() @@ -430,7 +435,7 @@ def to_snowflake(self, table_name: str) -> None: return None with self._query_generator() as query: - query = f'CREATE TABLE IF NOT EXISTS "{table_name}" AS ({query});\n' + query = f'CREATE {"TEMPORARY" if temporary else ""} TABLE IF NOT EXISTS "{table_name}" AS ({query});\n' execute_snowflake_statement(self.snowflake_conn, query) @@ -459,31 +464,39 @@ def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata def supports_remote_storage_export(self) -> bool: - return True + return ( + self.config.offline_store.storage_integration_name + and self.config.offline_store.blob_export_location + ) def to_remote_storage(self) -> List[str]: - if not self.config.offline_store.s3_staging_location: + if not self.export_path: raise ValueError( - "to_remote_storage() requires `s3_staging_location` to be specified in config" + "to_remote_storage() requires `blob_export_location` to be specified in config" ) if not self.config.offline_store.storage_integration_name: raise ValueError( "to_remote_storage() requires `storage_integration_name` to be specified in config" ) - table = f"temporary_{uuid.uuid4()}" + table = f"temporary_{uuid.uuid4().hex}" self.to_snowflake(table) - copy_into_query = ( - f'COPY INTO "{self.config.offline_store.s3_staging_location}/{table}" from {table}\n' - f" storage_integration = {self.config.offline_store.storage_integration_name}\n" - f" file_format = (TYPE = PARQUET)" - f" DETAILED_OUTPUT = TRUE;\n" - ) + copy_into_query = f"""copy into '{self.config.offline_store.blob_export_location}/{table}' from "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n + storage_integration = {self.config.offline_store.storage_integration_name}\n + file_format = (TYPE = PARQUET)\n + DETAILED_OUTPUT = TRUE\n + HEADER = TRUE;\n + """ - execute_snowflake_statement(self.snowflake_conn, copy_into_query) - # TODO(achal) what next here?? - return [] + cursor = execute_snowflake_statement(self.snowflake_conn, copy_into_query) + all_rows = ( + cursor.fetchall() + ) # This may be need pagination at some point in the future. + file_name_column_index = [ + idx for idx, rm in enumerate(cursor.description) if rm.name == "FILE_NAME" + ][0] + return [f"{self.export_path}/{row[file_name_column_index]}" for row in all_rows] def _get_entity_schema( diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 0b2965084d8..8efddbc9797 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -370,6 +370,9 @@ def test_historical_features(environment, universal_data_sources, full_feature_n full_feature_names=full_feature_names, ) + if job_from_df.supports_remote_storage_export: + print(job_from_df.to_remote_storage()) + start_time = datetime.utcnow() actual_df_from_df_entities = job_from_df.to_df() From a8eb5d5c2e7c7a6ea6cf93f1b52170588586495c Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 11:25:54 -0700 Subject: [PATCH 05/14] better test config : Signed-off-by: Achal Shah --- .../feature_repos/universal/data_sources/bigquery.py | 4 +++- .../feature_repos/universal/data_sources/snowflake.py | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 620f444159b..83bc1ef308f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -51,7 +51,9 @@ def teardown(self): self.dataset = None def create_offline_store_config(self): - return BigQueryOfflineStoreConfig() + return BigQueryOfflineStoreConfig( + location="US", gcs_staging_location="gs://feast-export/" + ) def create_data_source( self, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index 23466bc00c0..ae83ea8eb08 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -34,6 +34,8 @@ def __init__(self, project_name: str, *args, **kwargs): warehouse=os.environ["SNOWFLAKE_CI_WAREHOUSE"], database="FEAST", schema="OFFLINE", + storage_integration_name="FEAST_S3", + blob_export_location="s3://feast-snowflake-offload/export", ) def create_data_source( From b91162efec1d6ca1d46e732031886842206f3da5 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 11:55:14 -0700 Subject: [PATCH 06/14] fix tests: Signed-off-by: Achal Shah --- sdk/python/feast/infra/offline_stores/file.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index d60d4681741..ae98f8d0c29 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -105,6 +105,9 @@ def persist(self, storage: SavedDatasetStorage): def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata + def supports_remote_storage_export(self) -> bool: + return False + class FileOfflineStore(OfflineStore): @staticmethod From 2bb9af36ede1d67401777a813505156f841e28e2 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 12:02:11 -0700 Subject: [PATCH 07/14] fix tests Signed-off-by: Achal Shah --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 7e2fc0d7953..7c5581d069a 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -494,7 +494,7 @@ def supports_remote_storage_export(self) -> bool: return True def to_remote_storage(self) -> List[str]: - if self.config.offline_store.gcs_staging_location or not self._gcs_path: + if not self._gcs_path: raise ValueError( "gcs_staging_location needs to be specified for the big query " "offline store when executing `to_remote_storage()`" From 9eef0bbef920da40e59408360b5b60bd27128f8c Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 12:06:56 -0700 Subject: [PATCH 08/14] more fixes Signed-off-by: Achal Shah --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- sdk/python/feast/infra/offline_stores/snowflake.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 7c5581d069a..1dcd098af66 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -491,7 +491,7 @@ def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata def supports_remote_storage_export(self) -> bool: - return True + return self._gcs_path def to_remote_storage(self) -> List[str]: if not self._gcs_path: diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index cb727f095a6..71394c44038 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -92,8 +92,10 @@ class SnowflakeOfflineStoreConfig(FeastConfigBaseModel): """ Snowflake schema name """ storage_integration_name: Optional[str] = None + """ Storage integration name in snowflake """ blob_export_location: Optional[str] = None + """ Location (in S3, Google storage or Azure storage) where data is offloaded """ class Config: allow_population_by_field_name = True From 5c7ba228dab38c4cd0dee585c55a3c15d717f658 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 12:52:03 -0700 Subject: [PATCH 09/14] more fixes Signed-off-by: Achal Shah --- .../offline_store/test_universal_historical_retrieval.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 8efddbc9797..160bf688d64 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -370,7 +370,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n full_feature_names=full_feature_names, ) - if job_from_df.supports_remote_storage_export: + if job_from_df.supports_remote_storage_export(): print(job_from_df.to_remote_storage()) start_time = datetime.utcnow() From dcfba45a0e75a4f1abcd010accafe16316e4f16d Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 12:57:04 -0700 Subject: [PATCH 10/14] more fixes Signed-off-by: Achal Shah --- .../offline_store/test_universal_historical_retrieval.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 160bf688d64..2a425aeb297 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -371,7 +371,9 @@ def test_historical_features(environment, universal_data_sources, full_feature_n ) if job_from_df.supports_remote_storage_export(): - print(job_from_df.to_remote_storage()) + files = job_from_df.to_remote_storage() + print(files) + assert len(files) > 0 # This should be way more detailed start_time = datetime.utcnow() actual_df_from_df_entities = job_from_df.to_df() From 7fbc1194ac97a4330f35a43b1ffe7511903ee6dc Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 14:18:41 -0700 Subject: [PATCH 11/14] fix bigquery Signed-off-by: Achal Shah --- sdk/python/feast/infra/offline_stores/bigquery.py | 6 +++++- .../offline_store/test_universal_historical_retrieval.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 1dcd098af66..81d00d11f66 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -513,9 +513,13 @@ def to_remote_storage(self) -> List[str]: ) extract_job.result() - storage_client = StorageClient(project=self.config.offline_store.project_id) + bucket: str + prefix: str + storage_client = StorageClient(project=self.client.project) bucket, prefix = self._gcs_path[len("gs://") :].split("/", 1) prefix = prefix.rsplit("/", 1)[0] + if prefix.startswith("/"): + prefix = prefix[1:] blobs = storage_client.list_blobs(bucket, prefix=prefix) results = [] diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 2a425aeb297..abaf1622c08 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -373,7 +373,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n if job_from_df.supports_remote_storage_export(): files = job_from_df.to_remote_storage() print(files) - assert len(files) > 0 # This should be way more detailed + assert len(files) > 0 # This test should be way more detailed start_time = datetime.utcnow() actual_df_from_df_entities = job_from_df.to_df() From d3e6533d40648fae1f8c30ae3b3b09e409fe1fb5 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 14:27:43 -0700 Subject: [PATCH 12/14] use temp table for entity df table Signed-off-by: Achal Shah --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 81d00d11f66..b0d09358f76 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -603,7 +603,7 @@ def _upload_entity_df( """Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table""" if isinstance(entity_df, str): - job = client.query(f"CREATE TABLE {table_name} AS ({entity_df})") + job = client.query(f"CREATE TEMP TABLE {table_name} AS ({entity_df})") elif isinstance(entity_df, pd.DataFrame): # Drop the index so that we don't have unnecessary columns From 95c548db0320d2a07d63062e582fb9c3ac79dcb6 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 14:30:34 -0700 Subject: [PATCH 13/14] simplify condition Signed-off-by: Achal Shah --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index b0d09358f76..f81063e0f95 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -491,7 +491,7 @@ def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata def supports_remote_storage_export(self) -> bool: - return self._gcs_path + return self._gcs_path is not None def to_remote_storage(self) -> List[str]: if not self._gcs_path: From 4421b668b32c20ce739498afa269d137b097f0e5 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 7 Jul 2022 14:56:35 -0700 Subject: [PATCH 14/14] remove temp Signed-off-by: Achal Shah --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index f81063e0f95..3bf340acf92 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -603,7 +603,7 @@ def _upload_entity_df( """Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table""" if isinstance(entity_df, str): - job = client.query(f"CREATE TEMP TABLE {table_name} AS ({entity_df})") + job = client.query(f"CREATE TABLE {table_name} AS ({entity_df})") elif isinstance(entity_df, pd.DataFrame): # Drop the index so that we don't have unnecessary columns