1
1
import logging
2
2
import time
3
- from datetime import datetime
3
+ from datetime import datetime , timedelta
4
4
from unittest .mock import patch
5
5
6
- import cbor2
7
6
import pytest
8
7
import requests
8
+ from botocore .auth import SigV4Auth
9
9
from botocore .config import Config as BotoConfig
10
10
from botocore .exceptions import ClientError
11
11
12
+ # cbor2: explicitly load from private _encoder/_decoder module to avoid using the (non-patched) C-version
13
+ from cbor2 ._decoder import loads as cbor2_loads
14
+ from cbor2 ._encoder import dumps as cbor2_dumps
15
+
12
16
from localstack import config , constants
13
17
from localstack .aws .api .kinesis import ShardIteratorType , SubscribeToShardInput
18
+ from localstack .aws .client import _patch_cbor2
14
19
from localstack .services .kinesis import provider as kinesis_provider
20
+ from localstack .testing .aws .util import is_aws_cloud
15
21
from localstack .testing .config import TEST_AWS_ACCESS_KEY_ID
16
22
from localstack .testing .pytest import markers
17
23
from localstack .utils .aws import resources
21
27
22
28
LOGGER = logging .getLogger (__name__ )
23
29
30
+ # make sure cbor2 patches are applied
31
+ # (for the test-data decoding, usually done as init hook in LocalStack)
32
+ _patch_cbor2 ()
33
+
24
34
25
35
def get_shard_iterator (stream_name , kinesis_client ):
26
36
response = kinesis_client .describe_stream (StreamName = stream_name )
@@ -243,7 +253,7 @@ def test_subscribe_to_shard_cbor_at_timestamp(
243
253
)
244
254
headers ["Content-Type" ] = constants .APPLICATION_AMZ_CBOR_1_1
245
255
headers ["X-Amz-Target" ] = "Kinesis_20131202.SubscribeToShard"
246
- data = cbor2 . dumps (
256
+ data = cbor2_dumps (
247
257
SubscribeToShardInput (
248
258
ConsumerARN = consumer_arn ,
249
259
ShardId = shard_id ,
@@ -252,7 +262,8 @@ def test_subscribe_to_shard_cbor_at_timestamp(
252
262
# manually set a UTC epoch with milliseconds
253
263
"Timestamp" : "1718960048000" ,
254
264
},
255
- )
265
+ ),
266
+ datetime_as_timestamp = True ,
256
267
)
257
268
found_record = False
258
269
with requests .post (url , data , headers = headers , stream = True ) as result :
@@ -376,17 +387,17 @@ def test_get_records(
376
387
)
377
388
headers ["Content-Type" ] = constants .APPLICATION_AMZ_CBOR_1_1
378
389
headers ["X-Amz-Target" ] = "Kinesis_20131202.GetRecords"
379
- data = cbor2 . dumps ({"ShardIterator" : iterator })
390
+ data = cbor2_dumps ({"ShardIterator" : iterator }, datetime_as_timestamp = True )
380
391
result = requests .post (url , data , headers = headers )
381
392
assert 200 == result .status_code
382
- result = cbor2 . loads (result .content )
393
+ result = cbor2_loads (result .content )
383
394
attrs = ("Data" , "EncryptionType" , "PartitionKey" , "SequenceNumber" )
384
395
assert select_attributes (json_records [0 ], attrs ) == select_attributes (
385
396
result ["Records" ][0 ], attrs
386
397
)
387
- # ensure that the CBOR datetime format is unix timestamp millis
398
+ # ensure that the CBOR datetime format is parsed the same way
388
399
assert (
389
- int ( json_records [0 ]["ApproximateArrivalTimestamp" ]. timestamp () * 1000 )
400
+ json_records [0 ]["ApproximateArrivalTimestamp" ]
390
401
== result ["Records" ][0 ]["ApproximateArrivalTimestamp" ]
391
402
)
392
403
@@ -420,10 +431,10 @@ def test_get_records_empty_stream(
420
431
)
421
432
headers ["Content-Type" ] = constants .APPLICATION_AMZ_CBOR_1_1
422
433
headers ["X-Amz-Target" ] = "Kinesis_20131202.GetRecords"
423
- data = cbor2 . dumps ({"ShardIterator" : iterator })
434
+ data = cbor2_dumps ({"ShardIterator" : iterator }, datetime_as_timestamp = True )
424
435
cbor_response = requests .post (url , data , headers = headers )
425
436
assert 200 == cbor_response .status_code
426
- cbor_records_content = cbor2 . loads (cbor_response .content )
437
+ cbor_records_content = cbor2_loads (cbor_response .content )
427
438
cbor_records = cbor_records_content .get ("Records" )
428
439
assert 0 == len (cbor_records )
429
440
@@ -567,6 +578,77 @@ def test_get_records_shard_iterator_with_surrounding_quotes(
567
578
568
579
assert aws_client .kinesis .get_records (ShardIterator = f'"{ shard_iterator } "' )["Records" ]
569
580
581
+ @markers .aws .validated
582
+ def test_subscribe_to_shard_with_at_timestamp_cbor (
583
+ self ,
584
+ kinesis_create_stream ,
585
+ wait_for_stream_ready ,
586
+ kinesis_register_consumer ,
587
+ wait_for_kinesis_consumer_ready ,
588
+ aws_client ,
589
+ aws_http_client_factory ,
590
+ account_id ,
591
+ region_name ,
592
+ ):
593
+ # create stream
594
+ pre_create_timestamp = (datetime .now () - timedelta (hours = 0 , minutes = 1 )).astimezone ()
595
+ stream_name = kinesis_create_stream (ShardCount = 1 )
596
+ stream_arn = aws_client .kinesis .describe_stream (StreamName = stream_name )[
597
+ "StreamDescription"
598
+ ]["StreamARN" ]
599
+ wait_for_stream_ready (stream_name )
600
+ post_create_timestamp = (datetime .now () + timedelta (hours = 0 , minutes = 1 )).astimezone ()
601
+
602
+ # perform a raw DescribeStream request to test the datetime serialization by LocalStack
603
+ kinesis_http_client = aws_http_client_factory ("kinesis" , signer_factory = SigV4Auth )
604
+ endpoint = (
605
+ f"https://{ account_id } .control-kinesis.{ region_name } .amazonaws.com"
606
+ if is_aws_cloud ()
607
+ else config .internal_service_url ()
608
+ )
609
+ describe_response = kinesis_http_client .post (
610
+ f"{ endpoint } " ,
611
+ data = cbor2_dumps ({"StreamARN" : stream_arn }, datetime_as_timestamp = True ),
612
+ headers = {
613
+ "content-type" : "application/x-amz-cbor-1.1" ,
614
+ "x-amz-target" : "Kinesis_20131202.DescribeStream" ,
615
+ "host" : endpoint ,
616
+ },
617
+ )
618
+
619
+ # verify that the request can be properly parsed, and that the timestamp is within the boundaries
620
+ assert describe_response .status_code == 200
621
+ describe_response_content = describe_response .content
622
+ describe_response_data = cbor2_loads (describe_response_content )
623
+ assert (
624
+ pre_create_timestamp
625
+ <= describe_response_data ["StreamDescription" ]["StreamCreationTimestamp" ]
626
+ <= post_create_timestamp
627
+ )
628
+
629
+ shard_id = describe_response_data ["StreamDescription" ]["Shards" ][0 ]["ShardId" ]
630
+ shard_iterator_response = kinesis_http_client .post (
631
+ f"{ endpoint } " ,
632
+ data = cbor2_dumps (
633
+ {
634
+ "StreamARN" : stream_arn ,
635
+ "ShardId" : shard_id ,
636
+ "ShardIteratorType" : "AT_TIMESTAMP" ,
637
+ "Timestamp" : datetime .now ().astimezone (),
638
+ },
639
+ datetime_as_timestamp = True ,
640
+ ),
641
+ headers = {
642
+ "content-type" : "application/x-amz-cbor-1.1" ,
643
+ "x-amz-target" : "Kinesis_20131202.GetShardIterator" ,
644
+ "host" : endpoint ,
645
+ },
646
+ )
647
+ assert shard_iterator_response .status_code == 200
648
+ shard_iterator_content = shard_iterator_response .content
649
+ shard_iterator_response_data = cbor2_loads (shard_iterator_content )
650
+ assert "ShardIterator" in shard_iterator_response_data
651
+
570
652
571
653
class TestKinesisPythonClient :
572
654
@markers .skip_offline
0 commit comments