Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.

Commit 58d61f3

Browse files
authored
DynamoDB: refactor the store for proper type annotations (#13256)
1 parent 4f9792f commit 58d61f3

6 files changed

Lines changed: 194 additions & 186 deletions

File tree

localstack-core/localstack/services/dynamodb/models.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@
33

44
from localstack.aws.api.dynamodb import (
55
AttributeMap,
6+
BackupDetails,
7+
ContinuousBackupsDescription,
8+
GlobalTableDescription,
69
Key,
10+
KinesisDataStreamDestination,
711
RegionName,
812
ReplicaDescription,
913
StreamViewType,
14+
TableDescription,
1015
TableName,
1116
TimeToLiveSpecification,
1217
)
@@ -91,9 +96,20 @@ class TableRecords(TypedDict):
9196
RecordsMap = dict[TableName, TableRecords]
9297

9398

99+
class TableProperties(TypedDict, total=False):
100+
ContinuousBackupsDescription: ContinuousBackupsDescription
101+
102+
103+
@dataclasses.dataclass
104+
class Backup:
105+
details: BackupDetails
106+
backup_file: str
107+
table_name: str
108+
109+
94110
class DynamoDBStore(BaseStore):
95111
# maps global table names to configurations (for the legacy v.2017 tables)
96-
GLOBAL_TABLES: dict[str, dict] = CrossRegionAttribute(default=dict)
112+
GLOBAL_TABLES: dict[str, GlobalTableDescription] = CrossRegionAttribute(default=dict)
97113

98114
# Maps table name to the region they exist in on DDBLocal (for v.2019 global tables)
99115
TABLE_REGION: dict[TableName, RegionName] = CrossRegionAttribute(default=dict)
@@ -104,19 +120,24 @@ class DynamoDBStore(BaseStore):
104120
)
105121

106122
# cache table taggings - maps table ARN to tags dict
107-
TABLE_TAGS: dict[str, dict] = CrossRegionAttribute(default=dict)
123+
TABLE_TAGS: dict[str, dict[str, str]] = CrossRegionAttribute(default=dict)
108124

109125
# maps table names to cached table definitions
110-
table_definitions: dict[str, dict] = LocalAttribute(default=dict)
126+
table_definitions: dict[str, TableDescription] = LocalAttribute(default=dict)
127+
128+
# map table name to streaming destinations
129+
streaming_destinations: dict[str, list[KinesisDataStreamDestination]] = LocalAttribute(
130+
default=dict
131+
)
111132

112133
# maps table names to additional table properties that are not stored upstream (e.g., ReplicaUpdates)
113-
table_properties: dict[str, dict] = LocalAttribute(default=dict)
134+
table_properties: dict[str, TableProperties] = LocalAttribute(default=dict)
114135

115136
# maps table names to TTL specifications
116137
ttl_specifications: dict[str, TimeToLiveSpecification] = LocalAttribute(default=dict)
117138

118139
# maps backups
119-
backups: dict[str, dict] = LocalAttribute(default=dict)
140+
backups: dict[str, Backup] = LocalAttribute(default=dict)
120141

121142

122143
dynamodb_stores = AccountRegionBundle("dynamodb", DynamoDBStore)

localstack-core/localstack/services/dynamodb/provider.py

Lines changed: 61 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import re
77
import threading
88
import time
9-
import traceback
109
from collections import defaultdict
1110
from concurrent.futures import ThreadPoolExecutor
1211
from contextlib import contextmanager
@@ -64,6 +63,7 @@
6463
GetItemOutput,
6564
GlobalTableAlreadyExistsException,
6665
GlobalTableNotFoundException,
66+
KinesisDataStreamDestination,
6767
KinesisStreamingDestinationOutput,
6868
ListGlobalTablesOutput,
6969
ListTablesInputLimit,
@@ -274,7 +274,12 @@ def forward_to_kinesis_stream(
274274
table_arn = arns.dynamodb_table_arn(table_name, account_id, region_name)
275275
records = table_records["records"]
276276
table_def = store.table_definitions.get(table_name) or {}
277-
stream_arn = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"]
277+
destinations = store.streaming_destinations.get(table_name)
278+
if not destinations:
279+
LOG.debug("Table %s has no Kinesis streaming destinations enabled", table_name)
280+
continue
281+
282+
stream_arn = destinations[-1]["StreamArn"]
278283
for record in records:
279284
kinesis_record = dict(
280285
tableName=table_name,
@@ -1665,34 +1670,32 @@ def enable_kinesis_streaming_destination(
16651670
if not stream:
16661671
raise ValidationException("User does not have a permission to use kinesis stream")
16671672

1668-
table_def = get_store(context.account_id, context.region).table_definitions.setdefault(
1669-
table_name, {}
1670-
)
1671-
1672-
dest_status = table_def.get("KinesisDataStreamDestinationStatus")
1673-
if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:
1674-
raise ValidationException(
1675-
"Table is not in a valid state to enable Kinesis Streaming "
1676-
"Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
1677-
"to perform ENABLE operation."
1678-
)
1673+
store = get_store(context.account_id, context.region)
1674+
streaming_destinations = store.streaming_destinations.get(table_name) or []
16791675

1680-
table_def.setdefault("KinesisDataStreamDestinations", [])
1676+
destinations = [d for d in streaming_destinations if d["StreamArn"] == stream_arn]
1677+
if destinations:
1678+
status = destinations[0].get("DestinationStatus", None)
1679+
if status not in ["DISABLED", "ENABLED_FAILED", None]:
1680+
raise ValidationException(
1681+
"Table is not in a valid state to enable Kinesis Streaming "
1682+
"Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
1683+
"to perform ENABLE operation."
1684+
)
16811685

16821686
# remove the stream destination if already present
1683-
table_def["KinesisDataStreamDestinations"] = [
1684-
t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn
1687+
store.streaming_destinations[table_name] = [
1688+
_d for _d in streaming_destinations if _d["StreamArn"] != stream_arn
16851689
]
16861690
# append the active stream destination at the end of the list
1687-
table_def["KinesisDataStreamDestinations"].append(
1688-
{
1689-
"DestinationStatus": DestinationStatus.ACTIVE,
1690-
"DestinationStatusDescription": "Stream is active",
1691-
"StreamArn": stream_arn,
1692-
"ApproximateCreationDateTimePrecision": ApproximateCreationDateTimePrecision.MILLISECOND,
1693-
}
1691+
store.streaming_destinations[table_name].append(
1692+
KinesisDataStreamDestination(
1693+
DestinationStatus=DestinationStatus.ACTIVE,
1694+
DestinationStatusDescription="Stream is active",
1695+
StreamArn=stream_arn,
1696+
ApproximateCreationDateTimePrecision=ApproximateCreationDateTimePrecision.MILLISECOND,
1697+
)
16941698
)
1695-
table_def["KinesisDataStreamDestinationStatus"] = DestinationStatus.ACTIVE
16961699
return KinesisStreamingDestinationOutput(
16971700
DestinationStatus=DestinationStatus.ENABLING,
16981701
StreamArn=stream_arn,
@@ -1715,34 +1718,25 @@ def disable_kinesis_streaming_destination(
17151718
error_message=f"Requested resource not found: Table: {table_name} not found",
17161719
)
17171720

1718-
# TODO: Must raise if invoked before KinesisStreamingDestination is ACTIVE
1719-
17201721
stream = self._event_forwarder.is_kinesis_stream_exists(stream_arn=stream_arn)
17211722
if not stream:
17221723
raise ValidationException(
17231724
"User does not have a permission to use kinesis stream",
17241725
)
17251726

1726-
table_def = get_store(context.account_id, context.region).table_definitions.setdefault(
1727-
table_name, {}
1728-
)
1729-
1730-
stream_destinations = table_def.get("KinesisDataStreamDestinations")
1731-
if stream_destinations:
1732-
if table_def["KinesisDataStreamDestinationStatus"] == DestinationStatus.ACTIVE:
1733-
for dest in stream_destinations:
1734-
if (
1735-
dest["StreamArn"] == stream_arn
1736-
and dest["DestinationStatus"] == DestinationStatus.ACTIVE
1737-
):
1738-
dest["DestinationStatus"] = DestinationStatus.DISABLED
1739-
dest["DestinationStatusDescription"] = ("Stream is disabled",)
1740-
table_def["KinesisDataStreamDestinationStatus"] = DestinationStatus.DISABLED
1741-
return KinesisStreamingDestinationOutput(
1742-
DestinationStatus=DestinationStatus.DISABLING,
1743-
StreamArn=stream_arn,
1744-
TableName=table_name,
1745-
)
1727+
store = get_store(context.account_id, context.region)
1728+
streaming_destinations = store.streaming_destinations.get(table_name) or []
1729+
1730+
# Get the right destination based on the arn
1731+
destinations = [d for d in streaming_destinations if d["StreamArn"] == stream_arn]
1732+
if destinations:
1733+
destinations[0]["DestinationStatus"] = DestinationStatus.DISABLED
1734+
destinations[0]["DestinationStatusDescription"] = "Stream is disabled"
1735+
return KinesisStreamingDestinationOutput(
1736+
DestinationStatus=DestinationStatus.DISABLING,
1737+
StreamArn=stream_arn,
1738+
TableName=table_name,
1739+
)
17461740
raise ValidationException(
17471741
"Table is not in a valid state to disable Kinesis Streaming Destination:"
17481742
"DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."
@@ -1753,12 +1747,9 @@ def describe_kinesis_streaming_destination(
17531747
) -> DescribeKinesisStreamingDestinationOutput:
17541748
self.ensure_table_exists(context.account_id, context.region, table_name)
17551749

1756-
table_def = (
1757-
get_store(context.account_id, context.region).table_definitions.get(table_name) or {}
1758-
)
1759-
1760-
stream_destinations = table_def.get("KinesisDataStreamDestinations") or []
1761-
stream_destinations = copy.deepcopy(stream_destinations)
1750+
store = get_store(context.account_id, context.region)
1751+
table_destinations = store.streaming_destinations.get(table_name) or []
1752+
stream_destinations = copy.deepcopy(table_destinations)
17621753

17631754
for destination in stream_destinations:
17641755
destination.pop("ApproximateCreationDateTimePrecision", None)
@@ -1799,31 +1790,28 @@ def update_kinesis_streaming_destination(
17991790
)
18001791

18011792
store = get_store(context.account_id, context.region)
1793+
table_destinations = store.streaming_destinations.get(table_name) or []
18021794

1803-
table_def = store.table_definitions.get(table_name) or {}
1804-
table_def.setdefault("KinesisDataStreamDestinations", [])
1805-
1806-
table_id = table_def["TableId"]
1807-
1808-
destination = None
1809-
for stream in table_def["KinesisDataStreamDestinations"]:
1810-
if stream["StreamArn"] == stream_arn:
1811-
destination = stream
1812-
1813-
if destination is None:
1795+
# filter the right destination based on the stream ARN
1796+
destinations = [d for d in table_destinations if d["StreamArn"] == stream_arn]
1797+
if not destinations:
18141798
raise ValidationException(
18151799
"Table is not in a valid state to enable Kinesis Streaming Destination: "
18161800
f"No streaming destination with streamArn: {stream_arn} found for table with tableName: {table_name}"
18171801
)
18181802

1803+
destination = destinations[0]
1804+
table_def = store.table_definitions.get(table_name) or {}
1805+
table_def.setdefault("KinesisDataStreamDestinations", [])
1806+
1807+
table_id = store.table_definitions.get(table_name, {}).get("TableId")
18191808
if (
18201809
existing_precision := destination["ApproximateCreationDateTimePrecision"]
18211810
) == update_kinesis_streaming_configuration["ApproximateCreationDateTimePrecision"]:
18221811
raise ValidationException(
18231812
f"Invalid Request: Precision is already set to the desired value of {existing_precision} "
18241813
f"for tableId: {table_id}, kdsArn: {stream_arn}"
18251814
)
1826-
18271815
destination["ApproximateCreationDateTimePrecision"] = time_precision
18281816

18291817
return UpdateKinesisStreamingDestinationOutput(
@@ -2317,26 +2305,28 @@ def get_table_stream_type(
23172305
:return: a TableStreamViewType object if the table has streams enabled. If not, return None
23182306
"""
23192307
if not table_name_or_arn:
2320-
return
2308+
return None
23212309

23222310
table_name = table_name_or_arn.split(":table/")[-1]
23232311

23242312
is_kinesis = False
23252313
stream_view_type = None
23262314

2327-
if table_definition := get_store(account_id, region_name).table_definitions.get(table_name):
2328-
if table_definition.get("KinesisDataStreamDestinationStatus") == "ACTIVE":
2315+
# To determine if stream to kinesis is enabled, we look for active kinesis destinations
2316+
destinations = get_store(account_id, region_name).streaming_destinations.get(table_name) or []
2317+
for destination in destinations:
2318+
if destination["DestinationStatus"] == DestinationStatus.ACTIVE:
23292319
is_kinesis = True
23302320

23312321
table_arn = arns.dynamodb_table_arn(table_name, account_id=account_id, region_name=region_name)
2332-
23332322
if (
23342323
stream := dynamodbstreams_api.get_stream_for_table(account_id, region_name, table_arn)
23352324
) and stream["StreamStatus"] in (StreamStatus.ENABLING, StreamStatus.ENABLED):
23362325
stream_view_type = stream["StreamViewType"]
23372326

23382327
if is_kinesis or stream_view_type:
23392328
return TableStreamType(stream_view_type, is_kinesis=is_kinesis)
2329+
return None
23402330

23412331

23422332
def get_updated_records(
@@ -2410,7 +2400,9 @@ def _add_record(item, comparison_set: ItemSet):
24102400
return {table_name: TableRecords(records=result, table_stream_type=table_stream_type)}
24112401

24122402

2413-
def create_dynamodb_stream(account_id: str, region_name: str, data, latest_stream_label):
2403+
def create_dynamodb_stream(
2404+
account_id: str, region_name: str, data: CreateTableInput, latest_stream_label: str | None
2405+
) -> None:
24142406
stream = data["StreamSpecification"]
24152407
enabled = stream.get("StreamEnabled")
24162408

@@ -2428,22 +2420,6 @@ def create_dynamodb_stream(account_id: str, region_name: str, data, latest_strea
24282420
)
24292421

24302422

2431-
def dynamodb_get_table_stream_specification(account_id: str, region_name: str, table_name: str):
2432-
try:
2433-
table_schema = SchemaExtractor.get_table_schema(
2434-
table_name, account_id=account_id, region_name=region_name
2435-
)
2436-
return table_schema["Table"].get("StreamSpecification")
2437-
except Exception as e:
2438-
LOG.info(
2439-
"Unable to get stream specification for table %s: %s %s",
2440-
table_name,
2441-
e,
2442-
traceback.format_exc(),
2443-
)
2444-
raise e
2445-
2446-
24472423
def find_item_for_keys_values_in_batch(
24482424
table_name: str, item_keys: dict, batch: BatchGetResponseMap
24492425
) -> AttributeMap | None:

0 commit comments

Comments
 (0)