Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.

Commit 381be84

Browse files
committed
Move stream destinations in a separate field
1 parent 1f6de08 commit 381be84

3 files changed

Lines changed: 109 additions & 145 deletions

File tree

localstack-core/localstack/services/dynamodb/models.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
ContinuousBackupsDescription,
88
GlobalTableDescription,
99
Key,
10+
KinesisDataStreamDestination,
1011
RegionName,
1112
ReplicaDescription,
1213
StreamViewType,
@@ -124,6 +125,11 @@ class DynamoDBStore(BaseStore):
124125
# maps table names to cached table definitions
125126
table_definitions: dict[str, TableDescription] = LocalAttribute(default=dict)
126127

128+
# map table name to streaming destinations
129+
streaming_destinations: dict[str, list[KinesisDataStreamDestination]] = LocalAttribute(
130+
default=dict
131+
)
132+
127133
# maps table names to additional table properties that are not stored upstream (e.g., ReplicaUpdates)
128134
table_properties: dict[str, TableProperties] = LocalAttribute(default=dict)
129135

localstack-core/localstack/services/dynamodb/provider.py

Lines changed: 57 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import re
77
import threading
88
import time
9-
import traceback
109
from collections import defaultdict
1110
from concurrent.futures import ThreadPoolExecutor
1211
from contextlib import contextmanager
@@ -64,6 +63,7 @@
6463
GetItemOutput,
6564
GlobalTableAlreadyExistsException,
6665
GlobalTableNotFoundException,
66+
KinesisDataStreamDestination,
6767
KinesisStreamingDestinationOutput,
6868
ListGlobalTablesOutput,
6969
ListTablesInputLimit,
@@ -274,7 +274,8 @@ def forward_to_kinesis_stream(
274274
table_arn = arns.dynamodb_table_arn(table_name, account_id, region_name)
275275
records = table_records["records"]
276276
table_def = store.table_definitions.get(table_name) or {}
277-
stream_arn = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"]
277+
destinations = store.streaming_destinations.get(table_name) or []
278+
stream_arn = destinations[-1]["StreamArn"]
278279
for record in records:
279280
kinesis_record = dict(
280281
tableName=table_name,
@@ -1665,34 +1666,32 @@ def enable_kinesis_streaming_destination(
16651666
if not stream:
16661667
raise ValidationException("User does not have a permission to use kinesis stream")
16671668

1668-
table_def = get_store(context.account_id, context.region).table_definitions.setdefault(
1669-
table_name, {}
1670-
)
1671-
1672-
dest_status = table_def.get("KinesisDataStreamDestinationStatus")
1673-
if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:
1674-
raise ValidationException(
1675-
"Table is not in a valid state to enable Kinesis Streaming "
1676-
"Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
1677-
"to perform ENABLE operation."
1678-
)
1669+
store = get_store(context.account_id, context.region)
1670+
streaming_destinations = store.streaming_destinations.get(table_name) or []
16791671

1680-
table_def.setdefault("KinesisDataStreamDestinations", [])
1672+
destinations = [d for d in streaming_destinations if d["StreamArn"] == stream_arn]
1673+
if destinations:
1674+
status = destinations[0].get("DestinationStatus", None)
1675+
if status not in ["DISABLED", "ENABLED_FAILED", None]:
1676+
raise ValidationException(
1677+
"Table is not in a valid state to enable Kinesis Streaming "
1678+
"Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
1679+
"to perform ENABLE operation."
1680+
)
16811681

16821682
# remove the stream destination if already present
1683-
table_def["KinesisDataStreamDestinations"] = [
1684-
t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn
1683+
store.streaming_destinations[table_name] = [
1684+
_d for _d in streaming_destinations if _d["StreamArn"] != stream_arn
16851685
]
16861686
# append the active stream destination at the end of the list
1687-
table_def["KinesisDataStreamDestinations"].append(
1688-
{
1689-
"DestinationStatus": DestinationStatus.ACTIVE,
1690-
"DestinationStatusDescription": "Stream is active",
1691-
"StreamArn": stream_arn,
1692-
"ApproximateCreationDateTimePrecision": ApproximateCreationDateTimePrecision.MILLISECOND,
1693-
}
1687+
store.streaming_destinations[table_name].append(
1688+
KinesisDataStreamDestination(
1689+
DestinationStatus=DestinationStatus.ACTIVE,
1690+
DestinationStatusDescription="Stream is active",
1691+
StreamArn=stream_arn,
1692+
ApproximateCreationDateTimePrecision=ApproximateCreationDateTimePrecision.MILLISECOND,
1693+
)
16941694
)
1695-
table_def["KinesisDataStreamDestinationStatus"] = DestinationStatus.ACTIVE
16961695
return KinesisStreamingDestinationOutput(
16971696
DestinationStatus=DestinationStatus.ENABLING,
16981697
StreamArn=stream_arn,
@@ -1715,34 +1714,25 @@ def disable_kinesis_streaming_destination(
17151714
error_message=f"Requested resource not found: Table: {table_name} not found",
17161715
)
17171716

1718-
# TODO: Must raise if invoked before KinesisStreamingDestination is ACTIVE
1719-
17201717
stream = self._event_forwarder.is_kinesis_stream_exists(stream_arn=stream_arn)
17211718
if not stream:
17221719
raise ValidationException(
17231720
"User does not have a permission to use kinesis stream",
17241721
)
17251722

1726-
table_def = get_store(context.account_id, context.region).table_definitions.setdefault(
1727-
table_name, {}
1728-
)
1729-
1730-
stream_destinations = table_def.get("KinesisDataStreamDestinations")
1731-
if stream_destinations:
1732-
if table_def["KinesisDataStreamDestinationStatus"] == DestinationStatus.ACTIVE:
1733-
for dest in stream_destinations:
1734-
if (
1735-
dest["StreamArn"] == stream_arn
1736-
and dest["DestinationStatus"] == DestinationStatus.ACTIVE
1737-
):
1738-
dest["DestinationStatus"] = DestinationStatus.DISABLED
1739-
dest["DestinationStatusDescription"] = ("Stream is disabled",)
1740-
table_def["KinesisDataStreamDestinationStatus"] = DestinationStatus.DISABLED
1741-
return KinesisStreamingDestinationOutput(
1742-
DestinationStatus=DestinationStatus.DISABLING,
1743-
StreamArn=stream_arn,
1744-
TableName=table_name,
1745-
)
1723+
store = get_store(context.account_id, context.region)
1724+
streaming_destinations = store.streaming_destinations.get(table_name) or []
1725+
1726+
# Get the right destination based on the arn
1727+
destinations = [d for d in streaming_destinations if d["StreamArn"] == stream_arn]
1728+
if destinations:
1729+
destinations[0]["DestinationStatus"] = DestinationStatus.DISABLED
1730+
destinations[0]["DestinationStatusDescription"] = "Stream is disabled"
1731+
return KinesisStreamingDestinationOutput(
1732+
DestinationStatus=DestinationStatus.DISABLING,
1733+
StreamArn=stream_arn,
1734+
TableName=table_name,
1735+
)
17461736
raise ValidationException(
17471737
"Table is not in a valid state to disable Kinesis Streaming Destination:"
17481738
"DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."
@@ -1753,12 +1743,9 @@ def describe_kinesis_streaming_destination(
17531743
) -> DescribeKinesisStreamingDestinationOutput:
17541744
self.ensure_table_exists(context.account_id, context.region, table_name)
17551745

1756-
table_def = (
1757-
get_store(context.account_id, context.region).table_definitions.get(table_name) or {}
1758-
)
1759-
1760-
stream_destinations = table_def.get("KinesisDataStreamDestinations") or []
1761-
stream_destinations = copy.deepcopy(stream_destinations)
1746+
store = get_store(context.account_id, context.region)
1747+
table_destinations = store.streaming_destinations.get(table_name) or []
1748+
stream_destinations = copy.deepcopy(table_destinations)
17621749

17631750
for destination in stream_destinations:
17641751
destination.pop("ApproximateCreationDateTimePrecision", None)
@@ -1799,31 +1786,28 @@ def update_kinesis_streaming_destination(
17991786
)
18001787

18011788
store = get_store(context.account_id, context.region)
1789+
table_destinations = store.streaming_destinations.get(table_name) or []
18021790

1803-
table_def = store.table_definitions.get(table_name) or {}
1804-
table_def.setdefault("KinesisDataStreamDestinations", [])
1805-
1806-
table_id = table_def["TableId"]
1807-
1808-
destination = None
1809-
for stream in table_def["KinesisDataStreamDestinations"]:
1810-
if stream["StreamArn"] == stream_arn:
1811-
destination = stream
1812-
1813-
if destination is None:
1791+
# filter the right destination based on the stream ARN
1792+
destinations = [d for d in table_destinations if d["StreamArn"] == stream_arn]
1793+
if not destinations:
18141794
raise ValidationException(
18151795
"Table is not in a valid state to enable Kinesis Streaming Destination: "
18161796
f"No streaming destination with streamArn: {stream_arn} found for table with tableName: {table_name}"
18171797
)
18181798

1799+
destination = destinations[0]
1800+
table_def = store.table_definitions.get(table_name) or {}
1801+
table_def.setdefault("KinesisDataStreamDestinations", [])
1802+
1803+
table_id = store.table_definitions.get(table_name, {}).get("TableId")
18191804
if (
18201805
existing_precision := destination["ApproximateCreationDateTimePrecision"]
18211806
) == update_kinesis_streaming_configuration["ApproximateCreationDateTimePrecision"]:
18221807
raise ValidationException(
18231808
f"Invalid Request: Precision is already set to the desired value of {existing_precision} "
18241809
f"for tableId: {table_id}, kdsArn: {stream_arn}"
18251810
)
1826-
18271811
destination["ApproximateCreationDateTimePrecision"] = time_precision
18281812

18291813
return UpdateKinesisStreamingDestinationOutput(
@@ -2317,26 +2301,28 @@ def get_table_stream_type(
23172301
:return: a TableStreamViewType object if the table has streams enabled. If not, return None
23182302
"""
23192303
if not table_name_or_arn:
2320-
return
2304+
return None
23212305

23222306
table_name = table_name_or_arn.split(":table/")[-1]
23232307

23242308
is_kinesis = False
23252309
stream_view_type = None
23262310

2327-
if table_definition := get_store(account_id, region_name).table_definitions.get(table_name):
2328-
if table_definition.get("KinesisDataStreamDestinationStatus") == "ACTIVE":
2311+
# To determine if stream to kinesis is enabled, we look for active kinesis destinations
2312+
destinations = get_store(account_id, region_name).streaming_destinations.get(table_name) or []
2313+
for destination in destinations:
2314+
if destination["DestinationStatus"] == DestinationStatus.ACTIVE:
23292315
is_kinesis = True
23302316

23312317
table_arn = arns.dynamodb_table_arn(table_name, account_id=account_id, region_name=region_name)
2332-
23332318
if (
23342319
stream := dynamodbstreams_api.get_stream_for_table(account_id, region_name, table_arn)
23352320
) and stream["StreamStatus"] in (StreamStatus.ENABLING, StreamStatus.ENABLED):
23362321
stream_view_type = stream["StreamViewType"]
23372322

23382323
if is_kinesis or stream_view_type:
23392324
return TableStreamType(stream_view_type, is_kinesis=is_kinesis)
2325+
return None
23402326

23412327

23422328
def get_updated_records(
@@ -2410,7 +2396,9 @@ def _add_record(item, comparison_set: ItemSet):
24102396
return {table_name: TableRecords(records=result, table_stream_type=table_stream_type)}
24112397

24122398

2413-
def create_dynamodb_stream(account_id: str, region_name: str, data, latest_stream_label):
2399+
def create_dynamodb_stream(
2400+
account_id: str, region_name: str, data: CreateTableInput, latest_stream_label: str | None
2401+
) -> None:
24142402
stream = data["StreamSpecification"]
24152403
enabled = stream.get("StreamEnabled")
24162404

@@ -2428,22 +2416,6 @@ def create_dynamodb_stream(account_id: str, region_name: str, data, latest_strea
24282416
)
24292417

24302418

2431-
def dynamodb_get_table_stream_specification(account_id: str, region_name: str, table_name: str):
2432-
try:
2433-
table_schema = SchemaExtractor.get_table_schema(
2434-
table_name, account_id=account_id, region_name=region_name
2435-
)
2436-
return table_schema["Table"].get("StreamSpecification")
2437-
except Exception as e:
2438-
LOG.info(
2439-
"Unable to get stream specification for table %s: %s %s",
2440-
table_name,
2441-
e,
2442-
traceback.format_exc(),
2443-
)
2444-
raise e
2445-
2446-
24472419
def find_item_for_keys_values_in_batch(
24482420
table_name: str, item_keys: dict, batch: BatchGetResponseMap
24492421
) -> AttributeMap | None:

0 commit comments

Comments
 (0)