8000 [ESM] Improve SQS batch collection and flushing by gregfurman · Pull Request #12002 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

[ESM] Improve SQS batch collection and flushing #12002

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Feb 26, 2025
Merged

Conversation

gregfurman
Copy link
Contributor
@gregfurman gregfurman commented Dec 9, 2024

Motivation

This PR allows for the ESM poller to collect and send batches of SQS messages based on the number of messages collected and the duration of the batching event (up to 300s).

The motivations were as follows:

  1. The feature request bug: lambda event source mapping for non-FIFO SQS queue has invalid max batch size #5042 required LocalStack to support batch size over 10, which in turn would depend on MaximumBatchingWindowInSeconds being larger than 1. While offering CRUD support for this parameter, LocalStack did not actually collect a batch for the specified duration. Hence, these functional changes mean that we can now collect records until the window elapses.

  2. Under high volumes of requests, we expect the latency of LocalStack's gateway to degrade due. By supporting window batching, we have also introduced the ability for ESM pollers to long-poll an SQS event source for batches of data -- which should help alleviate some pressure on the gateway.

Changes

  • Adds long-polling for SQS ReceiveMessage that will continuously poll an SQS queue until either a desired number of messages BatchSize has been collected or the minimum between MaximumBatchingWindowInSeconds has elapsed.
  • Once collected, this batch is potentially further split into chunks of up to 2.5K records each in lieu of proper flushing based on Lambda payload quotas (see docs).

Testing

  • Unskips the test_sqs_event_source_mapping_batching_reserved_concurrency since we can now collect and send batches of more than 10 records at a time.

Benchmarking

Some benchmarks were run to assess how the long-polling changes would the latency of the LocalStack gateway. See benchmarking results here.

tl;dr Long polling saw fewer requests being made and, in some cases, had improved latencies at higher percentiles P(>95).

Assumptions

  • A higher load on the gateway means fewer available threads for processing new requests. Therefore, we'd expect short-polling calls with many poll-misses to add unnecessary load to the gateway (due to each processing call requiring a worker) that equivalent long-polling calls should be able to circumvent.
  • We poll an event source at an interval of once per second. Each long-polling call will block a single gateway thread (there are 1K available) while waiting for a request to complete. Therefore, we expect that (in extremis) many ESM pollers all running long-polling calls will cause resource contention and degrade performance.
  • For most cases, however, we assume that long-polling calls should reduce the number of requests made to the gateway within a given duration, therefore improving the availability of workers to process other requests.

Results

  • Performance with and without long polling was fairly similar in terms of latency across all experiments, with differences of between 50-300ms between the two approaches.
  • Large batch sizes coupled with high batch windows saw the largest improvements in performance.
  • Long polling resulted in fewer requests (~7%) being processed within the 5m test interval - likely due to high resource contention while long-polling calls were being made. However, this did not hold true for maximising Batch Size (10k) and Batch Window (300s) with long-polling which saw the highest throughput and best latency improvements.

Conclusions

  • The effects of long-polling were sometimes seen at higher latency percentiles -- where long polling runs had less latency than short polling. However, no notable changes in performance were observed between long and short polling.
  • Performance was optimized in both cases by using larger batch sizes and longer batch windows, with the combination of long polling + large batches + long windows showing the best overall efficiency.
  • Importantly, while introducing long-polling did not dramatically effect performance (positively or negatively), this provides LocalStack with an avenue for optimising

@gregfurman gregfurman self-assigned this Dec 9, 2024
@gregfurman gregfurman added semver: minor Non-breaking changes which can be included in minor releases, but not in patch releases aws:lambda:event-source-mapping AWS Lambda Event Source Mapping (ESM) labels Dec 9, 2024
@gregfurman gregfurman added this to the Playground milestone Dec 9, 2024
Copy link
github-actions bot commented Dec 9, 2024

LocalStack Community integration with Pro

    2 files  ±  0      2 suites  ±0   1h 30m 45s ⏱️ - 23m 17s
3 110 tests  - 993  2 890 ✅  - 880  220 💤  - 113  0 ❌ ±0 
3 112 runs   - 993  2 890 ✅  - 880  222 💤  - 113  0 ❌ ±0 

Results for commit 23d6cb6. ± Comparison against base commit 6f32581.

This pull request removes 994 and adds 1 tests. Note that renamed tests count towards both.
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_lambda_dynamodb
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_opensearch_crud
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_search_books
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_setup
tests.aws.scenario.kinesis_firehose.test_kinesis_firehose.TestKinesisFirehoseScenario ‑ test_kinesis_firehose_s3
tests.aws.scenario.lambda_destination.test_lambda_destination_scenario.TestLambdaDestinationScenario ‑ test_destination_sns
tests.aws.scenario.lambda_destination.test_lambda_destination_scenario.TestLambdaDestinationScenario ‑ test_infra
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_prefill_dynamodb_table
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input0-SUCCEEDED]
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input1-SUCCEEDED]
…
tests.aws.services.lambda_.event_source_mapping.test_lambda_integration_sqs.TestSQSEventSourceMapping ‑ test_sqs_event_source_mapping_batching_window_size_override

♻️ This comment has been updated with latest results.

@gregfurman gregfurman marked this pull request as ready for review December 9, 2024 11:43
Copy link
Member
@dfangl dfangl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already looks quite good, but i still have a concern:

We do not have an upper limit for the sleep times. As per your table, a queue which gets 1 message/s, and a batching window of 300s set, and a batch size of 1, triggers only once every 150s (on average, with no latency), which causes a massive queue buildup. If someone sets a 300s batch window, and a burst of messages occurs, we should still process them timely. I think AWS would do so as well.

Also please add unit descriptions to the table, as it makes it clearer :)

@gregfurman
Copy link
Contributor Author

@dfangl Thanks for the review here. I've removed the adaptive backoff since we're looking at providing an overwrite header in the boto3 requests to circumvent this 10 message limit.

Let me know if the (simplified) changes are alright!

Copy link
Member
@joe4dev joe4dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simplified message collection is much clearer 👏 Thank you for these refinements @gregfurman

I'm looking forward to supporting batching windows and batching by size 🙌
However, I'm concerned about flooding the LS gateway within the while loop (see ⚠️). How do you assess the scenario outlined in the comment?

@gregfurman gregfurman requested a review from joe4dev February 12, 2025 08:30
Copy link
Member
@joe4dev joe4dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just sharing some minor documentation suggestions to encode the lessons learned.

The code changes look good to me 👏👏👏 🚀

Two things:

  • Do we have a successful ext run?
  • I wanna look into the performance test results (haven't gotten to it yet ...)

@@ -97,28 +108,47 @@ def event_source(self) -> str:
def poll_events(self) -> None:
# SQS pipe source: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html
# "The 9 Ways an SQS Message can be Deleted": https://lucvandonkersgoed.com/2022/01/20/the-9-ways-an-sqs-message-can-be-deleted/
# TODO: implement batch window expires based on MaximumBatchingWindowInSeconds
# TODO: implement invocation payload size quota
# TODO: consider long-polling vs. short-polling trade-off. AWS uses long-polling:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs: I would love to see this TODO resolved and replaced with a paragraph explaining the trade-offs and lessons learned. Our future selves thank us 😃

Suggestion:

# We adopted long-polling for the SQS poll operation `ReceiveMessage` for improved performance.
# * PR (2025-02): https://github.com/localstack/localstack/pull/12002
# * ESM blog mentioning long-polling: https://aws.amazon.com/de/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/
# * Amazon SQS short and long polling: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html
# + Reduces latency because the `ReceiveMessage` call immediately returns once we reach the desired `BatchSize` or the `WaitTimeSeconds` elapses.
# + Matches the AWS behavior also using long-polling
# - Blocks a LocalStack gateway thread (default 1k) for every open connection, which could lead to resource contention if used at scale.
# * LocalStack shutdown works because the LocalStack gateway shuts down and terminates the open connection.
# * Our LS-internal optimizations using custom headers reduce the load on the LocalStack gateway by allowing for larger batch sizes and longer wait times than the AWS API.

LOG.debug("Polled %d events from %s", len(messages), self.source_arn)

messages = response.get("Messages", [])
LOG.debug("Polled %d events from %s", len(messages), self.source_arn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably good for debugging to be explicit here. I'm just wondering whether it's intentional to log empty polls as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking it could be useful for debugging to log explicitly whether nothing was polled from the event source. Perhaps we can distinguish this better with a "Polled no events from %s" -- wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main thought is around avoiding log pollution (imagine 100 ESMs printing every second), but it's probably worth keeping for now. For example: it would help to identify whether jitter around the 1s interval is needed 💡 .

The format is fine, being consistent is good 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be very careful about this - many people have DEBUG=1 by default, and this can be a lot. I agree it is not urgent to remove - but especially the message for no events could be removed in the future.


messages = response.get("Messages", [])
LOG.debug("Polled %d events from %s", len(messages), self.source_arn)
# NOTE: Split up a batch into mini-batches of up to 2.5K records each. This is to prevent exceeding the 6MB size-limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could move the # TODO: implement invocation payload size quota here, clarifying that's only a heuristic and not a perfect parity implementation

@gregfurman gregfurman requested a review from joe4dev February 12, 2025 15:46
Copy link
Member
@joe4dev joe4dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the unlimited persistence and going the extra mile @gregfurman to gain more confidence and learn more about potential limitations of this impactful change 🙇

The code changes look good, we have iterated on them and all suggestions are taken into consideration 👏👏👏 .

The benchmark results indicate that this change does not negatively impact the LocalStack gateway latency, but we should keep an 👂 open to potential user feedback in large (>100 ESMs) and high-performance (>100 users) environments.

I love the out-of-the-box thinking around custom headers to go beyond AWS API restrictions to reduce the number of requests on LocalStack while still keeping remote API compatibility 🧠 💯

Strictly speaking, we don't have explicit aws-validated tests for the new MaximumBatchingWindowInSeconds feature, but I think that's fine here given how the cost/value trade-off. It would be hard to test against AWS because their internal behavior is somewhat unpredictable (depending on internal performance optimizations) and probably not guaranteed (i.e., max is an upper bound). It would likely require multiple iterations of time-consuming testing.

Good to 🚢 from my side 🚀

nit: Any idea where that weird line mismatch (995 vs 999) comes from in the failing Lambda test?
It's clearly unrelated to these changes.

Copy link
Member
@dfangl dfangl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good to go, let's get it in!
I agree with Joel's final commend wholeheartedly, great work and patience with this PR!

LOG.debug("Polled %d events from %s", len(messages), self.source_arn)

messages = response.get("Messages", [])
LOG.debug("Polled %d events from %s", len(messages), self.source_arn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be very careful about this - many people have DEBUG=1 by default, and this can be a lot. I agree it is not urgent to remove - but especially the message for no events could be removed in the future.

@@ -1617,17 +1608,72 @@ def test_sqs_event_source_mapping_batch_size_override(
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid))
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid)

expected_invocations = -(batch_size // -2500) # converts floor division to ceil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we just add math.ceil here? If I have to explain what a line does in a comment, it is not really well readable :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F987

Lol yeah. Lemme change this now...

@joe4dev
Copy link
Member
joe4dev commented Feb 25, 2025

@gregfurman Do we know why test_sqs_event_source_mapping_batching_reserved_concurrency fails? Is it flaky?
https://app.circleci.com/pipelines/github/localstack/localstack/31294/workflows/85e2e6dd-0f6e-4014-9612-2bbad5b4e266/jobs/278937/tests

@gregfurman
Copy link
Contributor Author

@joe4dev Just ran some tests for this locally. There's a chance that the batch window of 10s exceeds before all 30 records have been processed and we end up flushing 3 times instead of twice. If we make the window larger this can be mitigated else if we load up the SQS queue before polling begins then the flake should disappear

@gregfurman gregfurman merged commit e509f9d into master Feb 26, 2025
31 checks passed
@gregfurman gregfurman deleted the fix/esm/batching branch February 26, 2025 09:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
aws:lambda:event-source-mapping AWS Lambda Event Source Mapping (ESM) semver: minor Non-breaking changes which can be included in minor releases, but not in patch releases
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants
0