Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 288 additions & 17 deletions localstack-core/localstack/aws/protocol/parser.py
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Nice! This will hopefully also help us in the future to remove the dependency on and patching of cbor2 🤩

Large diffs are not rendered by default.

371 changes: 352 additions & 19 deletions localstack-core/localstack/aws/protocol/serializer.py

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions tests/aws/services/kinesis/test_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from localstack.testing.aws.util import is_aws_cloud
from localstack.testing.pytest import markers
from localstack.utils.aws import resources
from localstack.utils.aws.arns import kinesis_stream_arn
from localstack.utils.common import retry, select_attributes, short_uid
from localstack.utils.files import load_file
from localstack.utils.kinesis import kinesis_connector
Expand Down Expand Up @@ -723,6 +724,36 @@ def _get_record():
record = retry(_get_record, sleep=1, retries=5)
assert record["Data"].decode("utf-8") == test_data

@markers.aws.validated
@markers.snapshot.skip_snapshot_verify(
# error message is wrong in Kinesis (returns the full ARN)
paths=["$..message"],
)
def test_cbor_exceptions(
self,
kinesis_create_stream,
wait_for_stream_ready,
aws_client,
kinesis_http_client,
region_name,
account_id,
snapshot,
):
fake_name = "wrong-stream-name"
fake_stream_arn = kinesis_stream_arn(
account_id=account_id, region_name=region_name, stream_name=fake_name
)
describe_response_raw = kinesis_http_client.post_raw(
operation="DescribeStream",
payload={"StreamARN": fake_stream_arn},
)
assert describe_response_raw.status_code == 400
cbor_content = describe_response_raw.content
describe_response_data = cbor2_loads(cbor_content)
snapshot.match("cbor-error", describe_response_data)
assert describe_response_data["__type"] == "ResourceNotFoundException"
# TODO: add manual assertion on CBOR body?


class TestKinesisJavaSDK:
# the lambda function is stored in the lambda common functions folder to re-use existing caching in CI
Expand Down
9 changes: 9 additions & 0 deletions tests/aws/services/kinesis/test_kinesis.snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -226,5 +226,14 @@
}
]
}
},
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_cbor_exceptions": {
"recorded-date": "04-09-2025, 16:59:26",
"recorded-content": {
"cbor-error": {
"__type": "ResourceNotFoundException",
"message": "Stream wrong-stream-name under account 111111111111 not found."
}
}
}
}
9 changes: 9 additions & 0 deletions tests/aws/services/kinesis/test_kinesis.validation.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_cbor_blob_handling": {
"last_validated_date": "2024-07-31T11:17:28+00:00"
},
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_cbor_exceptions": {
"last_validated_date": "2025-09-04T16:59:26+00:00",
"durations_in_seconds": {
"setup": 0.5,
"call": 0.47,
"teardown": 0.0,
"total": 0.97
}
},
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_create_stream_without_shard_count": {
"last_validated_date": "2022-08-26T07:30:59+00:00"
},
Expand Down
16 changes: 12 additions & 4 deletions tests/unit/aws/protocol/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from botocore.serialize import create_serializer

from localstack.aws.protocol.parser import (
CBORRequestParser,
OperationNotFoundParserError,
ProtocolParserError,
QueryRequestParser,
Expand Down Expand Up @@ -626,7 +627,10 @@ def test_json_parser_cognito_with_botocore():
)


def test_json_cbor_blob_parsing():
# TODO: once Kinesis supports multi protocols (json/cbor), update this test to select the protocol instead when
# creating the parser
@pytest.mark.parametrize("parser_factory", [CBORRequestParser, create_parser])
def test_json_cbor_blob_parsing(parser_factory):
serialized_request = {
"url_path": "/",
"query_string": "",
Expand Down Expand Up @@ -655,7 +659,7 @@ def test_json_cbor_blob_parsing():
# Load the appropriate service
service = load_service("kinesis")
operation_model = service.operation_model("PutRecord")
parser = create_parser(service)
parser = parser_factory(service)
parsed_operation_model, parsed_request = parser.parse(
HttpRequest(
method=serialized_request.get("method") or "GET",
Expand All @@ -678,7 +682,10 @@ def test_json_cbor_blob_parsing():
assert parsed_request["PartitionKey"] == "partitionkey"


def test_json_cbor_blob_parsing_w_timestamp(snapshot):
# TODO: once Kinesis supports multi protocols (json/cbor), update this test to select the protocol instead when
# creating the parser
@pytest.mark.parametrize("parser_factory", [CBORRequestParser, create_parser])
def test_json_cbor_blob_parsing_w_timestamp(snapshot, parser_factory):
serialized_request = {
"url_path": "/",
"query_string": "",
Expand Down Expand Up @@ -707,7 +714,7 @@ def test_json_cbor_blob_parsing_w_timestamp(snapshot):
# Load the appropriate service
service = load_service("kinesis")
operation_model = service.operation_model("SubscribeToShard")
parser = create_parser(service)
parser = parser_factory(service)
parsed_operation_model, parsed_request = parser.parse(
HttpRequest(
method=serialized_request.get("method"),
Expand All @@ -721,6 +728,7 @@ def test_json_cbor_blob_parsing_w_timestamp(snapshot):

# Check if the determined operation_model is correct
assert parsed_operation_model == operation_model
assert isinstance(parsed_request["StartingPosition"]["Timestamp"], datetime)
snapshot.match("parsed_request", parsed_request)


Expand Down
17 changes: 15 additions & 2 deletions tests/unit/aws/protocol/test_parser.snapshot.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
{
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp": {
"recorded-date": "21-06-2024, 13:58:29",
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp[CBORRequestParser]": {
"recorded-date": "04-09-2025, 18:32:21",
"recorded-content": {
"parsed_request": {
"ConsumerARN": "<test-consumer-arn>",
"ShardId": "<test-shard-id>",
"StartingPosition": {
"Timestamp": "2024-06-21 08:54:08.123000",
"Type": "AT_TIMESTAMP"
}
}
}
},
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp[create_parser]": {
"recorded-date": "04-09-2025, 18:32:21",
"recorded-content": {
"parsed_request": {
"ConsumerARN": "<test-consumer-arn>",
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/aws/protocol/test_parser.validation.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
{
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp": {
"last_validated_date": "2024-06-21T13:58:29+00:00"
},
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp[CBORRequestParser]": {
"last_validated_date": "2025-09-04T18:32:21+00:00",
"durations_in_seconds": {
"setup": 0.0,
"call": 0.01,
"teardown": 0.0,
"total": 0.01
}
},
"tests/unit/aws/protocol/test_parser.py::test_json_cbor_blob_parsing_w_timestamp[create_parser]": {
"last_validated_date": "2025-09-04T18:32:21+00:00",
"durations_in_seconds": {
"setup": 0.0,
"call": 0.0,
"teardown": 0.0,
"total": 0.0
}
}
}
11 changes: 9 additions & 2 deletions tests/unit/aws/protocol/test_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
)
from localstack.aws.api.sts import Credentials, GetSessionTokenResponse
from localstack.aws.protocol.serializer import (
CBORResponseSerializer,
ProtocolSerializerError,
QueryResponseSerializer,
UnknownSerializerError,
Expand Down Expand Up @@ -248,6 +249,10 @@ def _botocore_event_streaming_test(
assert actual_events == expected_events


def _cbor_serializer_factory(service):
return CBORResponseSerializer()


def test_rest_xml_serializer_cloudfront_with_botocore():
parameters = {
"TestResult": {
Expand Down Expand Up @@ -1844,9 +1849,11 @@ def test_query_protocol_json_serialization(headers_dict):
"headers_dict",
[{"Content-Type": "application/cbor"}, {"Accept": "application/cbor"}],
)
def test_json_protocol_cbor_serialization(headers_dict):
@pytest.mark.parametrize("serializer_factory", [create_serializer, _cbor_serializer_factory])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Nice way to test the serialization for both ways! 💯

def test_json_protocol_cbor_serialization(headers_dict, serializer_factory):
# TODO: test datetime serialization format for Kinesis manually
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why does this have to be tested manually? Can we maybe extend this test to use an operation which has a datetime in the spec?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think because right now, Botocore is unable to parse/serialize CBOR data for Kinesis, so I would need to write a manual test somehow to verify what AWS is returning "raw" and compare it. Sorry, when I meant manually, I meant we need to verify against AWS the raw response and put it in a unit test, not "fully manual"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context: Kinesis' CBOR is a different wire format than Smithy RPC v2 CBOR, and you're accurate in botocore not supporting it. The AWS SDK for Java has a working implementation to reference if you want to pursue it.

Copy link
Copy Markdown
Contributor Author

@bentsku bentsku Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I think we will park Kinesis CBOR support as its "own" protocol for now, as we rely on botocore internals, and will continue to treat it as a sub-part/different serialization format of the json protocol. We might revisit in the future. Thanks for the tips about Java support!

service = load_service("kinesis")
response_serializer = create_serializer(service)
response_serializer = serializer_factory(service)
headers = Headers(headers_dict)
response_data = GetRecordsOutput(
Records=[
Expand Down
Loading