diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 438b9d2ce86..8c940fdc41b 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -236,18 +236,45 @@ def _table_tags(online_config, table_instance) -> list[dict[str, str]]: @staticmethod def _update_tags(dynamodb_client, table_name: str, new_tags: list[dict[str, str]]): + """Update DynamoDB table tags using a diff-based approach. + + Instead of removing all tags and re-adding them (which is vulnerable to + the eventual-consistency window between UntagResource and TagResource), + this method computes the minimal set of changes needed: + + - Only removes tags that are no longer present in new_tags. + - Only adds/updates tags whose value has changed or that are new. + + This avoids the race condition described in + https://github.com/feast-dev/feast/issues/6418 where calling + TagResource immediately after UntagResource can leave a table with no + tags due to DynamoDB's asynchronous tag operations. + """ table_arn = dynamodb_client.describe_table(TableName=table_name)["Table"][ "TableArn" ] current_tags = dynamodb_client.list_tags_of_resource(ResourceArn=table_arn)[ "Tags" ] - if current_tags: - remove_keys = [tag["Key"] for tag in current_tags] - dynamodb_client.untag_resource(ResourceArn=table_arn, TagKeys=remove_keys) - if new_tags: - dynamodb_client.tag_resource(ResourceArn=table_arn, Tags=new_tags) + current_tag_map = {tag["Key"]: tag["Value"] for tag in current_tags} + new_tag_map = {tag["Key"]: tag["Value"] for tag in new_tags} + + # Remove only tags that are no longer in new_tags + keys_to_remove = [k for k in current_tag_map if k not in new_tag_map] + # Add / update only tags whose value is new or has changed + tags_to_add = [ + {"Key": k, "Value": v} + for k, v in new_tag_map.items() + if current_tag_map.get(k) != v + ] + + if keys_to_remove: + dynamodb_client.untag_resource( + ResourceArn=table_arn, TagKeys=keys_to_remove + ) + if tags_to_add: + dynamodb_client.tag_resource(ResourceArn=table_arn, Tags=tags_to_add) def update( self, diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index ddd73af53c6..3b84b1a3547 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -1154,3 +1154,136 @@ def test_dynamodb_online_store_online_read_requested_features_parallel( assert "age" in feat_dict assert "name" not in feat_dict assert "avg_orders_day" not in feat_dict + + +# --------------------------------------------------------------------------- +# Tests for the diff-based _update_tags (fix for GitHub issue #6418) +# +# DynamoDB's TagResource / UntagResource are eventually consistent, so the +# old approach of "remove all → re-add all" could leave tables tag-less due +# to the async propagation gap. The new approach only touches tags that +# actually need to change. +# --------------------------------------------------------------------------- + + +def _make_mock_dynamodb_client( + current_tags: list[dict[str, str]], + table_arn: str = "arn:aws:dynamodb:us-west-2:123456789012:table/test_table", +): + """Helper: create a pure MagicMock DynamoDB client for tag tests. + This avoids Moto's authentication issues with DynamoDB tag APIs. + """ + from unittest.mock import MagicMock + + client = MagicMock() + # Mock describe_table + client.describe_table.return_value = {"Table": {"TableArn": table_arn}} + # Mock list_tags_of_resource + client.list_tags_of_resource.return_value = {"Tags": current_tags} + return client + + +def test_update_tags_no_change_makes_no_api_calls(): + """Idempotency: calling _update_tags with identical tags must not call + untag_resource or tag_resource (avoids unnecessary eventual-consistency exposure). + """ + table_name = f"{TABLE_NAME}_no_change" + initial_tags = [{"Key": "env", "Value": "prod"}, {"Key": "team", "Value": "ml"}] + client = _make_mock_dynamodb_client(current_tags=initial_tags) + + DynamoDBOnlineStore._update_tags(client, table_name, initial_tags) + + client.untag_resource.assert_not_called() + client.tag_resource.assert_not_called() + + +def test_update_tags_adds_new_tags_without_untag(): + """Adding a new tag key must call tag_resource but NOT untag_resource.""" + table_name = f"{TABLE_NAME}_add_only" + existing = [{"Key": "env", "Value": "prod"}] + client = _make_mock_dynamodb_client(current_tags=existing) + + new_tags = [{"Key": "env", "Value": "prod"}, {"Key": "team", "Value": "ml"}] + DynamoDBOnlineStore._update_tags(client, table_name, new_tags) + + client.untag_resource.assert_not_called() + client.tag_resource.assert_called_once() + added = client.tag_resource.call_args[1]["Tags"] + assert added == [{"Key": "team", "Value": "ml"}] + + +def test_update_tags_removes_obsolete_tags_only(): + """Only truly obsolete tag keys (not in new_tags) must be passed to untag_resource.""" + table_name = f"{TABLE_NAME}_remove_only" + existing = [{"Key": "env", "Value": "prod"}, {"Key": "old-key", "Value": "old"}] + client = _make_mock_dynamodb_client(current_tags=existing) + + new_tags = [{"Key": "env", "Value": "prod"}] + DynamoDBOnlineStore._update_tags(client, table_name, new_tags) + + client.untag_resource.assert_called_once() + removed_keys = client.untag_resource.call_args[1]["TagKeys"] + assert removed_keys == ["old-key"] + + client.tag_resource.assert_not_called() + + +def test_update_tags_updates_changed_value(): + """A tag whose value changed must be passed to tag_resource (not untag_resource).""" + table_name = f"{TABLE_NAME}_change_value" + existing = [{"Key": "env", "Value": "staging"}] + client = _make_mock_dynamodb_client(current_tags=existing) + + new_tags = [{"Key": "env", "Value": "prod"}] + + DynamoDBOnlineStore._update_tags(client, table_name, new_tags) + + client.untag_resource.assert_not_called() + client.tag_resource.assert_called_once() + updated = client.tag_resource.call_args[1]["Tags"] + assert updated == [{"Key": "env", "Value": "prod"}] + + +def test_update_tags_full_replace(): + """Replacing an entire tag set removes old keys and adds new ones correctly.""" + table_name = f"{TABLE_NAME}_full_replace" + existing = [{"Key": "old1", "Value": "v1"}, {"Key": "old2", "Value": "v2"}] + client = _make_mock_dynamodb_client(current_tags=existing) + + new_tags = [{"Key": "new1", "Value": "n1"}, {"Key": "new2", "Value": "n2"}] + + DynamoDBOnlineStore._update_tags(client, table_name, new_tags) + + client.untag_resource.assert_called_once() + removed = set(client.untag_resource.call_args[1]["TagKeys"]) + assert removed == {"old1", "old2"} + + client.tag_resource.assert_called_once() + added = {t["Key"] for t in client.tag_resource.call_args[1]["Tags"]} + assert added == {"new1", "new2"} + + +def test_update_tags_clear_all_tags(): + """Passing an empty new_tags list removes all existing tags without calling tag_resource.""" + table_name = f"{TABLE_NAME}_clear_all" + existing = [{"Key": "env", "Value": "prod"}] + client = _make_mock_dynamodb_client(current_tags=existing) + + DynamoDBOnlineStore._update_tags(client, table_name, []) + + client.untag_resource.assert_called_once() + removed = client.untag_resource.call_args[1]["TagKeys"] + assert removed == ["env"] + + client.tag_resource.assert_not_called() + + +def test_update_tags_no_existing_tags_no_new_tags(): + """Both old and new tag sets are empty: no API calls at all.""" + table_name = f"{TABLE_NAME}_empty_both" + client = _make_mock_dynamodb_client(current_tags=[]) + + DynamoDBOnlineStore._update_tags(client, table_name, []) + + client.untag_resource.assert_not_called() + client.tag_resource.assert_not_called()