Skip to content

Commit bad2b7d

Browse files
fix(dynamodb): Avoid tag race condition by using diff-based tag updates (#6479)
* fix(dynamodb): Avoid tag race condition by using diff-based tag updates DynamoDB's TagResource and UntagResource are eventually consistent. The old implementation removed all tags then re-added all desired tags immediately, creating a race window where repeated store.apply() calls would leave tables without any tags. Fix: compute a diff between current and desired tags, then: - Only remove keys that are truly obsolete (not in new config) - Only add/update tags whose value is new or has changed - If nothing changed, make zero API calls (idempotent) This eliminates the race condition and also reduces unnecessary API calls on repeated store.apply() with unchanged configuration. Adds 7 unit tests covering: idempotency, add-only, remove-only, value change, full replace, clear-all, and empty-both cases. Fixes #6418 Signed-off-by: Harsh <thestarharsh@gmail.com> * style: run ruff auto-formatter to fix linting issues Signed-off-by: Harsh <thestarharsh@gmail.com> --------- Signed-off-by: Harsh <thestarharsh@gmail.com>
1 parent 2321c07 commit bad2b7d

2 files changed

Lines changed: 165 additions & 5 deletions

File tree

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,18 +236,45 @@ def _table_tags(online_config, table_instance) -> list[dict[str, str]]:
236236

237237
@staticmethod
238238
def _update_tags(dynamodb_client, table_name: str, new_tags: list[dict[str, str]]):
239+
"""Update DynamoDB table tags using a diff-based approach.
240+
241+
Instead of removing all tags and re-adding them (which is vulnerable to
242+
the eventual-consistency window between UntagResource and TagResource),
243+
this method computes the minimal set of changes needed:
244+
245+
- Only removes tags that are no longer present in new_tags.
246+
- Only adds/updates tags whose value has changed or that are new.
247+
248+
This avoids the race condition described in
249+
https://github.com/feast-dev/feast/issues/6418 where calling
250+
TagResource immediately after UntagResource can leave a table with no
251+
tags due to DynamoDB's asynchronous tag operations.
252+
"""
239253
table_arn = dynamodb_client.describe_table(TableName=table_name)["Table"][
240254
"TableArn"
241255
]
242256
current_tags = dynamodb_client.list_tags_of_resource(ResourceArn=table_arn)[
243257
"Tags"
244258
]
245-
if current_tags:
246-
remove_keys = [tag["Key"] for tag in current_tags]
247-
dynamodb_client.untag_resource(ResourceArn=table_arn, TagKeys=remove_keys)
248259

249-
if new_tags:
250-
dynamodb_client.tag_resource(ResourceArn=table_arn, Tags=new_tags)
260+
current_tag_map = {tag["Key"]: tag["Value"] for tag in current_tags}
261+
new_tag_map = {tag["Key"]: tag["Value"] for tag in new_tags}
262+
263+
# Remove only tags that are no longer in new_tags
264+
keys_to_remove = [k for k in current_tag_map if k not in new_tag_map]
265+
# Add / update only tags whose value is new or has changed
266+
tags_to_add = [
267+
{"Key": k, "Value": v}
268+
for k, v in new_tag_map.items()
269+
if current_tag_map.get(k) != v
270+
]
271+
272+
if keys_to_remove:
273+
dynamodb_client.untag_resource(
274+
ResourceArn=table_arn, TagKeys=keys_to_remove
275+
)
276+
if tags_to_add:
277+
dynamodb_client.tag_resource(ResourceArn=table_arn, Tags=tags_to_add)
251278

252279
def update(
253280
self,

sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,3 +1154,136 @@ def test_dynamodb_online_store_online_read_requested_features_parallel(
11541154
assert "age" in feat_dict
11551155
assert "name" not in feat_dict
11561156
assert "avg_orders_day" not in feat_dict
1157+
1158+
1159+
# ---------------------------------------------------------------------------
1160+
# Tests for the diff-based _update_tags (fix for GitHub issue #6418)
1161+
#
1162+
# DynamoDB's TagResource / UntagResource are eventually consistent, so the
1163+
# old approach of "remove all → re-add all" could leave tables tag-less due
1164+
# to the async propagation gap. The new approach only touches tags that
1165+
# actually need to change.
1166+
# ---------------------------------------------------------------------------
1167+
1168+
1169+
def _make_mock_dynamodb_client(
1170+
current_tags: list[dict[str, str]],
1171+
table_arn: str = "arn:aws:dynamodb:us-west-2:123456789012:table/test_table",
1172+
):
1173+
"""Helper: create a pure MagicMock DynamoDB client for tag tests.
1174+
This avoids Moto's authentication issues with DynamoDB tag APIs.
1175+
"""
1176+
from unittest.mock import MagicMock
1177+
1178+
client = MagicMock()
1179+
# Mock describe_table
1180+
client.describe_table.return_value = {"Table": {"TableArn": table_arn}}
1181+
# Mock list_tags_of_resource
1182+
client.list_tags_of_resource.return_value = {"Tags": current_tags}
1183+
return client
1184+
1185+
1186+
def test_update_tags_no_change_makes_no_api_calls():
1187+
"""Idempotency: calling _update_tags with identical tags must not call
1188+
untag_resource or tag_resource (avoids unnecessary eventual-consistency exposure).
1189+
"""
1190+
table_name = f"{TABLE_NAME}_no_change"
1191+
initial_tags = [{"Key": "env", "Value": "prod"}, {"Key": "team", "Value": "ml"}]
1192+
client = _make_mock_dynamodb_client(current_tags=initial_tags)
1193+
1194+
DynamoDBOnlineStore._update_tags(client, table_name, initial_tags)
1195+
1196+
client.untag_resource.assert_not_called()
1197+
client.tag_resource.assert_not_called()
1198+
1199+
1200+
def test_update_tags_adds_new_tags_without_untag():
1201+
"""Adding a new tag key must call tag_resource but NOT untag_resource."""
1202+
table_name = f"{TABLE_NAME}_add_only"
1203+
existing = [{"Key": "env", "Value": "prod"}]
1204+
client = _make_mock_dynamodb_client(current_tags=existing)
1205+
1206+
new_tags = [{"Key": "env", "Value": "prod"}, {"Key": "team", "Value": "ml"}]
1207+
DynamoDBOnlineStore._update_tags(client, table_name, new_tags)
1208+
1209+
client.untag_resource.assert_not_called()
1210+
client.tag_resource.assert_called_once()
1211+
added = client.tag_resource.call_args[1]["Tags"]
1212+
assert added == [{"Key": "team", "Value": "ml"}]
1213+
1214+
1215+
def test_update_tags_removes_obsolete_tags_only():
1216+
"""Only truly obsolete tag keys (not in new_tags) must be passed to untag_resource."""
1217+
table_name = f"{TABLE_NAME}_remove_only"
1218+
existing = [{"Key": "env", "Value": "prod"}, {"Key": "old-key", "Value": "old"}]
1219+
client = _make_mock_dynamodb_client(current_tags=existing)
1220+
1221+
new_tags = [{"Key": "env", "Value": "prod"}]
1222+
DynamoDBOnlineStore._update_tags(client, table_name, new_tags)
1223+
1224+
client.untag_resource.assert_called_once()
1225+
removed_keys = client.untag_resource.call_args[1]["TagKeys"]
1226+
assert removed_keys == ["old-key"]
1227+
1228+
client.tag_resource.assert_not_called()
1229+
1230+
1231+
def test_update_tags_updates_changed_value():
1232+
"""A tag whose value changed must be passed to tag_resource (not untag_resource)."""
1233+
table_name = f"{TABLE_NAME}_change_value"
1234+
existing = [{"Key": "env", "Value": "staging"}]
1235+
client = _make_mock_dynamodb_client(current_tags=existing)
1236+
1237+
new_tags = [{"Key": "env", "Value": "prod"}]
1238+
1239+
DynamoDBOnlineStore._update_tags(client, table_name, new_tags)
1240+
1241+
client.untag_resource.assert_not_called()
1242+
client.tag_resource.assert_called_once()
1243+
updated = client.tag_resource.call_args[1]["Tags"]
1244+
assert updated == [{"Key": "env", "Value": "prod"}]
1245+
1246+
1247+
def test_update_tags_full_replace():
1248+
"""Replacing an entire tag set removes old keys and adds new ones correctly."""
1249+
table_name = f"{TABLE_NAME}_full_replace"
1250+
existing = [{"Key": "old1", "Value": "v1"}, {"Key": "old2", "Value": "v2"}]
1251+
client = _make_mock_dynamodb_client(current_tags=existing)
1252+
1253+
new_tags = [{"Key": "new1", "Value": "n1"}, {"Key": "new2", "Value": "n2"}]
1254+
1255+
DynamoDBOnlineStore._update_tags(client, table_name, new_tags)
1256+
1257+
client.untag_resource.assert_called_once()
1258+
removed = set(client.untag_resource.call_args[1]["TagKeys"])
1259+
assert removed == {"old1", "old2"}
1260+
1261+
client.tag_resource.assert_called_once()
1262+
added = {t["Key"] for t in client.tag_resource.call_args[1]["Tags"]}
1263+
assert added == {"new1", "new2"}
1264+
1265+
1266+
def test_update_tags_clear_all_tags():
1267+
"""Passing an empty new_tags list removes all existing tags without calling tag_resource."""
1268+
table_name = f"{TABLE_NAME}_clear_all"
1269+
existing = [{"Key": "env", "Value": "prod"}]
1270+
client = _make_mock_dynamodb_client(current_tags=existing)
1271+
1272+
DynamoDBOnlineStore._update_tags(client, table_name, [])
1273+
1274+
client.untag_resource.assert_called_once()
1275+
removed = client.untag_resource.call_args[1]["TagKeys"]
1276+
assert removed == ["env"]
1277+
1278+
client.tag_resource.assert_not_called()
1279+
1280+
1281+
def test_update_tags_no_existing_tags_no_new_tags():
1282+
"""Both old and new tag sets are empty: no API calls at all."""
1283+
table_name = f"{TABLE_NAME}_empty_both"
1284+
client = _make_mock_dynamodb_client(current_tags=[])
1285+
1286+
DynamoDBOnlineStore._update_tags(client, table_name, [])
1287+
1288+
client.untag_resource.assert_not_called()
1289+
client.tag_resource.assert_not_called()

0 commit comments

Comments
 (0)