Skip to content

Commit a752211

Browse files
authored
feat: Add workgroup to athena offline store config (#3139)
* fix: FeastModuleImportError Signed-off-by: derek1032 <dchang@health2sync.com> * chore: Add aws region for athena universal tests Signed-off-by: derek1032 <dchang@health2sync.com> * feat: Add workgroup attribute for Athena offline store config Signed-off-by: derek1032 <dchang@health2sync.com> * fix: Add fail_if_exists condition to query string Signed-off-by: derek1032 <dchang@health2sync.com> * chore: Format python codes Signed-off-by: derek1032 <dchang@health2sync.com> * chore: use os.getenv and default value Signed-off-by: derek1032 <dchang@health2sync.com> Signed-off-by: derek1032 <dchang@health2sync.com>
1 parent c1c71da commit a752211

File tree

6 files changed

+85
-50
lines changed

6 files changed

+85
-50
lines changed

Makefile

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,19 @@ test-python-universal-mssql:
159159
sdk/python/tests
160160

161161

162-
#To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS. https://docs.aws.amazon.com/athena/latest/ug/getting-started.html
163-
#Modify environment variables ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_S3_BUCKET_NAME if you want to change the data source, database, and bucket name of S3 to use.
164-
#If tests fail with the pytest -n 8 option, change the number to 1.
162+
# To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS.
163+
# https://docs.aws.amazon.com/athena/latest/ug/getting-started.html
164+
# Modify environment variables ATHENA_REGION, ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_WORKGROUP or
165+
# ATHENA_S3_BUCKET_NAME according to your needs. If tests fail with the pytest -n 8 option, change the number to 1.
165166
test-python-universal-athena:
166167
PYTHONPATH='.' \
167168
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.athena_repo_configuration \
168169
PYTEST_PLUGINS=feast.infra.offline_stores.contrib.athena_offline_store.tests \
169170
FEAST_USAGE=False IS_TEST=True \
171+
ATHENA_REGION=ap-northeast-2 \
170172
ATHENA_DATA_SOURCE=AwsDataCatalog \
171173
ATHENA_DATABASE=default \
174+
ATHENA_WORKGROUP=primary \
172175
ATHENA_S3_BUCKET_NAME=feast-integration-tests \
173176
python -m pytest -n 8 --integration \
174177
-k "not test_go_feature_server and \

sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ class AthenaOfflineStoreConfig(FeastConfigBaseModel):
6060
database: StrictStr
6161
""" Athena database name """
6262

63+
workgroup: StrictStr
64+
""" Athena workgroup name """
65+
6366
s3_staging_location: StrictStr
6467
""" S3 path for importing & exporting data to Athena """
6568

@@ -243,6 +246,7 @@ def query_generator() -> Iterator[str]:
243246
athena_client,
244247
config.offline_store.data_source,
245248
config.offline_store.database,
249+
config.offline_store.workgroup,
246250
f"DROP TABLE IF EXISTS {config.offline_store.database}.{table_name}",
247251
)
248252

@@ -293,6 +297,7 @@ def write_logged_features(
293297
athena_client=athena_client,
294298
data_source=config.offline_store.data_source,
295299
database=config.offline_store.database,
300+
workgroup=config.offline_store.workgroup,
296301
s3_resource=s3_resource,
297302
s3_path=s3_path,
298303
table_name=destination.table_name,
@@ -378,6 +383,7 @@ def _to_df_internal(self) -> pd.DataFrame:
378383
self._athena_client,
379384
self._config.offline_store.data_source,
380385
self._config.offline_store.database,
386+
self._config.offline_store.workgroup,
381387
self._s3_resource,
382388
temp_external_location,
383389
self.get_temp_table_dml_header(temp_table_name, temp_external_location)
@@ -394,6 +400,7 @@ def _to_arrow_internal(self) -> pa.Table:
394400
self._athena_client,
395401
self._config.offline_store.data_source,
396402
self._config.offline_store.database,
403+
self._config.offline_store.workgroup,
397404
self._s3_resource,
398405
temp_external_location,
399406
self.get_temp_table_dml_header(temp_table_name, temp_external_location)
@@ -432,6 +439,7 @@ def to_athena(self, table_name: str) -> None:
432439
self._athena_client,
433440
self._config.offline_store.data_source,
434441
self._config.offline_store.database,
442+
self._config.offline_store.workgroup,
435443
query,
436444
)
437445

@@ -449,6 +457,7 @@ def _upload_entity_df(
449457
athena_client,
450458
config.offline_store.data_source,
451459
config.offline_store.database,
460+
config.offline_store.workgroup,
452461
s3_resource,
453462
f"{config.offline_store.s3_staging_location}/entity_df/{table_name}/{table_name}.parquet",
454463
table_name,
@@ -460,6 +469,7 @@ def _upload_entity_df(
460469
athena_client,
461470
config.offline_store.data_source,
462471
config.offline_store.database,
472+
config.offline_store.workgroup,
463473
f"CREATE TABLE {table_name} AS ({entity_df})",
464474
)
465475
else:
@@ -514,6 +524,7 @@ def _get_entity_df_event_timestamp_range(
514524
athena_client,
515525
config.offline_store.data_source,
516526
config.offline_store.database,
527+
config.offline_store.workgroup,
517528
f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max "
518529
f"FROM ({entity_df})",
519530
)

sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ def get_table_column_names_and_types(
225225
client,
226226
config.offline_store.data_source,
227227
config.offline_store.database,
228+
config.offline_store.workgroup,
228229
f"SELECT * FROM ({self.query}) LIMIT 1",
229230
)
230231
columns = aws_utils.get_athena_query_result(client, statement_id)[

sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,20 @@ class AthenaDataSourceCreator(DataSourceCreator):
2727

2828
def __init__(self, project_name: str, *args, **kwargs):
2929
super().__init__(project_name)
30-
self.client = aws_utils.get_athena_data_client("ap-northeast-2")
31-
self.s3 = aws_utils.get_s3_resource("ap-northeast-2")
32-
data_source = (
33-
os.environ.get("ATHENA_DATA_SOURCE")
34-
if os.environ.get("ATHENA_DATA_SOURCE")
35-
else "AwsDataCatalog"
36-
)
37-
database = (
38-
os.environ.get("ATHENA_DATABASE")
39-
if os.environ.get("ATHENA_DATABASE")
40-
else "default"
41-
)
42-
bucket_name = (
43-
os.environ.get("ATHENA_S3_BUCKET_NAME")
44-
if os.environ.get("ATHENA_S3_BUCKET_NAME")
45-
else "feast-integration-tests"
46-
)
30+
31+
region = os.getenv("ATHENA_REGION", "ap-northeast-2")
32+
data_source = os.getenv("ATHENA_DATA_SOURCE", "AwsDataCatalog")
33+
database = os.getenv("ATHENA_DATABASE", "default")
34+
workgroup = os.getenv("ATHENA_WORKGROUP", "primary")
35+
bucket_name = os.getenv("ATHENA_S3_BUCKET_NAME", "feast-integration-tests")
36+
37+
self.client = aws_utils.get_athena_data_client(region)
38+
self.s3 = aws_utils.get_s3_resource(region)
4739
self.offline_store_config = AthenaOfflineStoreConfig(
48-
data_source=f"{data_source}",
49-
region="ap-northeast-2",
50-
database=f"{database}",
40+
data_source=data_source,
41+
region=region,
42+
database=database,
43+
workgroup=workgroup,
5144
s3_staging_location=f"s3://{bucket_name}/test_dir",
5245
)
5346

@@ -77,6 +70,7 @@ def create_data_source(
7770
self.client,
7871
self.offline_store_config.data_source,
7972
self.offline_store_config.database,
73+
self.offline_store_config.workgroup,
8074
self.s3,
8175
s3_target,
8276
table_name,
@@ -126,5 +120,6 @@ def teardown(self):
126120
self.client,
127121
self.offline_store_config.data_source,
128122
self.offline_store_config.database,
123+
self.offline_store_config.workgroup,
129124
f"DROP TABLE IF EXISTS {table}",
130125
)

sdk/python/feast/infra/offline_stores/contrib/athena_repo_configuration.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
from feast.infra.offline_stores.contrib.athena_offline_store.tests.data_source import (
2+
AthenaDataSourceCreator,
3+
)
14
from tests.integration.feature_repos.integration_test_repo_config import (
25
IntegrationTestRepoConfig,
36
)
4-
from tests.integration.feature_repos.universal.data_sources.athena import (
5-
AthenaDataSourceCreator,
6-
)
77

88
FULL_REPO_CONFIGS = [
99
IntegrationTestRepoConfig(

0 commit comments

Comments
 (0)