Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.

Commit 396f80e

Browse files
authored
ASF: implement CBOR parser and serializer (#13103)
1 parent f04ea14 commit 396f80e

9 files changed

Lines changed: 743 additions & 44 deletions

File tree

localstack-core/localstack/aws/protocol/parser.py

Lines changed: 288 additions & 17 deletions
Large diffs are not rendered by default.

localstack-core/localstack/aws/protocol/serializer.py

Lines changed: 352 additions & 19 deletions
Large diffs are not rendered by default.

tests/aws/services/kinesis/test_kinesis.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from localstack.testing.aws.util import is_aws_cloud
2525
from localstack.testing.pytest import markers
2626
from localstack.utils.aws import resources
27+
from localstack.utils.aws.arns import kinesis_stream_arn
2728
from localstack.utils.common import retry, select_attributes, short_uid
2829
from localstack.utils.files import load_file
2930
from localstack.utils.kinesis import kinesis_connector
@@ -723,6 +724,36 @@ def _get_record():
723724
record = retry(_get_record, sleep=1, retries=5)
724725
assert record["Data"].decode("utf-8") == test_data
725726

727+
@markers.aws.validated
728+
@markers.snapshot.skip_snapshot_verify(
729+
# error message is wrong in Kinesis (returns the full ARN)
730+
paths=["$..message"],
731+
)
732+
def test_cbor_exceptions(
733+
self,
734+
kinesis_create_stream,
735+
wait_for_stream_ready,
736+
aws_client,
737+
kinesis_http_client,
738+
region_name,
739+
account_id,
740+
snapshot,
741+
):
742+
fake_name = "wrong-stream-name"
743+
fake_stream_arn = kinesis_stream_arn(
744+
account_id=account_id, region_name=region_name, stream_name=fake_name
745+
)
746+
describe_response_raw = kinesis_http_client.post_raw(
747+
operation="DescribeStream",
748+
payload={"StreamARN": fake_stream_arn},
749+
)
750+
assert describe_response_raw.status_code == 400
751+
cbor_content = describe_response_raw.content
752+
describe_response_data = cbor2_loads(cbor_content)
753+
snapshot.match("cbor-error", describe_response_data)
754+
assert describe_response_data["__type"] == "ResourceNotFoundException"
755+
# TODO: add manual assertion on CBOR body?
756+
726757

727758
class TestKinesisJavaSDK:
728759
# the lambda function is stored in the lambda common functions folder to re-use existing caching in CI

tests/aws/services/kinesis/test_kinesis.snapshot.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,5 +226,14 @@
226226
}
227227
]
228228
}
229+
},
230+
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_cbor_exceptions": {
231+
"recorded-date": "04-09-2025, 16:59:26",
232+
"recorded-content": {
233+
"cbor-error": {
234+
"__type": "ResourceNotFoundException",
235+
"message": "Stream wrong-stream-name under account 111111111111 not found."
236+
}
237+
}
229238
}
230239
}

tests/aws/services/kinesis/test_kinesis.validation.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@
55
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_cbor_blob_handling": {
66
"last_validated_date": "2024-07-31T11:17:28+00:00"
77
},
8+
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_cbor_exceptions": {
9+
"last_validated_date": "2025-09-04T16:59:26+00:00",
10+
"durations_in_seconds": {
11+
"setup": 0.5,
12+
"call": 0.47,
13+
"teardown": 0.0,
14+
"total": 0.97
15+
}
16+
},
817
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_create_stream_without_shard_count": {
918
"last_validated_date": "2022-08-26T07:30:59+00:00"
1019
},

tests/unit/aws/protocol/test_parser.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from botocore.serialize import create_serializer
88

99
from localstack.aws.protocol.parser import (
10+
CBORRequestParser,
1011
OperationNotFoundParserError,
1112
ProtocolParserError,
1213
QueryRequestParser,
@@ -626,7 +627,10 @@ def test_json_parser_cognito_with_botocore():
626627
)
627628

628629

629-
def test_json_cbor_blob_parsing():
630+
# TODO: once Kinesis supports multi protocols (json/cbor), update this test to select the protocol instead when
631+
# creating the parser
632+
@pytest.mark.parametrize("parser_factory", [CBORRequestParser, create_parser])
633+
def test_json_cbor_blob_parsing(parser_factory):
630634
serialized_request = {
631635
"url_path": "/",
632636
"query_string": "",
@@ -655,7 +659,7 @@ def test_json_cbor_blob_parsing():
655659
# Load the appropriate service
656660
service = load_service("kinesis")
657661
operation_model = service.operation_model("PutRecord")
658-
parser = create_parser(service)
662+
parser = parser_factory(service)
659663
parsed_operation_model, parsed_request = parser.parse(
660664
HttpRequest(
661665
method=serialized_request.get("method") or "GET",
@@ -678,7 +682,10 @@ def test_json_cbor_blob_parsing():
678682
assert parsed_request["PartitionKey"] == "partitionkey"
679683

680684

681-
def test_json_cbor_blob_parsing_w_timestamp(snapshot):
685+
# TODO: once Kinesis supports multi protocols (json/cbor), update this test to select the protocol instead when
686+
# creating the parser
687+
@pytest.mark.parametrize("parser_factory", [CBORRequestParser, create_parser])
688+
def test_json_cbor_blob_parsing_w_timestamp(snapshot, parser_factory):
682689
serialized_request = {
683690
"url_path": "/",
684691
"query_string": "",
@@ -707,7 +714,7 @@ def test_json_cbor_blob_parsing_w_timestamp(snapshot):
707714
# Load the appropriate service
708715
service = load_service("kinesis")
709716
operation_model = service.operation_model("SubscribeToShard")
710-
parser = create_parser(service)
717+
parser = parser_factory(service)
711718
parsed_operation_model, parsed_request = parser.parse(
712719
HttpRequest(
713720
method=serialized_request.get("method"),
@@ -721,6 +728,7 @@ def test_json_cbor_blob_parsing_w_timestamp(snapshot):
721728

722729
# Check if the determined operation_model is correct
723730
assert parsed_operation_model == operation_model
731+
assert isinstance(parsed_request["StartingPosition"]["Timestamp"], datetime)
724732
snapshot.match("parsed_request", parsed_request)
725733

726734

tests/unit/aws/protocol/test_parser.snapshot.json

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,19 @@
11
{
2-
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp": {
3-
"recorded-date": "21-06-2024, 13:58:29",
2+
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp[CBORRequestParser]": {
3+
"recorded-date": "04-09-2025, 18:32:21",
4+
"recorded-content": {
5+
"parsed_request": {
6+
"ConsumerARN": "<test-consumer-arn>",
7+
"ShardId": "<test-shard-id>",
8+
"StartingPosition": {
9+
"Timestamp": "2024-06-21 08:54:08.123000",
10+
"Type": "AT_TIMESTAMP"
11+
}
12+
}
13+
}
14+
},
15+
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp[create_parser]": {
16+
"recorded-date": "04-09-2025, 18:32:21",
417
"recorded-content": {
518
"parsed_request": {
619
"ConsumerARN": "<test-consumer-arn>",
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,23 @@
11
{
22
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp": {
33
"last_validated_date": "2024-06-21T13:58:29+00:00"
4+
},
5+
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp[CBORRequestParser]": {
6+
"last_validated_date": "2025-09-04T18:32:21+00:00",
7+
"durations_in_seconds": {
8+
"setup": 0.0,
9+
"call": 0.01,
10+
"teardown": 0.0,
11+
"total": 0.01
12+
}
13+
},
14+
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp[create_parser]": {
15+
"last_validated_date": "2025-09-04T18:32:21+00:00",
16+
"durations_in_seconds": {
17+
"setup": 0.0,
18+
"call": 0.0,
19+
"teardown": 0.0,
20+
"total": 0.0
21+
}
422
}
523
}

tests/unit/aws/protocol/test_serializer.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
)
3939
from localstack.aws.api.sts import Credentials, GetSessionTokenResponse
4040
from localstack.aws.protocol.serializer import (
41+
CBORResponseSerializer,
4142
ProtocolSerializerError,
4243
QueryResponseSerializer,
4344
UnknownSerializerError,
@@ -248,6 +249,10 @@ def _botocore_event_streaming_test(
248249
assert actual_events == expected_events
249250

250251

252+
def _cbor_serializer_factory(service):
253+
return CBORResponseSerializer()
254+
255+
251256
def test_rest_xml_serializer_cloudfront_with_botocore():
252257
parameters = {
253258
"TestResult": {
@@ -1844,9 +1849,11 @@ def test_query_protocol_json_serialization(headers_dict):
18441849
"headers_dict",
18451850
[{"Content-Type": "application/cbor"}, {"Accept": "application/cbor"}],
18461851
)
1847-
def test_json_protocol_cbor_serialization(headers_dict):
1852+
@pytest.mark.parametrize("serializer_factory", [create_serializer, _cbor_serializer_factory])
1853+
def test_json_protocol_cbor_serialization(headers_dict, serializer_factory):
1854+
# TODO: test datetime serialization format for Kinesis manually
18481855
service = load_service("kinesis")
1849-
response_serializer = create_serializer(service)
1856+
response_serializer = serializer_factory(service)
18501857
headers = Headers(headers_dict)
18511858
response_data = GetRecordsOutput(
18521859
Records=[

0 commit comments

Comments
 (0)