diff --git a/tests/aws/services/kinesis/test_kinesis.py b/tests/aws/services/kinesis/test_kinesis.py index 4cbd2bf6815dc..696906adf3b2c 100644 --- a/tests/aws/services/kinesis/test_kinesis.py +++ b/tests/aws/services/kinesis/test_kinesis.py @@ -4,7 +4,6 @@ from unittest.mock import patch import pytest -import requests from botocore.auth import SigV4Auth from botocore.config import Config as BotoConfig from botocore.exceptions import ClientError @@ -14,14 +13,11 @@ from cbor2._encoder import dumps as cbor2_dumps from localstack import config, constants -from localstack.aws.api.kinesis import ShardIteratorType, SubscribeToShardInput from localstack.aws.client import _patch_cbor2 from localstack.services.kinesis import provider as kinesis_provider from localstack.testing.aws.util import is_aws_cloud -from localstack.testing.config import TEST_AWS_ACCESS_KEY_ID from localstack.testing.pytest import markers from localstack.utils.aws import resources -from localstack.utils.aws.request_context import mock_aws_request_headers from localstack.utils.common import retry, select_attributes, short_uid from localstack.utils.kinesis import kinesis_connector @@ -51,6 +47,14 @@ def get_shard_iterator(stream_name, kinesis_client): class TestKinesis: + @staticmethod + def _get_endpoint(account_id: str, region_name: str): + return ( + f"https://{account_id}.control-kinesis.{region_name}.amazonaws.com" + if is_aws_cloud() + else config.internal_service_url() + ) + @markers.aws.validated def test_create_stream_without_stream_name_raises(self, aws_client_factory): boto_config = BotoConfig(parameter_validation=False) @@ -219,17 +223,17 @@ def test_subscribe_to_shard_with_at_timestamp( snapshot.match("Records", results) @markers.aws.needs_fixing - # TODO validate test against AWS. - # - if is_aws_cloud(): - # - Use proper URL to AWS instead of LocalStack - # - Properly sign / auth manually crafted CBOR request with real credentials + # TODO SubscribeToShard raises a 500 (Internal Server Error) against AWS def test_subscribe_to_shard_cbor_at_timestamp( self, kinesis_create_stream, wait_for_stream_ready, aws_client, kinesis_register_consumer, + aws_http_client_factory, + account_id, region_name, + wait_for_kinesis_consumer_ready, ): # create stream stream_name = kinesis_create_stream(ShardCount=1) @@ -245,28 +249,28 @@ def test_subscribe_to_shard_cbor_at_timestamp( response["StreamDescription"]["StreamARN"], consumer_name ) consumer_arn = response["Consumer"]["ConsumerARN"] - url = config.internal_service_url() - headers = mock_aws_request_headers( - "kinesis", - aws_access_key_id=TEST_AWS_ACCESS_KEY_ID, - region_name=region_name, - ) - headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1 - headers["X-Amz-Target"] = "Kinesis_20131202.SubscribeToShard" + wait_for_kinesis_consumer_ready(consumer_arn=consumer_arn) + + kinesis_http_client = aws_http_client_factory("kinesis", signer_factory=SigV4Auth) + endpoint = self._get_endpoint(account_id, region_name) + found_record = False data = cbor2_dumps( - SubscribeToShardInput( - ConsumerARN=consumer_arn, - ShardId=shard_id, - StartingPosition={ - "Type": ShardIteratorType.AT_TIMESTAMP, - # manually set a UTC epoch with milliseconds - "Timestamp": "1718960048000", + { + "ConsumerARN": consumer_arn, + "ShardId": shard_id, + "StartingPosition": { + "Type": "AT_TIMESTAMP", + "Timestamp": datetime.now().astimezone(), }, - ), + }, datetime_as_timestamp=True, ) - found_record = False - with requests.post(url, data, headers=headers, stream=True) as result: + headers = { + "Content-Type": constants.APPLICATION_AMZ_CBOR_1_1, + "X-Amz-Target": "Kinesis_20131202.SubscribeToShard", + "Host": endpoint, + } + with kinesis_http_client.post(endpoint, data=data, headers=headers, stream=True) as result: assert 200 == result.status_code # put records @@ -347,16 +351,14 @@ def test_subscribe_to_shard_with_sequence_number_as_iterator( results.sort(key=lambda k: k.get("Data")) snapshot.match("Records", results) - @markers.aws.needs_fixing - # TODO validate test against AWS. - # - if is_aws_cloud(): - # - Use proper URL to AWS instead of LocalStack - # - Properly sign / auth manually crafted CBOR request with real credentials + @markers.aws.validated def test_get_records( self, kinesis_create_stream, wait_for_stream_ready, aws_client, + aws_http_client_factory, + account_id, region_name, kinesis_register_consumer, snapshot, @@ -379,16 +381,22 @@ def test_get_records( # get records with CBOR encoding iterator = get_shard_iterator(stream_name, aws_client.kinesis) - url = config.internal_service_url() - headers = mock_aws_request_headers( - "kinesis", - aws_access_key_id=TEST_AWS_ACCESS_KEY_ID, - region_name=region_name, + + kinesis_http_client = aws_http_client_factory("kinesis", signer_factory=SigV4Auth) + endpoint = ( + f"https://{account_id}.control-kinesis.{region_name}.amazonaws.com" + if is_aws_cloud() + else config.internal_service_url() + ) + result = kinesis_http_client.post( + endpoint, + data=cbor2_dumps({"ShardIterator": iterator}, datetime_as_timestamp=True), + headers={ + "content-type": constants.APPLICATION_AMZ_CBOR_1_1, + "x-amz-target": "Kinesis_20131202.GetRecords", + "host": endpoint, + }, ) - headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1 - headers["X-Amz-Target"] = "Kinesis_20131202.GetRecords" - data = cbor2_dumps({"ShardIterator": iterator}, datetime_as_timestamp=True) - result = requests.post(url, data, headers=headers) assert 200 == result.status_code result = cbor2_loads(result.content) attrs = ("Data", "EncryptionType", "PartitionKey", "SequenceNumber") @@ -401,16 +409,14 @@ def test_get_records( == result["Records"][0]["ApproximateArrivalTimestamp"] ) - @markers.aws.needs_fixing - # TODO validate test against AWS. - # - if is_aws_cloud(): - # - Use proper URL to AWS instead of LocalStack - # - Properly sign / auth manually crafted CBOR request with real credentials + @markers.aws.validated def test_get_records_empty_stream( self, kinesis_create_stream, wait_for_stream_ready, aws_client, + aws_http_client_factory, + account_id, region_name, ): stream_name = kinesis_create_stream(ShardCount=1) @@ -423,16 +429,21 @@ def test_get_records_empty_stream( assert 0 == len(json_records) # empty get records with CBOR encoding - url = config.internal_service_url() - headers = mock_aws_request_headers( - "kinesis", - aws_access_key_id=TEST_AWS_ACCESS_KEY_ID, - region_name=region_name, + kinesis_http_client = aws_http_client_factory("kinesis", signer_factory=SigV4Auth) + endpoint = ( + f"https://{account_id}.control-kinesis.{region_name}.amazonaws.com" + if is_aws_cloud() + else config.internal_service_url() + ) + cbor_response = kinesis_http_client.post( + endpoint, + data=cbor2_dumps({"ShardIterator": iterator}, datetime_as_timestamp=True), + headers={ + "content-type": constants.APPLICATION_AMZ_CBOR_1_1, + "x-amz-target": "Kinesis_20131202.GetRecords", + "host": endpoint, + }, ) - headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1 - headers["X-Amz-Target"] = "Kinesis_20131202.GetRecords" - data = cbor2_dumps({"ShardIterator": iterator}, datetime_as_timestamp=True) - cbor_response = requests.post(url, data, headers=headers) assert 200 == cbor_response.status_code cbor_records_content = cbor2_loads(cbor_response.content) cbor_records = cbor_records_content.get("Records") @@ -610,7 +621,7 @@ def test_subscribe_to_shard_with_at_timestamp_cbor( endpoint, data=cbor2_dumps({"StreamARN": stream_arn}, datetime_as_timestamp=True), headers={ - "content-type": "application/x-amz-cbor-1.1", + "content-type": constants.APPLICATION_AMZ_CBOR_1_1, "x-amz-target": "Kinesis_20131202.DescribeStream", "host": endpoint, }, @@ -639,7 +650,7 @@ def test_subscribe_to_shard_with_at_timestamp_cbor( datetime_as_timestamp=True, ), headers={ - "content-type": "application/x-amz-cbor-1.1", + "content-type": constants.APPLICATION_AMZ_CBOR_1_1, "x-amz-target": "Kinesis_20131202.GetShardIterator", "host": endpoint, }, diff --git a/tests/aws/services/kinesis/test_kinesis.validation.json b/tests/aws/services/kinesis/test_kinesis.validation.json index fad051c9cb54b..6eab02c8bcb1c 100644 --- a/tests/aws/services/kinesis/test_kinesis.validation.json +++ b/tests/aws/services/kinesis/test_kinesis.validation.json @@ -8,6 +8,12 @@ "tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_create_stream_without_stream_name_raises": { "last_validated_date": "2024-06-10T09:38:19+00:00" }, + "tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_get_records": { + "last_validated_date": "2024-07-04T14:57:38+00:00" + }, + "tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_get_records_empty_stream": { + "last_validated_date": "2024-07-04T14:59:11+00:00" + }, "tests/aws/services/kinesis/test_kinesis.py::TestKinesis::test_record_lifecycle_data_integrity": { "last_validated_date": "2022-08-25T10:39:44+00:00" }, diff --git a/tests/aws/services/opensearch/test_opensearch.py b/tests/aws/services/opensearch/test_opensearch.py index 2ede494e0b7e2..e0a82e1af43b9 100644 --- a/tests/aws/services/opensearch/test_opensearch.py +++ b/tests/aws/services/opensearch/test_opensearch.py @@ -94,7 +94,7 @@ class TestOpensearchProvider: OPENSEARCH_MULTI_CLUSTER=True, regardless of changes in the config value. """ - @markers.aws.unknown + @markers.aws.validated def test_list_versions(self, aws_client): response = aws_client.opensearch.list_versions() @@ -130,7 +130,7 @@ def test_list_versions(self, aws_client): for expected_version in expected_versions: assert expected_version in versions - @markers.aws.unknown + @markers.aws.needs_fixing def test_get_compatible_versions(self, aws_client): response = aws_client.opensearch.get_compatible_versions() @@ -331,7 +331,7 @@ def test_get_compatible_versions(self, aws_client): for expected_compatible_version in expected_compatible_versions: assert expected_compatible_version in compatible_versions - @markers.aws.unknown + @markers.aws.needs_fixing def test_get_compatible_version_for_domain(self, opensearch_create_domain, aws_client): opensearch_domain = opensearch_create_domain(EngineVersion=ELASTICSEARCH_DEFAULT_VERSION) response = aws_client.opensearch.get_compatible_versions(DomainName=opensearch_domain) @@ -344,7 +344,7 @@ def test_get_compatible_version_for_domain(self, opensearch_create_domain, aws_c # Just check if 1.1 is contained (not equality) to avoid breaking the test if new versions are supported assert "OpenSearch_1.3" in compatibility["TargetVersions"] - @markers.aws.unknown + @markers.aws.needs_fixing def test_create_domain(self, opensearch_wait_for_cluster, aws_client): domain_name = f"opensearch-domain-{short_uid()}" try: @@ -474,7 +474,7 @@ def test_create_domain_with_invalid_name(self, aws_client): aws_client.opensearch.create_domain(DomainName="abc#") # no special characters allowed assert e.value.response["Error"]["Code"] == "ValidationException" - @markers.aws.unknown + @markers.aws.needs_fixing def test_create_domain_with_invalid_custom_endpoint(self, aws_client): with pytest.raises(botocore.exceptions.ClientError) as e: aws_client.opensearch.create_domain( @@ -505,7 +505,7 @@ def test_exception_header_field(self, aws_client): == "ValidationException" ) - @markers.aws.unknown + @markers.aws.needs_fixing def test_create_existing_domain_causes_exception(self, opensearch_wait_for_cluster, aws_client): domain_name = f"opensearch-domain-{short_uid()}" try: @@ -517,13 +517,13 @@ def test_create_existing_domain_causes_exception(self, opensearch_wait_for_clust finally: aws_client.opensearch.delete_domain(DomainName=domain_name) - @markers.aws.unknown + @markers.aws.needs_fixing def test_describe_domains(self, opensearch_domain, aws_client): response = aws_client.opensearch.describe_domains(DomainNames=[opensearch_domain]) assert len(response["DomainStatusList"]) == 1 assert response["DomainStatusList"][0]["DomainName"] == opensearch_domain - @markers.aws.unknown + @markers.aws.needs_fixing def test_gzip_responses(self, opensearch_endpoint, monkeypatch): def send_plain_request(method, url): """ @@ -556,7 +556,7 @@ def send_plain_request(method, url): # force the gzip decoding here (which would raise an exception if it's not actually gzip) assert gzip.decompress(raw_gzip_data) - @markers.aws.unknown + @markers.aws.needs_fixing def test_domain_version(self, opensearch_domain, opensearch_create_domain, aws_client): response = aws_client.opensearch.describe_domain(DomainName=opensearch_domain) assert "DomainStatus" in response @@ -564,7 +564,7 @@ def test_domain_version(self, opensearch_domain, opensearch_create_domain, aws_c assert "EngineVersion" in status assert status["EngineVersion"] == OPENSEARCH_DEFAULT_VERSION - @markers.aws.unknown + @markers.aws.needs_fixing def test_update_domain_config(self, opensearch_domain, aws_client): initial_response = aws_client.opensearch.describe_domain_config( DomainName=opensearch_domain @@ -587,7 +587,7 @@ def test_update_domain_config(self, opensearch_domain, aws_client): == final_response["DomainConfig"]["ClusterConfig"]["Options"]["InstanceType"] ) - @markers.aws.unknown + @markers.aws.needs_fixing def test_create_indices(self, opensearch_endpoint): indices = ["index1", "index2"] for index_name in indices: @@ -627,14 +627,14 @@ def test_create_indices(self, opensearch_endpoint): get = requests.get(document_path) assert get.status_code == 200 - @markers.aws.unknown + @markers.aws.needs_fixing def test_get_document(self, opensearch_document_path): response = requests.get(opensearch_document_path) assert ( "I'm just a simple man" in response.text ), f"document not found({response.status_code}): {response.text}" - @markers.aws.unknown + @markers.aws.needs_fixing def test_search(self, opensearch_endpoint, opensearch_document_path): index = "/".join(opensearch_document_path.split("/")[:-2]) # force the refresh of the index after the document was added, so it can appear in search @@ -648,7 +648,7 @@ def test_search(self, opensearch_endpoint, opensearch_document_path): "I'm just a simple man" in response.text ), f"search unsuccessful({response.status_code}): {response.text}" - @markers.aws.unknown + @markers.aws.only_localstack def test_endpoint_strategy_path(self, monkeypatch, opensearch_create_domain, aws_client): monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "path") @@ -661,7 +661,7 @@ def test_endpoint_strategy_path(self, monkeypatch, opensearch_create_domain, aws endpoint = status["Endpoint"] assert endpoint.endswith(f"/{domain_name}") - @markers.aws.unknown + @markers.aws.only_localstack def test_endpoint_strategy_port(self, monkeypatch, opensearch_create_domain, aws_client): monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "port") @@ -679,7 +679,7 @@ def test_endpoint_strategy_port(self, monkeypatch, opensearch_create_domain, aws ) # testing CloudFormation deployment here to make sure OpenSearch is installed - @markers.aws.unknown + @markers.aws.needs_fixing def test_cloudformation_deployment(self, deploy_cfn_template, aws_client): domain_name = f"domain-{short_uid()}" deploy_cfn_template( @@ -696,7 +696,7 @@ def test_cloudformation_deployment(self, deploy_cfn_template, aws_client): @markers.skip_offline class TestEdgeProxiedOpensearchCluster: - @markers.aws.unknown + @markers.aws.only_localstack def test_route_through_edge(self): cluster_id = f"domain-{short_uid()}" cluster_url = f"{config.internal_service_url()}/{cluster_id}" @@ -727,7 +727,7 @@ def test_route_through_edge(self): lambda: not cluster.is_up(), timeout=240 ), "gave up waiting for cluster to shut down" - @markers.aws.unknown + @markers.aws.only_localstack def test_custom_endpoint( self, opensearch_wait_for_cluster, opensearch_create_domain, aws_client ): @@ -759,7 +759,7 @@ def test_custom_endpoint( assert response.ok assert response.status_code == 200 - @markers.aws.unknown + @markers.aws.only_localstack def test_custom_endpoint_disabled( self, opensearch_wait_for_cluster, opensearch_create_domain, aws_client ): @@ -793,7 +793,7 @@ def test_custom_endpoint_disabled( @markers.skip_offline class TestMultiClusterManager: - @markers.aws.unknown + @markers.aws.only_localstack def test_multi_cluster(self, account_id, monkeypatch): monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain") monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", True) @@ -841,7 +841,7 @@ def test_multi_cluster(self, account_id, monkeypatch): @markers.skip_offline class TestMultiplexingClusterManager: - @markers.aws.unknown + @markers.aws.only_localstack def test_multiplexing_cluster(self, account_id, monkeypatch): monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain") monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", False) @@ -889,7 +889,7 @@ def test_multiplexing_cluster(self, account_id, monkeypatch): @markers.skip_offline class TestSingletonClusterManager: - @markers.aws.unknown + @markers.aws.only_localstack def test_endpoint_strategy_port_singleton_cluster(self, account_id, monkeypatch): monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "port") monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", False) @@ -935,7 +935,7 @@ def test_endpoint_strategy_port_singleton_cluster(self, account_id, monkeypatch) @markers.skip_offline class TestCustomBackendManager: - @markers.aws.unknown + @markers.aws.only_localstack def test_custom_backend(self, account_id, httpserver, monkeypatch): monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain") monkeypatch.setattr(config, "OPENSEARCH_CUSTOM_BACKEND", httpserver.url_for("/")) @@ -1001,7 +1001,7 @@ def test_custom_backend(self, account_id, httpserver, monkeypatch): httpserver.check() - @markers.aws.unknown + @markers.aws.only_localstack def test_custom_backend_with_custom_endpoint( self, httpserver, diff --git a/tests/aws/services/opensearch/test_opensearch.validation.json b/tests/aws/services/opensearch/test_opensearch.validation.json new file mode 100644 index 0000000000000..d4104bd7d5022 --- /dev/null +++ b/tests/aws/services/opensearch/test_opensearch.validation.json @@ -0,0 +1,5 @@ +{ + "tests/aws/services/opensearch/test_opensearch.py::TestOpensearchProvider::test_list_versions": { + "last_validated_date": "2024-07-04T15:26:07+00:00" + } +}