66import re
77import threading
88import time
9- import traceback
109from collections import defaultdict
1110from concurrent .futures import ThreadPoolExecutor
1211from contextlib import contextmanager
6463 GetItemOutput ,
6564 GlobalTableAlreadyExistsException ,
6665 GlobalTableNotFoundException ,
66+ KinesisDataStreamDestination ,
6767 KinesisStreamingDestinationOutput ,
6868 ListGlobalTablesOutput ,
6969 ListTablesInputLimit ,
@@ -274,7 +274,12 @@ def forward_to_kinesis_stream(
274274 table_arn = arns .dynamodb_table_arn (table_name , account_id , region_name )
275275 records = table_records ["records" ]
276276 table_def = store .table_definitions .get (table_name ) or {}
277- stream_arn = table_def ["KinesisDataStreamDestinations" ][- 1 ]["StreamArn" ]
277+ destinations = store .streaming_destinations .get (table_name )
278+ if not destinations :
279+ LOG .debug ("Table %s has no Kinesis streaming destinations enabled" , table_name )
280+ continue
281+
282+ stream_arn = destinations [- 1 ]["StreamArn" ]
278283 for record in records :
279284 kinesis_record = dict (
280285 tableName = table_name ,
@@ -1665,34 +1670,32 @@ def enable_kinesis_streaming_destination(
16651670 if not stream :
16661671 raise ValidationException ("User does not have a permission to use kinesis stream" )
16671672
1668- table_def = get_store (context .account_id , context .region ).table_definitions .setdefault (
1669- table_name , {}
1670- )
1671-
1672- dest_status = table_def .get ("KinesisDataStreamDestinationStatus" )
1673- if dest_status not in ["DISABLED" , "ENABLE_FAILED" , None ]:
1674- raise ValidationException (
1675- "Table is not in a valid state to enable Kinesis Streaming "
1676- "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
1677- "to perform ENABLE operation."
1678- )
1673+ store = get_store (context .account_id , context .region )
1674+ streaming_destinations = store .streaming_destinations .get (table_name ) or []
16791675
1680- table_def .setdefault ("KinesisDataStreamDestinations" , [])
1676+ destinations = [d for d in streaming_destinations if d ["StreamArn" ] == stream_arn ]
1677+ if destinations :
1678+ status = destinations [0 ].get ("DestinationStatus" , None )
1679+ if status not in ["DISABLED" , "ENABLED_FAILED" , None ]:
1680+ raise ValidationException (
1681+ "Table is not in a valid state to enable Kinesis Streaming "
1682+ "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
1683+ "to perform ENABLE operation."
1684+ )
16811685
16821686 # remove the stream destination if already present
1683- table_def [ "KinesisDataStreamDestinations" ] = [
1684- t for t in table_def [ "KinesisDataStreamDestinations" ] if t ["StreamArn" ] != stream_arn
1687+ store . streaming_destinations [ table_name ] = [
1688+ _d for _d in streaming_destinations if _d ["StreamArn" ] != stream_arn
16851689 ]
16861690 # append the active stream destination at the end of the list
1687- table_def [ "KinesisDataStreamDestinations" ].append (
1688- {
1689- " DestinationStatus" : DestinationStatus .ACTIVE ,
1690- " DestinationStatusDescription" : "Stream is active" ,
1691- " StreamArn" : stream_arn ,
1692- " ApproximateCreationDateTimePrecision" : ApproximateCreationDateTimePrecision .MILLISECOND ,
1693- }
1691+ store . streaming_destinations [ table_name ].append (
1692+ KinesisDataStreamDestination (
1693+ DestinationStatus = DestinationStatus .ACTIVE ,
1694+ DestinationStatusDescription = "Stream is active" ,
1695+ StreamArn = stream_arn ,
1696+ ApproximateCreationDateTimePrecision = ApproximateCreationDateTimePrecision .MILLISECOND ,
1697+ )
16941698 )
1695- table_def ["KinesisDataStreamDestinationStatus" ] = DestinationStatus .ACTIVE
16961699 return KinesisStreamingDestinationOutput (
16971700 DestinationStatus = DestinationStatus .ENABLING ,
16981701 StreamArn = stream_arn ,
@@ -1715,34 +1718,25 @@ def disable_kinesis_streaming_destination(
17151718 error_message = f"Requested resource not found: Table: { table_name } not found" ,
17161719 )
17171720
1718- # TODO: Must raise if invoked before KinesisStreamingDestination is ACTIVE
1719-
17201721 stream = self ._event_forwarder .is_kinesis_stream_exists (stream_arn = stream_arn )
17211722 if not stream :
17221723 raise ValidationException (
17231724 "User does not have a permission to use kinesis stream" ,
17241725 )
17251726
1726- table_def = get_store (context .account_id , context .region ).table_definitions .setdefault (
1727- table_name , {}
1728- )
1729-
1730- stream_destinations = table_def .get ("KinesisDataStreamDestinations" )
1731- if stream_destinations :
1732- if table_def ["KinesisDataStreamDestinationStatus" ] == DestinationStatus .ACTIVE :
1733- for dest in stream_destinations :
1734- if (
1735- dest ["StreamArn" ] == stream_arn
1736- and dest ["DestinationStatus" ] == DestinationStatus .ACTIVE
1737- ):
1738- dest ["DestinationStatus" ] = DestinationStatus .DISABLED
1739- dest ["DestinationStatusDescription" ] = ("Stream is disabled" ,)
1740- table_def ["KinesisDataStreamDestinationStatus" ] = DestinationStatus .DISABLED
1741- return KinesisStreamingDestinationOutput (
1742- DestinationStatus = DestinationStatus .DISABLING ,
1743- StreamArn = stream_arn ,
1744- TableName = table_name ,
1745- )
1727+ store = get_store (context .account_id , context .region )
1728+ streaming_destinations = store .streaming_destinations .get (table_name ) or []
1729+
1730+ # Get the right destination based on the arn
1731+ destinations = [d for d in streaming_destinations if d ["StreamArn" ] == stream_arn ]
1732+ if destinations :
1733+ destinations [0 ]["DestinationStatus" ] = DestinationStatus .DISABLED
1734+ destinations [0 ]["DestinationStatusDescription" ] = "Stream is disabled"
1735+ return KinesisStreamingDestinationOutput (
1736+ DestinationStatus = DestinationStatus .DISABLING ,
1737+ StreamArn = stream_arn ,
1738+ TableName = table_name ,
1739+ )
17461740 raise ValidationException (
17471741 "Table is not in a valid state to disable Kinesis Streaming Destination:"
17481742 "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."
@@ -1753,12 +1747,9 @@ def describe_kinesis_streaming_destination(
17531747 ) -> DescribeKinesisStreamingDestinationOutput :
17541748 self .ensure_table_exists (context .account_id , context .region , table_name )
17551749
1756- table_def = (
1757- get_store (context .account_id , context .region ).table_definitions .get (table_name ) or {}
1758- )
1759-
1760- stream_destinations = table_def .get ("KinesisDataStreamDestinations" ) or []
1761- stream_destinations = copy .deepcopy (stream_destinations )
1750+ store = get_store (context .account_id , context .region )
1751+ table_destinations = store .streaming_destinations .get (table_name ) or []
1752+ stream_destinations = copy .deepcopy (table_destinations )
17621753
17631754 for destination in stream_destinations :
17641755 destination .pop ("ApproximateCreationDateTimePrecision" , None )
@@ -1799,31 +1790,28 @@ def update_kinesis_streaming_destination(
17991790 )
18001791
18011792 store = get_store (context .account_id , context .region )
1793+ table_destinations = store .streaming_destinations .get (table_name ) or []
18021794
1803- table_def = store .table_definitions .get (table_name ) or {}
1804- table_def .setdefault ("KinesisDataStreamDestinations" , [])
1805-
1806- table_id = table_def ["TableId" ]
1807-
1808- destination = None
1809- for stream in table_def ["KinesisDataStreamDestinations" ]:
1810- if stream ["StreamArn" ] == stream_arn :
1811- destination = stream
1812-
1813- if destination is None :
1795+ # filter the right destination based on the stream ARN
1796+ destinations = [d for d in table_destinations if d ["StreamArn" ] == stream_arn ]
1797+ if not destinations :
18141798 raise ValidationException (
18151799 "Table is not in a valid state to enable Kinesis Streaming Destination: "
18161800 f"No streaming destination with streamArn: { stream_arn } found for table with tableName: { table_name } "
18171801 )
18181802
1803+ destination = destinations [0 ]
1804+ table_def = store .table_definitions .get (table_name ) or {}
1805+ table_def .setdefault ("KinesisDataStreamDestinations" , [])
1806+
1807+ table_id = store .table_definitions .get (table_name , {}).get ("TableId" )
18191808 if (
18201809 existing_precision := destination ["ApproximateCreationDateTimePrecision" ]
18211810 ) == update_kinesis_streaming_configuration ["ApproximateCreationDateTimePrecision" ]:
18221811 raise ValidationException (
18231812 f"Invalid Request: Precision is already set to the desired value of { existing_precision } "
18241813 f"for tableId: { table_id } , kdsArn: { stream_arn } "
18251814 )
1826-
18271815 destination ["ApproximateCreationDateTimePrecision" ] = time_precision
18281816
18291817 return UpdateKinesisStreamingDestinationOutput (
@@ -2317,26 +2305,28 @@ def get_table_stream_type(
23172305 :return: a TableStreamViewType object if the table has streams enabled. If not, return None
23182306 """
23192307 if not table_name_or_arn :
2320- return
2308+ return None
23212309
23222310 table_name = table_name_or_arn .split (":table/" )[- 1 ]
23232311
23242312 is_kinesis = False
23252313 stream_view_type = None
23262314
2327- if table_definition := get_store (account_id , region_name ).table_definitions .get (table_name ):
2328- if table_definition .get ("KinesisDataStreamDestinationStatus" ) == "ACTIVE" :
2315+ # To determine if stream to kinesis is enabled, we look for active kinesis destinations
2316+ destinations = get_store (account_id , region_name ).streaming_destinations .get (table_name ) or []
2317+ for destination in destinations :
2318+ if destination ["DestinationStatus" ] == DestinationStatus .ACTIVE :
23292319 is_kinesis = True
23302320
23312321 table_arn = arns .dynamodb_table_arn (table_name , account_id = account_id , region_name = region_name )
2332-
23332322 if (
23342323 stream := dynamodbstreams_api .get_stream_for_table (account_id , region_name , table_arn )
23352324 ) and stream ["StreamStatus" ] in (StreamStatus .ENABLING , StreamStatus .ENABLED ):
23362325 stream_view_type = stream ["StreamViewType" ]
23372326
23382327 if is_kinesis or stream_view_type :
23392328 return TableStreamType (stream_view_type , is_kinesis = is_kinesis )
2329+ return None
23402330
23412331
23422332def get_updated_records (
@@ -2410,7 +2400,9 @@ def _add_record(item, comparison_set: ItemSet):
24102400 return {table_name : TableRecords (records = result , table_stream_type = table_stream_type )}
24112401
24122402
2413- def create_dynamodb_stream (account_id : str , region_name : str , data , latest_stream_label ):
2403+ def create_dynamodb_stream (
2404+ account_id : str , region_name : str , data : CreateTableInput , latest_stream_label : str | None
2405+ ) -> None :
24142406 stream = data ["StreamSpecification" ]
24152407 enabled = stream .get ("StreamEnabled" )
24162408
@@ -2428,22 +2420,6 @@ def create_dynamodb_stream(account_id: str, region_name: str, data, latest_strea
24282420 )
24292421
24302422
2431- def dynamodb_get_table_stream_specification (account_id : str , region_name : str , table_name : str ):
2432- try :
2433- table_schema = SchemaExtractor .get_table_schema (
2434- table_name , account_id = account_id , region_name = region_name
2435- )
2436- return table_schema ["Table" ].get ("StreamSpecification" )
2437- except Exception as e :
2438- LOG .info (
2439- "Unable to get stream specification for table %s: %s %s" ,
2440- table_name ,
2441- e ,
2442- traceback .format_exc (),
2443- )
2444- raise e
2445-
2446-
24472423def find_item_for_keys_values_in_batch (
24482424 table_name : str , item_keys : dict , batch : BatchGetResponseMap
24492425) -> AttributeMap | None :
0 commit comments