From 9a72897013345c1995917db11df9d080302697df Mon Sep 17 00:00:00 2001 From: Benjamin Simon Date: Tue, 2 Aug 2022 16:05:48 +0200 Subject: [PATCH 1/4] refactor tests --- localstack/services/sns/provider.py | 13 +- tests/integration/test_sns.py | 395 ++++++++++++++++------------ 2 files changed, 237 insertions(+), 171 deletions(-) diff --git a/localstack/services/sns/provider.py b/localstack/services/sns/provider.py index eb38f85297475..1b6db6b235a4b 100644 --- a/localstack/services/sns/provider.py +++ b/localstack/services/sns/provider.py @@ -448,19 +448,19 @@ def publish_batch( ) -> PublishBatchResponse: if len(publish_batch_request_entries) > 10: raise TooManyEntriesInBatchRequestException( - "The batch request contains more entries than permissible" + "The batch request contains more entries than permissible." ) ids = [entry["Id"] for entry in publish_batch_request_entries] if len(set(ids)) != len(publish_batch_request_entries): raise BatchEntryIdsNotDistinctException( - "Two or more batch entries in the request have the same Id" + "Two or more batch entries in the request have the same Id." ) if topic_arn and ".fifo" in topic_arn: if not all(["MessageGroupId" in entry for entry in publish_batch_request_entries]): raise InvalidParameterException( - "The MessageGroupId parameter is required for FIFO topics" + "Invalid parameter: The MessageGroupId parameter is required for FIFO topics" ) response = {"Successful": [], "Failed": []} for entry in publish_batch_request_entries: @@ -651,7 +651,7 @@ def publish( ) -> PublishResponse: # We do not want the request to be forwarded to SNS backend if subject == "": - raise InvalidParameterException("Empty string for subject is not supported") + raise InvalidParameterException("Invalid parameter: Subject") if not message or all(not m for m in message): raise InvalidParameterException("Empty message") @@ -821,7 +821,9 @@ def create_topic( topic_arn = moto_response["TopicArn"] tag_resource_success = extract_tags(topic_arn, tags, True, sns_backend) if not tag_resource_success: - raise InvalidParameterException("Topic already exists with different tags") + raise InvalidParameterException( + "Invalid parameter: Tags Reason: Topic already exists with different tags" + ) if tags: self.tag_resource(context=context, resource_arn=topic_arn, tags=tags) sns_backend.sns_subscriptions[topic_arn] = ( @@ -845,6 +847,7 @@ def message_to_subscribers( skip_checks=False, message_attributes=None, ): + # AWS allows using TargetArn to publish to a topic, for backward compatibility if not topic_arn: topic_arn = req_data.get("TargetArn") sns_backend = SNSBackend.get() diff --git a/tests/integration/test_sns.py b/tests/integration/test_sns.py index 5dc1ffeadcf51..3e4c649eada9e 100644 --- a/tests/integration/test_sns.py +++ b/tests/integration/test_sns.py @@ -16,6 +16,7 @@ from localstack.aws.accounts import get_aws_account_id from localstack.services.install import SQS_BACKEND_IMPL from localstack.services.sns.provider import SNSBackend +from localstack.testing.aws.util import is_aws_cloud from localstack.utils import testutil from localstack.utils.aws import aws_stack from localstack.utils.net import wait_for_port_closed, wait_for_port_open @@ -25,7 +26,6 @@ from .awslambda.functions import lambda_integration from .awslambda.test_lambda import ( - LAMBDA_RUNTIME_PYTHON36, LAMBDA_RUNTIME_PYTHON37, TEST_LAMBDA_FUNCTION_PREFIX, TEST_LAMBDA_LIBS, @@ -515,6 +515,7 @@ def test_tags(self, sns_client, sns_create_topic, snapshot): tags = sns_client.list_tags_for_resource(ResourceArn=topic_arn) snapshot.match("list-after-update-tags", tags) + @pytest.mark.only_localstack def test_topic_subscription(self, sns_client, sns_create_topic, sns_subscription): topic_arn = sns_create_topic()["TopicArn"] subscription = sns_subscription( @@ -535,67 +536,100 @@ def check_subscription(): retry(check_subscription, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT) + @pytest.mark.aws_validated def test_sqs_topic_subscription_confirmation( - self, sns_client, sns_create_topic, sqs_create_queue, sqs_queue_arn, sns_subscription + self, sns_client, sns_create_topic, sqs_create_queue, sns_create_sqs_subscription ): topic_arn = sns_create_topic()["TopicArn"] - queue_arn = sqs_queue_arn(sqs_create_queue()) - subscription = sns_subscription( - TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, ReturnSubscriptionArn=True - ) + queue_url = sqs_create_queue() + subscription_attrs = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url) def check_subscription(): - subscription_arn = subscription["SubscriptionArn"] - subscription_attrs = sns_client.get_subscription_attributes( - SubscriptionArn=subscription_arn - ) - assert subscription_attrs["Attributes"]["PendingConfirmation"] == "false" + nonlocal subscription_attrs + if not subscription_attrs["PendingConfirmation"] == "false": + subscription_arn = subscription_attrs["SubscriptionArn"] + subscription_attrs = sns_client.get_subscription_attributes( + SubscriptionArn=subscription_arn + )["Attributes"] + return subscription_attrs["PendingConfirmation"] == "false" + + # SQS subscriptions are auto confirmed if they are from the user and in the same region + # todo check that + assert poll_condition(check_subscription, timeout=5) - retry(check_subscription, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT) - - def test_dead_letter_queue( + @pytest.mark.aws_validated + def test_sns_topic_as_lambda_dead_letter_queue( self, sns_client, sqs_client, + lambda_client, + lambda_su_role, + create_lambda_function, sns_create_topic, sqs_create_queue, - sqs_queue_arn, - create_lambda_function, sns_subscription, + sns_create_sqs_subscription, ): - lambda_name = f"test-{short_uid()}" - lambda_arn = aws_stack.lambda_function_arn(lambda_name) - topic_arn = sns_create_topic()["TopicArn"] - queue_name = f"test-{short_uid()}" - queue_url = sqs_create_queue(QueueName=queue_name) - queue_arn = sqs_queue_arn(queue_url) + # create an SNS topic that will be used as a DLQ by the lambda + dlq_topic_arn = sns_create_topic()["TopicArn"] + queue_url = sqs_create_queue() - create_lambda_function( - func_name=lambda_name, + # sqs_subscription + sns_create_sqs_subscription(topic_arn=dlq_topic_arn, queue_url=queue_url) + + # create an SNS topic that will be used to invoke the lambda + lambda_topic_arn = sns_create_topic()["TopicArn"] + + function_name = f"{TEST_LAMBDA_FUNCTION_PREFIX}-{short_uid()}" + lambda_creation_response = create_lambda_function( + func_name=function_name, handler_file=TEST_LAMBDA_PYTHON, - libs=TEST_LAMBDA_LIBS, - runtime=LAMBDA_RUNTIME_PYTHON36, - DeadLetterConfig={"TargetArn": queue_arn}, + runtime=LAMBDA_RUNTIME_PYTHON37, + role=lambda_su_role, + DeadLetterConfig={"TargetArn": dlq_topic_arn}, + ) + lambda_arn = lambda_creation_response["CreateFunctionResponse"]["FunctionArn"] + + # allow the SNS topic to invoke the lambda + permission_id = f"test-statement-{short_uid()}" + lambda_client.add_permission( + FunctionName=function_name, + StatementId=permission_id, + Action="lambda:InvokeFunction", + Principal="sns.amazonaws.com", + SourceArn=lambda_topic_arn, + ) + + # subscribe the lambda to the SNS topic: lambda_subscription + sns_subscription( + TopicArn=lambda_topic_arn, + Protocol="lambda", + Endpoint=lambda_arn, ) - sns_subscription(TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn) payload = { lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1, } - sns_client.publish(TopicArn=topic_arn, Message=json.dumps(payload)) + sns_client.publish(TopicArn=lambda_topic_arn, Message=json.dumps(payload)) def receive_dlq(): result = sqs_client.receive_message( QueueUrl=queue_url, MessageAttributeNames=["All"], VisibilityTimeout=0 ) - msg_attrs = result["Messages"][0]["MessageAttributes"] assert len(result["Messages"]) > 0 + msg_body = json.loads(result["Messages"][0]["Body"]) + msg_attrs = msg_body["MessageAttributes"] assert "RequestID" in msg_attrs assert "ErrorCode" in msg_attrs assert "ErrorMessage" in msg_attrs - retry(receive_dlq, retries=8, sleep=2) + # check that the SQS queue subscribed to the SNS topic used as DLQ received the error from the lambda + # on AWS, event retries can be quite delayed, so we have to wait up to 6 minutes here + # reduced retries when using localstack to avoid tests flaking + retries = 120 if is_aws_cloud() else 3 + retry(receive_dlq, retries=retries, sleep=3) + @pytest.mark.only_localstack def test_redrive_policy_http_subscription( self, sns_client, @@ -605,7 +639,6 @@ def test_redrive_policy_http_subscription( sqs_queue_arn, sns_subscription, ): - # self.unsubscribe_all_from_sns() dlq_name = f"dlq-{short_uid()}" dlq_url = sqs_create_queue(QueueName=dlq_name) dlq_arn = sqs_queue_arn(dlq_url) @@ -649,28 +682,34 @@ def test_redrive_policy_http_subscription( assert message["Type"] == "Notification" assert json.loads(message["Message"])["message"] == "test_redrive_policy" + @pytest.mark.aws_validated def test_redrive_policy_lambda_subscription( self, sns_client, sns_create_topic, sqs_create_queue, sqs_queue_arn, + lambda_client, create_lambda_function, + lambda_su_role, sqs_client, sns_subscription, + sns_allow_topic_sqs_queue, ): - # self.unsubscribe_all_from_sns() - dlq_name = f"dlq-{short_uid()}" - dlq_url = sqs_create_queue(QueueName=dlq_name) + dlq_url = sqs_create_queue() dlq_arn = sqs_queue_arn(dlq_url) topic_arn = sns_create_topic()["TopicArn"] + sns_allow_topic_sqs_queue( + sqs_queue_url=dlq_url, sqs_queue_arn=dlq_arn, sns_topic_arn=topic_arn + ) lambda_name = f"test-{short_uid()}" lambda_arn = create_lambda_function( func_name=lambda_name, libs=TEST_LAMBDA_LIBS, handler_file=TEST_LAMBDA_PYTHON, - runtime=LAMBDA_RUNTIME_PYTHON36, + runtime=LAMBDA_RUNTIME_PYTHON37, + role=lambda_su_role, )["CreateFunctionResponse"]["FunctionArn"] subscription = sns_subscription(TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn) @@ -680,7 +719,7 @@ def test_redrive_policy_lambda_subscription( AttributeName="RedrivePolicy", AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}), ) - testutil.delete_lambda_function(lambda_name) + lambda_client.delete_function(FunctionName=lambda_name) sns_client.publish( TopicArn=topic_arn, @@ -695,45 +734,7 @@ def test_redrive_policy_lambda_subscription( assert message["Type"] == "Notification" assert json.loads(message["Message"])["message"] == "test_redrive_policy" - def test_redrive_policy_queue_subscription( - self, - sns_client, - sns_create_topic, - sqs_create_queue, - sqs_queue_arn, - sqs_client, - sns_subscription, - ): - # self.unsubscribe_all_from_sns() - dlq_name = f"dlq-{short_uid()}" - dlq_url = sqs_create_queue(QueueName=dlq_name) - dlq_arn = sqs_queue_arn(dlq_url) - - topic_arn = sns_create_topic()["TopicArn"] - invalid_queue_arn = aws_stack.sqs_queue_arn("invalid_queue") - # subscribe with an invalid queue ARN, to trigger event on DLQ below - subscription = sns_subscription( - TopicArn=topic_arn, Protocol="sqs", Endpoint=invalid_queue_arn - ) - - sns_client.set_subscription_attributes( - SubscriptionArn=subscription["SubscriptionArn"], - AttributeName="RedrivePolicy", - AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}), - ) - - sns_client.publish( - TopicArn=topic_arn, Message=json.dumps({"message": "test_redrive_policy"}) - ) - - response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=10) - assert ( - len(response["Messages"]) == 1 - ), f"invalid number of messages in DLQ response {response}" - message = json.loads(response["Messages"][0]["Body"]) - assert message["Type"] == "Notification" - assert json.loads(message["Message"])["message"] == "test_redrive_policy" - + @pytest.mark.aws_validated def test_publish_with_empty_subject(self, sns_client, sns_create_topic): topic_arn = sns_create_topic()["TopicArn"] @@ -749,7 +750,9 @@ def test_publish_with_empty_subject(self, sns_client, sns_create_topic): ) assert e.value.response["Error"]["Code"] == "InvalidParameter" + assert e.value.response["Error"]["Message"] == "Invalid parameter: Subject" + # todo test with snapshot to aws validate it def test_create_topic_test_arn(self, sns_create_topic, sns_client): topic_name = f"topic-{short_uid()}" response = sns_create_topic(Name=topic_name) @@ -758,120 +761,122 @@ def test_create_topic_test_arn(self, sns_create_topic, sns_client): assert topic_arn_params[4] == get_aws_account_id() assert topic_arn_params[5] == topic_name + @pytest.mark.aws_validated def test_publish_message_by_target_arn( - self, sns_client, sns_create_topic, create_lambda_function, sns_subscription + self, + sns_client, + sqs_client, + sns_create_topic, + sqs_create_queue, + sns_create_sqs_subscription, ): - # self.unsubscribe_all_from_sns() - - func_name = f"lambda-{short_uid()}" + # using an SQS subscription to test TopicArn/TargetArn as it is easier to check against AWS topic_arn = sns_create_topic()["TopicArn"] + queue_url = sqs_create_queue() + sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url) - lambda_arn = create_lambda_function( - handler_file=TEST_LAMBDA_PYTHON_ECHO, - func_name=func_name, - runtime=LAMBDA_RUNTIME_PYTHON36, - )["CreateFunctionResponse"]["FunctionArn"] - subscription_arn = sns_subscription( - TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn - )["SubscriptionArn"] - - sns_client.publish(TopicArn=topic_arn, Message="test_message_1", Subject="test subject") + sns_client.publish(TopicArn=topic_arn, Message="test-msg-1") - # Lambda invoked 1 time - events = retry( - check_expected_lambda_log_events_length, - retries=3, - sleep=1, - function_name=func_name, - expected_length=1, + response = sqs_client.receive_message( + QueueUrl=queue_url, + MessageAttributeNames=["All"], + VisibilityTimeout=0, + WaitTimeSeconds=4, ) - message = events[0]["Records"][0] - assert message["EventSubscriptionArn"] == subscription_arn + assert len(response["Messages"]) == 1 + message = response["Messages"][0] + msg_body = json.loads(message["Body"]) + assert msg_body["Message"] == "test-msg-1" - sns_client.publish(TargetArn=topic_arn, Message="test_message_2", Subject="test subject") + sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]) - events = retry( - check_expected_lambda_log_events_length, - retries=3, - sleep=1, - function_name=func_name, - expected_length=2, - ) - # Lambda invoked 1 more time - assert len(events) == 2 + # publish with TargetArn instead of TopicArn + sns_client.publish(TargetArn=topic_arn, Message="test-msg-2") - for event in events: - message = event["Records"][0] - assert message["EventSubscriptionArn"] == subscription_arn + response = sqs_client.receive_message( + QueueUrl=queue_url, + MessageAttributeNames=["All"], + VisibilityTimeout=0, + WaitTimeSeconds=4, + ) + assert len(response["Messages"]) == 1 + message = response["Messages"][0] + msg_body = json.loads(message["Body"]) + assert msg_body["Message"] == "test-msg-2" - def test_publish_message_after_subscribe_topic( + @pytest.mark.aws_validated + def test_publish_message_before_subscribe_topic( self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, - sqs_queue_arn, sns_subscription, + sns_create_sqs_subscription, ): - # self.unsubscribe_all_from_sns() - topic_arn = sns_create_topic()["TopicArn"] - queue_url = sqs_create_queue() - queue_arn = sqs_queue_arn(queue_url) rs = sns_client.publish( TopicArn=topic_arn, Subject="test subject", Message="test_message_1" ) assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200 - sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn) + sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url) message_subject = "sqs subject" message_body = "test_message_2" rs = sns_client.publish(TopicArn=topic_arn, Subject=message_subject, Message=message_body) - # time.sleep(100) assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200 message_id = rs["MessageId"] - def get_message(q_url): - resp = sqs_client.receive_message(QueueUrl=q_url, VisibilityTimeout=0) - return json.loads(resp["Messages"][0]["Body"]) - - message = retry(get_message, retries=3, sleep=2, q_url=queue_url) + response = sqs_client.receive_message( + QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=5 + ) + # nothing was subscribing to the topic, so the first message is lost + assert len(response["Messages"]) == 1 + message = json.loads(response["Messages"][0]["Body"]) assert message["MessageId"] == message_id assert message["Subject"] == message_subject assert message["Message"] == message_body + @pytest.mark.aws_validated def test_create_duplicate_topic_with_more_tags(self, sns_client, sns_create_topic): topic_name = f"test-{short_uid()}" sns_create_topic(Name=topic_name) with pytest.raises(ClientError) as e: - sns_client.create_topic(Name=topic_name, Tags=[{"Key": "456", "Value": "pqr"}]) + sns_client.create_topic(Name=topic_name, Tags=[{"Key": "key1", "Value": "value1"}]) assert e.value.response["Error"]["Code"] == "InvalidParameter" - assert e.value.response["Error"]["Message"] == "Topic already exists with different tags" + assert ( + e.value.response["Error"]["Message"] + == "Invalid parameter: Tags Reason: Topic already exists with different tags" + ) assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400 + @pytest.mark.aws_validated def test_create_duplicate_topic_check_idempotency(self, sns_create_topic): topic_name = f"test-{short_uid()}" tags = [{"Key": "a", "Value": "1"}, {"Key": "b", "Value": "2"}] kwargs = [ - {"Tags": tags}, # to create topic with two tags {"Tags": tags}, # to create the same topic again with same tags {"Tags": [tags[0]]}, # to create the same topic again with one of the tags from above {"Tags": []}, # to create the same topic again with no tags ] - responses = [] + + # create topic with two tags + response = sns_create_topic(Name=topic_name, Tags=tags) + topic_arn = response["TopicArn"] + for arg in kwargs: - responses.append(sns_create_topic(Name=topic_name, **arg)) - # assert TopicArn is returned by all the above create_topic calls - for i in range(len(responses)): - assert "TopicArn" in responses[i] + response = sns_create_topic(Name=topic_name, **arg) + # assert TopicArn returned by all the above create_topic calls is the same as the original + assert response["TopicArn"] == topic_arn + @pytest.mark.only_localstack @pytest.mark.skip( reason="Idempotency not supported in Moto backend. See bug https://github.com/spulec/moto/issues/2333" ) @@ -901,6 +906,7 @@ def test_create_platform_endpoint_check_idempotency(self, sns_client): sns_client.delete_endpoint(EndpointArn=endpoint_arn) sns_client.delete_platform_application(PlatformApplicationArn=platform_arn) + # todo check this later? def test_publish_by_path_parameters( self, sns_create_topic, @@ -988,6 +994,7 @@ def handler(request): for server in servers: server.stop() + @pytest.mark.only_localstack def test_publish_sms_endpoint(self, sns_client, sns_create_topic, sns_subscription): list_of_contacts = [ f"+{random.randint(100000000, 9999999999)}", @@ -1174,12 +1181,24 @@ def get_messages(): retry(get_messages, retries=3, sleep=1) def test_publish_batch_messages_from_fifo_topic_to_fifo_queue( - self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, sns_subscription + self, + sns_client, + sns_create_topic, + sqs_client, + sqs_create_queue, + sns_create_sqs_subscription, ): topic_name = f"topic-{short_uid()}.fifo" queue_name = f"queue-{short_uid()}.fifo" - topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"] + topic_arn = sns_create_topic( + Name=topic_name, + Attributes={ + "FifoTopic": "true", + "ContentBasedDeduplication": "true", + }, + )["TopicArn"] + queue_url = sqs_create_queue( QueueName=queue_name, Attributes={ @@ -1188,12 +1207,15 @@ def test_publish_batch_messages_from_fifo_topic_to_fifo_queue( }, ) - sns_subscription( - TopicArn=topic_arn, - Protocol="sqs", - Endpoint=queue_url, - Attributes={"RawMessageDelivery": "true"}, + subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url) + subscription_arn = subscription["SubscriptionArn"] + + sns_client.set_subscription_attributes( + SubscriptionArn=subscription_arn, + AttributeName="RawMessageDelivery", + AttributeValue="true", ) + message_group_id = "complexMessageGroupId" publish_batch_response = sns_client.publish_batch( TopicArn=topic_arn, @@ -1239,11 +1261,12 @@ def get_messages(queue_url): MaxNumberOfMessages=10, ) assert len(response["Messages"]) == 3 - for message in response["Messages"]: + for msg_index, message in enumerate(response["Messages"]): assert "Body" in message assert message["Attributes"]["MessageGroupId"] == message_group_id if message["Body"] == "Test Message with two attributes": + assert msg_index == 0 assert len(message["MessageAttributes"]) == 2 assert message["MessageAttributes"]["attr1"] == { "StringValue": "99.12", @@ -1255,6 +1278,7 @@ def get_messages(queue_url): } elif message["Body"] == "Test Message with one attribute": + assert msg_index == 1 assert len(message["MessageAttributes"]) == 1 assert message["MessageAttributes"]["attr1"] == { "StringValue": "19.12", @@ -1262,12 +1286,21 @@ def get_messages(queue_url): } elif message["Body"] == "Test Message without attribute": + assert msg_index == 2 assert message.get("MessageAttributes") is None retry(get_messages, retries=5, sleep=1, queue_url=queue_url) + # todo add test for deduplication + # https://docs.aws.amazon.com/cli/latest/reference/sns/publish-batch.html + @pytest.mark.aws_validated def test_publish_batch_exceptions( - self, sns_client, sqs_client, sns_create_topic, sqs_create_queue, sns_subscription + self, + sns_client, + sqs_client, + sns_create_topic, + sqs_create_queue, + sns_create_sqs_subscription, ): topic_name = f"topic-{short_uid()}.fifo" queue_name = f"queue-{short_uid()}.fifo" @@ -1278,13 +1311,13 @@ def test_publish_batch_exceptions( Attributes={"FifoQueue": "true"}, ) - queue_arn = aws_stack.sqs_queue_arn(queue_url) + subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url) + subscription_arn = subscription["SubscriptionArn"] - sns_subscription( - TopicArn=topic_arn, - Protocol="sqs", - Endpoint=queue_arn, - Attributes={"RawMessageDelivery": "true"}, + sns_client.set_subscription_attributes( + SubscriptionArn=subscription_arn, + AttributeName="RawMessageDelivery", + AttributeValue="true", ) with pytest.raises(ClientError) as e: @@ -1293,38 +1326,55 @@ def test_publish_batch_exceptions( PublishBatchRequestEntries=[ { "Id": "1", - "Message": "Test Message with two attributes", + "Message": "Test message without Group ID", } ], ) assert e.value.response["Error"]["Code"] == "InvalidParameter" + assert ( + e.value.response["Error"]["Message"] + == "Invalid parameter: The MessageGroupId parameter is required for FIFO topics" + ) assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400 with pytest.raises(ClientError) as e: sns_client.publish_batch( TopicArn=topic_arn, PublishBatchRequestEntries=[ - {"Id": f"Id_{i}", "Message": f"message_{i}"} for i in range(11) + {"Id": f"Id_{i}", "Message": "Too many messages"} for i in range(11) ], ) assert e.value.response["Error"]["Code"] == "TooManyEntriesInBatchRequest" + assert ( + e.value.response["Error"]["Message"] + == "The batch request contains more entries than permissible." + ) assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400 with pytest.raises(ClientError) as e: sns_client.publish_batch( TopicArn=topic_arn, PublishBatchRequestEntries=[ - {"Id": "1", "Message": f"message_{i}"} for i in range(2) + {"Id": "1", "Message": "Messages with the same ID"} for i in range(2) ], ) assert e.value.response["Error"]["Code"] == "BatchEntryIdsNotDistinct" + assert ( + e.value.response["Error"]["Message"] + == "Two or more batch entries in the request have the same Id." + ) assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400 + # todo add test and implement behaviour for: + # Invalid parameter: The topic should either have ContentBasedDeduplication enabled + # or MessageDeduplicationId provided explicitly + def add_xray_header(self, request, **kwargs): request.headers[ "X-Amzn-Trace-Id" ] = "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1" + # todo check with Thomas? def test_publish_sqs_from_sns_with_xray_propagation( self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, sns_subscription ): @@ -1359,6 +1409,7 @@ def test_publish_sqs_from_sns_with_xray_propagation( == "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1" ) + @pytest.mark.aws_validated def test_create_topic_after_delete_with_new_tags(self, sns_create_topic, sns_client): topic_name = f"test-{short_uid()}" topic = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "pqr"}]) @@ -1367,33 +1418,44 @@ def test_create_topic_after_delete_with_new_tags(self, sns_create_topic, sns_cli topic1 = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "abc"}]) assert topic["TopicArn"] == topic1["TopicArn"] - def test_not_found_error_on_get_subscription_attributes( + @pytest.mark.aws_validated + def test_not_found_error_on_set_subscription_attributes( self, sns_client, sns_create_topic, sqs_create_queue, sqs_queue_arn, sns_subscription ): topic_arn = sns_create_topic()["TopicArn"] queue_url = sqs_create_queue() - queue_arn = sqs_queue_arn(queue_url) - subscription = sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn) + subscription_arn = subscription["SubscriptionArn"] subscription_attributes = sns_client.get_subscription_attributes( - SubscriptionArn=subscription["SubscriptionArn"] - ) + SubscriptionArn=subscription_arn + )["Attributes"] - assert ( - subscription_attributes.get("Attributes").get("SubscriptionArn") - == subscription["SubscriptionArn"] - ) + assert subscription_attributes["SubscriptionArn"] == subscription_arn - sns_client.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"]) + subscriptions_by_topic = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn) + assert len(subscriptions_by_topic["Subscriptions"]) == 1 - with pytest.raises(ClientError) as e: - sns_client.get_subscription_attributes(SubscriptionArn=subscription["SubscriptionArn"]) + sns_client.unsubscribe(SubscriptionArn=subscription_arn) - assert e.value.response["Error"]["Code"] == "NotFound" - assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404 + def check_subscription_deleted(): + try: + # AWS doesn't give NotFound error on GetSubscriptionAttributes for a while, might be cached + sns_client.set_subscription_attributes( + SubscriptionArn=subscription_arn, + AttributeName="RawMessageDelivery", + AttributeValue="true", + ) + raise Exception("Subscription is not deleted") + except ClientError as e: + assert e.response["Error"]["Code"] == "NotFound" + assert e.response["ResponseMetadata"]["HTTPStatusCode"] == 404 + + retry(check_subscription_deleted, retries=10, sleep_before=0.2, sleep=3) + subscriptions_by_topic = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn) + assert len(subscriptions_by_topic["Subscriptions"]) == 0 def test_message_to_fifo_sqs( self, @@ -1415,6 +1477,7 @@ def test_message_to_fifo_sqs( "ContentBasedDeduplication": "true", }, ) + # todo check ContentBasedDedup or MessageDeduplicationId when implemented queue_arn = sqs_queue_arn(queue_url) @@ -1484,7 +1547,7 @@ def test_empty_sns_message( @pytest.mark.parametrize("raw_message_delivery", [True, False]) @pytest.mark.aws_validated - def test_dead_letter_queue_with_deleted_sqs_queue( + def test_redrive_policy_sqs_queue_subscription( self, sns_client, sqs_client, From 583716c9ef3ff7b9bb12d9d43f90780abd0a12ad Mon Sep 17 00:00:00 2001 From: Benjamin Simon Date: Wed, 3 Aug 2022 15:35:59 +0200 Subject: [PATCH 2/4] add validations and aws-validate tests --- localstack/services/sns/provider.py | 25 ++++- tests/integration/test_sns.py | 151 ++++++++++++++++------------ 2 files changed, 107 insertions(+), 69 deletions(-) diff --git a/localstack/services/sns/provider.py b/localstack/services/sns/provider.py index 1b6db6b235a4b..7964ddb4dc7a6 100644 --- a/localstack/services/sns/provider.py +++ b/localstack/services/sns/provider.py @@ -12,6 +12,7 @@ import botocore.exceptions import requests as requests from flask import Response as FlaskResponse +from moto.sns import sns_backends as moto_sns_backends from moto.sns.exceptions import DuplicateSnsEndpointError from moto.sns.models import MAXIMUM_MESSAGE_LENGTH from requests.models import Response @@ -653,14 +654,30 @@ def publish( if subject == "": raise InvalidParameterException("Invalid parameter: Subject") if not message or all(not m for m in message): - raise InvalidParameterException("Empty message") + raise InvalidParameterException("Invalid parameter: Empty message") if len(message) > MAXIMUM_MESSAGE_LENGTH: - raise InvalidParameterException("Message too long") + raise InvalidParameterException("Invalid parameter: Message too long") - if topic_arn and ".fifo" in topic_arn and not message_group_id: + if topic_arn and ".fifo" in topic_arn: + if not message_group_id: + raise InvalidParameterException( + "The MessageGroupId parameter is required for FIFO topics", + ) + moto_sns_backend = moto_sns_backends[context.region] + if moto_sns_backend.get_topic(arn=topic_arn).content_based_deduplication == "false": + if not message_deduplication_id: + raise InvalidParameterException( + "Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly", + ) + elif message_deduplication_id: + # this is the first one to raise if both are set while the topic is not fifo + raise InvalidParameterException( + "Invalid parameter: MessageDeduplicationId Reason: The request includes MessageDeduplicationId parameter that is not valid for this topic type" + ) + elif message_group_id: raise InvalidParameterException( - "The MessageGroupId parameter is required for FIFO topics", + "Invalid parameter: MessageGroupId Reason: The request includes MessageGroupId parameter that is not valid for this topic type" ) sns_backend = SNSBackend.get() diff --git a/tests/integration/test_sns.py b/tests/integration/test_sns.py index 3e4c649eada9e..1fc86e077ccf9 100644 --- a/tests/integration/test_sns.py +++ b/tests/integration/test_sns.py @@ -18,7 +18,6 @@ from localstack.services.sns.provider import SNSBackend from localstack.testing.aws.util import is_aws_cloud from localstack.utils import testutil -from localstack.utils.aws import aws_stack from localstack.utils.net import wait_for_port_closed, wait_for_port_open from localstack.utils.strings import short_uid, to_str from localstack.utils.sync import poll_condition, retry @@ -475,8 +474,6 @@ def test_publish_non_existent_target(self, sns_client): assert ex.value.response["Error"]["Code"] == "InvalidClientTokenId" - # todo: the message key is added to the error response body, but not in AWS - # check with serializer? @pytest.mark.aws_validated @pytest.mark.skip_snapshot_verify(paths=["$..message"]) def test_tags(self, sns_client, sns_create_topic, snapshot): @@ -554,7 +551,6 @@ def check_subscription(): return subscription_attrs["PendingConfirmation"] == "false" # SQS subscriptions are auto confirmed if they are from the user and in the same region - # todo check that assert poll_condition(check_subscription, timeout=5) @pytest.mark.aws_validated @@ -906,48 +902,49 @@ def test_create_platform_endpoint_check_idempotency(self, sns_client): sns_client.delete_endpoint(EndpointArn=endpoint_arn) sns_client.delete_platform_application(PlatformApplicationArn=platform_arn) - # todo check this later? + @pytest.mark.aws_validated def test_publish_by_path_parameters( self, sns_create_topic, sns_client, sqs_client, sqs_create_queue, - sqs_queue_arn, - sns_subscription, + sns_create_sqs_subscription, + aws_http_client_factory, ): - topic_name = f"topic-{short_uid()}" - queue_name = f"queue-{short_uid()}" - message = f"test message {short_uid()}" - topic_arn = sns_create_topic(Name=topic_name)["TopicArn"] - - base_url = config.get_edge_url() - path = "Action=Publish&Version=2010-03-31&TopicArn={}&Message={}".format(topic_arn, message) - - queue_url = sqs_create_queue(QueueName=queue_name) - queue_arn = sqs_queue_arn(queue_url) + topic_arn = sns_create_topic()["TopicArn"] + queue_url = sqs_create_queue() + sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url) - subscription_arn = sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)[ - "SubscriptionArn" - ] + client = aws_http_client_factory("sns", region="us-east-1") - r = requests.post( - url="{}/?{}".format(base_url, path), - headers=aws_stack.mock_aws_request_headers("sns"), + if is_aws_cloud(): + endpoint_url = "https://sns.us-east-1.amazonaws.com" + else: + endpoint_url = config.get_edge_url() + + response = client.post( + endpoint_url, + params={ + "Action": "Publish", + "Version": "2010-03-31", + "TopicArn": topic_arn, + "Message": message, + }, ) - assert r.status_code == 200 - def get_notification(q_url): - resp = sqs_client.receive_message(QueueUrl=q_url) - return json.loads(resp["Messages"][0]["Body"]) + assert response.status_code == 200 + assert b" Date: Wed, 3 Aug 2022 16:33:56 +0200 Subject: [PATCH 3/4] fix unsubscribe not sync with moto --- localstack/services/sns/provider.py | 1 + 1 file changed, 1 insertion(+) diff --git a/localstack/services/sns/provider.py b/localstack/services/sns/provider.py index 7964ddb4dc7a6..57b100a285c90 100644 --- a/localstack/services/sns/provider.py +++ b/localstack/services/sns/provider.py @@ -576,6 +576,7 @@ def create_platform_endpoint( return CreateEndpointResponse(**result) def unsubscribe(self, context: RequestContext, subscription_arn: subscriptionARN) -> None: + call_moto(context) sns_backend = SNSBackend.get() def should_be_kept(current_subscription, target_subscription_arn): From 3970eb81601f8ec9baffa2aac469ac45964d2acf Mon Sep 17 00:00:00 2001 From: Benjamin Simon Date: Wed, 3 Aug 2022 16:40:08 +0200 Subject: [PATCH 4/4] recreate snapshots --- tests/integration/test_sns.py | 8 ++-- tests/integration/test_sns.snapshot.json | 58 ++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_sns.py b/tests/integration/test_sns.py index 1fc86e077ccf9..e048360b530ab 100644 --- a/tests/integration/test_sns.py +++ b/tests/integration/test_sns.py @@ -1582,16 +1582,14 @@ def test_redrive_policy_sqs_queue_subscription( snapshot.add_transformer(snapshot.transform.sqs_api()) # Need to skip the MD5OfBody/Signature, because it contains a timestamp snapshot.add_transformer( - snapshot.transform.jsonpath( - "$.json_encoded_delivery..Body.Signature", + snapshot.transform.key_value( + "Signature", "", reference_replacement=False, ) ) snapshot.add_transformer( - snapshot.transform.jsonpath( - "$.json_encoded_delivery..MD5OfBody", "", reference_replacement=False - ) + snapshot.transform.key_value("MD5OfBody", "", reference_replacement=False) ) topic_arn = sns_create_topic()["TopicArn"] diff --git a/tests/integration/test_sns.snapshot.json b/tests/integration/test_sns.snapshot.json index 3dae389aaaa4d..6ba02813eb1aa 100644 --- a/tests/integration/test_sns.snapshot.json +++ b/tests/integration/test_sns.snapshot.json @@ -112,5 +112,63 @@ } } } + }, + "tests/integration/test_sns.py::TestSNSProvider::test_redrive_policy_sqs_queue_subscription[True]": { + "recorded-date": "03-08-2022, 16:38:45", + "recorded-content": { + "raw_message_delivery": { + "Messages": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": "test_dlq_after_sqs_endpoint_deleted" + } + ], + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + } + } + }, + "tests/integration/test_sns.py::TestSNSProvider::test_redrive_policy_sqs_queue_subscription[False]": { + "recorded-date": "03-08-2022, 16:38:48", + "recorded-content": { + "json_encoded_delivery": { + "Messages": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "Type": "Notification", + "MessageId": "", + "TopicArn": "arn:aws:sns::111111111111:", + "Message": "test_dlq_after_sqs_endpoint_deleted", + "Timestamp": "date", + "SignatureVersion": "1", + "Signature": "", + "SigningCertURL": "https://sns..amazonaws.com/SimpleNotificationService-.pem", + "UnsubscribeURL": "/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns::111111111111::", + "MessageAttributes": { + "attr2": { + "Type": "Binary", + "Value": "AgME" + }, + "attr1": { + "Type": "Number", + "Value": "111" + } + } + } + } + ], + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + } + } } }