8000 Integrate with new SQS changes · localstack/localstack@b49705f · GitHub
[go: up one dir, main page]

Skip to content

Commit b49705f

Browse files
committed
Integrate with new SQS changes
1 parent fa1a10a commit b49705f

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed

localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,11 @@ def handle_message_count_override(params, context, **kwargs):
8080
context[HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = str(requested_count)
8181

8282
def handle_message_wait_time_seconds_override(params, context, **kwargs):
83-
requested_count = params.pop("sqs_override_wait_time_seconds", None)
84-
if not requested_count or requested_count <= DEFAULT_MAX_WAIT_TIME_SECONDS:
83+
requested_wait = params.pop("sqs_override_wait_time_seconds", None)
84+
if not requested_wait or requested_wait <= DEFAULT_MAX_WAIT_TIME_SECONDS:
8585
return
8686

87-
context[HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS] = str(requested_count)
87+
context[HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS] = str(requested_wait)
8888

8989
def handle_inject_headers(params, context, **kwargs):
9090
if override_message_count := context.pop(
@@ -104,6 +104,9 @@ def handle_inject_headers(params, context, **kwargs):
104104
event_system.register(
105105
"provide-client-params.sqs.ReceiveMessage", handle_message_count_override
106106
)
107+
event_system.register(
108+
"provide-client-params.sqs.ReceiveMessage", handle_message_wait_time_seconds_override
109+
)
107110
# Since we delete SQS messages after processing, this allows us to remove up to 10K entries at a time.
108111
event_system.register(
109112
"provide-client-params.sqs.DeleteMessageBatch", handle_message_count_override
@@ -141,6 +144,7 @@ def poll_events(self) -> None:
141144
MessageSystemAttributeNames=[MessageSystemAttributeName.All],
142145
# Override how many messages we can receive per call
143146
sqs_override_max_message_count=self.batch_size,
147+
# Override how long to wait until batching conditions are met
144148
sqs_override_wait_time_seconds=self.maximum_batching_window,
145149
)
146150

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1619,8 +1619,64 @@ def test_sqs_event_source_mapping_batch_size_override(
16191619
)
16201620

16211621
assert sum(len(event.get("Records", [])) for event in events) == batch_size
1622+
assert len(events[0].get("Records", [])) == batch_size
16221623

1623-
rs = aws_client.sqs.receive_message(QueueUrl=queue_url, WaitTimeSeconds=1)
1624+
rs = aws_client.sqs.receive_message(QueueUrl=queue_url)
1625+
assert rs.get("Messages", []) == []
1626+
1627+
@markers.aws.only_localstack
1628+
def test_sqs_event_source_mapping_batching_window_size_override(
1629+
self,
1630+
create_lambda_function,
1631+
sqs_create_queue,
1632+
sqs_get_queue_arn,
1633+
lambda_su_role,
1634+
cleanups,
1635+
aws_client,
1636+
):
1637+
function_name = f"lambda_func-{short_uid()}"
1638+
queue_name = f"queue-{short_uid()}"
1639+
mapping_uuid = None
1640+
1641+
create_lambda_function(
1642+
func_name=function_name,
1643+
handler_file=TEST_LAMBDA_PYTHON_ECHO,
1644+
runtime=Runtime.python3_12,
1645+
role=lambda_su_role,
1646+
)
1647+
queue_url = sqs_create_queue(QueueName=queue_name)
1648+
queue_arn = sqs_get_queue_arn(queue_url)
1649+
1650+
create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping(
1651+
EventSourceArn=queue_arn,
1652+
FunctionName=function_name,
1653+
MaximumBatchingWindowInSeconds=30,
1654+
BatchSize=10_000,
1655+
)
1656+
mapping_uuid = create_event_source_mapping_response["UUID"]
1657+
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid))
1658+
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid)
1659+
1660+
# Send 4 messages and delay their arrival by 5, 10, 15, and 25 seconds respectively
1661+
for s in [5, 10, 15, 25]:
1662+
aws_client.sqs.send_message(
1663+
QueueUrl=queue_url,
1664+
MessageBody=json.dumps({"delayed": f"{s}"}),
1665+
)
1666+
1667+
events = retry(
1668+
check_expected_lambda_log_events_length,
1669+
retries=60,
1670+
sleep=1,
1671+
function_name=function_name,
1672+
expected_length=1,
1673+
logs_client=aws_client.logs,
1674+
)
1675+
1676+
assert len(events) == 1
1677+
assert len(events[0].get("Records", [])) == 4
1678+
1679+
rs = aws_client.sqs.receive_message(QueueUrl=queue_url)
16241680
assert rs.get("Messages", []) == []
16251681

16261682

0 commit comments

Comments
 (0)
0