-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
[ESM] Handle polling of batches exceeding SQS message limits #12118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
return | ||
|
||
# Allow overide parameter to be greater than default and less than maxiumum batch size. | ||
override = min(requested_count, self.sqs_queue_parameters["BatchSize"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useful for getting remaining records less than the batch size. i.e we need 100 records but BatchSize
is 1k
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think nearly all comments like this can be great in-code comments :) If you want to tell a reviewer something, you most often also want to give this to anyone who wants to understand the code :)
"provide-client-params.sqs.ReceiveMessage", _handle_receive_message_override | ||
) | ||
event_system.register( | ||
"provide-client-params.sqs.DeleteMessageBatch", _handle_delete_batch_override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we delete SQS messages after processing, this allows us to remove up to 10K entries at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above: I think this could be a code comment instead of a PR comment :)
LocalStack Community integration with Pro 2 files ± 0 2 suites ±0 1h 33m 46s ⏱️ - 18m 28s Results for commit 3e24b7c. ± Comparison against base commit 1edb594. This pull request removes 568 and adds 26 tests. Note that renamed tests count towards both.
This pull request removes 73 skipped tests and adds 6 skipped tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super interesting topic and nice fix to avoid ESM basically DDOSing SQS 😄
I posted a few questions, when these are answered, we should be good from my side! 💯
I think a test would be great, but if that's not possible yet, we could still proceed.
FYI @dfangl @viren-nadkarni: I think this is pretty interesting when looking at #7240. The "internal request parameters" here could not really be used, because its pretty global while for this service it needs to be specific to the request. I feel like this PR could be good input on how to adjust the internal request metadata handling for these use cases. Or maybe there is already a way to use this infra?
@pytest.mark.skip( | ||
reason="Flushing based on payload sizes not yet implemented so large payloads are causing issues." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Is there any way to test this? When is this skip marker going to be removed? Adding a test which is skipped by default is not optimal I would say...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, the blocker here is that we cannot do proper boundary testing since an event with 10K records surpasses the 6MB limit of what a Lambda function can synchronously ingest (see Invocation Payload Quotas).
Is there any way to test this?
Yeah -- we can reduce the size of the batch we are sending. I'll parametrize this test to illustrate the override working and just skip the 10K case.
When is this skip marker going to be removed?
The skip marker should be removed once #12002 is merged -- since size based batching will be introduced and payloads should be split into chunks of up to 6MB each (that is, within Lambda quota bounds).
Adding a test which is skipped by default is not optimal I would say...
Agree! Are we OK with multiple test cases where just a single one is (temporarily) skipped?
@@ -36,6 +38,7 @@ def __init__( | |||
): | |||
super().__init__(source_arn, source_parameters, source_client, processor) | |||
self.queue_url = get_queue_url(self.source_arn) | |||
self._register_client_hooks() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: It's a bit dangerous that the client hooks here is registered on any source client handed in here. What is the impact of the registered client hooks / who else uses the client which is handed in here (f.e. due to caching in the client factories)? @bentsku Do you maybe have some input here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the impact of the registered client hooks
The hooks will add a x-localstack-sqs-override-message-count
header to all ReceiveMessage
and DeleteMessageBatc
8000
h
calls made via this client. If this client were to be re-used outside of this SQS poller, the expected parameter verification will no longer be used.
who else uses the client which is handed in here (f.e. due to caching in the client factories)?
No other services should be using this customised source client -- where its exclusivelt created for the poller (outside of the caching mechanism).
It's a bit dangerous that the client hooks here is registered on any source client handed in here.
OK I have a potential workaround. What about instead of automatically registering these headers, we extend the client to take in a custom parameter that is only ever used in the poller. This parameter can then be converted into a header via a hook.
That way, if the cached-client is re-used, these headers are not added by default but instead require explicit setting. This approach will still require registering hooks but wont add this experimental value to every call.
Example usage:
response = self.source_client.receive_message(
QueueUrl=self.queue_url,
MessageAttributeNames=["All"],
MessageSystemAttributeNames=[MessageSystemAttributeName.All],
x_max_number_of_messages=10_000, # where this param will be converted to a header via a hook
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bentsku Do you think we should add the header as a param (like the example above) or continue with the current approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, explicit parameter passing could greatly improve readability and flexibility (e.g., in cases where values differ by API and not by client). We could even consider some standardized parameters for adding HTTP headers if needed in other areas (e.g., EventStudio).
However, I don't want to delay getting this great approach in and potential standardization could follow after learning from this experience.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Let's get this in and iterate on it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed as well, I like the parameter approach, as it also allows us to have a clearer way to add parameters, and be more explicit about it, I agree with Joel. But this works for now, and we can iterate on it. Sorry for the delay in reviewing! Thanks Joel 🙏
# Allow overide parameter to be greater than default and less than maxiumum batch size. | ||
override = min(requested_count, self.sqs_queue_parameters["BatchSize"]) | ||
context[HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = str(override) | ||
params["MaxNumberOfMessages"] = min(requested_count, DEFAULT_MAX_RECEIVE_COUNT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note (non-blocking): I think this min
is not necessary due to this conditional in line 57:
if not requested_count or requested_count <= DEFAULT_MAX_RECEIVE_COUNT:
return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is necessary since we want to change the MaxNumberOfMessages
param back to a value below 10
-- otherwise validations fail.
89ce06d
to
99192e9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice changes, @gregfurman! the new parameters seems very cleanly added at the right levels of abstraction.
just added a few minor suggestions
cc3318a
to
848ed15
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing blocking a merge from my side (once CI is 🟢 again). Great work 👏👏👏
I'm just sharing my opinion about ideas around standardizing hook-based customization implementations (in the future).
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
@@ -36,6 +38,7 @@ def __init__( | |||
): | |||
super().__init__(source_arn, source_parameters, source_client, processor) | |||
self.queue_url = get_queue_url(self.source_arn) | |||
self._register_client_hooks() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, explicit parameter passing could greatly improve readability and flexibility (e.g., in cases where values differ by API and not by client). We could even consider some standardized parameters for adding HTTP headers if needed in other areas (e.g., EventStudio).
However, I don't want to delay getting this great approach in and potential standardization could follow after learning from this experience.
Motivation
This PR allows for SQS requests originating from event source mappings to circumvent message count validations -- allowing for up to
10 000
messages to be returned (or deleted) at a time.This is required since polling calls can only be made in batches of 10, making the collection of large batch sizes cumbersome and costly. (i.e 10K messages requires ~1K receive requests + ~1K delete requests).
Changes
"x-localstack-sqs-override-message-count"
header which the SQS provider uses to override how many messages can be processed in a single call.SqsPoller
, allowingReceiveMessage
andDeleteMessageBatch
calls to return/delete up to10 000
records at a time."x-localstack-sqs-override-message-count"
headers in boto3 requests.TODO
What's left to do: