8000 Fix flaky lambda test event retry reserved concurrency by joe4dev · Pull Request #12441 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

Fix flaky lambda test event retry reserved concurrency #12441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions localstack-core/localstack/testing/pytest/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,38 @@
from mypy_boto3_sqs.type_defs import MessageTypeDef


@pytest.fixture(scope="session")
def aws_client_no_retry(aws_client_factory):
"""
This fixture can be used to obtain Boto clients with disabled retries for testing.
botocore docs: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#configuring-a-retry-mode

Use this client when testing exceptions (i.e., with pytest.raises(...)) or expected errors (e.g., status code 500)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can clarify here, that this is mostly needed when exceptions are tested which have retries? So all listed in here: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode (or matching the http status codes also listed there).
For a "ResourceNotFound" exception for example, this is not necessary

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we are indeed using the legacy retry mode, which makes things even worse 😬 .

I wanted to keep it simple and long-term given there are no adverse effects of using the no_retry client with a non-retrying exception. Nevertheless, I added the clarification for most accurate and actionable advice at the current status.

to avoid unnecessary retries and mitigate test flakiness if the tested error condition is time-bound.

This client is needed for the following errors, exceptions, and HTTP status codes defined by the legacy retry mode:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode
General socket/connection errors:
* ConnectionError
* ConnectionClosedError
* ReadTimeoutError
* EndpointConnectionError

Service-side throttling/limit errors and exceptions:
* Throttling
* ThrottlingException
* ThrottledException
* RequestThrottledException
* ProvisionedThroughputExceededException

HTTP status codes: 429, 500, 502, 503, 504, and 509

Hence, this client is not needed for a `ResourceNotFound` error (but it doesn't harm).
"""
no_retry_config = botocore.config.Config(retries={"max_attempts": 1})
return aws_client_factory(config=no_retry_config)


@pytest.fixture(scope="class")
def aws_http_client_factory(aws_session):
"""
Expand Down
40 changes: 40 additions & 0 deletions tests/aws/services/lambda_/functions/lambda_notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import datetime
import json
import os
import time

import boto3

sqs_client = boto3.client("sqs", endpoint_url=os.environ.get("AWS_ENDPOINT_URL"))


def handler(event, context):
"""Example: Send a message to the queue_url provided in notify and then wait for 7 seconds.
The message includes the value of the environment variable called "FUNCTION_VARIANT".
aws_client.lambda_.invoke(
FunctionName=fn_arn,
InvocationType="Event",
Payload=json.dumps({"notify": queue_url, "env_var": "FUNCTION_VARIANT", "label": "01-sleep", "wait": 7})
)

Parameters:
* `notify`: SQS queue URL to notify a message
* `env_var`: Name of the environment variable that should be included in the message
* `label`: Label to be included in the message
* `wait`: Time in seconds to sleep
"""
if queue_url := event.get("notify"):
message = {
"request_id": context.aws_request_id,
"timestamp": datetime.datetime.now(datetime.UTC).isoformat(),
}
if env_var := event.get("env_var"):
message[env_var] = os.environ[env_var]
if label := event.get("label"):
message["label"] = label
print(f"Notify message: {message}")
sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(message))

if wait_time := event.get("wait"):
print(f"Sleeping for {wait_time} seconds ...")
time.sleep(wait_time)
112 changes: 82 additions & 30 deletions tests/aws/services/lambda_/test_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
get_invoke_init_type,
update_done,
)
from localstack.testing.aws.util import create_client_with_keys, is_aws_cloud
from localstack.testing.aws.util import (
create_client_with_keys,
is_aws_cloud,
)
from localstack.testing.pytest import markers
from localstack.testing.snapshots.transformer_utility import PATTERN_UUID
from localstack.utils import files, platform, testutil
Expand Down Expand Up @@ -123,6 +126,7 @@
TEST_LAMBDA_PYTHON_MULTIPLE_HANDLERS = os.path.join(
THIS_FOLDER, "functions/lambda_multiple_handlers.py"
)
TEST_LAMBDA_NOTIFIER = os.path.join(THIS_FOLDER, "functions/lambda_notifier.py")

PYTHON_TEST_RUNTIMES = RUNTIMES_AGGREGATED["python"]
NODE_TEST_RUNTIMES = RUNTIMES_AGGREGATED["nodejs"]
Expand Down Expand Up @@ -2614,18 +2618,37 @@ def _invoke_lambda():
assert not errored

@markers.aws.validated
@pytest.mark.skip(reason="flaky")
def test_reserved_concurrency_async_queue(self, create_lambda_function, snapshot, aws_client):
def test_reserved_concurrency_async_queue(
self,
create_lambda_function,
sqs_create_queue,
sqs_collect_messages,
snapshot,
aws_client,
aws_client_no_retry,
):
"""Test async/event invoke retry behavior due to limited reserved concurrency.
Timeline:
1) Set ReservedConcurrentExecutions=1
2) sync_invoke_warm_up => ok
3) async_invoke_one => ok
4) async_invoke_two => gets retried
5) sync invoke => fails with TooManyRequestsException
6) Set ReservedConcurrentExecutions=3
7) sync_invoke_final => ok
"""
min_concurrent_executions = 10 + 3
check_concurrency_quota(aws_client, min_concurrent_executions)

queue_url = sqs_create_queue()

func_name = f"test_lambda_{short_uid()}"
create_lambda_function(
func_name=func_name,
handler_file=TEST_LAMBDA_INTROSPECT_PYTHON,
handler_file=TEST_LAMBDA_NOTIFIER,
runtime=Runtime.python3_12,
client=aws_client.lambda_,
timeout=20,
timeout=30,
)

fn = aws_client.lambda_.get_function_configuration(
Expand All @@ -2641,46 +2664,75 @@ def test_reserved_concurrency_async_queue(self, create_lambda_function, snapshot
snapshot.match("put_fn_concurrency", put_fn_concurrency)

# warm up the Lambda function to mitigate flakiness due to cold start
aws_client.lambda_.invoke(FunctionName=fn_arn, InvocationType="RequestResponse")
sync_invoke_warm_up = aws_client.lambda_.invoke(
FunctionName=fn_arn, InvocationType="RequestResponse"
)
assert "FunctionError" not in sync_invoke_warm_up

# simultaneously queue two event invocations
# The first event invoke gets executed immediately
aws_client.lambda_.invoke(
FunctionName=fn_arn, InvocationType="Event", Payload=json.dumps({"wait": 15})
# Immediately queue two event invocations:
# 1) The first event invoke gets executed immediately
async_invoke_one = aws_client.lambda_.invoke(
FunctionName=fn_arn,
InvocationType="Event",
Payload=json.dumps({"notify": queue_url, "wait": 15}),
)
# The second event invoke gets throttled and re-scheduled with an internal retry
aws_client.lambda_.invoke(
FunctionName=fn_arn, InvocationType="Event", Payload=json.dumps({"wait": 10})
assert "FunctionError" not in async_invoke_one
# 2) The second event invoke gets throttled and re-scheduled with an internal retry
async_invoke_two = aws_client.lambda_.invoke(
FunctionName=fn_arn,
InvocationType="Event",
Payload=json.dumps({"notify": queue_url}),
)
assert "FunctionError" not in async_invoke_two

# Ensure one event invocation is being executed and the other one is in the queue.
time.sleep(5)
# Wait until the first async invoke is being executed while the second async invoke is in the queue.
messages = sqs_collect_messages(
queue_url,
expected=1,
timeout=15,
)
async_invoke_one_notification = json.loads(messages[0]["Body"])
assert (
async_invoke_one_notification["request_id"]
== async_invoke_one["ResponseMetadata"]["RequestId"]
)

# Synchronous invocations raise an exception because insufficient reserved concurrency is available
# It is important to disable botocore retries because the concurrency limit is time-bound because it only
# triggers as long as the first async invoke is processing!
with pytest.raises(aws_client.lambda_.exceptions.TooManyRequestsException) as e:
aws_client.lambda_.invoke(FunctionName=fn_arn, InvocationType="RequestResponse")
aws_client_no_retry.lambda_.invoke(
FunctionName=fn_arn, InvocationType="RequestResponse"
)
snapshot.match("too_many_requests_exc", e.value.response)

# ReservedConcurrentExecutions=2 is insufficient because the throttled async event invoke might be
# re-scheduled before the synchronous invoke while the first async invoke is still running.
aws_client.lambda_.put_function_concurrency(
FunctionName=func_name, ReservedConcurrentExecutions=3
)
aws_client.lambda_.invoke(FunctionName=fn_arn, InvocationType="RequestResponse")

def assert_events():
log_events = aws_client.logs.filter_log_events(
logGroupName=f"/aws/lambda/{func_name}",
)["events"]
invocation_count = len(
[event["message"] for event in log_events if event["message"].startswith("REPORT")]
)
assert invocation_count == 4

retry(assert_events, retries=120, sleep=2)
# Invocations succeed after raising reserved concurrency
sync_invoke_final = aws_client.lambda_.invoke(
FunctionName=fn_arn,
InvocationType="RequestResponse",
Payload=json.dumps({"notify": queue_url}),
)
assert "FunctionError" not in sync_invoke_final

# TODO: snapshot logs & request ID for correlation after request id gets propagated
# https://github.com/localstack/localstack/pull/7874
# Contains the re-queued `async_invoke_two` and the `sync_invoke_final`, but the order might differ
# depending on whether invoke_two gets re-schedule before or after the final invoke.
# AWS docs: https://docs.aws.amazon.com/lambda/latest/dg/invocation-async-error-handling.html
# "The retry interval increases exponentially from 1 second after the first attempt to a maximum of 5 minutes."
final_messages = sqs_collect_messages(
queue_url,
expected=2,
timeout=20,
)
invoked_request_ids = {json.loads(msg["Body"])["request_id"] for msg in final_messages}
assert {
async_invoke_two["ResponseMetadata"]["RequestId"],
sync_invoke_final["ResponseMetadata"]["RequestId"],
} == invoked_request_ids

@markers.snapshot.skip_snapshot_verify(paths=["$..Attributes.AWSTraceHeader"])
@markers.aws.validated
Expand Down
4 changes: 2 additions & 2 deletions tests/aws/services/lambda_/test_lambda.snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -2982,7 +2982,7 @@
}
},
"tests/aws/services/lambda_/test_lambda.py::TestLambdaConcurrency::test_reserved_concurrency_async_queue": {
"recorded-date": "08-04-2024, 17:07:59",
"recorded-date": "26-03-2025, 10:53:54",
"recorded-content": {
"fn": {
"Architectures": [
Expand Down Expand Up @@ -3019,7 +3019,7 @@
"OptimizationStatus": "Off"
},
"State": "Active",
"Timeout": 20,
"Timeout": 30,
"TracingConfig": {
"Mode": "PassThrough"
},
Expand Down
2 changes: 1 addition & 1 deletion tests/aws/services/lambda_/test_lambda.validation.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
"last_validated_date": "2024-04-08T17:08:10+00:00"
},
"tests/aws/services/lambda_/test_lambda.py::TestLambdaConcurrency::test_reserved_concurrency_async_queue": {
"last_validated_date": "2024-04-08T17:07:56+00:00"
"last_validated_date": "2025-03-26T10:54:29+00:00"
},
"tests/aws/services/lambda_/test_lambda.py::TestLambdaConcurrency::test_reserved_provisioned_overlap": {
"last_validated_date": "2024-04-08T17:10:36+00:00"
Expand Down
Loading
0