8000 [ESM] Handle polling of batches exceeding SQS message limits by gregfurman · Pull Request #12118 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

[ESM] Handle polling of batches exceeding SQS message limits #12118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 20, 2025

Conversation

gregfurman
Copy link
Contributor

Motivation

This PR allows for SQS requests originating from event source mappings to circumvent message count validations -- allowing for up to 10 000 messages to be returned (or deleted) at a time.

This is required since polling calls can only be made in batches of 10, making the collection of large batch sizes cumbersome and costly. (i.e 10K messages requires ~1K receive requests + ~1K delete requests).

Changes

  • Add custom "x-localstack-sqs-override-message-count" header which the SQS provider uses to override how many messages can be processed in a single call.
  • Register event handlers to the AWS client in SqsPoller, allowing ReceiveMessage and DeleteMessageBatch calls to return/delete up to 10 000 records at a time.
  • Extend SQS provider to handle custom "x-localstack-sqs-override-message-count" headers in boto3 requests.

TODO

What's left to do:

@gregfurman gregfurman added aws:sqs Amazon Simple Queue Service semver: minor Non-breaking changes which can be included in minor releases, but not in patch releases aws:lambda:event-source-mapping AWS Lambda Event Source Mapping (ESM) labels Jan 9, 2025
@gregfurman gregfurman self-assigned this Jan 9, 2025
return

# Allow overide parameter to be greater than default and less than maxiumum batch size.
override = min(requested_count, self.sqs_queue_parameters["BatchSize"])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useful for getting remaining records less than the batch size. i.e we need 100 records but BatchSize is 1k

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think nearly all comments like this can be great in-code comments :) If you want to tell a reviewer something, you most often also want to give this to anyone who wants to understand the code :)

"provide-client-params.sqs.ReceiveMessage", _handle_receive_message_override
)
event_system.register(
"provide-client-params.sqs.DeleteMessageBatch", _handle_delete_batch_override
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we delete SQS messages after processing, this allows us to remove up to 10K entries at a time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: I think this could be a code comment instead of a PR comment :)

Copy link
github-actions bot commented Jan 9, 2025

S3 Image Test Results (AMD64 / ARM64)

  2 files  ±0    2 suites  ±0   5m 32s ⏱️ + 1m 38s
441 tests ±0  389 ✅ ±0   52 💤 ±0  0 ❌ ±0 
882 runs  ±0  778 ✅ ±0  104 💤 ±0  0 ❌ ±0 

Results for commit 99192e9. ± Comparison against base commit 4832016.

♻️ This comment has been updated with latest results.

Copy link
github-actions bot commented Jan 9, 2025

LocalStack Community integration with Pro

    2 files  ±  0      2 suites  ±0   1h 33m 46s ⏱️ - 18m 28s
3 410 tests  - 542  3 167 ✅  - 475  243 💤  - 67  0 ❌ ±0 
3 412 runs   - 542  3 167 ✅  - 475  245 💤  - 67  0 ❌ ±0 

Results for commit 3e24b7c. ± Comparison against base commit 1edb594.

This pull request removes 568 and adds 26 tests. Note that renamed tests count towards both.
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_lambda_dynamodb
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_opensearch_crud
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_search_books
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_setup
tests.aws.scenario.kinesis_firehose.test_kinesis_firehose.TestKinesisFirehoseScenario ‑ test_kinesis_firehose_s3
tests.aws.scenario.lambda_destination.test_lambda_destination_scenario.TestLambdaDestinationScenario ‑ test_destination_sns
tests.aws.scenario.lambda_destination.test_lambda_destination_scenario.TestLambdaDestinationScenario ‑ test_infra
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_prefill_dynamodb_table
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input0-SUCCEEDED]
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input1-SUCCEEDED]
…
tests.aws.services.lambda_.event_source_mapping.test_lambda_integration_sqs.TestSQSEventSourceMapping ‑ test_sqs_event_source_mapping_batch_size_override[10000]
tests.aws.services.lambda_.event_source_mapping.test_lambda_integration_sqs.TestSQSEventSourceMapping ‑ test_sqs_event_source_mapping_batch_size_override[1000]
tests.aws.services.lambda_.event_source_mapping.test_lambda_integration_sqs.TestSQSEventSourceMapping ‑ test_sqs_event_source_mapping_batch_size_override[100]
tests.aws.services.lambda_.event_source_mapping.test_lambda_integration_sqs.TestSQSEventSourceMapping ‑ test_sqs_event_source_mapping_batch_size_override[20]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp[NANOSECONDS]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp[SECONDS]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp_invalid[INVALID_DATE]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp_invalid[INVALID_ISO]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp_invalid[INVALID_TIME]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp_invalid[JSONATA]
…
This pull request removes 73 skipped tests and adds 6 skipped tests. Note that renamed tests count towards both.
tests.aws.scenario.kinesis_firehose.test_kinesis_firehose.TestKinesisFirehoseScenario ‑ test_kinesis_firehose_s3
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input4-FAILED]
tests.aws.scenario.mythical_mysfits.test_mythical_misfits.TestMythicalMisfitsScenario ‑ test_deployed_infra_state
tests.aws.scenario.mythical_mysfits.test_mythical_misfits.TestMythicalMisfitsScenario ‑ test_populate_data
tests.aws.scenario.mythical_mysfits.test_mythical_misfits.TestMythicalMisfitsScenario ‑ test_user_clicks_are_stored
tests.aws.services.cloudcontrol.test_cloudcontrol_api.TestCloudControlResourceApi ‑ test_api_exceptions
tests.aws.services.cloudcontrol.test_cloudcontrol_api.TestCloudControlResourceApi ‑ test_create_exceptions
tests.aws.services.cloudcontrol.test_cloudcontrol_api.TestCloudControlResourceApi ‑ test_create_invalid_desiredstate
tests.aws.services.cloudcontrol.test_cloudcontrol_api.TestCloudControlResourceApi ‑ test_double_create_with_client_token
tests.aws.services.cloudcontrol.test_cloudcontrol_api.TestCloudControlResourceApi ‑ test_lifecycle
…
tests.aws.services.lambda_.event_source_mapping.test_lambda_integration_sqs.TestSQSEventSourceMapping ‑ test_sqs_event_source_mapping_batch_size_override[10000]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp_jsonata[INVALID_DATE]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp_jsonata[INVALID_ISO]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp_jsonata[INVALID_TIME]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp_jsonata[NO_T]
tests.aws.services.stepfunctions.v2.scenarios.test_base_scenarios.TestBaseScenarios ‑ test_wait_timestamp_jsonata[NO_Z]

♻️ This comment has been updated with latest results.

@gregfurman gregfurman 8000 marked this pull request as ready for review January 10, 2025 08:11
Copy link
Member
@alexrashed alexrashed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super interesting topic and nice fix to avoid ESM basically DDOSing SQS 😄
I posted a few questions, when these are answered, we should be good from my side! 💯
I think a test would be great, but if that's not possible yet, we could still proceed.

FYI @dfangl @viren-nadkarni: I think this is pretty interesting when looking at #7240. The "internal request parameters" here could not really be used, because its pretty global while for this service it needs to be specific to the request. I feel like this PR could be good input on how to adjust the internal request metadata handling for these use cases. Or maybe there is already a way to use this infra?

Comment on lines 1563 to 1577
@pytest.mark.skip(
reason="Flushing based on payload sizes not yet implemented so large payloads are causing issues."
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Is there any way to test this? When is this skip marker going to be removed? Adding a test which is skipped by default is not optimal I would say...‏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, the blocker here is that we cannot do proper boundary testing since an event with 10K records surpasses the 6MB limit of what a Lambda function can synchronously ingest (see Invocation Payload Quotas).

Is there any way to test this?

Yeah -- we can reduce the size of the batch we are sending. I'll parametrize this test to illustrate the override working and just skip the 10K case.

When is this skip marker going to be removed?

The skip marker should be removed once #12002 is merged -- since size based batching will be introduced and payloads should be split into chunks of up to 6MB each (that is, within Lambda quota bounds).

Adding a test which is skipped by default is not optimal I would say...‏

Agree! Are we OK with multiple test cases where just a single one is (temporarily) skipped?

@@ -36,6 +38,7 @@ def __init__(
):
super().__init__(source_arn, source_parameters, source_client, processor)
self.queue_url = get_queue_url(self.source_arn)
self._register_client_hooks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: It's a bit dangerous that the client hooks here is registered on any source client handed in here. What is the impact of the registered client hooks / who else uses the client which is handed in here (f.e. due to caching in the client factories)? @bentsku Do you maybe have some input here as well?‏

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the impact of the registered client hooks

The hooks will add a x-localstack-sqs-override-message-count header to all ReceiveMessage and DeleteMessageBatc 8000 h calls made via this client. If this client were to be re-used outside of this SQS poller, the expected parameter verification will no longer be used.

who else uses the client which is handed in here (f.e. due to caching in the client factories)?

No other services should be using this customised source client -- where its exclusivelt created for the poller (outside of the caching mechanism).

It's a bit dangerous that the client hooks here is registered on any source client handed in here.

OK I have a potential workaround. What about instead of automatically registering these headers, we extend the client to take in a custom parameter that is only ever used in the poller. This parameter can then be converted into a header via a hook.

That way, if the cached-client is re-used, these headers are not added by default but instead require explicit setting. This approach will still require registering hooks but wont add this experimental value to every call.

Example usage:

        response = self.source_client.receive_message(
            QueueUrl=self.queue_url,
            MessageAttributeNames=["All"],
            MessageSystemAttributeNames=[MessageSystemAttributeName.All],
            x_max_number_of_messages=10_000, # where this param will be converted to a header via a hook
        )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bentsku Do you think we should add the header as a param (like the example above) or continue with the current approach?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, explicit parameter passing could greatly improve readability and flexibility (e.g., in cases where values differ by API and not by client). We could even consider some standardized parameters for adding HTTP headers if needed in other areas (e.g., EventStudio).

However, I don't want to delay getting this great approach in and potential standardization could follow after learning from this experience.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Let's get this in and iterate on it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed as well, I like the parameter approach, as it also allows us to have a clearer way to add parameters, and be more explicit about it, I agree with Joel. But this works for now, and we can iterate on it. Sorry for the delay in reviewing! Thanks Joel 🙏

# Allow overide parameter to be greater than default and less than maxiumum batch size.
override = min(requested_count, self.sqs_queue_parameters["BatchSize"])
context[HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT] = str(override)
params["MaxNumberOfMessages"] = min(requested_count, DEFAULT_MAX_RECEIVE_COUNT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note (non-blocking): ‏I think this min is not necessary due to this conditional in line 57:

if not requested_count or requested_count <= DEFAULT_MAX_RECEIVE_COUNT:
    return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary since we want to change the MaxNumberOfMessages param back to a value below 10 -- otherwise validations fail.

@gregfurman gregfurman force-pushed the add/sqs/message-count-override branch from 89ce06d to 99192e9 Compare January 17, 2025 16:33
Copy link
Member
@thrau thrau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice changes, @gregfurman! the new parameters seems very cleanly added at the right levels of abstraction.

just added a few minor suggestions

Copy link
Member
@joe4dev joe4dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing blocking a merge from my side (once CI is 🟢 again). Great work 👏👏👏
I'm just sharing my opinion about ideas around standardizing hook-based customization implementations (in the future).

@@ -36,6 +38,7 @@ def __init__(
):
super().__init__(source_arn, source_parameters, source_client, processor)
self.queue_url = get_queue_url(self.source_arn)
self._register_client_hooks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, explicit parameter passing could greatly improve readability and flexibility (e.g., in cases where values differ by API and not by client). We could even consider some standardized parameters for adding HTTP headers if needed in other areas (e.g., EventStudio).

However, I don't want to delay getting this great approach in and potential standardization could follow after learning from this experience.

@gregfurman gregfurman merged commit bd8bfa6 into master Jan 20, 2025
31 checks passed
@gregfurman gregfurman deleted the add/sqs/message-count-override branch January 20, 2025 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
aws:lambda:event-source-mapping AWS Lambda Event Source Mapping (ESM) aws:sqs Amazon Simple Queue Service semver: minor Non-breaking changes which can be included in minor releases, but not in patch releases
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants
0