Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add tag update helper method
Signed-off-by: Rob Howley <howley.robert@gmail.com>
  • Loading branch information
robhowley committed Apr 23, 2025
commit 009f82936d95a3bde5fda24bc37d54fe41c0bbb1
64 changes: 45 additions & 19 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,43 @@ async def close(self):
def async_supported(self) -> SupportedAsyncMethods:
return SupportedAsyncMethods(read=True, write=True)

@staticmethod
def _table_tags(online_config, table_instance) -> list[dict[str, str]]:
online_tags = online_config.tags or {}
common_tags = [
{"Key": key, "Value": value} for key, value in online_tags.items()
]
Comment on lines +146 to +154
Copy link
Copy Markdown
Contributor Author

@robhowley robhowley Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the table level tags override the global where applicable. eg

# in yaml, default to platform team owns the infra
team: platform
# in feature view py file we override for one particular instance
tags={"team": "product-team"}

return common_tags + (
[
{"Key": key, "Value": value}
for key, value in table_instance.tags.items()
if key not in online_tags
]
if table_instance.tags
else []
)

@staticmethod
def _update_tags(dynamodb_client, table_name, new_tags):
table_arn = dynamodb_client.describe_table(TableName=table_name)["Table"][
"TableArn"
]
current_tags = dynamodb_client.list_tags_of_resource(ResourceArn=table_arn).get(
"Tags"
)
if current_tags:
remove_keys = [
tag["Key"]
for tag in current_tags
if tag["Key"] not in new_tags or tag["Value"] != new_tags[tag["Key"]]
]
if remove_keys:
dynamodb_client.untag_resource(
ResourceArn=table_arn, TagKeys=remove_keys
)
if new_tags:
dynamodb_client.tag_resource(ResourceArn=table_arn, Tags=new_tags)

def update(
self,
config: RepoConfig,
Expand Down Expand Up @@ -168,26 +205,12 @@ def update(
online_config.session_based_auth,
)

online_tags = online_config.tags or {}
common_tags = [
{"Key": key, "Value": value}
for key, value in online_tags.items()
]

for table_instance in tables_to_keep:
table_tags = common_tags + (
[
{"Key": key, "Value": value}
for key, value in table_instance.tags.items()
if key not in online_tags
]
if table_instance.tags
else []
)

# Add Tags attribute to creation request only if configured to prevent
# TagResource permission issues, even with an empty Tags array.
table_tags = self._table_tags(online_config, table_instance)
kwargs = {"Tags": table_tags} if table_tags else {}

try:
dynamodb_resource.create_table(
TableName=_get_table_name(online_config, config, table_instance),
Expand All @@ -206,9 +229,12 @@ def update(
raise

for table_instance in tables_to_keep:
dynamodb_client.get_waiter("table_exists").wait(
TableName=_get_table_name(online_config, config, table_instance)
)
table_name = _get_table_name(online_config, config, table_instance)
dynamodb_client.get_waiter("table_exists").wait(TableName=table_name)
# once table is confirmed to exist, update the tags.
# tags won't be updated in the create_table call if the table already exists
tags = self._table_tags(online_config, table_instance)
self._update_tags(dynamodb_client, table_name, tags)

for table_to_delete in tables_to_delete:
_delete_table_idempotent(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from copy import deepcopy
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

import boto3
import pytest
Expand Down Expand Up @@ -32,6 +33,7 @@
@dataclass
class MockFeatureView:
name: str
tags: Optional[dict[str, str]] = None


@pytest.fixture
Expand Down Expand Up @@ -209,6 +211,13 @@ def test_dynamodb_online_store_online_write_batch(
assert [item[1] for item in stored_items] == list(features)


def _get_tags(dynamodb_client, table_name):
table_arn = dynamodb_client.describe_table(TableName=table_name)["Table"][
"TableArn"
]
return dynamodb_client.list_tags_of_resource(ResourceArn=table_arn).get("Tags")


@mock_dynamodb
def test_dynamodb_online_store_update(repo_config, dynamodb_online_store):
"""Test DynamoDBOnlineStore update method."""
Expand All @@ -222,7 +231,7 @@ def test_dynamodb_online_store_update(repo_config, dynamodb_online_store):
dynamodb_online_store.update(
config=repo_config,
tables_to_delete=[MockFeatureView(name=db_table_delete_name)],
tables_to_keep=[MockFeatureView(name=db_table_keep_name)],
tables_to_keep=[MockFeatureView(name=db_table_keep_name, tags={"some": "tag"})],
entities_to_delete=None,
entities_to_keep=None,
partial=None,
Expand All @@ -237,6 +246,62 @@ def test_dynamodb_online_store_update(repo_config, dynamodb_online_store):
assert len(existing_tables) == 1
assert existing_tables[0] == f"test_aws.{db_table_keep_name}"

assert _get_tags(dynamodb_client, existing_tables[0]) == [
{"Key": "some", "Value": "tag"}
]


@mock_dynamodb
def test_dynamodb_online_store_update_tags(repo_config, dynamodb_online_store):
"""Test DynamoDBOnlineStore update method."""
# create dummy table to update with new tags and tag values
table_name = f"{TABLE_NAME}_keep_update_tags"
create_test_table(PROJECT, table_name, REGION)

# add tags on update
dynamodb_online_store.update(
config=repo_config,
tables_to_delete=[],
tables_to_keep=[MockFeatureView(name=table_name, tags={"some": "tag"})],
entities_to_delete=[],
entities_to_keep=[],
partial=None,
)

# update tags
dynamodb_online_store.update(
config=repo_config,
tables_to_delete=[],
tables_to_keep=[
MockFeatureView(name=table_name, tags={"some": "new-tag", "another": "tag"})
],
entities_to_delete=[],
entities_to_keep=[],
partial=None,
)

# check only db_table_keep_name exists
dynamodb_client = dynamodb_online_store._get_dynamodb_client(REGION)
existing_tables = dynamodb_client.list_tables().get("TableNames", None)

expected_tags = [
{"Key": "some", "Value": "new-tag"},
{"Key": "another", "Value": "tag"},
]
assert _get_tags(dynamodb_client, existing_tables[0]) == expected_tags

# and then remove all tags
dynamodb_online_store.update(
config=repo_config,
tables_to_delete=[],
tables_to_keep=[MockFeatureView(name=table_name, tags=None)],
entities_to_delete=[],
entities_to_keep=[],
partial=None,
)

assert _get_tags(dynamodb_client, existing_tables[0]) == []


@mock_dynamodb
def test_dynamodb_online_store_teardown(repo_config, dynamodb_online_store):
Expand Down