10000 fix opensearch and kinesis test markers by alexrashed · Pull Request #11141 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content
8000

fix opensearch and kinesis test markers #11141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
8000 Loading
Diff view
Diff view
123 changes: 67 additions & 56 deletions tests/aws/services/kinesis/test_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from unittest.mock import patch

import pytest
import requests
from botocore.auth import SigV4Auth
from botocore.config import Config as BotoConfig
from botocore.exceptions import ClientError
Expand All @@ -14,14 +13,11 @@
from cbor2._encoder import dumps as cbor2_dumps

from localstack import config, constants
from localstack.aws.api.kinesis import ShardIteratorType, SubscribeToShardInput
from localstack.aws.client import _patch_cbor2
from localstack.services.kinesis import provider as kinesis_provider
from localstack.testing.aws.util import is_aws_cloud
from localstack.testing.config import TEST_AWS_ACCESS_KEY_ID
from localstack.testing.pytest import markers
from localstack.utils.aws import resources
from localstack.utils.aws.request_context import mock_aws_request_headers
from localstack.utils.common import retry, select_attributes, short_uid
from localstack.utils.kinesis import kinesis_connector

Expand Down Expand Up @@ -51,6 +47,14 @@ def get_shard_iterator(stream_name, kinesis_client):


class TestKinesis:
@staticmethod
def _get_endpoint(account_id: str, region_name: str):
return (
f"https://{account_id}.control-kinesis.{region_name}.amazonaws.com"
if is_aws_cloud()
else config.internal_service_url()
)

@markers.aws.validated
def test_create_stream_without_stream_name_raises(self, aws_client_factory):
boto_config = BotoConfig(parameter_validation=False)
Expand Down Expand Up @@ -219,17 +223,17 @@ def test_subscribe_to_shard_with_at_timestamp(
snapshot.match("Records", results)

@markers.aws.needs_fixing
# TODO validate test against AWS.
# - if is_aws_cloud():
# - Use proper URL to AWS instead of LocalStack
# - Properly sign / auth manually crafted CBOR request with real credentials
# TODO SubscribeToShard raises a 500 (Internal Server Error) against AWS
def test_subscribe_to_shard_cbor_at_timestamp(
self,
kinesis_create_stream,
wait_for_stream_ready,
aws_client,
kinesis_register_consumer,
aws_http_client_factory,
account_id,
region_name,
wait_for_kinesis_consumer_ready,
):
# create stream
stream_name = kinesis_create_stream(ShardCount=1)
Expand All @@ -245,28 +249,28 @@ def test_subscribe_to_shard_cbor_at_timestamp(
response["StreamDescription"]["StreamARN"], consumer_name
)
consumer_arn = response["Consumer"]["ConsumerARN"]
url = config.internal_service_url()
headers = mock_aws_request_headers(
"kinesis",
aws_access_key_id=TEST_AWS_ACCESS_KEY_ID,
region_name=region_name,
)
headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1
headers["X-Amz-Target"] = "Kinesis_20131202.SubscribeToShard"
wait_for_kinesis_consumer_ready(consumer_arn=consumer_arn)

kinesis_http_client = aws_http_client_factory("kinesis", signer_factory=SigV4Auth)
endpoint = self._get_endpoint(account_id, region_name)
found_record = False
data = cbor2_dumps(
SubscribeToShardInput(
ConsumerARN=consumer_arn,
ShardId=shard_id,
StartingPosition={
"Type": ShardIteratorType.AT_TIMESTAMP,
# manually set a UTC epoch with milliseconds
"Timestamp": "1718960048000",
{
"ConsumerARN": consumer_arn,
"ShardId": shard_id,
"StartingPosition": {
"Type": "AT_TIMESTAMP",
"Timestamp": datetime.now().astimezone(),
},
),
},
datetime_as_timestamp=True,
)
found_record = False
with requests.post(url, data, headers=headers, stream=True) as result:
headers = {
"Content-Type": constants.APPLICATION_AMZ_CBOR_1_1,
"X-Amz-Target": "Kinesis_20131202.SubscribeToShard",
"Host": endpoint,
}
with kinesis_http_client.post(endpoint, data=data, headers=headers, stream=True) as result:
assert 200 == result.status_code

# put records
Expand Down Expand Up @@ -347,16 +351,14 @@ def test_subscribe_to_shard_with_sequence_number_as_iterator(
results.sort(key=lambda k: k.get("Data"))
snapshot.match("Records", results)

@markers.aws.needs_fixing
# TODO validate test against AWS.
# - if is_aws_cloud():
# - Use proper URL to AWS instead of LocalStack
# - Properly sign / auth manually crafted CBOR request with real credentials
@markers.aws.validated
def test_get_records(
self,
kinesis_create_stream,
wait_for_stream_ready,
aws_client,
aws_http_client_factory,
account_id,
region_name,
kinesis_register_consumer,
snapshot,
Expand All @@ -379,16 +381,22 @@ def test_get_records(

# get records with CBOR encoding
iterator = get_shard_iterator(stream_name, aws_client.kinesis)
url = config.internal_service_url()
headers = mock_aws_request_headers(
"kinesis",
aws_access_key_id=TEST_AWS_ACCESS_KEY_ID,
region_name=region_name,

kinesis_http_client = aws_http_client_factory("kinesis", signer_factory=SigV4Auth)
endpoint = (
f"https://{account_id}.control-kinesis.{region_name}.amazonaws.com"
if is_aws_cloud()
else config.internal_service_url()
)
result = kinesis_http_client.post(
endpoint,
data=cbor2_dumps({"ShardIterator": iterator}, datetime_as_timestamp=True),
headers={
"content-type": constants.APPLICATION_AMZ_CBOR_1_1,
"x-amz-target": "Kinesis_20131202.GetRecords",
"host": endpoint,
},
)
headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1
headers["X-Amz-Target"] = "Kinesis_20131202.GetRecords"
data = cbor2_dumps({"ShardIterator": iterator}, datetime_as_timestamp=True)
result = requests.post(url, data, headers=headers)
assert 200 == result.status_code
result = cbor2_loads(result.content)
attrs = ("Data", "EncryptionType", "PartitionKey", "SequenceNumber")
Expand All @@ -401,16 +409,14 @@ def test_get_records(
== result["Records"][0]["ApproximateArrivalTimestamp"]
)

@markers.aws.needs_fixing
# TODO validate test against AWS.
# - if is_aws_cloud():
# - Use proper URL to AWS instead of LocalStack
# - Properly sign / auth manually crafted CBOR request with real credentials
@markers.aws.validated
def test_get_records_empty_stream(
self,
kinesis_create_stream,
wait_for_stream_ready,
aws_client,
aws_http_client_factory,
account_id,
region_name,
):
stream_name = kinesis_create_stream(ShardCount=1)
Expand All @@ -423,16 +429,21 @@ def test_get_records_empty_stream(
assert 0 == len(json_records)

# empty get records with CBOR encoding
url = config.internal_service_url()
headers = mock_aws_request_headers(
"kinesis",
aws_access_key_id=TEST_AWS_ACCESS_KEY_ID,
region_name=region_name,
kinesis_http_client = aws_http_client_factory("kinesis", signer_factory=SigV4Auth)
endpoint = (
f"https://{account_id}.control-kinesis.{region_name}.amazonaws.com"
if is_aws_cloud()
else config.internal_service_url()
)
cbor_response = kinesis_http_client.post(
endpoint,
data=cbor2_dumps({"ShardIterator": iterator}, datetime_as_timestamp=True),
headers={
"content-type": constants.APPLICATION_AMZ_CBOR_1_1,
"x-amz-target": "Kinesis_20131202.GetRecords",
"host": endpoint,
},
)
headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1
headers["X-Amz-Target"] = "Kinesis_20131202.GetRecords"
data = cbor2_dumps({"ShardIterator": iterator}, datetime_as_timestamp=True)
cbor_response = requests.post(url, data, headers=headers)
assert 200 == cbor_response.status_code
cbor_records_content = cbor2_loads(cbor_response.content)
8000 cbor_records = cbor_records_content.get("Records")
Expand Down Expand Up @@ -610,7 +621,7 @@ def test_subscribe_to_shard_with_at_timestamp_cbor(
endpoint,
data=cbor2_dumps({"StreamARN": stream_arn}, datetime_as_timestamp=True),
headers={
"content-type": "application/x-amz-cbor-1.1",
"content-type": constants.APPLICATION_AMZ_CBOR_1_1,
"x-amz-target": "Kinesis_20131202.DescribeStream",
"host": endpoint,
},
Expand Down Expand Up @@ -639,7 +650,7 @@ def test_subscribe_to_shard_with_at_timestamp_cbor(
datetime_as_timestamp=True,
),
headers={
"content-type": "application/x-amz-cbor-1.1",
"content-type": constants.APPLICATION_AMZ_CBOR_1_1,
"x-amz-target": "Kinesis_20131202.GetShardIterator",
"host": endpoint,
},
Expand Down
6 changes: 6 additions & 0 deletions tests/aws/services/kinesis/test_kinesis.validation.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_create_stream_without_stream_name_raises": {
"last_validated_date": "2024-06-10T09:38:19+00:00"
},
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_get_records": {
"last_validated_date": "2024-07-04T14:57:38+00:00"
},
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_get_records_empty_stream": {
"last_validated_date": "2024-07-04T14:59:11+00:00"
},
"tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_record_lifecycle_data_integrity": {
"last_validated_date": "2022-08-25T10:39:44+00:00"
},
Expand Down
Loading
Loading
0