diff --git a/localstack-core/localstack/testing/pytest/fixtures.py b/localstack-core/localstack/testing/pytest/fixtures.py index d526097aef1cb..66cc5c2f016eb 100644 --- a/localstack-core/localstack/testing/pytest/fixtures.py +++ b/localstack-core/localstack/testing/pytest/fixtures.py @@ -67,6 +67,38 @@ from mypy_boto3_sqs.type_defs import MessageTypeDef +@pytest.fixture(scope="session") +def aws_client_no_retry(aws_client_factory): + """ + This fixture can be used to obtain Boto clients with disabled retries for testing. + botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode + + Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500) + to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound. + + This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode: + https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode + General socket/connection errors: + * ConnectionError + * ConnectionClosedError + * ReadTimeoutError + * EndpointConnectionError + + Service-side throttling/limit errors and exceptions: + * Throttling + * ThrottlingException + * ThrottledException + * RequestThrottledException + * ProvisionedThroughputExceededException + + HTTP status codes: 429, 500, 502, 503, 504, and 509 + + Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm). + """ + no_retry_config = botocore.config.Config(retries={"max_attempts": 1}) + return aws_client_factory(config=no_retry_config) + + @pytest.fixture(scope="class") def aws_http_client_factory(aws_session): """ diff --git a/tests/aws/services/lambda_/functions/lambda_notifier.py b/tests/aws/services/lambda_/functions/lambda_notifier.py new file mode 100644 index 0000000000000..01b75c6fd64b9 --- /dev/null +++ b/tests/aws/services/lambda_/functions/lambda_notifier.py @@ -0,0 +1,40 @@ +import datetime +import json +import os +import time + +import boto3 + +sqs_client = boto3.client("sqs", endpoint_url=os.environ.get("AWS_ENDPOINT_URL")) + + +def handler(event, context): + """Example: Send a message to the queue_url provided in notify and then wait for 7 seconds. + The message includes the value of the environment variable called "FUNCTION_VARIANT". + aws_client.lambda_.invoke( + FunctionName=fn_arn, + InvocationType="Event", + Payload=json.dumps({"notify": queue_url, "env_var": "FUNCTION_VARIANT", "label": "01-sleep", "wait": 7}) + ) + + Parameters: + * `notify`: SQS queue URL to notify a message + * `env_var`: Name of the environment variable that should be included in the message + * `label`: Label to be included in the message + * `wait`: Time in seconds to sleep + """ + if queue_url := event.get("notify"): + message = { + "request_id": context.aws_request_id, + "timestamp": datetime.datetime.now(datetime.UTC).isoformat(), + } + if env_var := event.get("env_var"): + message[env_var] = os.environ[env_var] + if label := event.get("label"): + message["label"] = label + print(f"Notify message: {message}") + sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(message)) + + if wait_time := event.get("wait"): + print(f"Sleeping for {wait_time} seconds ...") + time.sleep(wait_time) diff --git a/tests/aws/services/lambda_/test_lambda.py b/tests/aws/services/lambda_/test_lambda.py index 11b754d296fe1..cba8061226134 100644 --- a/tests/aws/services/lambda_/test_lambda.py +++ b/tests/aws/services/lambda_/test_lambda.py @@ -30,7 +30,10 @@ get_invoke_init_type, update_done, ) -from localstack.testing.aws.util import create_client_with_keys, is_aws_cloud +from localstack.testing.aws.util import ( + create_client_with_keys, + is_aws_cloud, +) from localstack.testing.pytest import markers from localstack.testing.snapshots.transformer_utility import PATTERN_UUID from localstack.utils import files, platform, testutil @@ -123,6 +126,7 @@ TEST_LAMBDA_PYTHON_MULTIPLE_HANDLERS = os.path.join( THIS_FOLDER, "functions/lambda_multiple_handlers.py" ) +TEST_LAMBDA_NOTIFIER = os.path.join(THIS_FOLDER, "functions/lambda_notifier.py") PYTHON_TEST_RUNTIMES = RUNTIMES_AGGREGATED["python"] NODE_TEST_RUNTIMES = RUNTIMES_AGGREGATED["nodejs"] @@ -2614,18 +2618,37 @@ def _invoke_lambda(): assert not errored @markers.aws.validated - @pytest.mark.skip(reason="flaky") - def test_reserved_concurrency_async_queue(self, create_lambda_function, snapshot, aws_client): + def test_reserved_concurrency_async_queue( + self, + create_lambda_function, + sqs_create_queue, + sqs_collect_messages, + snapshot, + aws_client, + aws_client_no_retry, + ): + """Test async/event invoke retry behavior due to limited reserved concurrency. + Timeline: + 1) Set ReservedConcurrentExecutions=1 + 2) sync_invoke_warm_up => ok + 3) async_invoke_one => ok + 4) async_invoke_two => gets retried + 5) sync invoke => fails with TooManyRequestsException + 6) Set ReservedConcurrentExecutions=3 + 7) sync_invoke_final => ok + """ min_concurrent_executions = 10 + 3 check_concurrency_quota(aws_client, min_concurrent_executions) + queue_url = sqs_create_queue() + func_name = f"test_lambda_{short_uid()}" create_lambda_function( func_name=func_name, - handler_file=TEST_LAMBDA_INTROSPECT_PYTHON, + handler_file=TEST_LAMBDA_NOTIFIER, runtime=Runtime.python3_12, client=aws_client.lambda_, - timeout=20, + timeout=30, ) fn = aws_client.lambda_.get_function_configuration( @@ -2641,24 +2664,46 @@ def test_reserved_concurrency_async_queue(self, create_lambda_function, snapshot snapshot.match("put_fn_concurrency", put_fn_concurrency) # warm up the Lambda function to mitigate flakiness due to cold start - aws_client.lambda_.invoke(FunctionName=fn_arn, InvocationType="RequestResponse") + sync_invoke_warm_up = aws_client.lambda_.invoke( + FunctionName=fn_arn, InvocationType="RequestResponse" + ) + assert "FunctionError" not in sync_invoke_warm_up - # simultaneously queue two event invocations - # The first event invoke gets executed immediately - aws_client.lambda_.invoke( - FunctionName=fn_arn, InvocationType="Event", Payload=json.dumps({"wait": 15}) + # Immediately queue two event invocations: + # 1) The first event invoke gets executed immediately + async_invoke_one = aws_client.lambda_.invoke( + FunctionName=fn_arn, + InvocationType="Event", + Payload=json.dumps({"notify": queue_url, "wait": 15}), ) - # The second event invoke gets throttled and re-scheduled with an internal retry - aws_client.lambda_.invoke( - FunctionName=fn_arn, InvocationType="Event", Payload=json.dumps({"wait": 10}) + assert "FunctionError" not in async_invoke_one + # 2) The second event invoke gets throttled and re-scheduled with an internal retry + async_invoke_two = aws_client.lambda_.invoke( + FunctionName=fn_arn, + InvocationType="Event", + Payload=json.dumps({"notify": queue_url}), ) + assert "FunctionError" not in async_invoke_two - # Ensure one event invocation is being executed and the other one is in the queue. - time.sleep(5) + # Wait until the first async invoke is being executed while the second async invoke is in the queue. + messages = sqs_collect_messages( + queue_url, + expected=1, + timeout=15, + ) + async_invoke_one_notification = json.loads(messages[0]["Body"]) + assert ( + async_invoke_one_notification["request_id"] + == async_invoke_one["ResponseMetadata"]["RequestId"] + ) # Synchronous invocations raise an exception because insufficient reserved concurrency is available + # It is important to disable botocore retries because the concurrency limit is time-bound because it only + # triggers as long as the first async invoke is processing! with pytest.raises(aws_client.lambda_.exceptions.TooManyRequestsException) as e: - aws_client.lambda_.invoke(FunctionName=fn_arn, InvocationType="RequestResponse") + aws_client_no_retry.lambda_.invoke( + FunctionName=fn_arn, InvocationType="RequestResponse" + ) snapshot.match("too_many_requests_exc", e.value.response) # ReservedConcurrentExecutions=2 is insufficient because the throttled async event invoke might be @@ -2666,21 +2711,28 @@ def test_reserved_concurrency_async_queue(self, create_lambda_function, snapshot aws_client.lambda_.put_function_concurrency( FunctionName=func_name, ReservedConcurrentExecutions=3 ) - aws_client.lambda_.invoke(FunctionName=fn_arn, InvocationType="RequestResponse") - - def assert_events(): - log_events = aws_client.logs.filter_log_events( - logGroupName=f"/aws/lambda/{func_name}", - )["events"] - invocation_count = len( - [event["message"] for event in log_events if event["message"].startswith("REPORT")] - ) - assert invocation_count == 4 - - retry(assert_events, retries=120, sleep=2) + # Invocations succeed after raising reserved concurrency + sync_invoke_final = aws_client.lambda_.invoke( + FunctionName=fn_arn, + InvocationType="RequestResponse", + Payload=json.dumps({"notify": queue_url}), + ) + assert "FunctionError" not in sync_invoke_final - # TODO: snapshot logs & request ID for correlation after request id gets propagated - # https://github.com/localstack/localstack/pull/7874 + # Contains the re-queued `async_invoke_two` and the `sync_invoke_final`, but the order might differ + # depending on whether invoke_two gets re-schedule before or after the final invoke. + # AWS docs: https://docs.aws.amazon.com/lambda/latest/dg/invocation-async-error-handling.html + # "The retry interval increases exponentially from 1 second after the first attempt to a maximum of 5 minutes." + final_messages = sqs_collect_messages( + queue_url, + expected=2, + timeout=20, + ) + invoked_request_ids = {json.loads(msg["Body"])["request_id"] for msg in final_messages} + assert { + async_invoke_two["ResponseMetadata"]["RequestId"], + sync_invoke_final["ResponseMetadata"]["RequestId"], + } == invoked_request_ids @markers.snapshot.skip_snapshot_verify(paths=["$..Attributes.AWSTraceHeader"]) @markers.aws.validated diff --git a/tests/aws/services/lambda_/test_lambda.snapshot.json b/tests/aws/services/lambda_/test_lambda.snapshot.json index cb24e3154abd6..85733509934a2 100644 --- a/tests/aws/services/lambda_/test_lambda.snapshot.json +++ b/tests/aws/services/lambda_/test_lambda.snapshot.json @@ -2982,7 +2982,7 @@ } }, "tests/aws/services/lambda_/test_lambda.py::TestLambdaConcurrency::test_reserved_concurrency_async_queue": { - "recorded-date": "08-04-2024, 17:07:59", + "recorded-date": "26-03-2025, 10:53:54", "recorded-content": { "fn": { "Architectures": [ @@ -3019,7 +3019,7 @@ "OptimizationStatus": "Off" }, "State": "Active", - "Timeout": 20, + "Timeout": 30, "TracingConfig": { "Mode": "PassThrough" }, diff --git a/tests/aws/services/lambda_/test_lambda.validation.json b/tests/aws/services/lambda_/test_lambda.validation.json index 49d07c303273f..adc3a699f0367 100644 --- a/tests/aws/services/lambda_/test_lambda.validation.json +++ b/tests/aws/services/lambda_/test_lambda.validation.json @@ -78,7 +78,7 @@ "last_validated_date": "2024-04-08T17:08:10+00:00" }, "tests/aws/services/lambda_/test_lambda.py::TestLambdaConcurrency::test_reserved_concurrency_async_queue": { - "last_validated_date": "2024-04-08T17:07:56+00:00" + "last_validated_date": "2025-03-26T10:54:29+00:00" }, "tests/aws/services/lambda_/test_lambda.py::TestLambdaConcurrency::test_reserved_provisioned_overlap": { "last_validated_date": "2024-04-08T17:10:36+00:00"