From 51f55380dc6b8ebb6b12c5d8a5ac3742179b8aa9 Mon Sep 17 00:00:00 2001 From: Simon Walker Date: Tue, 16 Jul 2024 22:48:26 +0100 Subject: [PATCH 1/4] Replace JSONRequestParser._parse_blob --- localstack-core/localstack/aws/protocol/parser.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/localstack-core/localstack/aws/protocol/parser.py b/localstack-core/localstack/aws/protocol/parser.py index 54713ecfb3dda..f810a3bd881ad 100644 --- a/localstack-core/localstack/aws/protocol/parser.py +++ b/localstack-core/localstack/aws/protocol/parser.py @@ -898,6 +898,15 @@ def _parse_timestamp( return self._convert_str_to_timestamp(node, self.CBOR_TIMESTAMP_FORMAT) return super()._parse_timestamp(request, shape, node, uri_params) + def _parse_blob( + self, request: Request, shape: Shape, node: bool, uri_params: Mapping[str, Any] = None + ) -> bytes: + if isinstance(node, bytes) and request.mimetype.startswith("application/x-amz-cbor"): + # CBOR does not base64 encode binary data + return bytes(node) + else: + return super()._parse_blob(request, shape, node, uri_params) + class JSONRequestParser(BaseJSONRequestParser): """ From ae6fa2836fac4a99f5875fd3e997bb101c6e3420 Mon Sep 17 00:00:00 2001 From: Simon Walker Date: Tue, 16 Jul 2024 22:48:59 +0100 Subject: [PATCH 2/4] Replace test_json_cbor_blob_parsing body --- tests/unit/aws/protocol/test_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/aws/protocol/test_parser.py b/tests/unit/aws/protocol/test_parser.py index 8c2f1a4ef9be9..f2472184b00aa 100644 --- a/tests/unit/aws/protocol/test_parser.py +++ b/tests/unit/aws/protocol/test_parser.py @@ -642,7 +642,7 @@ def test_json_cbor_blob_parsing(): "X-Amz-Target": "Kinesis_20131202.PutRecord", "x-localstack-tgt-api": "kinesis", }, - "body": b"\xa3dDataTaGVsbG8sIHdvcmxkIQ==jStreamNamedtestlPartitionKeylpartitionkey", + "body": b"\xbfjStreamNamedtestdDataMhello, world!lPartitionKeylpartitionkey\xff", "url": "/", "context": {}, } From 71e1cca18070a5fb4a1287a47a3e152be114f4bd Mon Sep 17 00:00:00 2001 From: Simon Walker Date: Wed, 17 Jul 2024 21:17:03 +0100 Subject: [PATCH 3/4] Add kinesis http client and refactor existing http test We want to re-use the HTTP request generation in an upcoming test --- tests/aws/services/kinesis/test_kinesis.py | 109 +++++++++++------- .../kinesis/test_kinesis.validation.json | 2 +- 2 files changed, 69 insertions(+), 42 deletions(-) diff --git a/tests/aws/services/kinesis/test_kinesis.py b/tests/aws/services/kinesis/test_kinesis.py index 8842ffe0fa7d7..5edbd2825ef36 100644 --- a/tests/aws/services/kinesis/test_kinesis.py +++ b/tests/aws/services/kinesis/test_kinesis.py @@ -1,6 +1,7 @@ import logging import time from datetime import datetime, timedelta +from typing import Any from unittest.mock import patch import pytest @@ -29,6 +30,60 @@ _patch_cbor2() +class KinesisHTTPClient: + """ + Simple HTTP client for making Kinesis requests manually using the CBOR serialization type. + + This serialization type is not available via botocore. + """ + + def __init__( + self, + account_id: str, + region_name: str, + client_factory, + ): + self.account_id = account_id + self.region_name = region_name + self._client = client_factory("kinesis", region=self.region_name, signer_factory=SigV4Auth) + + def post(self, operation: str, payload: dict, datetime_as_timestamp: bool = True) -> Any: + """ + Perform a kinesis operation, encoding the request payload with CBOR and decoding the + response with CBOR. + """ + response = self._client.post( + self.endpoint, + data=cbor2_dumps(payload, datetime_as_timestamp=datetime_as_timestamp), + headers=self._build_headers(operation), + ) + response_body = cbor2_loads(response.content) + if response.status_code != 200: + raise ValueError(f"Bad status: {response.status_code}, response body: {response_body}") + + return response_body + + def _build_headers(self, operation: str) -> dict: + return { + "content-type": constants.APPLICATION_AMZ_CBOR_1_1, + "x-amz-target": f"Kinesis_20131202.{operation}", + "host": self.endpoint, + } + + @property + def endpoint(self) -> str: + return ( + f"https://{self.account_id}.control-kinesis.{self.region_name}.amazonaws.com" + if is_aws_cloud() + else config.internal_service_url() + ) + + +@pytest.fixture +def kinesis_http_client(account_id, region_name, aws_http_client_factory): + return KinesisHTTPClient(account_id, region_name, client_factory=aws_http_client_factory) + + class TestKinesis: @staticmethod def _get_endpoint(account_id: str, region_name: str): @@ -577,12 +632,8 @@ def test_subscribe_to_shard_with_at_timestamp_cbor( self, kinesis_create_stream, wait_for_stream_ready, - kinesis_register_consumer, - wait_for_kinesis_consumer_ready, aws_client, - aws_http_client_factory, - account_id, - region_name, + kinesis_http_client, ): # create stream pre_create_timestamp = (datetime.now() - timedelta(hours=0, minutes=1)).astimezone() @@ -594,26 +645,13 @@ def test_subscribe_to_shard_with_at_timestamp_cbor( post_create_timestamp = (datetime.now() + timedelta(hours=0, minutes=1)).astimezone() # perform a raw DescribeStream request to test the datetime serialization by LocalStack - kinesis_http_client = aws_http_client_factory("kinesis", signer_factory=SigV4Auth) - endpoint = ( - f"https://{account_id}.control-kinesis.{region_name}.amazonaws.com" - if is_aws_cloud() - else config.internal_service_url() - ) - describe_response = kinesis_http_client.post( - endpoint, - data=cbor2_dumps({"StreamARN": stream_arn}, datetime_as_timestamp=True), - headers={ - "content-type": constants.APPLICATION_AMZ_CBOR_1_1, - "x-amz-target": "Kinesis_20131202.DescribeStream", - "host": endpoint, - }, + describe_response_data = kinesis_http_client.post( + operation="DescribeStream", + payload={"StreamARN": stream_arn}, ) - # verify that the request can be properly parsed, and that the timestamp is within the boundaries - assert describe_response.status_code == 200 - describe_response_content = describe_response.content - describe_response_data = cbor2_loads(describe_response_content) + # verify that the request can be properly parsed, and that the timestamp is within the + # boundaries assert ( pre_create_timestamp <= describe_response_data["StreamDescription"]["StreamCreationTimestamp"] @@ -621,26 +659,15 @@ def test_subscribe_to_shard_with_at_timestamp_cbor( ) shard_id = describe_response_data["StreamDescription"]["Shards"][0]["ShardId"] - shard_iterator_response = kinesis_http_client.post( - endpoint, - data=cbor2_dumps( - { - "StreamARN": stream_arn, - "ShardId": shard_id, - "ShardIteratorType": "AT_TIMESTAMP", - "Timestamp": datetime.now().astimezone(), - }, - datetime_as_timestamp=True, - ), - headers={ - "content-type": constants.APPLICATION_AMZ_CBOR_1_1, - "x-amz-target": "Kinesis_20131202.GetShardIterator", - "host": endpoint, + shard_iterator_response_data = kinesis_http_client.post( + "GetShardIterator", + payload={ + "StreamARN": stream_arn, + "ShardId": shard_id, + "ShardIteratorType": "AT_TIMESTAMP", + "Timestamp": datetime.now().astimezone(), }, ) - assert shard_iterator_response.status_code == 200 - shard_iterator_content = shard_iterator_response.content - shard_iterator_response_data = cbor2_loads(shard_iterator_content) assert "ShardIterator" in shard_iterator_response_data diff --git a/tests/aws/services/kinesis/test_kinesis.validation.json b/tests/aws/services/kinesis/test_kinesis.validation.json index 6eab02c8bcb1c..3a3054619f4cb 100644 --- a/tests/aws/services/kinesis/test_kinesis.validation.json +++ b/tests/aws/services/kinesis/test_kinesis.validation.json @@ -27,7 +27,7 @@ "last_validated_date": "2024-06-21T15:20:46+00:00" }, "tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_subscribe_to_shard_with_at_timestamp_cbor": { - "last_validated_date": "2024-07-04T09:16:32+00:00" + "last_validated_date": "2024-07-17T20:16:42+00:00" }, "tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_subscribe_to_shard_with_sequence_number_as_iterator": { "last_validated_date": "2022-08-26T07:29:21+00:00" From e380b77d17a494bb21226e1289c139c310297481 Mon Sep 17 00:00:00 2001 From: Simon Walker Date: Wed, 17 Jul 2024 21:17:51 +0100 Subject: [PATCH 4/4] Add test for blob payload handling using HTTP client --- tests/aws/services/kinesis/test_kinesis.py | 42 +++++++++++++++++++ .../kinesis/test_kinesis.validation.json | 3 ++ 2 files changed, 45 insertions(+) diff --git a/tests/aws/services/kinesis/test_kinesis.py b/tests/aws/services/kinesis/test_kinesis.py index 5edbd2825ef36..7674de39fd6f8 100644 --- a/tests/aws/services/kinesis/test_kinesis.py +++ b/tests/aws/services/kinesis/test_kinesis.py @@ -670,6 +670,48 @@ def test_subscribe_to_shard_with_at_timestamp_cbor( ) assert "ShardIterator" in shard_iterator_response_data + @markers.aws.validated + def test_cbor_blob_handling( + self, + kinesis_create_stream, + wait_for_stream_ready, + aws_client, + kinesis_http_client, + ): + # create stream + stream_name = kinesis_create_stream(ShardCount=1) + wait_for_stream_ready(stream_name) + + test_data = f"hello world {short_uid()}" + + # put a record on to the stream + kinesis_http_client.post( + operation="PutRecord", + payload={ + "Data": test_data.encode("utf-8"), + "PartitionKey": f"key-{short_uid()}", + "StreamName": stream_name, + }, + ) + + # don't need to get shard iterator manually, so use the SDK + shard_iterator: str | None = get_shard_iterator(stream_name, aws_client.kinesis) + assert shard_iterator is not None + + def _get_record(): + # send get records request via the http client + get_records_response = kinesis_http_client.post( + operation="GetRecords", + payload={ + "ShardIterator": shard_iterator, + }, + ) + assert len(get_records_response["Records"]) == 1 + return get_records_response["Records"][0] + + record = retry(_get_record, sleep=1, retries=5) + assert record["Data"].decode("utf-8") == test_data + class TestKinesisPythonClient: @markers.skip_offline diff --git a/tests/aws/services/kinesis/test_kinesis.validation.json b/tests/aws/services/kinesis/test_kinesis.validation.json index 3a3054619f4cb..0d5a025a798d6 100644 --- a/tests/aws/services/kinesis/test_kinesis.validation.json +++ b/tests/aws/services/kinesis/test_kinesis.validation.json @@ -2,6 +2,9 @@ "tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_add_tags_to_stream": { "last_validated_date": "2022-08-25T06:56:43+00:00" }, + "tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_cbor_blob_handling": { + "last_validated_date": "2024-07-17T20:09:34+00:00" + }, "tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_create_stream_without_shard_count": { "last_validated_date": "2022-08-26T07:30:59+00:00" },