8000 fix: kinesis CBOR blob handling by simonrw · Pull Request #11220 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

fix: kinesis CBOR blob handling #11220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
8000 Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions localstack-core/localstack/aws/protocol/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,15 @@ def _parse_timestamp(
return self._convert_str_to_timestamp(node, self.CBOR_TIMESTAMP_FORMAT)
return super()._parse_timestamp(request, shape, node, uri_params)

def _parse_blob(
self, request: Request, shape: Shape, node: bool, uri_params: Mapping[str, Any] = None
) -> bytes:
if isinstance(node, bytes) and request.mimetype.startswith("application/x-amz-cbor"):
# CBOR does not base64 encode binary data
return bytes(node)
else:
return super()._parse_blob(request, shape, node, uri_params)


class JSONRequestParser(BaseJSONRequestParser):
"""
Expand Down
151 changes: 110 additions & 41 deletions tests/aws/services/kinesis/test_kinesis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import time
from datetime import datetime, timedelta
from typing import Any
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -29,6 +30,60 @@
_patch_cbor2()


class KinesisHTTPClient:
"""
Simple HTTP client for making Kinesis requests manually using the CBOR serialization type.

This serialization type is not available via botocore.
"""

def __init__(
self,
account_id: str,
region_name: str,
client_factory,
):
self.account_id = account_id
self.region_name = region_name
self._client = client_factory("kinesis", region=self.region_name, signer_factory=SigV4Auth)

def post(self, operation: str, payload: dict, datetime_as_timestamp: bool = True) -> Any:
"""
Perform a kinesis operation, encoding the request payload with CBOR and decoding the
response with CBOR.
"""
response = self._client.post(
self.endpoint,
data=cbor2_dumps(payload, datetime_as_timestamp=datetime_as_timestamp),
headers=self._build_headers(operation),
)
response_body = cbor2_loads(response.content)
if response.status_code != 200:
raise ValueError(f"Bad status: {response.status_code}, response body: {response_body}")

return response_body

def _build_headers(self, operation: str) -> dict:
return {
"content-type": constants.APPLICATION_AMZ_CBOR_1_1,
"x-amz-target": f"Kinesis_20131202.{operation}",
"host": self.endpoint,
}

@property
def endpoint(self) -> str:
return (
f"https://{self.account_id}.control-kinesis.{self.region_name}.amazonaws.com"
if is_aws_cloud()
else config.internal_service_url()
)


@pytest.fixture
def kinesis_http_client(account_id, region_name, aws_http_client_factory):
return KinesisHTTPClient(account_id, region_name, client_factory=aws_http_client_factory)


class TestKinesis:
@staticmethod
def _get_endpoint(account_id: str, region_name: str):
Expand Down Expand Up @@ -577,12 +632,8 @@ def test_subscribe_to_shard_with_at_timestamp_cbor(
self,
kinesis_create_stream,
wait_for_stream_ready,
kinesis_register_consumer,
wait_for_kinesis_consumer_ready,
aws_client,
aws_http_client_factory,
account_id,
region_name,
kinesis_http_client,
):
# create stream
pre_create_timestamp = (datetime.now() - timedelta(hours=0, minutes=1)).astimezone()
Expand All @@ -594,55 +645,73 @@ def test_subscribe_to_shard_with_at_timestamp_cbor(
post_create_timestamp = (datetime.now() + timedelta(hours=0, minutes=1)).astimezone()

# perform a raw DescribeStream request to test the datetime serialization by LocalStack
kinesis_http_client = aws_http_client_factory("kinesis", signer_factory=SigV4Auth)
endpoint = (
f"https://{account_id}.control-kinesis.{region_name}.amazonaws.com"
if is_aws_cloud()
else config.internal_service_url()
)
describe_response = kinesis_http_client.post(
endpoint,
data=cbor2_dumps({"StreamARN": stream_arn}, datetime_as_timestamp=True),
headers={
"content-type": constants.APPLICATION_AMZ_CBOR_1_1,
"x-amz-target": "Kinesis_20131202.DescribeStream",
"host": endpoint,
},
describe_response_data = kinesis_http_client.post(
operation="DescribeStream",
payload={"StreamARN": stream_arn},
)

# verify that the request can be properly parsed, and that the timestamp is within the boundaries
assert describe_response.status_code == 200
describe_response_content = describe_response.content
describe_response_data = cbor2_loads(describe_response_content)
# verify that the request can be properly parsed, and that the timestamp is within the
# boundaries
assert (
pre_create_timestamp
<= describe_response_data["StreamDescription"]["StreamCreationTimestamp"]
<= post_create_timestamp
)

shard_id = describe_response_data["StreamDescription"]["Shards"][0]["ShardId"]
shard_iterator_response = kinesis_http_client.post(
endpoint,
data=cbor2_dumps(
{
"StreamARN": stream_arn,
"ShardId": shard_id,
"ShardIteratorType": "AT_TIMESTAMP",
"Timestamp": datetime.now().astimezone(),
},
datetime_as_timestamp=True,
),
headers={
"content-type": constants.APPLICATION_AMZ_CBOR_1_1,
"x-amz-target": "Kinesis_20131202.GetShardIterator",
"host": endpoint,
shard_iterator_response_data = kinesis_http_client.post(
"GetShardIterator",
payload={
"StreamARN": stream_arn,
"ShardId": shard_id,
"ShardIteratorType": "AT_TIMESTAMP",
"Timestamp": datetime.now().astimezone(),
},
)
assert shard_iterator_response.status_code == 200
shard_iterator_content = shard_iterator_response.content
shard_iterator_response_data = cbor2_loads(shard_iterator_content)
assert "ShardIterator" in shard_iterator_response_data

@markers.aws.validated
def test_cbor_blob_handling(
self,
kinesis_create_stream,
wait_for_stream_ready,
aws_client,
kinesis_http_client,
):
# create stream
stream_name = kinesis_create_stream(ShardCount=1)
wait_for_stream_ready(stream_name)

test_data = f"hello world {short_uid()}"

# put a record on to the stream
kinesis_http_client.post(
operation="PutRecord",
payload={
"Data": test_data.encode("utf-8"),
"PartitionKey": f"key-{short_uid()}",
"StreamName": stream_name,
},
)

# don't need to get shard iterator manually, so use the SDK
shard_iterator: str | None = get_shard_iterator(stream_name, aws_client.kinesis)
assert shard_iterator is not None

def _get_record():
# send get records request via the http client
get_records_response = kinesis_http_client.post(
operation="GetRecords",
payload={
"ShardIterator": shard_iterator,
},
)
assert len(get_records_response["Records"]) == 1
return get_records_response["Records"][0]

record = retry(_get_record, sleep=1, retries=5)
assert record["Data"].decode("utf-8") == test_data


class TestKinesisPythonClient:
@markers.skip_offline
Expand Down
5 changes: 4 additions & 1 deletion tests/aws/services/kinesis/test_kinesis.validation.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_add_tags_to_stream": {
"last_validated_date": "2022-08-25T06:56:43+00:00"
},
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_cbor_blob_handling": {
"last_validated_date": "2024-07-17T20:09:34+00:00"
},
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_create_stream_without_shard_count": {
"last_validated_date": "2022-08-26T07:30:59+00:00"
},
Expand All @@ -27,7 +30,7 @@
"last_validated_date": "2024-06-21T15:20:46+00:00"
},
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_subscribe_to_shard_with_at_timestamp_cbor": {
"last_validated_date": "2024-07-04T09:16:32+00:00"
"last_validated_date": "2024-07-17T20:16:42+00:00"
},
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_subscribe_to_shard_with_sequence_number_as_iterator": {
"last_validated_date": "2022-08-26T07:29:21+00:00"
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/aws/protocol/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def test_json_cbor_blob_parsing():
"X-Amz-Target": "Kinesis_20131202.PutRecord",
"x-localstack-tgt-api": "kinesis",
},
"body": b"\xa3dDataTaGVsbG8sIHdvcmxkIQ==jStreamNamedtestlPartitionKeylpartitionkey",
"body": b"\xbfjStreamNamedtestdDataMhello, world!lPartitionKeylpartitionkey\xff",
"url": "/",
"context": {},
}
Expand Down
Loading
0