8000 fix SNS subscription deleted when SQS endpoint was deleted (#6645) · localstack/localstack@615b51d · GitHub
[go: up one dir, main page]

Skip to content

Commit 615b51d

Browse files
authored
fix SNS subscription deleted when SQS endpoint was deleted (#6645)
1 parent ccdddb3 commit 615b51d

File tree

3 files changed

+220
-6
lines changed

3 files changed

+220
-6
lines changed

localstack/services/sns/provider.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -991,12 +991,10 @@ async def message_to_subscriber(
991991
store_delivery_log(subscriber, False, message, message_id)
992992
sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))
993993
if "NonExistentQueue" in str(exc):
994-
LOG.info(
995-
'Removing non-existent queue "%s" subscribed to topic "%s"',
996-
queue_url,
997-
topic_arn,
998-
)
999-
subscriptions.remove(subscriber)
994+
LOG.debug("The SQS queue endpoint does not exist anymore")
995+
# todo: if the queue got deleted, even if we recreate a queue with the same name/url
996+
# AWS won't send to it anymore. Would need to unsub/resub.
997+
# We should mark this subscription as "broken"
1000998
return
1001999

10021000
elif subscriber["Protocol"] == "lambda":

tests/integration/test_sns.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import json
33
import queue
44
import random
5+
import time
56
from operator import itemgetter
67

78
import pytest
@@ -2113,3 +2114,91 @@ def test_publish_to_gcm(self, sns_client):
21132114

21142115
sns_client.delete_endpoint(EndpointArn=endpoint_arn)
21152116
sns_client.delete_platform_application(PlatformApplicationArn=platform_app_arn)
2117+
2118+
@pytest.mark.aws_validated
2119+
@pytest.mark.skip_snapshot_verify(
2120+
paths=[
2121+
"$..Attributes.Owner",
2122+
"$..Attributes.ConfirmationWasAuthenticated",
2123+
"$..Attributes.RawMessageDelivery",
2124+
"$..Attributes.sqs_queue_url",
2125+
"$..Subscriptions..Owner",
2126+
]
2127+
)
2128+
def test_subscription_after_failure_to_deliver(
2129+
self,
2130+
sns_client,
2131+
sqs_client,
2132+
sns_create_topic,
2133+
sqs_create_queue,
2134+
sqs_queue_arn,
2135+
sqs_queue_exists,
2136+
sns_create_sqs_subscription,
2137+
sns_allow_topic_sqs_queue,
2138+
snapshot,
2139+
):
2140+
topic_arn = sns_create_topic()["TopicArn"]
2141+
queue_name = f"test-queue-{short_uid()}"
2142+
queue_url = sqs_create_queue(QueueName=queue_name)
2143+
2144+
subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)
2145+
subscription_arn = subscription["SubscriptionArn"]
2146+
2147+
dlq_url = sqs_create_queue()
2148+
dlq_arn = sqs_queue_arn(dlq_url)
2149+
2150+
sns_allow_topic_sqs_queue(
2151+
sqs_queue_url=dlq_url,
2152+
sqs_queue_arn=dlq_arn,
2153+
sns_topic_arn=topic_arn,
2154+
)
2155+
2156+
sub_attrs = sns_client.get_subscription_attributes(SubscriptionArn=subscription_arn)
2157+
snapshot.match("subscriptions-attrs", sub_attrs)
2158+
2159+
message = "test_dlq_before_sqs_endpoint_deleted"
2160+
sns_client.publish(TopicArn=topic_arn, Message=message)
2161+
response = sqs_client.receive_message(
2162+
QueueUrl=queue_url, WaitTimeSeconds=10, MaxNumberOfMessages=4
2163+
)
2164+
snapshot.match("messages-before-delete", response)
2165+
sqs_client.delete_message(
2166+
QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]
2167+
)
2168+
2169+
sqs_client.delete_queue(QueueUrl=queue_url)
2170+
# try to send a message before setting a DLQ
2171+
message = "test_dlq_after_sqs_endpoint_deleted"
2172+
sns_client.publish(TopicArn=topic_arn, Message=message)
2173+
# to avoid race condition, publish is async and the redrive policy can be in effect before the actual publish
2174+
time.sleep(1)
2175+
2176+
# check the subscription is still there after we deleted the queue
2177+
subscriptions = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)
2178+
snapshot.match("subscriptions", subscriptions)
2179+
2180+
sns_client.set_subscription_attributes(
2181+
SubscriptionArn=subscription_arn,
2182+
AttributeName="RedrivePolicy",
2183+
AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),
2184+
)
2185+
2186+
sub_attrs = sns_client.get_subscription_attributes(SubscriptionArn=subscription_arn)
2187+
snapshot.match("subscriptions-attrs-with-redrive", sub_attrs)
2188+
2189+
# AWS takes some time to delete the queue, which make the test fails as it delivers the message correctly
2190+
assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)
2191+
2192+
# test sending and receiving multiple messages
2193+
for i in range(2):
2194+
message = f"test_dlq_after_sqs_endpoint_deleted_{i}"
2195+
2196+
sns_client.publish(TopicArn=topic_arn, Message=message)
2197+
response = sqs_client.receive_message(
2198+
QueueUrl=dlq_url, WaitTimeSeconds=10, MaxNumberOfMessages=4
2199+
)
2200+
sqs_client.delete_message(
2201+
QueueUrl=dlq_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]
2202+
)
2203+
2204+
snapshot.match(f"message-{i}-after-delete", response)

tests/integration/test_sns.snapshot.json

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1668,5 +1668,132 @@
16681668
"tests/integration/test_sns.py::TestSNSProvider::test_publish_sqs_from_sns_with_xray_propagation": {
16691669
"recorded-date": "09-08-2022, 11:35:46",
16701670
"recorded-content": {}
1671+
},
1672+
"tests/integration/test_sns.py::TestSNSProvider::test_subscription_after_failure_to_deliver": {
1673+
"recorded-date": "10-08-2022, 17:04:52",
1674+
"recorded-content": {
1675+
"subscriptions-attrs": {
1676+
"Attributes": {
1677+
"ConfirmationWasAuthenticated": "true",
1678+
"Endpoint": "arn:aws:sqs:<region>:111111111111:<resource:1>",
1679+
"Owner": "111111111111",
1680+
"PendingConfirmation": "false",
1681+
"Protocol": "sqs",
1682+
"RawMessageDelivery": "false",
1683+
"SubscriptionArn": "arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>",
1684+
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>"
1685+
},
1686+
"ResponseMetadata": {
1687+
"HTTPHeaders": {},
1688+
"HTTPStatusCode": 200
1689+
}
1690+
},
1691+
"messages-before-delete": {
1692+
"Messages": [
1693+
{
1694+
"Body": {
1695+
"Type": "Notification",
1696+
"MessageId": "<uuid:1>",
1697+
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>",
1698+
"Message": "test_dlq_before_sqs_endpoint_deleted",
1699+
"Timestamp": "date",
1700+
"SignatureVersion": "1",
1701+
"Signature": "<signature>",
1702+
"SigningCertURL": "https://sns.<region>.amazonaws.com/SimpleNotificationService-<signing-cert-file:1>",
1703+
"UnsubscribeURL": "<unsubscribe-domain>/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>"
1704+
},
1705+
"MD5OfBody": "<md5-hash>",
1706+
"MessageId": "<uuid:2>",
1707+
"ReceiptHandle": "<receipt-handle:1>"
1708+
}
1709+
],
1710+
"ResponseMetadata": {
1711+
"HTTPHeaders": {},
1712+
"HTTPStatusCode": 200
1713+
}
1714+
},
1715+
"subscriptions": {
1716+
"ResponseMetadata": {
1717+
"HTTPHeaders": {},
1718+
"HTTPStatusCode": 200
1719+
},
1720+
"Subscriptions": [
1721+
{
1722+
"Endpoint": "arn:aws:sqs:<region>:111111111111:<resource:1>",
1723+
"Owner": "111111111111",
1724+
"Protocol": "sqs",
1725+
"SubscriptionArn": "arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>",
1726+
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>"
1727+
}
1728+
]
1729+
},
1730+
"subscriptions-attrs-with-redrive": {
1731+
"Attributes": {
1732+
"ConfirmationWasAuthenticated": "true",
1733+
"Endpoint": "arn:aws:sqs:<region>:111111111111:<resource:1>",
1734+
"Owner": "111111111111",
1735+
"PendingConfirmation": "false",
1736+
"Protocol": "sqs",
1737+
"RawMessageDelivery": "false",
1738+
"RedrivePolicy": {
1739+
"deadLetterTargetArn": "arn:aws:sqs:<region>:111111111111:<resource:4>"
1740+
},
1741+
"SubscriptionArn": "arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>",
1742+
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>"
1743+
},
1744+
F987 "ResponseMetadata": {
1745+
"HTTPHeaders": {},
1746+
"HTTPStatusCode": 200
1747+
}
1748+
},
1749+
"message-0-after-delete": {
1750+
"Messages": [
1751+
{
1752+
"Body": {
1753+
"Type": "Notification",
1754+
"MessageId": "<uuid:3>",
1755+
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>",
1756+
"Message": "test_dlq_after_sqs_endpoint_deleted_0",
1757+
"Timestamp": "date",
1758+
"SignatureVersion": "1",
1759+
"Signature": "<signature>",
1760+
"SigningCertURL": "https://sns.<region>.amazonaws.com/SimpleNotificationService-<signing-cert-file:1>",
1761+
"UnsubscribeURL": "<unsubscribe-domain>/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>"
1762+
},
1763+
"MD5OfBody": "<md5-hash>",
1764+
"MessageId": "<uuid:4>",
1765+
"ReceiptHandle": "<receipt-handle:2>"
1766+
}
1767+
],
1768+
"ResponseMetadata": {
1769+
"HTTPHeaders": {},
1770+
"HTTPStatusCode": 200
1771+
}
1772+
},
1773+
"message-1-after-delete": {
1774+
"Messages": [
1775+
{
1776+
"Body": {
1777+
"Type": "Notification",
1778+
"MessageId": "<uuid:5>",
1779+
"TopicArn": "arn:aws:sns:<region>:111111111111:<resource:3>",
1780+
"Message": "test_dlq_after_sqs_endpoint_deleted_1",
1781+
"Timestamp": "date",
1782+
"SignatureVersion": "1",
1783+
"Signature": "<signature>",
1784+
"SigningCertURL": "https://sns.<region>.amazonaws.com/SimpleNotificationService-<signing-cert-file:1>",
1785+
"UnsubscribeURL": "<unsubscribe-domain>/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:<region>:111111111111:<resource:3>:<resource:2>"
1786+
},
1787+
"MD5OfBody": "<md5-hash>",
1788+
"MessageId": "<uuid:6>",
1789+
"ReceiptHandle": "<receipt-handle:3>"
1790+
}
1791+
],
1792+
"ResponseMetadata": {
1793+
"HTTPHeaders": {},
1794+
"HTTPStatusCode": 200
1795+
}
1796+
}
1797+
}
16711798
}
16721799
}

0 commit comments

Comments
 (0)
0