8000 ESM-v2: Introduce new EventSourceMappingArn and add empty bootstrap test · localstack/localstack@0a6579e · GitHub
[go: up one dir, main page]

Skip to content

Commit 0a6579e

Browse files
committed
ESM-v2: Introduce new EventSourceMappingArn and add empty bootstrap test
1 parent 67f5698 commit 0a6579e

File tree

11 files changed

+243
-8
lines changed

11 files changed

+243
-8
lines changed

localstack-core/localstack/services/lambda_/event_source_mapping/esm_config_factory.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,40 +5,50 @@
55
DestinationConfig,
66
EventSourceMappingConfiguration,
77
EventSourcePosition,
8+
RequestContext,
89
)
910
from localstack.services.lambda_ import hooks as lambda_hooks
1011
from localstack.services.lambda_.event_source_mapping.esm_worker import EsmState, EsmStateReason
1112
from localstack.services.lambda_.event_source_mapping.pipe_utils import (
1213
get_standardized_service_name,
1314
)
14-
from localstack.utils.aws.arns import parse_arn
15+
from localstack.utils.aws.arns import lambda_event_source_mapping_arn, parse_arn
1516
from localstack.utils.collections import merge_recursive
1617
from localstack.utils.strings import long_uid
1718

1819

1920
class EsmConfigFactory:
2021
request: CreateEventSourceMappingRequest
22+
context: RequestContext
2123
function_arn: str
2224

23-
def __init__(self, request: CreateEventSourceMappingRequest, function_arn: str):
25+
def __init__(
26+
self, request: CreateEventSourceMappingRequest, context: RequestContext, function_arn: str
27+
):
2428
self.request = request
2529
self.function_arn = function_arn
30+
self.context = context
2631

2732
def get_esm_config(self) -> EventSourceMappingConfiguration:
2833
"""Creates an Event Source Mapping (ESM) configuration based on a create ESM request.
2934
* CreateEventSourceMapping API: https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html
3035
* CreatePipe API: https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_CreatePipe.html
3136
The CreatePipe API covers largely the same parameters, but is better structured using hierarchical parameters.
3237
"""
33-
3438
service = ""
3539
if source_arn := self.request.get("EventSourceArn"):
3640
parsed_arn = parse_arn(source_arn)
3741
service = get_standardized_service_name(parsed_arn["service"])
3842

43+
uuid = long_uid()
44+
3945
default_source_parameters = {}
40-
default_source_parameters["UUID"] = long_uid()
46+
default_source_parameters["UUID"] = uuid
47+
default_source_parameters["EventSourceMappingArn"] = lambda_event_source_mapping_arn(
48+
uuid, self.context.account_id, self.context.region
49+
)
4150
default_source_parameters["StateTransitionReason"] = EsmStateReason.USER_ACTION
51+
4252
if service == "sqs":
4353
default_source_parameters["BatchSize"] = 10
4454
default_source_parameters["MaximumBatchingWindowInSeconds"] = 0

localstack-core/localstack/services/lambda_/provider.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,10 @@
220220
from localstack.services.lambda_.urlrouter import FunctionUrlRouter
221221
from localstack.services.plugins import ServiceLifecycleHook
222222
from localstack.state import StateVisitor
223-
from localstack.utils.aws.arns import extract_service_from_arn, get_partition
223+
from localstack.utils.aws.arns import (
224+
extract_service_from_arn,
225+
get_partition,
226+
)
224227
from localstack.utils.bootstrap import is_api_enabled
225228
from localstack.utils.collections import PaginatedList
226229
from localstack.utils.files import load_file
@@ -1859,7 +1862,7 @@ def create_event_source_mapping_v2(
18591862
# Validations
18601863
function_arn, function_name, state = self.validate_event_source_mapping(context, request)
18611864

1862-
esm_config = EsmConfigFactory(request, function_arn).get_esm_config()
1865+
esm_config = EsmConfigFactory(request, context, function_arn).get_esm_config()
18631866

18641867
# Copy esm_config to avoid a race condition with potential async update in the store
18651868
state.event_source_mappings[esm_config["UUID"]] = esm_config.copy()

localstack-core/localstack/utils/aws/arns.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,11 @@ def lambda_code_signing_arn(code_signing_id: str, account_id: str, region_name:
267267
return _resource_arn(code_signing_id, pattern, account_id=account_id, region_name=region_name)
268268

269269

270+
def lambda_event_source_mapping_arn(uuid: str, account_id: str, region_name: str) -> str:
271+
pattern = "arn:%s:lambda:%s:%s:event-source-mapping:%s"
272+
return _resource_arn(uuid, pattern, account_id=account_id, region_name=region_name)
273+
274+
270275
def lambda_function_or_layer_arn(
271276
type: str,
272277
entity_name: str,

tests/aws/services/cloudformation/resources/test_lambda.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ def _assert_single_lambda_call():
5858

5959

6060
@markers.snapshot.skip_snapshot_verify(
61-
["$..EventSourceMappings..FunctionArn", "$..EventSourceMappings..LastProcessingResult"]
61+
[
62+
"$..EventSourceMappings..FunctionArn",
63+
"$..EventSourceMappings..LastProcessingResult",
64+
"$..EventSourceMappings..EventSourceMappingArn",
65+
]
6266
)
6367
@markers.aws.validated
6468
def test_lambda_w_dynamodb_event_filter_update(deploy_cfn_template, snapshot, aws_client):
@@ -777,6 +781,7 @@ def _send_events():
777781
"$..BisectBatchOnFunctionError",
778782
"$..DestinationConfig",
779783
"$..LastProcessingResult",
784+
"$..EventSourceMappingArn",
780785
"$..MaximumRecordAgeInSeconds",
781786
"$..TumblingWindowInSeconds",
782787
]
@@ -904,6 +909,7 @@ def wait_logs():
904909
"$..BisectBatchOnFunctionError",
905910
"$..DestinationConfig",
906911
"$..LastProcessingResult",
912+
"$..EventSourceMappingArn",
907913
"$..MaximumRecordAgeInSeconds",
908914
"$..Configuration.CodeSize",
909915
"$..Tags",

tests/aws/services/lambda_/event_source_mapping/conftest.py

Lines changed: 1 addition & 1 deletion
A851
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ def snapshot(request, _snapshot_session: SnapshotSession, account_id, region_nam
2121
RegexTransformer(f"arn:{get_partition(region_name)}:", "arn:<partition>:"), priority=2
2222
)
2323

24-
_snapshot_session.add_transformer(SNAPSHOT_BASIC_TRANSFORMER_NEW, priority=2)
24+
_snapshot_session.add_transformer(SNAPSHOT_BASIC_TRANSFORMER_NEW, priority=0)
2525

2626
return _snapshot_session

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def _get_lambda_logs_event(function_name, expected_num_events, retries=30):
8282
"$..TableDescription.TableStatus",
8383
"$..Records..dynamodb.SizeBytes",
8484
"$..Records..eventVersion",
85+
"$..EventSourceMappingArn",
8586
],
8687
)
8788
class TestDynamoDBEventSourceMapping:

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
TEST_LAMBDA_KINESIS_BATCH_ITEM_FAILURE = (
3636
FUNCTIONS_PATH / "lambda_report_batch_item_failures_kinesis.py"
3737
)
38+
TEST_LAMBDA_PROVIDED_BOOTSTRAP_EMPTY = FUNCTIONS_PATH / "provided_bootstrap_empty"
3839

3940

4041
@pytest.fixture(autouse=True)
@@ -64,12 +65,14 @@ def _snapshot_transformers(snapshot):
6465
"$..BisectBatchOnFunctionError",
6566
"$..DestinationConfig",
6667
"$..LastProcessingResult",
68+
"$..EventSourceMappingArn",
6769
"$..MaximumBatchingWindowInSeconds",
6870
"$..MaximumRecordAgeInSeconds",
6971
"$..ResponseMetadata.HTTPStatusCode",
7072
"$..State",
7173
"$..Topics",
7274
"$..TumblingWindowInSeconds",
75+
"$..EventSourceMappingArn",
7376
],
7477
)
7578
class TestKinesisSource:
@@ -893,6 +896,7 @@ def verify_failure_received():
893896
"set_lambda_response",
894897
[
895898
# Successes
899+
"",
896900
[],
897901
None,
898902
{},
@@ -901,6 +905,7 @@ def verify_failure_received():
901905
],
902906
ids=[
903907
# Successes
908+
"empty_string_success",
904909
"empty_list_success",
905910
"null_success",
906911
"empty_dict_success",
@@ -970,6 +975,71 @@ def _verify_messages_received():
970975
invocation_events = retry(_verify_messages_received, retries=30, sleep=5)
971976
snapshot.match("kinesis_events", invocation_events)
972977

978+
@markers.aws.validated
979+
@markers.snapshot.skip_snapshot_verify(
980+
paths=[
981+
"$..Messages..Body.KinesisBatchInfo.shardId",
982+
],
983+
)
984+
def test_kinesis_empty_provided(
985+
self,
986+
create_lambda_function,
987+
kinesis_create_stream,
988+
lambda_su_role,
989+
wait_for_stream_ready,
990+
cleanups,
991+
snapshot,
992+
aws_client,
993+
):
994+
function_name = f"lambda_func-{short_uid()}"
995+
stream_name = f"test-foobar-{short_uid()}"
996+
record_data = "hello"
997+
998+
create_lambda_function(
999+
handler_file=TEST_LAMBDA_PROVIDED_BOOTSTRAP_EMPTY,
1000+
func_name=function_name,
1001+
runtime=Runtime.provided_al2023,
1002+
role=lambda_su_role,
1003+
)
1004+
1005+
kinesis_create_stream(StreamName=stream_name, ShardCount=1)
1006+
wait_for_stream_ready(stream_name=stream_name)
1007+
stream_summary = aws_client.kinesis.describe_stream_summary(StreamName=stream_name)
1008+
assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1
1009+
stream_arn = aws_client.kinesis.describe_stream(StreamName=stream_name)[
1010+
"StreamDescription"
1011+
]["StreamARN"]
1012+
1013+
create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping(
1014+
EventSourceArn=stream_arn,
1015+
FunctionName=function_name,
1016+
StartingPosition="TRIM_HORIZON",
1017+
BatchSize=1,
1018+
MaximumBatchingWindowInSeconds=1,
1019+
MaximumRetryAttempts=2,
1020+
)
1021+
snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response)
1022+
uuid = create_event_source_mapping_response["UUID"]
1023+
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=uuid))
1024+
_await_event_source_mapping_enabled(aws_client.lambda_, uuid)
1025+
1026+
aws_client.kinesis.put_record(
1027+
Data=record_data,
1028+
PartitionKey="test",
1029+
StreamName=stream_name,
1030+
)
1031+
1032+
def _verify_invoke():
1033+
log_events = aws_client.logs.filter_log_events(
1034+
logGroupName=f"/aws/lambda/{function_name}",
1035+
)["events"]
1036+
assert len([e["message"] for e in log_events if e["message"].startswith("REPORT")]) == 1
1037+
1038+
retry(_verify_invoke, retries=30, sleep=5)
1039+
1040+
get_esm_result = aws_client.lambda_.get_event_source_mapping(UUID=uuid)
1041+
snapshot.match("get_esm_result", get_esm_result)
1042+
9731043

9741044
# TODO: add tests for different edge cases in filtering (e.g. message isn't json => needs to be dropped)
9751045
# https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-kinesis
@@ -985,6 +1055,7 @@ class TestKinesisEventFiltering:
9851055
paths=[
9861056
"$..Messages..Body.KinesisBatchInfo.shardId",
9871057
"$..Messages..Body.KinesisBatchInfo.streamArn",
1058+
"$..EventSourceMappingArn",
9881059
],
9891060
)
9901061
@markers.aws.validated

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2944,5 +2944,117 @@
29442944
}
29452945
]
29462946
}
2947+
},
2948+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_report_batch_item_success_scenarios[empty_string_success]": {
2949+
"recorded-date": "11-10-2024, 12:38:15",
2950+
"recorded-content": {
2951+
"create_event_source_mapping_response": {
2952+
"BatchSize": 1,
2953+
"BisectBatchOnFunctionError": false,
2954+
"DestinationConfig": {
2955+
"OnFailure": {}
2956+
},
2957+
"EventSourceArn": "arn:<partition>:kinesis:<region>:111111111111:stream/<resource:1>",
2958+
"EventSourceMappingArn": "arn:<partition>:lambda:<region>:111111111111:event-source-mapping:<uuid:1>",
2959+
"FunctionArn": "arn:<partition>:lambda:<region>:111111111111:function:<resource:2>",
2960+
"FunctionResponseTypes": [
2961+
"ReportBatchItemFailures"
2962+
],
2963+
"LastModified": "<datetime>",
2964+
"LastProcessingResult": "No records processed",
2965+
"MaximumBatchingWindowInSeconds": 1,
2966+
"MaximumRecordAgeInSeconds": -1,
2967+
"MaximumRetryAttempts": 2,
2968+
"ParallelizationFactor": 1,
2969+
"StartingPosition": "TRIM_HORIZON",
2970+
"State": "Creating",
2971+
"StateTransitionReason": "User action",
2972+
"TumblingWindowInSeconds": 0,
2973+
"UUID": "<uuid:1>",
2974+
"ResponseMetadata": {
2975+
"HTTPHeaders": {},
2976+
"HTTPStatusCode": 202
2977+
}
2978+
},
2979+
"kinesis_events": [
2980+
{
2981+
"Records": [
2982+
{
2983+
"kinesis": {
2984+
"kinesisSchemaVersion": "1.0",
2985+
"partitionKey": "test",
2986+
"sequenceNumber": "<sequence-number:1>",
2987+
"data": "aGVsbG8=",
2988+
"approximateArrivalTimestamp": "<approximate-arrival-timestamp>"
2989+
},
2990+
"eventSource": "aws:kinesis",
2991+
"eventVersion": "1.0",
2992+
"eventID": "shardId-000000000000:<sequence-number:1>",
2993+
"eventName": "aws:kinesis:record",
2994+
"invokeIdentityArn": "arn:<partition>:iam::111111111111:role/<resource:3>",
2995+
"awsRegion": "<region>",
2996+
"eventSourceARN": "arn:<partition>:kinesis:<region>:111111111111:stream/<resource:1>"
2997+
}
2998+
]
2999+
}
3000+
]
3001+
}
3002+
},
3003+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_empty_provided": {
3004+
"recorded-date": "11-10-2024, 11:04:55",
3005+
"recorded-content": {
3006+
"create_event_source_mapping_response": {
3007+
"BatchSize": 1,
3008+
"BisectBatchOnFunctionError": false,
3009+
"DestinationConfig": {
3010+
"OnFailure": {}
3011+
},
3012+
"EventSourceArn": "arn:<partition>:kinesis:<region>:111111111111:stream/<resource:1>",
3013+
"EventSourceMappingArn": "arn:<partition>:lambda:<region>:111111111111:event-source-mapping:<uuid:1>",
3014+
"FunctionArn": "arn:<partition>:lambda:<region>:111111111111:function:<resource:2>",
3015+
"FunctionResponseTypes": [],
3016+
"LastModified": "<datetime>",
3017+
"LastProcessingResult": "No records processed",
3018+
"MaximumBatchingWindowInSeconds": 1,
3019+
"MaximumRecordAgeInSeconds": -1,
3020+
"MaximumRetryAttempts": 2,
3021+
"ParallelizationFactor": 1,
3022+
"StartingPosition": "TRIM_HORIZON",
3023+
"State": "Creating",
3024+
"StateTransitionReason": "User action",
3025+
"TumblingWindowInSeconds": 0,
3026+
"UUID": "<uuid:1>",
3027+
"ResponseMetadata": {
3028+
"HTTPHeaders": {},
3029+
"HTTPStatusCode": 202
3030+
}
3031+
},
3032+
"get_esm_result": {
3033+
"BatchSize": 1,
3034+
"BisectBatchOnFunctionError": false,
3035+
"DestinationConfig": {
3036+
"OnFailure": {}
3037+
},
3038+
"EventSourceArn": "arn:<partition>:kinesis:<region>:111111111111:stream/<resource:1>",
3039+
"EventSourceMappingArn": "arn:<partition>:lambda:<region>:111111111111:event-source-mapping:<uuid:1>",
3040+
"FunctionArn": "arn:<partition>:lambda:<region>:111111111111:function:<resource:2>",
3041+
"FunctionResponseTypes": [],
3042+
"LastModified": "<datetime>",
3043+
"LastProcessingResult": "OK",
3044+
"MaximumBatchingWindowInSeconds": 1,
3045+
"MaximumRecordAgeInSeconds": -1,
3046+
"MaximumRetryAttempts": 2,
3047+
"ParallelizationFactor": 1,
3048+
"StartingPosition": "TRIM_HORIZON",
3049+
"State": "Enabled",
3050+
"StateTransitionReason": "User action",
3051+
"TumblingWindowInSeconds": 0,
3052+
"UUID": "<uuid:1>",
3053+
"ResponseMetadata": {
3054+
"HTTPHeaders": {},
3055+
"HTTPStatusCode": 200
3056+
}
3057+
}
3058+
}
29473059
}
29483060
}

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_duplicate_event_source_mappings": {
1818
"last_validated_date": "2024-01-04T23:37:20+00:00"
1919
},
20+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_empty_provided": {
21+
"last_validated_date": "2024-10-11T11:04:52+00:00"
22+
},
2023
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_event_source_mapping_with_async_invocation": {
2124
"last_validated_date": "2023-02-27T15:55:08+00:00"
2225
},
@@ -59,6 +62,9 @@
5962
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_report_batch_item_success_scenarios[empty_list_success]": {
6063
"last_validated_date": "2024-09-11T17:42:39+00:00"
6164
},
65+
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_report_batch_item_success_scenarios[empty_string_success]": {
66+
"last_validated_date": "2024-10-11T12:38:13+00:00"
67+
},
6268
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_report_batch_item_success_scenarios[null_batch_item_failure_success]": {
6369
"last_validated_date": "2024-09-11T17:48:35+00:00"
6470
},

0 commit comments

Comments
 (0)
0