From 4d2095ce604e1819ba9f8853047be568a29ee850 Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Fri, 5 Aug 2022 23:30:00 +0200 Subject: [PATCH 1/4] remove code that manually places SQS messages into a DLQ --- .../sqs_event_source_listener.py | 11 +++---- .../services/awslambda/lambda_executors.py | 24 ++++---------- localstack/utils/aws/dead_letter_queue.py | 32 ++----------------- localstack/utils/aws/message_forwarding.py | 2 +- 4 files changed, 15 insertions(+), 54 deletions(-) diff --git a/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py b/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py index 213170f96b4a4..2319fbf653135 100644 --- a/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py +++ b/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py @@ -101,11 +101,11 @@ def _process_messages_for_event_source(self, source, messages): return res def _send_event_to_lambda(self, queue_arn, queue_url, lambda_arn, messages, region): - def delete_messages(result, func_arn, event, error=None, dlq_sent=None, **kwargs): - if error and not dlq_sent: - # Skip deleting messages from the queue in case of processing errors AND if - # the message has not yet been sent to a dead letter queue (DLQ). - # We'll pick them up and retry next time they become available on the queue. + def delete_messages(result, func_arn, event, error=None, **kwargs): + if error: + # Skip deleting messages from the queue in case of processing errors. We'll pick them up and retry + # next time they become visible in the queue. Redrive policies will be handled automatically by SQS + # on the next polling attempt. return region_name = queue_arn.split(":")[3] @@ -142,7 +142,6 @@ def delete_messages(result, func_arn, event, error=None, dlq_sent=None, **kwargs event = {"Records": records} - # TODO implement retries, based on "RedrivePolicy.maxReceiveCount" in the queue settings res = run_lambda( func_arn=lambda_arn, event=event, diff --git a/localstack/services/awslambda/lambda_executors.py b/localstack/services/awslambda/lambda_executors.py index 6a09fe7e9fe45..4f00c6354e980 100644 --- a/localstack/services/awslambda/lambda_executors.py +++ b/localstack/services/awslambda/lambda_executors.py @@ -25,7 +25,6 @@ LAMBDA_RUNTIME_PROVIDED, get_container_network_for_lambda, get_main_endpoint_from_container, - get_record_from_event, is_java_lambda, is_nodejs_runtime, rm_docker_container, @@ -34,10 +33,7 @@ from localstack.services.install import GO_LAMBDA_RUNTIME, INSTALL_PATH_LOCALSTACK_FAT_JAR from localstack.utils.aws import aws_stack from localstack.utils.aws.aws_models import LambdaFunction -from localstack.utils.aws.dead_letter_queue import ( - lambda_error_to_dead_letter_queue, - sqs_error_to_dead_letter_queue, -) +from localstack.utils.aws.dead_letter_queue import lambda_error_to_dead_letter_queue from localstack.utils.cloudwatch.cloudwatch_util import cloudwatched from localstack.utils.collections import select_attributes from localstack.utils.common import ( @@ -354,14 +350,7 @@ def handle_error( lambda_function: LambdaFunction, event: Dict, error: Exception, asynchronous: bool = False ): if asynchronous: - if get_record_from_event(event, "eventSource") == EVENT_SOURCE_SQS: - sqs_queue_arn = get_record_from_event(event, "eventSourceARN") - if sqs_queue_arn: - # event source is SQS, send event back to dead letter queue - return sqs_error_to_dead_letter_queue(sqs_queue_arn, event, error) - else: - # event source is not SQS, send back to lambda dead letter queue - lambda_error_to_dead_letter_queue(lambda_function, event, error) + lambda_error_to_dead_letter_queue(lambda_function, event, error) class LambdaAsyncLocks: @@ -425,7 +414,6 @@ def _run(func_arn=None): # start the execution raised_error = None result = None - dlq_sent = None invocation_type = "Event" if asynchronous else "RequestResponse" inv_context = InvocationContext( lambda_function, @@ -438,13 +426,13 @@ def _run(func_arn=None): result = self._execute(lambda_function, inv_context) except Exception as e: raised_error = e - dlq_sent = handle_error(lambda_function, event, e, asynchronous) + handle_error(lambda_function, event, e, asynchronous) raise e finally: self.function_invoke_times[func_arn] = invocation_time - callback and callback( - result, func_arn, event, error=raised_error, dlq_sent=dlq_sent - ) + if callback: + callback(result, func_arn, event, error=raised_error) + lambda_result_to_destination( lambda_function, event, result, asynchronous, raised_error ) diff --git a/localstack/utils/aws/dead_letter_queue.py b/localstack/utils/aws/dead_letter_queue.py index db2719cd8bd95..ea90d5d0bce42 100644 --- a/localstack/utils/aws/dead_letter_queue.py +++ b/localstack/utils/aws/dead_letter_queue.py @@ -1,7 +1,6 @@ import json import logging import uuid -from json import JSONDecodeError from typing import Dict, List, Union from localstack.utils.aws import aws_stack @@ -11,47 +10,22 @@ LOG = logging.getLogger(__name__) -def sqs_error_to_dead_letter_queue(queue_arn: str, event: Dict, error): - client = aws_stack.connect_to_service("sqs") - queue_url = aws_stack.get_sqs_queue_url(queue_arn) - attrs = client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["RedrivePolicy"]) - attrs = attrs.get("Attributes", {}) - try: - policy = json.loads(attrs.get("RedrivePolicy") or "{}") - except JSONDecodeError: - LOG.warning( - "Parsing RedrivePolicy {} failed, Queue: {}".format( - attrs.get("RedrivePolicy"), queue_arn - ) - ) - return - - target_arn = policy.get("deadLetterTargetArn") - if not target_arn: - return - return _send_to_dead_letter_queue("SQS", queue_arn, target_arn, event, error) - - def sns_error_to_dead_letter_queue(sns_subscriber: dict, event: str, error): # event should be of type str if coming from SNS, as it represents the message body being passed down policy = json.loads(sns_subscriber.get("RedrivePolicy") or "{}") target_arn = policy.get("deadLetterTargetArn") if not target_arn: return - return _send_to_dead_letter_queue( - "SNS", sns_subscriber["SubscriptionArn"], target_arn, event, error - ) + return _send_to_dead_letter_queue(sns_subscriber["SubscriptionArn"], target_arn, event, error) def lambda_error_to_dead_letter_queue(func_details: LambdaFunction, event: Dict, error): dlq_arn = (func_details.dead_letter_config or {}).get("TargetArn") source_arn = func_details.id - return _send_to_dead_letter_queue("Lambda", source_arn, dlq_arn, event, error) + return _send_to_dead_letter_queue(source_arn, dlq_arn, event, error) -def _send_to_dead_letter_queue( - source_type: str, source_arn: str, dlq_arn: str, event: Union[Dict, str], error -): +def _send_to_dead_letter_queue(source_arn: str, dlq_arn: str, event: Union[Dict, str], error): if not dlq_arn: return LOG.info("Sending failed execution %s to dead letter queue %s", source_arn, dlq_arn) diff --git a/localstack/utils/aws/message_forwarding.py b/localstack/utils/aws/message_forwarding.py index 0851f009b04d9..eb93d071a2cb6 100644 --- a/localstack/utils/aws/message_forwarding.py +++ b/localstack/utils/aws/message_forwarding.py @@ -34,7 +34,7 @@ def lambda_result_to_destination( event: Dict, result: InvocationResult, is_async: bool, - error: InvocationException, + error: Optional[InvocationException], ): if not func_details.destination_enabled(): return From 4e8b0818056595b8e9a8f7dc7296186ecaab922a Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Sat, 6 Aug 2022 21:27:01 +0200 Subject: [PATCH 2/4] add two snapshot tests for lambda sqs event source mapping --- .../sqs_event_source_listener.py | 31 +-- .../testing/snapshots/transformer_utility.py | 8 + .../functions/lambda_sqs_integration.py | 55 ++++ .../awslambda/test_lambda_integration.py | 2 + .../awslambda/test_lambda_sqs_integration.py | 242 ++++++++++++++++++ .../test_lambda_sqs_integration.snapshot.json | 202 +++++++++++++++ tests/integration/test_sqs.py | 50 ---- 7 files changed, 525 insertions(+), 65 deletions(-) create mode 100644 tests/integration/awslambda/functions/lambda_sqs_integration.py create mode 100644 tests/integration/awslambda/test_lambda_sqs_integration.py create mode 100644 tests/integration/awslambda/test_lambda_sqs_integration.snapshot.json diff --git a/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py b/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py index 2319fbf653135..bde935fbc860e 100644 --- a/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py +++ b/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py @@ -124,21 +124,22 @@ def delete_messages(result, func_arn, event, error=None, **kwargs): records = [] for msg in messages: message_attrs = message_attributes_to_lower(msg.get("MessageAttributes")) - records.append( - { - "body": msg.get("Body", "MessageBody"), - "receiptHandle": msg.get("ReceiptHandle"), - "md5OfBody": msg.get("MD5OfBody") or msg.get("MD5OfMessageBody"), - "eventSourceARN": queue_arn, - "eventSource": lambda_executors.EVENT_SOURCE_SQS, - "awsRegion": region, - "messageId": msg["MessageId"], - "attributes": msg.get("Attributes", {}), - "messageAttributes": message_attrs, - "md5OfMessageAttributes": msg.get("MD5OfMessageAttributes"), - "sqs": True, - } - ) + record = { + "body": msg.get("Body", "MessageBody"), + "receiptHandle": msg.get("ReceiptHandle"), + "md5OfBody": msg.get("MD5OfBody") or msg.get("MD5OfMessageBody"), + "eventSourceARN": queue_arn, + "eventSource": lambda_executors.EVENT_SOURCE_SQS, + "awsRegion": region, + "messageId": msg["MessageId"], + "attributes": msg.get("Attributes", {}), + "messageAttributes": message_attrs, + } + + if md5OfMessageAttributes := msg.get("MD5OfMessageAttributes"): + record["md5OfMessageAttributes"] = md5OfMessageAttributes + + records.append(record) event = {"Records": records} diff --git a/localstack/testing/snapshots/transformer_utility.py b/localstack/testing/snapshots/transformer_utility.py index 2a104f742d72d..77d8892347f59 100644 --- a/localstack/testing/snapshots/transformer_utility.py +++ b/localstack/testing/snapshots/transformer_utility.py @@ -50,6 +50,14 @@ def key_value( replace_reference=reference_replacement, ) + @staticmethod + def resource_name(replacement_name: str = "resource"): + """Creates a new KeyValueBasedTransformer for the resource name. + + :return: KeyValueBasedTransformer + """ + return KeyValueBasedTransformer(_resource_name_transformer, replacement_name) + @staticmethod def jsonpath(jsonpath: str, value_replacement: str, reference_replacement: bool = True): """Creates a new JsonpathTransformer. If the jsonpath matches, the value will be replaced. diff --git a/tests/integration/awslambda/functions/lambda_sqs_integration.py b/tests/integration/awslambda/functions/lambda_sqs_integration.py new file mode 100644 index 0000000000000..9b4f1a761aa74 --- /dev/null +++ b/tests/integration/awslambda/functions/lambda_sqs_integration.py @@ -0,0 +1,55 @@ +"""This lambda is used for lambda/sqs integration tests. Since SQS event source mappings don't allow +DestinationConfigurations that send lambda results to other source (like SQS queues), that can be used to verify +invocations, this lambda does this manually. You can pass in an event that looks like this:: + + { + "destination": "", + "fail_attempts": 2 + } + +Which will cause the lambda to fail twice (comparing the "ApproximateReceiveCount" of the SQS event triggering +the lambda), and send either an error or success result to the SQS queue passed in the destination key. +""" +import json +import os + +import boto3 + + +def handler(event, context): + # this lambda expects inputs from an SQS event source mapping + if not event.get("Records"): + raise ValueError("no records passed to event") + + # it expects exactly one record where the message body is '{"destination": ""}' that mimics a + # DestinationConfig (which is not possible with SQS event source mappings). + record = event["Records"][0] + message = json.loads(record["body"]) + + if not message.get("destination"): + raise ValueError("no destination for the event given") + + error = None + try: + if message["fail_attempts"] >= int(record["attributes"]["ApproximateReceiveCount"]): + raise ValueError("failed attempt") + except Exception as e: + error = e + raise + finally: + # we then send a message to the destination queue + result = {"error": None if not error else str(error), "event": event} + sqs = create_external_boto_client("sqs") + sqs.send_message(QueueUrl=message.get("destination"), MessageBody=json.dumps(result)) + + +def create_external_boto_client(service): + endpoint_url = None + if os.environ.get("LOCALSTACK_HOSTNAME"): + endpoint_url = ( + f"http://{os.environ['LOCALSTACK_HOSTNAME']}:{os.environ.get('EDGE_PORT', 4566)}" + ) + region_name = ( + os.environ.get("AWS_DEFAULT_REGION") or os.environ.get("AWS_REGION") or "us-east-1" + ) + return boto3.client(service, endpoint_url=endpoint_url, region_name=region_name) diff --git a/tests/integration/awslambda/test_lambda_integration.py b/tests/integration/awslambda/test_lambda_integration.py index 2f6150dac841b..7bf1f8c3bd8b0 100644 --- a/tests/integration/awslambda/test_lambda_integration.py +++ b/tests/integration/awslambda/test_lambda_integration.py @@ -79,6 +79,8 @@ class TestSQSEventSourceMapping: + # FIXME: refactor and move to test_lambda_sqs_integration + @pytest.mark.skip_snapshot_verify def test_event_source_mapping_default_batch_size( self, diff --git a/tests/integration/awslambda/test_lambda_sqs_integration.py b/tests/integration/awslambda/test_lambda_sqs_integration.py new file mode 100644 index 0000000000000..479b0ce3ee776 --- /dev/null +++ b/tests/integration/awslambda/test_lambda_sqs_integration.py @@ -0,0 +1,242 @@ +import json +import os +import time + +import pytest + +from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON38 +from localstack.utils.strings import short_uid +from localstack.utils.sync import retry + +THIS_FOLDER = os.path.dirname(os.path.realpath(__file__)) +LAMBDA_SQS_INTEGRATION_FILE = os.path.join(THIS_FOLDER, "functions", "lambda_sqs_integration.py") + + +def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30): + def assert_mapping_enabled(): + assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled" + + retry(assert_mapping_enabled, sleep_before=2, retries=retries) + + +@pytest.fixture(autouse=True) +def _snapshot_transformers(snapshot): + # manual transformers since we are passing SQS attributes through lambdas and back again + snapshot.add_transformer(snapshot.transform.key_value("QueueUrl")) + snapshot.add_transformer(snapshot.transform.key_value("ReceiptHandle")) + snapshot.add_transformer(snapshot.transform.key_value("SenderId", reference_replacement=False)) + snapshot.add_transformer(snapshot.transform.resource_name()) + # body contains dynamic attributes so md5 hash changes + snapshot.add_transformer(snapshot.transform.key_value("MD5OfBody")) + # lower-case for when messages are rendered in lambdas + snapshot.add_transformer(snapshot.transform.key_value("receiptHandle")) + snapshot.add_transformer(snapshot.transform.key_value("md5OfBody")) + + +@pytest.mark.skip_snapshot_verify( + paths=[ + # FIXME: this is most of the event source mapping unfortunately + "$..ParallelizationFactor", + "$..LastProcessingResult", + "$..Topics", + "$..MaximumRetryAttempts", + "$..MaximumBatchingWindowInSeconds", + "$..FunctionResponseTypes", + "$..StartingPosition", + "$..StateTransitionReason", + ] +) +def test_failing_lambda_retries_after_visibility_timeout( + create_lambda_function, + lambda_client, + sqs_client, + sqs_create_queue, + sqs_queue_arn, + lambda_su_role, + snapshot, + cleanups, +): + """This test verifies a basic SQS retry scenario. The lambda uses an SQS queue as event source, and we are + testing whether the lambda automatically retries after the visibility timeout expires, and, after the retry, + properly deletes the message from the queue.""" + + # create queue used in the lambda to send events to (to verify lambda was invoked) + destination_queue_name = f"destination-queue-{short_uid()}" + destination_url = sqs_create_queue(QueueName=destination_queue_name) + snapshot.match( + "get_destination_queue_url", sqs_client.get_queue_url(QueueName=destination_queue_name) + ) + + retry_timeout = ( + 2 # timeout in seconds, used for both the lambda and the queue visibility timeout + ) + + # set up lambda function + function_name = f"failing-lambda-{short_uid()}" + create_lambda_function( + func_name=function_name, + handler_file=LAMBDA_SQS_INTEGRATION_FILE, + runtime=LAMBDA_RUNTIME_PYTHON38, + role=lambda_su_role, + timeout=retry_timeout, # timeout needs to be <= than visibility timeout + ) + + # create event source queue + event_source_url = sqs_create_queue( + QueueName=f"source-queue-{short_uid()}", + Attributes={ + # the visibility timeout is implicitly also the time between retries + "VisibilityTimeout": str(retry_timeout), + }, + ) + event_source_arn = sqs_queue_arn(event_source_url) + + # wire everything with the event source mapping + response = lambda_client.create_event_source_mapping( + EventSourceArn=event_source_arn, + FunctionName=function_name, + BatchSize=1, + ) + mapping_uuid = response["UUID"] + cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=mapping_uuid)) + _await_event_source_mapping_enabled(lambda_client, mapping_uuid) + response = lambda_client.get_event_source_mapping(UUID=mapping_uuid) + snapshot.match("event_source_mapping", response) + + # trigger lambda with a message and pass the result destination url. the event format is expected by the + # lambda_sqs_integration.py lambda. + event = {"destination": destination_url, "fail_attempts": 1} + sqs_client.send_message( + QueueUrl=event_source_url, + MessageBody=json.dumps(event), + ) + + # now wait for the first invocation result which is expected to fail + then = time.time() + first_response = sqs_client.receive_message( + QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1 + ) + assert "Messages" in first_response + snapshot.match("first_attempt", first_response) + + # and then after a few seconds (at least the visibility timeout), we expect the + second_response = sqs_client.receive_message( + QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1 + ) + assert "Messages" in second_response + snapshot.match("second_attempt", second_response) + + # check that it took at least the retry timeout between the first and second attempt + assert time.time() >= then + retry_timeout + + # assert message is removed from the queue + assert "Messages" not in sqs_client.receive_message( + QueueUrl=destination_url, WaitTimeSeconds=retry_timeout + 1, MaxNumberOfMessages=1 + ) + + +@pytest.mark.skip_snapshot_verify( + paths=[ + # FIXME: this is most of the event source mapping unfortunately + "$..ParallelizationFactor", + "$..LastProcessingResult", + "$..Topics", + "$..MaximumRetryAttempts", + "$..MaximumBatchingWindowInSeconds", + "$..FunctionResponseTypes", + "$..StartingPosition", + "$..StateTransitionReason", + ] +) +def test_redrive_policy_with_failing_lambda( + create_lambda_function, + lambda_client, + sqs_client, + sqs_create_queue, + sqs_queue_arn, + lambda_su_role, + snapshot, + cleanups, +): + """This test verifies a that SQS moves a message that is passed to a failing lambda to a DLQ according to the + redrive policy, and the lambda is invoked the correct number of times. The test retries twice and the event + source mapping should then automatically move the message to the DQL, but not earlier (see + https://github.com/localstack/localstack/issues/5283)""" + + # create queue used in the lambda to send events to (to verify lambda was invoked) + destination_queue_name = f"destination-queue-{short_uid()}" + destination_url = sqs_create_queue(QueueName=destination_queue_name) + snapshot.match( + "get_destination_queue_url", sqs_client.get_queue_url(QueueName=destination_queue_name) + ) + + retry_timeout = ( + 2 # timeout in seconds, used for both the lambda and the queue visibility timeout + ) + retries = 2 + + # set up lambda function + function_name = f"failing-lambda-{short_uid()}" + create_lambda_function( + func_name=function_name, + handler_file=LAMBDA_SQS_INTEGRATION_FILE, + runtime=LAMBDA_RUNTIME_PYTHON38, + role=lambda_su_role, + timeout=retry_timeout, # timeout needs to be <= than visibility timeout + ) + + # create dlq for event source queue + event_dlq_url = sqs_create_queue(QueueName=f"event-dlq-{short_uid()}") + event_dlq_arn = sqs_queue_arn(event_dlq_url) + + # create event source queue + event_source_url = sqs_create_queue( + QueueName=f"source-queue-{short_uid()}", + Attributes={ + # the visibility timeout is implicitly also the time between retries + "VisibilityTimeout": str(retry_timeout), + "RedrivePolicy": json.dumps( + {"deadLetterTargetArn": event_dlq_arn, "maxReceiveCount": retries} + ), + }, + ) + event_source_arn = sqs_queue_arn(event_source_url) + + # wire everything with the event source mapping + mapping_uuid = lambda_client.create_event_source_mapping( + EventSourceArn=event_source_arn, + FunctionName=function_name, + BatchSize=1, + )["UUID"] + cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=mapping_uuid)) + _await_event_source_mapping_enabled(lambda_client, mapping_uuid) + + # trigger lambda with a message and pass the result destination url. the event format is expected by the + # lambda_sqs_integration.py lambda. + event = {"destination": destination_url, "fail_attempts": retries} + sqs_client.send_message( + QueueUrl=event_source_url, + MessageBody=json.dumps(event), + ) + + # now wait for the first invocation result which is expected to fail + first_response = sqs_client.receive_message( + QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1 + ) + assert "Messages" in first_response + snapshot.match("first_attempt", first_response) + + # check that the DLQ is empty + assert "Messages" not in sqs_client.receive_message(QueueUrl=event_dlq_url, WaitTimeSeconds=1) + + # the second is also expected to fail, and then the message moves into the DLQ + second_response = sqs_client.receive_message( + QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1 + ) + assert "Messages" in second_response + snapshot.match("second_attempt", second_response) + + # now check that the event messages was placed in the DLQ + dlq_response = sqs_client.receive_message(QueueUrl=event_dlq_url, WaitTimeSeconds=15) + assert "Messages" in dlq_response + snapshot.match("dlq_response", dlq_response) diff --git a/tests/integration/awslambda/test_lambda_sqs_integration.snapshot.json b/tests/integration/awslambda/test_lambda_sqs_integration.snapshot.json new file mode 100644 index 0000000000000..0179c84b5f7b3 --- /dev/null +++ b/tests/integration/awslambda/test_lambda_sqs_integration.snapshot.json @@ -0,0 +1,202 @@ +{ + "tests/integration/awslambda/test_lambda_sqs_integration.py::test_failing_lambda_retries_after_visibility_timeout": { + "recorded-date": "06-08-2022, 20:38:01", + "recorded-content": { + "get_destination_queue_url": { + "QueueUrl": "", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "event_source_mapping": { + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + }, + "UUID": "", + "BatchSize": 1, + "MaximumBatchingWindowInSeconds": 0, + "EventSourceArn": "arn:aws:sqs::111111111111:", + "FunctionArn": "arn:aws:lambda::111111111111:function:", + "LastModified": "datetime", + "State": "Enabled", + "StateTransitionReason": "USER_INITIATED", + "FunctionResponseTypes": [] + }, + "first_attempt": { + "Messages": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "error": "failed attempt", + "event": { + "Records": [ + { + "messageId": "", + "receiptHandle": "", + "body": "{\"destination\": \"\", \"fail_attempts\": 1}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "timestamp", + "SenderId": "sender-id", + "ApproximateFirstReceiveTimestamp": "timestamp" + }, + "messageAttributes": {}, + "md5OfBody": "", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs::111111111111:", + "awsRegion": "" + } + ] + } + } + } + ], + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "second_attempt": { + "Messages": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "error": null, + "event": { + "Records": [ + { + "messageId": "", + "receiptHandle": "", + "body": "{\"destination\": \"\", \"fail_attempts\": 1}", + "attributes": { + "ApproximateReceiveCount": "2", + "SentTimestamp": "timestamp", + "SenderId": "sender-id", + "ApproximateFirstReceiveTimestamp": "timestamp" + }, + "messageAttributes": {}, + "md5OfBody": "", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs::111111111111:", + "awsRegion": "" + } + ] + } + } + } + ], + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + } + } + }, + "tests/integration/awslambda/test_lambda_sqs_integration.py::test_redrive_policy_with_failing_lambda": { + "recorded-date": "06-08-2022, 21:14:45", + "recorded-content": { + "get_destination_queue_url": { + "QueueUrl": "", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "first_attempt": { + "Messages": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "error": "failed attempt", + "event": { + "Records": [ + { + "messageId": "", + "receiptHandle": "", + "body": "{\"destination\": \"\", \"fail_attempts\": 2}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "timestamp", + "SenderId": "sender-id", + "ApproximateFirstReceiveTimestamp": "timestamp" + }, + "messageAttributes": {}, + "md5OfBody": "", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs::111111111111:", + "awsRegion": "" + } + ] + } + } + } + ], + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "second_attempt": { + "Messages": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "error": "failed attempt", + "event": { + "Records": [ + { + "messageId": "", + "receiptHandle": "", + "body": "{\"destination\": \"\", \"fail_attempts\": 2}", + "attributes": { + "ApproximateReceiveCount": "2", + "SentTimestamp": "timestamp", + "SenderId": "sender-id", + "ApproximateFirstReceiveTimestamp": "timestamp" + }, + "messageAttributes": {}, + "md5OfBody": "", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs::111111111111:", + "awsRegion": "" + } + ] + } + } + } + ], + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "dlq_response": { + "Messages": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "destination": "", + "fail_attempts": 2 + } + } + ], + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + } + } + } +} diff --git a/tests/integration/test_sqs.py b/tests/integration/test_sqs.py index 2c59ab68a8aa8..e2efb7967b8d6 100644 --- a/tests/integration/test_sqs.py +++ b/tests/integration/test_sqs.py @@ -1515,56 +1515,6 @@ def test_dead_letter_queue_list_sources(self, sqs_client, sqs_create_queue): assert queue_url_1 in source_urls["queueUrls"] assert queue_url_2 in source_urls["queueUrls"] - def test_dead_letter_queue_execution( - self, sqs_client, sqs_create_queue, lambda_client, create_lambda_function - ): - - # TODO: lambda creation does not work when testing against AWS - queue_name = f"queue-{short_uid()}" - dead_letter_queue_name = f"dl-queue-{short_uid()}" - dl_queue_url = sqs_create_queue(QueueName=dead_letter_queue_name) - - # create arn - url_parts = dl_queue_url.split("/") - region = os.environ.get("AWS_DEFAULT_REGION") or TEST_REGION - dl_target_arn = "arn:aws:sqs:{}:{}:{}".format( - region, url_parts[len(url_parts) - 2], url_parts[-1] - ) - - policy = {"deadLetterTargetArn": dl_target_arn, "maxReceiveCount": 1} - queue_url = sqs_create_queue( - QueueName=queue_name, Attributes={"RedrivePolicy": json.dumps(policy)} - ) - - lambda_name = f"lambda-{short_uid()}" - create_lambda_function( - func_name=lambda_name, - libs=TEST_LAMBDA_LIBS, - handler_file=TEST_LAMBDA_PYTHON, - runtime=LAMBDA_RUNTIME_PYTHON36, - ) - # create arn - url_parts = queue_url.split("/") - queue_arn = "arn:aws:sqs:{}:{}:{}".format( - region, url_parts[len(url_parts) - 2], url_parts[-1] - ) - lambda_client.create_event_source_mapping( - EventSourceArn=queue_arn, FunctionName=lambda_name - ) - - # add message to SQS, which will trigger the Lambda, resulting in an error - payload = {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1} - sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(payload)) - - assert poll_condition( - lambda: "Messages" - in sqs_client.receive_message(QueueUrl=dl_queue_url, VisibilityTimeout=0), - 10.0, - 1.0, - ) - result_recv = sqs_client.receive_message(QueueUrl=dl_queue_url, VisibilityTimeout=0) - assert result_recv["Messages"][0]["Body"] == json.dumps(payload) - @pytest.mark.aws_validated def test_dead_letter_queue_with_fifo_and_content_based_deduplication( self, sqs_client, sqs_create_queue, sqs_queue_arn From be4914e89002ce53e0068d08a1df520733bcb415 Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Sun, 7 Aug 2022 14:13:29 +0200 Subject: [PATCH 3/4] remove more unused code --- .../sqs_event_source_listener.py | 28 ++++++++----------- tests/integration/test_sqs.py | 11 ++------ 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py b/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py index bde935fbc860e..15633d4b3459b 100644 --- a/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py +++ b/localstack/services/awslambda/event_source_listeners/sqs_event_source_listener.py @@ -49,8 +49,6 @@ def _listener_loop(self, *args): self.SQS_LISTENER_THREAD.pop("_thread_") return - unprocessed_messages = {} - for source in sources: queue_arn = source["EventSourceArn"] region_name = queue_arn.split(":")[3] @@ -59,21 +57,17 @@ def _listener_loop(self, *args): try: queue_url = aws_stack.sqs_queue_url_for_arn(queue_arn) - messages = unprocessed_messages.pop(queue_arn, None) + result = sqs_client.receive_message( + QueueUrl=queue_url, + AttributeNames=["All"], + MessageAttributeNames=["All"], + MaxNumberOfMessages=batch_size, + ) + messages = result.get("Messages") if not messages: - result = sqs_client.receive_message( - QueueUrl=queue_url, - AttributeNames=["All"], - MessageAttributeNames=["All"], - MaxNumberOfMessages=batch_size, - ) - messages = result.get("Messages") - if not messages: - continue - - res = self._process_messages_for_event_source(source, messages) - if not res: - unprocessed_messages[queue_arn] = messages + continue + + self._process_messages_for_event_source(source, messages) except Exception as e: if "NonExistentQueue" not in str(e): @@ -100,7 +94,7 @@ def _process_messages_for_event_source(self, source, messages): ) return res - def _send_event_to_lambda(self, queue_arn, queue_url, lambda_arn, messages, region): + def _send_event_to_lambda(self, queue_arn, queue_url, lambda_arn, messages, region) -> bool: def delete_messages(result, func_arn, event, error=None, **kwargs): if error: # Skip deleting messages from the queue in case of processing errors. We'll pick them up and retry diff --git a/tests/integration/test_sqs.py b/tests/integration/test_sqs.py index e2efb7967b8d6..50272c7ba3148 100644 --- a/tests/integration/test_sqs.py +++ b/tests/integration/test_sqs.py @@ -2382,20 +2382,13 @@ def test_delete_message_with_deleted_receipt_handle(self, sqs_client, sqs_queue) sqs_client.delete_message(QueueUrl=sqs_queue, ReceiptHandle=handle) sqs_client.delete_message(QueueUrl=sqs_queue, ReceiptHandle=handle) + # TODO: test message attributes and message system attributes + def get_region(): return os.environ.get("AWS_DEFAULT_REGION") or TEST_REGION -# TODO: test visibility timeout (with various ways to set them: queue attributes, receive parameter, update call) -# TODO: test message attributes and message system attributes - - -class TestSqsLambdaIntegration: - pass - # TODO: move tests here - - @pytest.fixture() def sqs_http_client(aws_http_client_factory): yield aws_http_client_factory("sqs") From 818d70d8bd695a2ed7bcdaf36f6fdc9cf66155c2 Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Mon, 8 Aug 2022 21:38:16 +0200 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: Dominik Schubert --- .../awslambda/functions/lambda_sqs_integration.py | 9 +++------ .../integration/awslambda/test_lambda_sqs_integration.py | 5 ++--- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/tests/integration/awslambda/functions/lambda_sqs_integration.py b/tests/integration/awslambda/functions/lambda_sqs_integration.py index 9b4f1a761aa74..6a4404c5e522f 100644 --- a/tests/integration/awslambda/functions/lambda_sqs_integration.py +++ b/tests/integration/awslambda/functions/lambda_sqs_integration.py @@ -18,8 +18,8 @@ def handler(event, context): # this lambda expects inputs from an SQS event source mapping - if not event.get("Records"): - raise ValueError("no records passed to event") + if len(event.get("Records", [])) != 1: + raise ValueError("the payload must consist of exactly one record") # it expects exactly one record where the message body is '{"destination": ""}' that mimics a # DestinationConfig (which is not possible with SQS event source mappings). @@ -49,7 +49,4 @@ def create_external_boto_client(service): endpoint_url = ( f"http://{os.environ['LOCALSTACK_HOSTNAME']}:{os.environ.get('EDGE_PORT', 4566)}" ) - region_name = ( - os.environ.get("AWS_DEFAULT_REGION") or os.environ.get("AWS_REGION") or "us-east-1" - ) - return boto3.client(service, endpoint_url=endpoint_url, region_name=region_name) + return boto3.client(service, endpoint_url=endpoint_url) diff --git a/tests/integration/awslambda/test_lambda_sqs_integration.py b/tests/integration/awslambda/test_lambda_sqs_integration.py index 479b0ce3ee776..0570d22205d78 100644 --- a/tests/integration/awslambda/test_lambda_sqs_integration.py +++ b/tests/integration/awslambda/test_lambda_sqs_integration.py @@ -137,7 +137,6 @@ def test_failing_lambda_retries_after_visibility_timeout( @pytest.mark.skip_snapshot_verify( paths=[ - # FIXME: this is most of the event source mapping unfortunately "$..ParallelizationFactor", "$..LastProcessingResult", "$..Topics", @@ -158,9 +157,9 @@ def test_redrive_policy_with_failing_lambda( snapshot, cleanups, ): - """This test verifies a that SQS moves a message that is passed to a failing lambda to a DLQ according to the + """This test verifies that SQS moves a message that is passed to a failing lambda to a DLQ according to the redrive policy, and the lambda is invoked the correct number of times. The test retries twice and the event - source mapping should then automatically move the message to the DQL, but not earlier (see + source mapping should then automatically move the message to the DLQ, but not earlier (see https://github.com/localstack/localstack/issues/5283)""" # create queue used in the lambda to send events to (to verify lambda was invoked)