66import re
77import threading
88import time
9- import traceback
109from collections import defaultdict
1110from concurrent .futures import ThreadPoolExecutor
1211from contextlib import contextmanager
6463 GetItemOutput ,
6564 GlobalTableAlreadyExistsException ,
6665 GlobalTableNotFoundException ,
66+ KinesisDataStreamDestination ,
6767 KinesisStreamingDestinationOutput ,
6868 ListGlobalTablesOutput ,
6969 ListTablesInputLimit ,
@@ -274,7 +274,8 @@ def forward_to_kinesis_stream(
274274 table_arn = arns .dynamodb_table_arn (table_name , account_id , region_name )
275275 records = table_records ["records" ]
276276 table_def = store .table_definitions .get (table_name ) or {}
277- stream_arn = table_def ["KinesisDataStreamDestinations" ][- 1 ]["StreamArn" ]
277+ destinations = store .streaming_destinations .get (table_name ) or []
278+ stream_arn = destinations [- 1 ]["StreamArn" ]
278279 for record in records :
279280 kinesis_record = dict (
280281 tableName = table_name ,
@@ -1665,34 +1666,32 @@ def enable_kinesis_streaming_destination(
16651666 if not stream :
16661667 raise ValidationException ("User does not have a permission to use kinesis stream" )
16671668
1668- table_def = get_store (context .account_id , context .region ).table_definitions .setdefault (
1669- table_name , {}
1670- )
1671-
1672- dest_status = table_def .get ("KinesisDataStreamDestinationStatus" )
1673- if dest_status not in ["DISABLED" , "ENABLE_FAILED" , None ]:
1674- raise ValidationException (
1675- "Table is not in a valid state to enable Kinesis Streaming "
1676- "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
1677- "to perform ENABLE operation."
1678- )
1669+ store = get_store (context .account_id , context .region )
1670+ streaming_destinations = store .streaming_destinations .get (table_name ) or []
16791671
1680- table_def .setdefault ("KinesisDataStreamDestinations" , [])
1672+ destinations = [d for d in streaming_destinations if d ["StreamArn" ] == stream_arn ]
1673+ if destinations :
1674+ status = destinations [0 ].get ("DestinationStatus" , None )
1675+ if status not in ["DISABLED" , "ENABLED_FAILED" , None ]:
1676+ raise ValidationException (
1677+ "Table is not in a valid state to enable Kinesis Streaming "
1678+ "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "
1679+ "to perform ENABLE operation."
1680+ )
16811681
16821682 # remove the stream destination if already present
1683- table_def [ "KinesisDataStreamDestinations" ] = [
1684- t for t in table_def [ "KinesisDataStreamDestinations" ] if t ["StreamArn" ] != stream_arn
1683+ store . streaming_destinations [ table_name ] = [
1684+ _d for _d in streaming_destinations if _d ["StreamArn" ] != stream_arn
16851685 ]
16861686 # append the active stream destination at the end of the list
1687- table_def [ "KinesisDataStreamDestinations" ].append (
1688- {
1689- " DestinationStatus" : DestinationStatus .ACTIVE ,
1690- " DestinationStatusDescription" : "Stream is active" ,
1691- " StreamArn" : stream_arn ,
1692- " ApproximateCreationDateTimePrecision" : ApproximateCreationDateTimePrecision .MILLISECOND ,
1693- }
1687+ store . streaming_destinations [ table_name ].append (
1688+ KinesisDataStreamDestination (
1689+ DestinationStatus = DestinationStatus .ACTIVE ,
1690+ DestinationStatusDescription = "Stream is active" ,
1691+ StreamArn = stream_arn ,
1692+ ApproximateCreationDateTimePrecision = ApproximateCreationDateTimePrecision .MILLISECOND ,
1693+ )
16941694 )
1695- table_def ["KinesisDataStreamDestinationStatus" ] = DestinationStatus .ACTIVE
16961695 return KinesisStreamingDestinationOutput (
16971696 DestinationStatus = DestinationStatus .ENABLING ,
16981697 StreamArn = stream_arn ,
@@ -1715,34 +1714,25 @@ def disable_kinesis_streaming_destination(
17151714 error_message = f"Requested resource not found: Table: { table_name } not found" ,
17161715 )
17171716
1718- # TODO: Must raise if invoked before KinesisStreamingDestination is ACTIVE
1719-
17201717 stream = self ._event_forwarder .is_kinesis_stream_exists (stream_arn = stream_arn )
17211718 if not stream :
17221719 raise ValidationException (
17231720 "User does not have a permission to use kinesis stream" ,
17241721 )
17251722
1726- table_def = get_store (context .account_id , context .region ).table_definitions .setdefault (
1727- table_name , {}
1728- )
1729-
1730- stream_destinations = table_def .get ("KinesisDataStreamDestinations" )
1731- if stream_destinations :
1732- if table_def ["KinesisDataStreamDestinationStatus" ] == DestinationStatus .ACTIVE :
1733- for dest in stream_destinations :
1734- if (
1735- dest ["StreamArn" ] == stream_arn
1736- and dest ["DestinationStatus" ] == DestinationStatus .ACTIVE
1737- ):
1738- dest ["DestinationStatus" ] = DestinationStatus .DISABLED
1739- dest ["DestinationStatusDescription" ] = ("Stream is disabled" ,)
1740- table_def ["KinesisDataStreamDestinationStatus" ] = DestinationStatus .DISABLED
1741- return KinesisStreamingDestinationOutput (
1742- DestinationStatus = DestinationStatus .DISABLING ,
1743- StreamArn = stream_arn ,
1744- TableName = table_name ,
1745- )
1723+ store = get_store (context .account_id , context .region )
1724+ streaming_destinations = store .streaming_destinations .get (table_name ) or []
1725+
1726+ # Get the right destination based on the arn
1727+ destinations = [d for d in streaming_destinations if d ["StreamArn" ] == stream_arn ]
1728+ if destinations :
1729+ destinations [0 ]["DestinationStatus" ] = DestinationStatus .DISABLED
1730+ destinations [0 ]["DestinationStatusDescription" ] = "Stream is disabled"
1731+ return KinesisStreamingDestinationOutput (
1732+ DestinationStatus = DestinationStatus .DISABLING ,
1733+ StreamArn = stream_arn ,
1734+ TableName = table_name ,
1735+ )
17461736 raise ValidationException (
17471737 "Table is not in a valid state to disable Kinesis Streaming Destination:"
17481738 "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."
@@ -1753,12 +1743,9 @@ def describe_kinesis_streaming_destination(
17531743 ) -> DescribeKinesisStreamingDestinationOutput :
17541744 self .ensure_table_exists (context .account_id , context .region , table_name )
17551745
1756- table_def = (
1757- get_store (context .account_id , context .region ).table_definitions .get (table_name ) or {}
1758- )
1759-
1760- stream_destinations = table_def .get ("KinesisDataStreamDestinations" ) or []
1761- stream_destinations = copy .deepcopy (stream_destinations )
1746+ store = get_store (context .account_id , context .region )
1747+ table_destinations = store .streaming_destinations .get (table_name ) or []
1748+ stream_destinations = copy .deepcopy (table_destinations )
17621749
17631750 for destination in stream_destinations :
17641751 destination .pop ("ApproximateCreationDateTimePrecision" , None )
@@ -1799,31 +1786,28 @@ def update_kinesis_streaming_destination(
17991786 )
18001787
18011788 store = get_store (context .account_id , context .region )
1789+ table_destinations = store .streaming_destinations .get (table_name ) or []
18021790
1803- table_def = store .table_definitions .get (table_name ) or {}
1804- table_def .setdefault ("KinesisDataStreamDestinations" , [])
1805-
1806- table_id = table_def ["TableId" ]
1807-
1808- destination = None
1809- for stream in table_def ["KinesisDataStreamDestinations" ]:
1810- if stream ["StreamArn" ] == stream_arn :
1811- destination = stream
1812-
1813- if destination is None :
1791+ # filter the right destination based on the stream ARN
1792+ destinations = [d for d in table_destinations if d ["StreamArn" ] == stream_arn ]
1793+ if not destinations :
18141794 raise ValidationException (
18151795 "Table is not in a valid state to enable Kinesis Streaming Destination: "
18161796 f"No streaming destination with streamArn: { stream_arn } found for table with tableName: { table_name } "
18171797 )
18181798
1799+ destination = destinations [0 ]
1800+ table_def = store .table_definitions .get (table_name ) or {}
1801+ table_def .setdefault ("KinesisDataStreamDestinations" , [])
1802+
1803+ table_id = store .table_definitions .get (table_name , {}).get ("TableId" )
18191804 if (
18201805 existing_precision := destination ["ApproximateCreationDateTimePrecision" ]
18211806 ) == update_kinesis_streaming_configuration ["ApproximateCreationDateTimePrecision" ]:
18221807 raise ValidationException (
18231808 f"Invalid Request: Precision is already set to the desired value of { existing_precision } "
18241809 f"for tableId: { table_id } , kdsArn: { stream_arn } "
18251810 )
1826-
18271811 destination ["ApproximateCreationDateTimePrecision" ] = time_precision
18281812
18291813 return UpdateKinesisStreamingDestinationOutput (
@@ -2317,26 +2301,28 @@ def get_table_stream_type(
23172301 :return: a TableStreamViewType object if the table has streams enabled. If not, return None
23182302 """
23192303 if not table_name_or_arn :
2320- return
2304+ return None
23212305
23222306 table_name = table_name_or_arn .split (":table/" )[- 1 ]
23232307
23242308 is_kinesis = False
23252309 stream_view_type = None
23262310
2327- if table_definition := get_store (account_id , region_name ).table_definitions .get (table_name ):
2328- if table_definition .get ("KinesisDataStreamDestinationStatus" ) == "ACTIVE" :
2311+ # To determine if stream to kinesis is enabled, we look for active kinesis destinations
2312+ destinations = get_store (account_id , region_name ).streaming_destinations .get (table_name ) or []
2313+ for destination in destinations :
2314+ if destination ["DestinationStatus" ] == DestinationStatus .ACTIVE :
23292315 is_kinesis = True
23302316
23312317 table_arn = arns .dynamodb_table_arn (table_name , account_id = account_id , region_name = region_name )
2332-
23332318 if (
23342319 stream := dynamodbstreams_api .get_stream_for_table (account_id , region_name , table_arn )
23352320 ) and stream ["StreamStatus" ] in (StreamStatus .ENABLING , StreamStatus .ENABLED ):
23362321 stream_view_type = stream ["StreamViewType" ]
23372322
23382323 if is_kinesis or stream_view_type :
23392324 return TableStreamType (stream_view_type , is_kinesis = is_kinesis )
2325+ return None
23402326
23412327
23422328def get_updated_records (
@@ -2410,7 +2396,9 @@ def _add_record(item, comparison_set: ItemSet):
24102396 return {table_name : TableRecords (records = result , table_stream_type = table_stream_type )}
24112397
24122398
2413- def create_dynamodb_stream (account_id : str , region_name : str , data , latest_stream_label ):
2399+ def create_dynamodb_stream (
2400+ account_id : str , region_name : str , data : CreateTableInput , latest_stream_label : str | None
2401+ ) -> None :
24142402 stream = data ["StreamSpecification" ]
24152403 enabled = stream .get ("StreamEnabled" )
24162404
@@ -2428,22 +2416,6 @@ def create_dynamodb_stream(account_id: str, region_name: str, data, latest_strea
24282416 )
24292417
24302418
2431- def dynamodb_get_table_stream_specification (account_id : str , region_name : str , table_name : str ):
2432- try :
2433- table_schema = SchemaExtractor .get_table_schema (
2434- table_name , account_id = account_id , region_name = region_name
2435- )
2436- return table_schema ["Table" ].get ("StreamSpecification" )
2437- except Exception as e :
2438- LOG .info (
2439- "Unable to get stream specification for table %s: %s %s" ,
2440- table_name ,
2441- e ,
2442- traceback .format_exc (),
2443- )
2444- raise e
2445-
2446-
24472419def find_item_for_keys_values_in_batch (
24482420 table_name : str , item_keys : dict , batch : BatchGetResponseMap
24492421) -> AttributeMap | None :
0 commit comments