|
6 | 6 |
|
7 | 7 | from localstack import config |
8 | 8 | from localstack.aws.api import RequestContext |
9 | | -from localstack.aws.api.dynamodbstreams import StreamStatus, StreamViewType, TableName |
| 9 | +from localstack.aws.api.dynamodbstreams import ( |
| 10 | + StreamDescription, |
| 11 | + StreamStatus, |
| 12 | + StreamViewType, |
| 13 | + TableName, |
| 14 | +) |
10 | 15 | from localstack.aws.connect import connect_to |
11 | 16 | from localstack.services.dynamodb.v2.provider import DynamoDBProvider |
12 | | -from localstack.services.dynamodbstreams.models import DynamoDbStreamsStore, dynamodbstreams_stores |
| 17 | +from localstack.services.dynamodbstreams.models import ( |
| 18 | + DynamoDbStreamsStore, |
| 19 | + Stream, |
| 20 | + dynamodbstreams_stores, |
| 21 | +) |
13 | 22 | from localstack.utils.aws import arns, resources |
14 | 23 | from localstack.utils.common import now_utc |
15 | 24 | from localstack.utils.threads import FuncThread |
@@ -65,28 +74,32 @@ def add_dynamodb_stream( |
65 | 74 | stream_name=stream_name, |
66 | 75 | ) |
67 | 76 | latest_stream_label = latest_stream_label or "latest" |
68 | | - stream = { |
69 | | - "StreamArn": arns.dynamodb_stream_arn( |
70 | | - table_name=table_name, |
71 | | - latest_stream_label=latest_stream_label, |
72 | | - account_id=account_id, |
73 | | - region_name=region_name, |
74 | | - ), |
75 | | - "TableName": table_name, |
76 | | - "StreamLabel": latest_stream_label, |
77 | | - "StreamStatus": StreamStatus.ENABLING, |
78 | | - "KeySchema": [], |
79 | | - "Shards": [], |
80 | | - "StreamViewType": view_type, |
81 | | - "shards_id_map": {}, |
82 | | - } |
83 | | - store.ddb_streams[table_name] = stream |
84 | | - |
85 | | - |
86 | | -def get_stream_for_table(account_id: str, region_name: str, table_arn: str) -> dict: |
| 77 | + stream_arn = arns.dynamodb_stream_arn( |
| 78 | + table_name=table_name, |
| 79 | + latest_stream_label=latest_stream_label, |
| 80 | + account_id=account_id, |
| 81 | + region_name=region_name, |
| 82 | + ) |
| 83 | + stream = StreamDescription( |
| 84 | + TableName=table_name, |
| 85 | + StreamArn=stream_arn, |
| 86 | + StreamLabel=latest_stream_label, |
| 87 | + StreamStatus=StreamStatus.ENABLING, |
| 88 | + KeySchema=[], |
| 89 | + Shards=[], |
| 90 | + StreamViewType=view_type, |
| 91 | + ) |
| 92 | + store.ddb_streams[table_name] = Stream(StreamDescription=stream) |
| 93 | + |
| 94 | + |
| 95 | +def get_stream_for_table( |
| 96 | + account_id: str, region_name: str, table_arn: str |
| 97 | +) -> StreamDescription | None: |
87 | 98 | store = get_dynamodbstreams_store(account_id, region_name) |
88 | 99 | table_name = table_name_from_stream_arn(table_arn) |
89 | | - return store.ddb_streams.get(table_name) |
| 100 | + if stream := store.ddb_streams.get(table_name): |
| 101 | + return stream.StreamDescription |
| 102 | + return None |
90 | 103 |
|
91 | 104 |
|
92 | 105 | def _process_forwarded_records( |
@@ -206,11 +219,11 @@ def kinesis_shard_id(dynamodbstream_shard_id: str) -> str: |
206 | 219 | return f"{shard_params[0]}-{shard_params[-1]}" |
207 | 220 |
|
208 | 221 |
|
209 | | -def get_shard_id(stream: dict, kinesis_shard_id: str) -> str: |
210 | | - ddb_stream_shard_id = stream.get("shards_id_map", {}).get(kinesis_shard_id) |
| 222 | +def get_shard_id(stream: Stream, kinesis_shard_id: str) -> str: |
| 223 | + ddb_stream_shard_id = stream.shards_id_map.get(kinesis_shard_id) |
211 | 224 | if not ddb_stream_shard_id: |
212 | 225 | ddb_stream_shard_id = shard_id(kinesis_shard_id) |
213 | | - stream["shards_id_map"][kinesis_shard_id] = ddb_stream_shard_id |
| 226 | + stream.shards_id_map[kinesis_shard_id] = ddb_stream_shard_id |
214 | 227 |
|
215 | 228 | return ddb_stream_shard_id |
216 | 229 |
|
|
0 commit comments