8000 Make event invoke function update test more predicatable by joe4dev · Pull Request #12444 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

Make event invoke function update test more predicatable #12444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 81 additions & 42 deletions tests/aws/services/lambda_/test_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
from localstack.utils.strings import short_uid, to_bytes, to_str
from localstack.utils.sync import retry, wait_until
from localstack.utils.testutil import create_lambda_archive
from tests.aws.services.lambda_.utils import get_s3_keys

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -3051,65 +3050,99 @@ def _update_function():
thread.join()
assert not errored

# TODO: Fix first invoke getting retried and ending up being executed against the new variant because the
# update stops the running function version. We should let running executions finish for $LATEST in this case.
# MAYBE: consider validating whether a code update behaves differently than a configuration update
@markers.snapshot.skip_snapshot_verify(
# TODO: Fix first invoke getting retried and ending up being executed against the new variant because the
# update stops the running function version. We should let running executions finish for $LATEST in this case.
paths=[
"$..01-sleep",
]
)
@markers.aws.validated
def test_async_invoke_queue_upon_function_update(
self, aws_client, create_lambda_function, s3_create_bucket, snapshot
self, aws_client, create_lambda_function, sqs_create_queue, sqs_collect_messages, snapshot
):
"""Test what happens with queued async invokes (i.e., event invokes) when updating a function.
We are using a combination of reserved concurrency and sleeps to design this test case predictable.
Observation: If we don't wait after sending the first invoke, some queued invokes can still be handled by an
old variant in some non-deterministic way.
Timeline:
1) Set ReservedConcurrentExecutions=1
2) sync_invoke_warm_up => variant-one
3) async_invoke_one => variant-one
4) Queue 5 async invokes => variant-two because they are executed after the update
5) UpdateFunction
6) Queue 5 async invokes => variant-two
"""
# HACK: workaround to ignore `$..async_invoke_history_sorted[0]` because indices don't work in the ignore list
snapshot.add_transformer(
snapshot.transform.regex("01-sleep--variant-2", "01-sleep--variant-1")
)
bucket_name = f"lambda-target-bucket-{short_uid()}"
s3_create_bucket(Bucket=bucket_name)
# Queue for tracking invoked versions
queue_url = sqs_create_queue()

function_name = f"test-function-{short_uid()}"
environment_v1 = {
"Variables": {"S3_BUCKET_NAME": bucket_name, "FUNCTION_VARIANT": "variant-1"}
}
environment_v1 = {"Variables": {"FUNCTION_VARIANT": "variant-one"}}
create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_PYTHON_S3_INTEGRATION_FUNCTION_VERSION,
handler_file=TEST_LAMBDA_NOTIFIER,
runtime=Runtime.python3_12,
Environment=environment_v1,
)
# Add reserved concurrency limits the throughput and makes it easier to cause event invokes to queue up.
# Add reserved concurrency to limit the throughput, which makes it easier to queue up event invokes
reserved_concurrency_response = aws_client.lambda_.put_function_concurrency(
FunctionName=function_name,
ReservedConcurrentExecutions=1,
)
assert reserved_concurrency_response["ResponseMetadata"]["HTTPStatusCode"] == 200

payload = {"request_prefix": f"{1:02}-sleep", "sleep_seconds": 22}
aws_client.lambda_.invoke(
# Warm up the Lambda function to mitigate flakiness due to cold start
sync_invoke_warm_up = aws_client.lambda_.invoke(
FunctionName=function_name, InvocationType="RequestResponse"
)
assert "FunctionError" not in sync_invoke_warm_up

async_invoke_one = aws_client.lambda_.invoke(
FunctionName=function_name,
InvocationType="Event",
Payload=json.dumps(payload),
# Wait long enough until the function gets updated to variant-two
Payload=json.dumps(
{
"notify": queue_url,
"env_var": "FUNCTION_VARIANT",
"label": "01-sleep",
"wait": 20,
}
),
)
# Make it very likely that the invocation is being processed because the Lambda poller should pick up queued
# async invokes quickly using long polling.
time.sleep(2)

# Send async invocation, which should queue up before we update the function
num_invocations_before = 9
# Send async invocations, which should queue up before we update the function
num_invocations_before = 5
for index in range(num_invocations_before):
payload = {"request_prefix": f"{index + 2:02}-before"}
aws_client.lambda_.invoke(
FunctionName=function_name,
InvocationType="Event",
Payload=json.dumps(payload),
Payload=json.dumps(
{
"notify": queue_url,
"env_var": "FUNCTION_VARIANT",
"label": f"{index + 2:02}-before-update",
}
),
)

# Wait until the first async invoke is being executed before updating the function
messages_invoke_one = sqs_collect_messages(
queue_url,
expected=1,
timeout=15,
)
async_invoke_one_notification = json.loads(messages_invoke_one[0]["Body"])
snapshot.match(
async_invoke_one_notification["label"],
async_invoke_one_notification["FUNCTION_VARIANT"],
)
assert (
async_invoke_one_notification["request_id"]
== async_invoke_one["ResponseMetadata"]["RequestId"]
)

# Update the function variant while still having invokes in the async invoke queue
environment_v2 = environment_v1.copy()
environment_v2["Variables"]["FUNCTION_VARIANT"] = "variant-2"
environment_v2["Variables"]["FUNCTION_VARIANT"] = "variant-two"
aws_client.lambda_.update_function_configuration(
FunctionName=function_name, Environment=environment_v2
)
Expand All @@ -3119,24 +3152,30 @@ def test_async_invoke_queue_upon_function_update(
# Send further async invocations after the update succeeded
num_invocations_after = 5
for index in range(num_invocations_after):
payload = {"request_prefix": f"{index + num_invocations_before + 2:02}-after"}
aws_client.lambda_.invoke(
FunctionName=function_name,
InvocationType="Event",
Payload=json.dumps(payload),
Payload=json.dumps(
{
"notify": queue_url,
"env_var": "FUNCTION_VARIANT",
"label": f"{index + num_invocations_before + 2:02}-after-update",
}
),
)

# +1 for the first sleep invocation
total_invocations = 1 + num_invocations_before + num_invocations_after

def assert_s3_objects():
s3_keys_output = get_s3_keys(aws_client, bucket_name)
assert len(s3_keys_output) >= total_invocations
return s3_keys_output

s3_keys = retry(assert_s3_objects, retries=20, sleep=5)
s3_keys_sorted = sorted(s3_keys)
snapshot.match("async_invoke_history_sorted", s3_keys_sorted)
# Collect event invokes before and after update without invoke_one (already deleted)
messages = sqs_collect_messages(
queue_url,
expected=num_invocations_before + num_invocations_after,
timeout=90,
)
for message in messages:
body_json = json.loads(message["Body"])
assert body_json["label"] != "01-sleep", (
"The message with the label `01-sleep` should have been deleted from the queue."
)
snapshot.match(body_json["label"], body_json["FUNCTION_VARIANT"])


# TODO: test if routing is static for a single invocation:
Expand Down
30 changes: 12 additions & 18 deletions tests/aws/services/lambda_/test_lambda.snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -4263,25 +4263,19 @@
}
},
"tests/aws/services/lambda_/test_lambda.py::TestLambdaVersions::test_async_invoke_queue_upon_function_update": {
"recorded-date": "15-05-2024, 17:38:05",
"recorded-date": "26-03-2025, 15:48:55",
"recorded-content": {
"async_invoke_history_sorted": [
"01-sleep--variant-1",
"02-before--variant-2",
"03-before--variant-2",
"04-before--variant-2",
"05-before--variant-2",
"06-before--variant-2",
"07-before--variant-2",
"08-before--variant-2",
"09-before--variant-2",
"10-before--variant-2",
"11-after--variant-2",
"12-after--variant-2",
"13-after--variant-2",
"14-after--variant-2",
"15-after--variant-2"
]
"01-sleep": "variant-one",
"02-before-update": "variant-two",
"03-before-update": "variant-two",
"04-before-update": "variant-two",
"05-before-update": "variant-two",
"06-before-update": "variant-two",
"07-after-update": "variant-two",
"08-after-update": "variant-two",
"09-after-update": "variant-two",
"10-after-update": "variant-two",
"11-after-update": "variant-two"
}
},
"tests/aws/services/lambda_/test_lambda.py::TestLambdaVersions::test_function_update_during_invoke": {
Expand Down
2 changes: 1 addition & 1 deletion tests/aws/services/lambda_/test_lambda.validation.json
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@
"last_validated_date": "2024-08-05T13:57:02+00:00"
},
"tests/aws/services/lambda_/test_lambda.py::TestLambdaVersions::test_async_invoke_queue_upon_function_update": {
"last_validated_date": "2024-05-15T18:29:38+00:00"
"last_validated_date": "2025-03-26T16:13:15+00:00"
},
"tests/aws/services/lambda_/test_lambda.py::TestLambdaVersions::test_function_update_during_invoke": {
"last_validated_date": "2024-05-15T19:05:04+00:00"
Expand Down
Loading
0