8000 refactor sns tests and fifo topic validations by bentsku · Pull Request #6586 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

refactor sns tests and fifo topic validations #6586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add validations and aws-validate tests
  • Loading branch information
bentsku committed Aug 3, 2022
commit 583716c9ef3ff7b9bb12d9d43f90780abd0a12ad
25 changes: 21 additions & 4 deletions localstack/services/sns/provider.py
8000
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import botocore.exceptions
import requests as requests
from flask import Response as FlaskResponse
from moto.sns import sns_backends as moto_sns_backends
from moto.sns.exceptions import DuplicateSnsEndpointError
from moto.sns.models import MAXIMUM_MESSAGE_LENGTH
from requests.models import Response
Expand Down Expand Up @@ -653,14 +654,30 @@ def publish(
if subject == "":
raise InvalidParameterException("Invalid parameter: Subject")
if not message or all(not m for m in message):
raise InvalidParameterException("Empty message")
raise InvalidParameterException("Invalid parameter: Empty message")

if len(message) > MAXIMUM_MESSAGE_LENGTH:
raise InvalidParameterException("Message too long")
raise InvalidParameterException("Invalid parameter: Message too long")

if topic_arn and ".fifo" in topic_arn and not message_group_id:
if topic_arn and ".fifo" in topic_arn:
if not message_group_id:
raise InvalidParameterException(
"The MessageGroupId parameter is required for FIFO topics",
)
moto_sns_backend = moto_sns_backends[context.region]
if moto_sns_backend.get_topic(arn=topic_arn).content_based_deduplication == "false":
if not message_deduplication_id:
raise InvalidParameterException(
"Invalid parameter: The topic should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly",
)
elif message_deduplication_id:
# this is the first one to raise if both are set while the topic is not fifo
raise InvalidParameterException(
"Invalid parameter: MessageDeduplicationId Reason: The request includes MessageDeduplicationId parameter that is not valid for this topic type"
)
elif message_group_id:
raise InvalidParameterException(
"The MessageGroupId parameter is required for FIFO topics",
"Invalid parameter: MessageGroupId Reason: The request includes MessageGroupId parameter that is not valid for this topic type"
)

sns_backend = SNSBackend.get()
Expand Down
151 changes: 86 additions & 65 deletions tests/integration/test_sns.py
Original file line number Diff line number Diff line change
8000 Expand Up @@ -18,7 +18,6 @@
from localstack.services.sns.provider import SNSBackend
from localstack.testing.aws.util import is_aws_cloud
from localstack.utils import testutil
from localstack.utils.aws import aws_stack
from localstack.utils.net import wait_for_port_closed, wait_for_port_open
from localstack.utils.strings import short_uid, to_str
from localstack.utils.sync import poll_condition, retry
Expand Down Expand Up @@ -475,8 +474,6 @@ def test_publish_non_existent_target(self, sns_client):

assert ex.value.response["Error"]["Code"] == "InvalidClientTokenId"

# todo: the message key is added to the error response body, but not in AWS
# check with serializer?
@pytest.mark.aws_validated
@pytest.mark.skip_snapshot_verify(paths=["$..message"])
def test_tags(self, sns_client, sns_create_topic, snapshot):
Expand Down Expand Up @@ -554,7 +551,6 @@ def check_subscription():
return subscription_attrs["PendingConfirmation"] == "false"

# SQS subscriptions are auto confirmed if they are from the user and in the same region
# todo check that
assert poll_condition(check_subscription, timeout=5)

@pytest.mark.aws_validated
Expand Down Expand Up @@ -906,48 +902,49 @@ def test_create_platform_endpoint_check_idempotency(self, sns_client):
sns_client.delete_endpoint(EndpointArn=endpoint_arn)
sns_client.delete_platform_application(PlatformApplicationArn=platform_arn)

# todo check this later?
@pytest.mark.aws_validated
def test_publish_by_path_parameters(
self,
sns_create_topic,
sns_client,
sqs_client,
sqs_create_queue,
sqs_queue_arn,
sns_subscription,
sns_create_sqs_subscription,
aws_http_client_factory,
):
topic_name = f"topic-{short_uid()}"
queue_name = f"queue-{short_uid()}"

message = f"test message {short_uid()}"
topic_arn = sns_create_topic(Name=topic_name)["TopicArn"]

base_url = config.get_edge_url()
path = "Action=Publish&Version=2010-03-31&TopicArn={}&Message={}".format(topic_arn, message)

queue_url = sqs_create_queue(QueueName=queue_name)
queue_arn = sqs_queue_arn(queue_url)
topic_arn = sns_create_topic()["TopicArn"]
queue_url = sqs_create_queue()
sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)

subscription_arn = sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)[
"SubscriptionArn"
]
client = aws_http_client_factory("sns", region="us-east-1")

r = requests.post(
url="{}/?{}".format(base_url, path),
headers=aws_stack.mock_aws_request_headers("sns"),
if is_aws_cloud():
endpoint_url = "https://sns.us-east-1.amazonaws.com"
else:
endpoint_url = config.get_edge_url()

response = client.post(
endpoint_url,
params={
"Action": "Publish",
"Version": "2010-03-31",
"TopicArn": topic_arn,
"Message": message,
},
)
assert r.status_code == 200

def get_notification(q_url):
resp = sqs_client.receive_message(QueueUrl=q_url)
return json.loads(resp["Messages"][0]["Body"])
assert response.status_code == 200
assert b"<PublishResponse" in response.content

notification = retry(get_notification, retries=3, sleep=2, q_url=queue_url)
assert notification["TopicArn"] == topic_arn
a 8000 ssert notification["Message"] == message

sns_client.unsubscribe(SubscriptionArn=subscription_arn)
messages = sqs_client.receive_message(
QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=5
)["Messages"]
msg_body = json.loads(messages[0]["Body"])
assert msg_body["TopicArn"] == topic_arn
assert msg_body["Message"] == message

@pytest.mark.only_localstack
def test_multiple_subscriptions_http_endpoint(
self, sns_client, sns_create_topic, sns_subscription
):
Expand Down Expand Up @@ -1180,6 +1177,7 @@ def get_messages():

retry(get_messages, retries=3, sleep=1)

@pytest.mark.aws_validated
def test_publish_batch_messages_from_fifo_topic_to_fifo_queue(
self,
sns_client,
Expand Down Expand Up @@ -1292,6 +1290,7 @@ def get_messages(queue_url):
retry(get_messages, retries=5, sleep=1, queue_url=queue_url)
# todo add test for deduplication
# https://docs.aws.amazon.com/cli/latest/reference/sns/publish-batch.html
# https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html

@pytest.mark.aws_validated
def test_publish_batch_exceptions(
Expand Down Expand Up @@ -1364,17 +1363,13 @@ def test_publish_batch_exceptions(
== "Two or more batch entries in the request have the same Id."
)
assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400

# todo add test and implement behaviour for:
# Invalid parameter: The topic should either have ContentBasedDeduplication enabled
# or MessageDeduplicationId provided explicitly
# todo add test and implement behaviour for ContentBasedDeduplication or MessageDeduplicationId

def add_xray_header(self, request, **kwargs):
request.headers[
"X-Amzn-Trace-Id"
] = "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"

# todo check with Thomas?
def test_publish_sqs_from_sns_with_xray_propagation(
self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, sns_subscription
):
Expand Down Expand Up @@ -1457,51 +1452,54 @@ def check_subscription_deleted():
subscriptions_by_topic = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)
assert len(subscriptions_by_topic["Subscriptions"]) == 0

@pytest.mark.aws_validated
def test_message_to_fifo_sqs(
self,
sns_client,
sqs_client,
sns_create_topic,
sqs_create_queue,
sqs_queue_arn,
sns_subscription,
sns_create_sqs_subscription,
):
topic_name = f"topic-{short_uid()}.fifo"
queue_name = f"queue-{short_uid()}.fifo"

topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"]
topic_arn = sns_create_topic(
Name=topic_name,
Attributes={
"FifoTopic": "true",
"ContentBasedDeduplication": "true", # todo: not enforced yet
},
)["TopicArn"]
queue_url = sqs_create_queue(
QueueName=queue_name,
Attributes={
"FifoQueue": "true",
"ContentBasedDeduplication": "true",
},
)
# todo check ContentBasedDedup or MessageDeduplicationId when implemented
# todo check both ContentBasedDeduplication and MessageDeduplicationId when implemented
# https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html

queue_arn = sqs_queue_arn(queue_url)

sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)
sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)

message = "Test"
sns_client.publish(TopicArn=topic_arn, Message=message, MessageGroupId=short_uid())

def get_message():
received = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)[
"Messages"
][0]["Body"]
assert json.loads(received)["Message"] == message

retry(get_message, retries=10, sleep_before=0.15, sleep=1)
messages = sqs_client.receive_message(
QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=10
)["Messages"]
msg_body = messages[0]["Body"]
assert json.loads(msg_body)["Message"] == message

@pytest.mark.aws_validated
def test_validations_for_fifo(
self,
sns_client,
sqs_client,
sns_create_topic,
sqs_create_queue,
sqs_queue_arn,
sns_subscription,
sns_create_sqs_subscription,
):
topic_name = f"topic-{short_uid()}"
fifo_topic_name = f"topic-{short_uid()}.fifo"
Expand All @@ -1517,10 +1515,8 @@ def test_validations_for_fifo(
QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}
)

fifo_queue_arn = sqs_queue_arn(fifo_queue_url)

with pytest.raises(ClientError) as e:
sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=fifo_queue_arn)
sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=fifo_queue_url)

assert e.match("standard SNS topic")

3262 Expand All @@ -1529,18 +1525,41 @@ def test_validations_for_fifo(

assert e.match("MessageGroupId")

with pytest.raises(ClientError) as e:
sns_client.publish(TopicArn=fifo_topic_arn, Message="test", MessageGroupId=short_uid())
# if ContentBasedDeduplication is not set at the topic level, it needs MessageDeduplicationId for each msg
assert e.match("MessageDeduplicationId")
assert e.match("ContentBasedDeduplication")

with pytest.raises(ClientError) as e:
sns_client.publish(
TopicArn=topic_arn, Message="test", MessageDeduplicationId=short_uid()
)
assert e.match("MessageDeduplicationId")

with pytest.raises(ClientError) as e:
sns_client.publish(TopicArn=topic_arn, Message="test", MessageGroupId=short_uid())
assert e.match("MessageGroupId")

@pytest.mark.aws_validated
def test_empty_sns_message(
self, sns_client, sqs_client, sns_topic, sqs_queue, sqs_queue_arn, sns_subscription
self,
sns_client,
sqs_client,
sns_create_topic,
sqs_create_queue,
sns_create_sqs_subscription,
):
topic_arn = sns_topic["Attributes"]["TopicArn"]
queue_arn = sqs_queue_arn(sqs_queue)
sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)
topic_arn = sns_create_topic()["TopicArn"]
queue_url = sqs_create_queue()
sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)

with pytest.raises(ClientError) as e:
sns_client.publish(Message="", TopicArn=topic_arn)
assert e.match("Empty message")
assert (
sqs_client.get_queue_attributes(
QueueUrl=sqs_queue, AttributeNames=["ApproximateNumberOfMessages"]
QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]
)["Attributes"]["ApproximateNumberOfMessages"]
== "0"
)
Expand Down Expand Up @@ -1892,18 +1911,20 @@ def test_dlq_external_http_endpoint(
# AWS doesn't send to the DLQ if the UnsubscribeConfirmation fails to be delivered
assert "Messages" not in response

def test_publish_too_long_message(self, sns_client):
fake_arn = "arn:aws:sns:us-east-1:123456789012:i_dont_exist"
@pytest.mark.aws_validated
def test_publish_too_long_message(self, sns_client, sns_create_topic):
topic_arn = sns_create_topic()["TopicArn"]
# simulate payload over 256kb
message = "This is a test message" * 12000

with pytest.raises(ClientError) as e:
sns_client.publish(TopicArn=fake_arn, Message=message)
sns_client.publish(TopicArn=topic_arn, Message=message)

assert e.value.response["Error"]["Code"] == "InvalidParameter"
assert e.value.response["Error"]["Message"] == "Message too long"
assert e.value.response["Error"]["Message"] == "Invalid parameter: Message too long"
assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400

@pytest.mark.only_localstack # needs real credentials for GCM/FCM
def test_publish_to_gcm(self, sns_client):
key = "mock_server_key"
token = "mock_token"
Expand Down
0