8000 migrate Kinesis to ASF by alexrashed · Pull Request #6166 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

migrate Kinesis to ASF #6166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add tests for content-negotiation in ASF serializer
  • Loading branch information
alexrashed committed Aug 30, 2022
commit 06716c55e3e00a93f8b68f6dbf289fbcf8790163
2 changes: 1 addition & 1 deletion localstack/aws/protocol/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def _create_default_response(
def _get_mime_type(self, headers: Optional[Dict | Headers]) -> str:
"""
Extracts the accepted mime type from the request headers and returns a matching, supported mime type for the
serializer.
serializer or the default mime type of the service if there is no match.
:param headers: to extract the "Accept" header from
:return: preferred mime type to be used by the serializer (if it is not accepted by the client,
an error is logged)
Expand Down
130 changes: 128 additions & 2 deletions tests/unit/aws/protocol/test_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
from typing import Any, Dict, Iterator, List, Optional
from xml.etree import ElementTree

import cbor2
import pytest
from botocore.awsrequest import HeadersDict
from botocore.endpoint import convert_to_response_dict
from botocore.parsers import ResponseParser, create_parser
from dateutil.tz import tzlocal, tzutc
from requests.models import Response as RequestsResponse
from urllib3 import HTTPResponse as UrlLibHttpResponse
from werkzeug.datastructures import Headers
from werkzeug.wrappers import ResponseStream

from localstack.aws.api import CommonServiceException, ServiceException
Expand All @@ -22,6 +24,7 @@
CancellationReason,
TransactionCanceledException,
)
from localstack.aws.api.kinesis import GetRecordsOutput, Record
from localstack.aws.api.lambda_ import ResourceNotFoundException
from localstack.aws.api.route53 import NoSuchHostedZone
from localstack.aws.api.sns import VerificationException
Expand All @@ -30,13 +33,15 @@
ReceiptHandleIsInvalid,
UnsupportedOperation,
)
from localstack.aws.api.sts import Credentials, GetSessionTokenResponse
from localstack.aws.protocol.serializer import (
ProtocolSerializerError,
QueryResponseSerializer,
UnknownSerializerError,
create_serializer,
)
from localstack.aws.spec import load_service
from localstack.constants import APPLICATION_AMZ_CBOR_1_1
from localstack.http.response import Response
from localstack.utils.common import to_str

Expand Down Expand Up @@ -112,15 +117,15 @@ def _botocore_error_serializer_integration_test(
status_code: int,
message: Optional[str],
is_sender_fault: bool = False,
**additional_error_fields: Dict[str, Any]
**additional_error_fields: Dict[str, Any],
) -> dict:
"""
Performs an integration test for the error serialization using botocore as parser.
It executes the following steps:
- Load the given service (f.e. "sqs")
- Serialize the _error_ response with the appropriate serializer from the AWS Serivce Framework
- Parse the serialized error response using the botocore parser
- Checks the the metadata is correct (status code, requestID,...)
- Checks if the the metadata is correct (status code, requestID,...)
- Checks if the parsed error response content is correct

:param service: to load the correct service specification, serializer, and parser
Expand Down Expand Up @@ -1684,3 +1689,124 @@ def test_restjson_streaming_payload(payload):
"Payload": payload,
},
)


@pytest.mark.parametrize(
"service,accept_header,content_type_header,expected_mime_type",
[
# Test default S3
("s3", None, None, "text/xml"),
# Test default STS
("sts", None, None, "text/xml"),
# Test STS for "any" Accept header
("sts", "*/*", None, "text/xml"),
# Test STS for "any" Accept header and xml content
("sts", "*/*", "text/xml", "text/xml"),
# Test STS without Accept and xml content
("sts", None, "text/xml", "text/xml"),
# Test STS without Accept and JSON content
("sts", None, "application/json", "application/json"),
# Test STS with JSON Accept and XML content
("sts", "application/json", "text/xml", "application/json"),
# Test default Kinesis
("kinesis", None, None, "application/json"),
# Test Kinesis for "any" Accept header
("kinesis", "*/*", None, "application/json"),
# Test Kinesis for "any" Accept header and JSON content
("kinesis", "*/*", "application/json", "application/json"),
# Test Kinesis without Accept and CBOR content
("kinesis", None, "application/cbor", "application/cbor"),
# Test Kinesis without Accept and CBOR content
("kinesis", None, "application/cbor", "application/cbor"),
# Test Kinesis with JSON Accept and CBOR content
("kinesis", "application/json", "application/cbor", "application/json"),
# Test Kinesis with CBOR Accept and JSON content
("kinesis", "application/cbor", "application/json", "application/cbor"),
# Test Kinesis with CBOR 1.1 Accept and JSON content
("kinesis", APPLICATION_AMZ_CBOR_1_1, "application/json", APPLICATION_AMZ_CBOR_1_1),
# Test Kinesis with non-supported Accept header and without Content-Type
("kinesis", "unknown/content-type", None, "application/json"),
# Test Kinesis with non-supported Accept header and CBOR Content-Type
("kinesis", "unknown/content-type", "application/cbor", "application/json"),
# Test Kinesis with non-supported Content-Type
("kinesis", None, "unknown/content-type", "application/json"),
],
)
def test_accept_header_detection(
service: str,
accept_header: Optional[str],
content_type_header: Optional[str],
expected_mime_type: str,
):
service_model = load_service(service)
response_serializer = create_serializer(service_model)
headers = Headers()
if accept_header:
headers["Accept"] = accept_header
if content_type_header:
headers["Content-Type"] = content_type_header
mime_type = response_serializer._get_mime_type(headers)
assert (
mime_type == expected_mime_type
), f"Detected mime type ({mime_type}) was not as expected ({expected_mime_type})"


@pytest.mark.parametrize(
"headers_dict",
[{"Content-Type": "application/json"}, {"Accept": "application/json"}],
)
def test_query_protocol_json_serialization(headers_dict):
service = load_service("sts")
response_serializer = create_serializer(service)
headers = Headers(headers_dict)
utc_timestamp = 1661255665.123
response_data = GetSessionTokenResponse(
Credentials=Credentials(
AccessKeyId="accessKeyId",
SecretAccessKey="secretAccessKey",
SessionToken="sessionToken",
Expiration=datetime.utcfromtimestamp(utc_timestamp),
)
)
result: Response = response_serializer.serialize_to_response(
response_data, service.operation_model("GetSessionToken"), headers
)
assert result is not None
assert result.content_type is not None
assert result.content_type == "application/json"
parsed_data = json.loads(result.data)
# Ensure the structure is the same as for query-xml (f.e. with "SOAP"-like root element), but just JSON encoded
assert "GetSessionTokenResponse" in parsed_data
assert "ResponseMetadata" in parsed_data["GetSessionTokenResponse"]
assert "GetSessionTokenResult" in parsed_data["GetSessionTokenResponse"]
# Make sure the timestamp is formatted as str(int(utc float))
assert parsed_data["GetSessionTokenResponse"]["GetSessionTokenResult"].get(
"Credentials", {}
).get("Expiration") == str(int(utc_timestamp))


@pytest.mark.parametrize(
"headers_dict",
[{"Content-Type": "application/cbor"}, {"Accept": "application/cbor"}],
)
def test_json_protocol_cbor_serialization(headers_dict):
service = load_service("kinesis")
response_serializer = create_serializer(service)
headers = Headers(headers_dict)
response_data = GetRecordsOutput(
Records=[
Record(
SequenceNumber="test_sequence_number",
Data=b"test_data",
PartitionKey="test_partition_key",
)
]
)
result: Response = response_serializer.serialize_to_response(
response_data, service.operation_model("GetRecords"), headers
)
assert result is not None
assert result.content_type is not None
assert result.content_type == "application/cbor"
parsed_data = cbor2.loads(result.data)
assert parsed_data == response_data
0