8000 fix existing tests, add request-level kinesis cbor test · localstack/localstack@c0db4c5 · GitHub
[go: up one dir, main page]

Skip to content

Commit c0db4c5

Browse files
committed
fix existing tests, add request-level kinesis cbor test
1 parent b1be7b6 commit c0db4c5

File tree

4 files changed

+103
-13
lines changed

4 files changed

+103
-13
lines changed

tests/aws/services/kinesis/test_kinesis.py

Lines changed: 92 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
import logging
22
import time
3-
from datetime import datetime
3+
from datetime import datetime, timedelta
44
from unittest.mock import patch
55

6-
import cbor2
76
import pytest
87
import requests
8+
from botocore.auth import SigV4Auth
99
from botocore.config import Config as BotoConfig
1010
from botocore.exceptions import ClientError
1111

12+
# cbor2: explicitly load from private _encoder/_decoder module to avoid using the (non-patched) C-version
13+
from cbor2._decoder import loads as cbor2_loads
14+
from cbor2._encoder import dumps as cbor2_dumps
15+
1216
from localstack import config, constants
1317
from localstack.aws.api.kinesis import ShardIteratorType, SubscribeToShardInput
18+
from localstack.aws.client import _patch_cbor2
1419
from localstack.services.kinesis import provider as kinesis_provider
20+
from localstack.testing.aws.util import is_aws_cloud
1521
from localstack.testing.config import TEST_AWS_ACCESS_KEY_ID
1622
from localstack.testing.pytest import markers
1723
from localstack.utils.aws import resources
@@ -21,6 +27,10 @@
2127

2228
LOGGER = logging.getLogger(__name__)
2329

30+
# make sure cbor2 patches are applied
31+
# (for the test-data decoding, usually done as init hook in LocalStack)
32+
_patch_cbor2()
33+
2434

2535
def get_shard_iterator(stream_name, kinesis_client):
2636
response = kinesis_client.describe_stream(StreamName=stream_name)
@@ -243,7 +253,7 @@ def test_subscribe_to_shard_cbor_at_timestamp(
243253
)
244254
headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1
245255
headers["X-Amz-Target"] = "Kinesis_20131202.SubscribeToShard"
246-
data = cbor2.dumps(
256+
data = cbor2_dumps(
247257
SubscribeToShardInput(
248258
ConsumerARN=consumer_arn,
249259
ShardId=shard_id,
@@ -252,7 +262,8 @@ def test_subscribe_to_shard_cbor_at_timestamp(
252262
# manually set a UTC epoch with milliseconds
253263
"Timestamp": "1718960048000",
254264
},
255-
)
265+
),
266+
datetime_as_timestamp=True,
256267
)
257268
found_record = False
258269
with requests.post(url, data, headers=headers, stream=True) as result:
@@ -376,17 +387,17 @@ def test_get_records(
376387
)
377388
headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1
378389
headers["X-Amz-Target"] = "Kinesis_20131202.GetRecords"
379-
data = cbor2.dumps({"ShardIterator": iterator})
390+
data = cbor2_dumps({"ShardIterator": iterator}, datetime_as_timestamp=True)
380391
result = requests.post(url, data, headers=headers)
381392
assert 200 == result.status_code
382-
result = cbor2.loads(result.content)
393+
result = cbor2_loads(result.content)
383394
attrs = ("Data", "EncryptionType", "PartitionKey", "SequenceNumber")
384395
assert select_attributes(json_records[0], attrs) == select_attributes(
385396
result["Records"][0], attrs
386397
)
387-
# ensure that the CBOR datetime format is unix timestamp millis
398+
# ensure that the CBOR datetime format is parsed the same way
388399
assert (
389-
int(json_records[0]["ApproximateArrivalTimestamp"].timestamp() * 1000)
400+
json_records[0]["ApproximateArrivalTimestamp"]
390401
== result["Records"][0]["ApproximateArrivalTimestamp"]
391402
)
392403

@@ -420,10 +431,10 @@ def test_get_records_empty_stream(
420431
)
421432
headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1
422433
headers["X-Amz-Target"] = "Kinesis_20131202.GetRecords"
423-
data = cbor2.dumps({"ShardIterator": iterator})
434+
data = cbor2_dumps({"ShardIterator": iterator}, datetime_as_timestamp=True)
424435
cbor_response = requests.post(url, data, headers=headers)
425436
assert 200 == cbor_response.status_code
426-
cbor_records_content = cbor2.loads(cbor_response.content)
437+
cbor_records_content = cbor2_loads(cbor_response.content)
427438
cbor_records = cbor_records_content.get("Records")
428439
assert 0 == len(cbor_records)
429440

@@ -567,6 +578,77 @@ def test_get_records_shard_iterator_with_surrounding_quotes(
567578

568579
assert aws_client.kinesis.get_records(ShardIterator=f'"{shard_iterator}"')["Records"]
569580

581+
@markers.aws.validated
582+
def test_subscribe_to_shard_with_at_timestamp_cbor(
583+
self,
584+
kinesis_create_stream,
585+
wait_for_stream_ready,
586+
kinesis_register_consumer,
587+
wait_for_kinesis_consumer_ready,
588+
aws_client,
589+
aws_http_client_factory,
590+
account_id,
591+
region_name,
592+
):
593+
# create stream
594+
pre_create_timestamp = (datetime.now() - timedelta(hours=0, minutes=1)).astimezone()
595+
stream_name = kinesis_create_stream(ShardCount=1)
596+
stream_arn = aws_client.kinesis.describe_stream(StreamName=stream_name)[
597+
"StreamDescription"
598+
]["StreamARN"]
599+
wait_for_stream_ready(stream_name)
600+
post_create_timestamp = (datetime.now() + timedelta(hours=0, minutes=1)).astimezone()
601+
602+
# perform a raw DescribeStream request to test the datetime serialization by LocalStack
603+
kinesis_http_client = aws_http_client_factory("kinesis", signer_factory=SigV4Auth)
604+
endpoint = (
605+
f"https://{account_id}.control-kinesis.{region_name}.amazonaws.com"
606+
if is_aws_cloud()
607+
else config.internal_service_url()
608+
)
609+
describe_response = kinesis_http_client.post(
610+
f"{endpoint}",
611+
data=cbor2_dumps({"StreamARN": stream_arn}, datetime_as_timestamp=True),
612+
headers={
613+
"content-type": "application/x-amz-cbor-1.1",
614+
"x-amz-target": "Kinesis_20131202.DescribeStream",
615+
"host": endpoint,
616+
},
617+
)
618+
619+
# verify that the request can be properly parsed, and that the timestamp is within the boundaries
620+
assert describe_response.status_code == 200
621+
describe_response_content = describe_response.content
622+
describe_response_data = cbor2_loads(describe_response_content)
623+
assert (
624+
pre_create_timestamp
625+
<= describe_response_data["StreamDescription"]["StreamCreationTimestamp"]
626+
<= post_create_timestamp
627+
)
628+
629+
shard_id = describe_response_data["StreamDescription"]["Shards"][0]["ShardId"]
630+
shard_iterator_response = kinesis_http_client.post(
631+
f"{endpoint}",
632+
data=cbor2_dumps(
633+
{
634+
"StreamARN": stream_arn,
635+
"ShardId": shard_id,
636+
"ShardIteratorType": "AT_TIMESTAMP",
637+
"Timestamp": datetime.now().astimezone(),
638+
},
639+
datetime_as_timestamp=True,
640+
),
641+
headers={
642+
"content-type": "application/x-amz-cbor-1.1",
643+
"x-amz-target": "Kinesis_20131202.GetShardIterator",
644+
"host": endpoint,
645+
},
646+
)
647+
assert shard_iterator_response.status_code == 200
648+
shard_iterator_content = shard_iterator_response.content
649+
shard_iterator_response_data = cbor2_loads(shard_iterator_content)
650+
assert "ShardIterator" in shard_iterator_response_data
651+
570652

571653
class TestKinesisPythonClient:
572654
@markers.skip_offline

tests/aws/services/kinesis/test_kinesis.validation.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@
2020
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_subscribe_to_shard_with_at_timestamp": {
2121
"last_validated_date": "2024-06-21T15:20:46+00:00"
2222
},
23+
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_subscribe_to_shard_with_at_timestamp_cbor": {
24+
"last_validated_date": "2024-07-04T09:16:32+00:00"
25+
},
2326
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_subscribe_to_shard_with_sequence_number_as_iterator": {
2427
"last_validated_date": "2022-08-26T07:29:21+00:00"
28+
},
29+
"tests/aws/services/kinesis/test_kinesis.py::test_subscribe_to_shard_with_at_timestamp_cbor": {
30+
"last_validated_date": "2024-07-04T07:13:22+00:00"
2531
}
2632
}

tests/unit/aws/protocol/test_parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ def test_json_cbor_blob_parsing():
642642
"X-Amz-Target": "Kinesis_20131202.PutRecord",
643643
"x-localstack-tgt-api": "kinesis",
644644
},
645-
"body": b"\xbfjStreamNamedtestdDataMhello, world!lPartitionKeylpartitionkey\xff",
645+
"body": b"\xa3dDataTaGVsbG8sIHdvcmxkIQ==jStreamNamedtestlPartitionKeylpartitionkey",
646646
"url": "/",
647647
"context": {},
648648
}

tests/unit/aws/protocol/test_serializer.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
from typing import Any, Dict, Iterator, List, Optional
88
from xml.etree import ElementTree
99

10-
import cbor2
1110
import pytest
1211
from botocore.awsrequest import HeadersDict
1312
from botocore.endpoint import convert_to_response_dict
1413
from botocore.parsers import ResponseParser, create_parser
14+
15+
# cbor2: explicitly load from private _decoder module to avoid using the (non-patched) C-version
16+
from cbor2._decoder import loads as cbor2_loads
1517
from dateutil.tz import tzlocal, tzutc
1618
from requests.models import Response as RequestsResponse
1719
from urllib3 import HTTPResponse as UrlLibHttpResponse
@@ -1860,7 +1862,7 @@ def test_json_protocol_cbor_serialization(headers_dict):
18601862
assert result is not None
18611863
assert result.content_type is not None
18621864
assert result.content_type == "application/cbor"
1863-
parsed_data = cbor2.loads(result.data)
1865+
parsed_data = cbor2_loads(result.data)
18641866
assert parsed_data == response_data
18651867

18661868

0 commit comments

Comments
 (0)
0