Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions localstack-core/localstack/services/dynamodb/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@

from localstack.aws.api.dynamodb import (
AttributeMap,
BackupDetails,
ContinuousBackupsDescription,
GlobalTableDescription,
Key,
KinesisDataStreamDestination,
RegionName,
ReplicaDescription,
StreamViewType,
TableDescription,
TableName,
TimeToLiveSpecification,
)
Expand Down Expand Up @@ -91,9 +96,20 @@ class TableRecords(TypedDict):
RecordsMap = dict[TableName, TableRecords]


class TableProperties(TypedDict, total=False):
ContinuousBackupsDescription: ContinuousBackupsDescription


@dataclasses.dataclass
class Backup:
details: BackupDetails
backup_file: str
table_name: str


class DynamoDBStore(BaseStore):
# maps global table names to configurations (for the legacy v.2017 tables)
GLOBAL_TABLES: dict[str, dict] = CrossRegionAttribute(default=dict)
GLOBAL_TABLES: dict[str, GlobalTableDescription] = CrossRegionAttribute(default=dict)

# Maps table name to the region they exist in on DDBLocal (for v.2019 global tables)
TABLE_REGION: dict[TableName, RegionName] = CrossRegionAttribute(default=dict)
Expand All @@ -104,19 +120,24 @@ class DynamoDBStore(BaseStore):
)

# cache table taggings - maps table ARN to tags dict
TABLE_TAGS: dict[str, dict] = CrossRegionAttribute(default=dict)
TABLE_TAGS: dict[str, dict[str, str]] = CrossRegionAttribute(default=dict)

# maps table names to cached table definitions
table_definitions: dict[str, dict] = LocalAttribute(default=dict)
table_definitions: dict[str, TableDescription] = LocalAttribute(default=dict)

# map table name to streaming destinations
streaming_destinations: dict[str, list[KinesisDataStreamDestination]] = LocalAttribute(
default=dict
)

# maps table names to additional table properties that are not stored upstream (e.g., ReplicaUpdates)
table_properties: dict[str, dict] = LocalAttribute(default=dict)
table_properties: dict[str, TableProperties] = LocalAttribute(default=dict)

# maps table names to TTL specifications
ttl_specifications: dict[str, TimeToLiveSpecification] = LocalAttribute(default=dict)

# maps backups
backups: dict[str, dict] = LocalAttribute(default=dict)
backups: dict[str, Backup] = LocalAttribute(default=dict)


dynamodb_stores = AccountRegionBundle("dynamodb", DynamoDBStore)
146 changes: 61 additions & 85 deletions localstack-core/localstack/services/dynamodb/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import re
import threading
import time
import traceback
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
Expand Down Expand Up @@ -64,6 +63,7 @@
GetItemOutput,
GlobalTableAlreadyExistsException,
GlobalTableNotFoundException,
KinesisDataStreamDestination,
KinesisStreamingDestinationOutput,
ListGlobalTablesOutput,
ListTablesInputLimit,
Expand Down Expand Up @@ -274,7 +274,12 @@ def forward_to_kinesis_stream(
table_arn = arns.dynamodb_table_arn(table_name, account_id, region_name)
records = table_records["records"]
table_def = store.table_definitions.get(table_name) or {}
stream_arn = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"]
destinations = store.streaming_destinations.get(table_name)
if not destinations:
LOG.debug("Table %s has no Kinesis streaming destinations enabled", table_name)
continue

stream_arn = destinations[-1]["StreamArn"]
for record in records:
kinesis_record = dict(
tableName=table_name,
Expand Down Expand Up @@ -1665,34 +1670,32 @@ def enable_kinesis_streaming_destination(
if not stream:
raise ValidationException("User does not have a permission to use kinesis stream")

table_def = get_store(context.account_id, context.region).table_definitions.setdefault(
table_name, {}
)

dest_status = table_def.get("KinesisDataStreamDestinationStatus")
if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:
raise ValidationException(
"Table is not in a valid state to enable Kinesis Streaming "
"Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
"to perform ENABLE operation."
)
store = get_store(context.account_id, context.region)
streaming_destinations = store.streaming_destinations.get(table_name) or []

table_def.setdefault("KinesisDataStreamDestinations", [])
destinations = [d for d in streaming_destinations if d["StreamArn"] == stream_arn]
if destinations:
status = destinations[0].get("DestinationStatus", None)
if status not in ["DISABLED", "ENABLED_FAILED", None]:
raise ValidationException(
"Table is not in a valid state to enable Kinesis Streaming "
"Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
"to perform ENABLE operation."
)

# remove the stream destination if already present
table_def["KinesisDataStreamDestinations"] = [
t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn
store.streaming_destinations[table_name] = [
_d for _d in streaming_destinations if _d["StreamArn"] != stream_arn
]
# append the active stream destination at the end of the list
table_def["KinesisDataStreamDestinations"].append(
{
"DestinationStatus": DestinationStatus.ACTIVE,
"DestinationStatusDescription": "Stream is active",
"StreamArn": stream_arn,
"ApproximateCreationDateTimePrecision": ApproximateCreationDateTimePrecision.MILLISECOND,
}
store.streaming_destinations[table_name].append(
KinesisDataStreamDestination(
DestinationStatus=DestinationStatus.ACTIVE,
DestinationStatusDescription="Stream is active",
StreamArn=stream_arn,
ApproximateCreationDateTimePrecision=ApproximateCreationDateTimePrecision.MILLISECOND,
)
)
table_def["KinesisDataStreamDestinationStatus"] = DestinationStatus.ACTIVE
return KinesisStreamingDestinationOutput(
DestinationStatus=DestinationStatus.ENABLING,
StreamArn=stream_arn,
Expand All @@ -1715,34 +1718,25 @@ def disable_kinesis_streaming_destination(
error_message=f"Requested resource not found: Table: {table_name} not found",
)

# TODO: Must raise if invoked before KinesisStreamingDestination is ACTIVE

stream = self._event_forwarder.is_kinesis_stream_exists(stream_arn=stream_arn)
if not stream:
raise ValidationException(
"User does not have a permission to use kinesis stream",
)

table_def = get_store(context.account_id, context.region).table_definitions.setdefault(
table_name, {}
)

stream_destinations = table_def.get("KinesisDataStreamDestinations")
if stream_destinations:
if table_def["KinesisDataStreamDestinationStatus"] == DestinationStatus.ACTIVE:
for dest in stream_destinations:
if (
dest["StreamArn"] == stream_arn
and dest["DestinationStatus"] == DestinationStatus.ACTIVE
):
dest["DestinationStatus"] = DestinationStatus.DISABLED
dest["DestinationStatusDescription"] = ("Stream is disabled",)
table_def["KinesisDataStreamDestinationStatus"] = DestinationStatus.DISABLED
return KinesisStreamingDestinationOutput(
DestinationStatus=DestinationStatus.DISABLING,
StreamArn=stream_arn,
TableName=table_name,
)
store = get_store(context.account_id, context.region)
streaming_destinations = store.streaming_destinations.get(table_name) or []

# Get the right destination based on the arn
destinations = [d for d in streaming_destinations if d["StreamArn"] == stream_arn]
if destinations:
destinations[0]["DestinationStatus"] = DestinationStatus.DISABLED
destinations[0]["DestinationStatusDescription"] = "Stream is disabled"
return KinesisStreamingDestinationOutput(
DestinationStatus=DestinationStatus.DISABLING,
StreamArn=stream_arn,
TableName=table_name,
)
raise ValidationException(
"Table is not in a valid state to disable Kinesis Streaming Destination:"
"DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."
Expand All @@ -1753,12 +1747,9 @@ def describe_kinesis_streaming_destination(
) -> DescribeKinesisStreamingDestinationOutput:
self.ensure_table_exists(context.account_id, context.region, table_name)

table_def = (
get_store(context.account_id, context.region).table_definitions.get(table_name) or {}
)

stream_destinations = table_def.get("KinesisDataStreamDestinations") or []
stream_destinations = copy.deepcopy(stream_destinations)
store = get_store(context.account_id, context.region)
table_destinations = store.streaming_destinations.get(table_name) or []
stream_destinations = copy.deepcopy(table_destinations)

for destination in stream_destinations:
destination.pop("ApproximateCreationDateTimePrecision", None)
Expand Down Expand Up @@ -1799,31 +1790,28 @@ def update_kinesis_streaming_destination(
)

store = get_store(context.account_id, context.region)
table_destinations = store.streaming_destinations.get(table_name) or []

table_def = store.table_definitions.get(table_name) or {}
table_def.setdefault("KinesisDataStreamDestinations", [])

table_id = table_def["TableId"]

destination = None
for stream in table_def["KinesisDataStreamDestinations"]:
if stream["StreamArn"] == stream_arn:
destination = stream

if destination is None:
# filter the right destination based on the stream ARN
destinations = [d for d in table_destinations if d["StreamArn"] == stream_arn]
if not destinations:
raise ValidationException(
"Table is not in a valid state to enable Kinesis Streaming Destination: "
f"No streaming destination with streamArn: {stream_arn} found for table with tableName: {table_name}"
)

destination = destinations[0]
table_def = store.table_definitions.get(table_name) or {}
table_def.setdefault("KinesisDataStreamDestinations", [])

table_id = store.table_definitions.get(table_name, {}).get("TableId")
if (
existing_precision := destination["ApproximateCreationDateTimePrecision"]
) == update_kinesis_streaming_configuration["ApproximateCreationDateTimePrecision"]:
raise ValidationException(
f"Invalid Request: Precision is already set to the desired value of {existing_precision} "
f"for tableId: {table_id}, kdsArn: {stream_arn}"
)

destination["ApproximateCreationDateTimePrecision"] = time_precision

return UpdateKinesisStreamingDestinationOutput(
Expand Down Expand Up @@ -2317,26 +2305,28 @@ def get_table_stream_type(
:return: a TableStreamViewType object if the table has streams enabled. If not, return None
"""
if not table_name_or_arn:
return
return None

table_name = table_name_or_arn.split(":table/")[-1]

is_kinesis = False
stream_view_type = None

if table_definition := get_store(account_id, region_name).table_definitions.get(table_name):
if table_definition.get("KinesisDataStreamDestinationStatus") == "ACTIVE":
# To determine if stream to kinesis is enabled, we look for active kinesis destinations
destinations = get_store(account_id, region_name).streaming_destinations.get(table_name) or []
for destination in destinations:
if destination["DestinationStatus"] == DestinationStatus.ACTIVE:
is_kinesis = True

table_arn = arns.dynamodb_table_arn(table_name, account_id=account_id, region_name=region_name)

if (
stream := dynamodbstreams_api.get_stream_for_table(account_id, region_name, table_arn)
) and stream["StreamStatus"] in (StreamStatus.ENABLING, StreamStatus.ENABLED):
stream_view_type = stream["StreamViewType"]

if is_kinesis or stream_view_type:
return TableStreamType(stream_view_type, is_kinesis=is_kinesis)
return None


def get_updated_records(
Expand Down Expand Up @@ -2410,7 +2400,9 @@ def _add_record(item, comparison_set: ItemSet):
return {table_name: TableRecords(records=result, table_stream_type=table_stream_type)}


def create_dynamodb_stream(account_id: str, region_name: str, data, latest_stream_label):
def create_dynamodb_stream(
account_id: str, region_name: str, data: CreateTableInput, latest_stream_label: str | None
) -> None:
stream = data["StreamSpecification"]
enabled = stream.get("StreamEnabled")

Expand All @@ -2428,22 +2420,6 @@ def create_dynamodb_stream(account_id: str, region_name: str, data, latest_strea
)


def dynamodb_get_table_stream_specification(account_id: str, region_name: str, table_name: str):
try:
table_schema = SchemaExtractor.get_table_schema(
table_name, account_id=account_id, region_name=region_name
)
return table_schema["Table"].get("StreamSpecification")
except Exception as e:
LOG.info(
"Unable to get stream specification for table %s: %s %s",
table_name,
e,
traceback.format_exc(),
)
raise e


def find_item_for_keys_values_in_batch(
table_name: str, item_keys: dict, batch: BatchGetResponseMap
) -> AttributeMap | None:
Expand Down
Loading
Loading