-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
[ESM] Improve SQS batch collection and flushing #12002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
LocalStack Community integration with Pro 2 files ± 0 2 suites ±0 1h 30m 45s ⏱️ - 23m 17s Results for commit 23d6cb6. ± Comparison against base commit 6f32581. This pull request removes 994 and adds 1 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
d7a668c
to
3e04203
Compare
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This already looks quite good, but i still have a concern:
We do not have an upper limit for the sleep times. As per your table, a queue which gets 1 message/s, and a batching window of 300s set, and a batch size of 1, triggers only once every 150s (on average, with no latency), which causes a massive queue buildup. If someone sets a 300s batch window, and a burst of messages occurs, we should still process them timely. I think AWS would do so as well.
Also please add unit descriptions to the table, as it makes it clearer :)
aab6e03
to
0237cab
Compare
@dfangl Thanks for the review here. I've removed the adaptive backoff since we're looking at providing an overwrite header in the boto3 requests to circumvent this 10 message limit. Let me know if the (simplified) changes are alright! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The simplified message collection is much clearer 👏 Thank you for these refinements @gregfurman
I'm looking forward to supporting batching windows and batching by size 🙌
However, I'm concerned about flooding the LS gateway within the while loop (see
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py
Show resolved
Hide resolved
b49705f
to
275f942
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just sharing some minor documentation suggestions to encode the lessons learned.
The code changes look good to me 👏👏👏 🚀
Two things:
- Do we have a successful ext run?
- I wanna look into the performance test results (haven't gotten to it yet ...)
@@ -97,28 +108,47 @@ def event_source(self) -> str: | |||
def poll_events(self) -> None: | |||
# SQS pipe source: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html | |||
# "The 9 Ways an SQS Message can be Deleted": https://lucvandonkersgoed.com/2022/01/20/the-9-ways-an-sqs-message-can-be-deleted/ | |||
# TODO: implement batch window expires based on MaximumBatchingWindowInSeconds | |||
# TODO: implement invocation payload size quota | |||
# TODO: consider long-polling vs. short-polling trade-off. AWS uses long-polling: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs: I would love to see this TODO resolved and replaced with a paragraph explaining the trade-offs and lessons learned. Our future selves thank us 😃
Suggestion:
# We adopted long-polling for the SQS poll operation `ReceiveMessage` for improved performance.
# * PR (2025-02): https://github.com/localstack/localstack/pull/12002
# * ESM blog mentioning long-polling: https://aws.amazon.com/de/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/
# * Amazon SQS short and long polling: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html
# + Reduces latency because the `ReceiveMessage` call immediately returns once we reach the desired `BatchSize` or the `WaitTimeSeconds` elapses.
# + Matches the AWS behavior also using long-polling
# - Blocks a LocalStack gateway thread (default 1k) for every open connection, which could lead to resource contention if used at scale.
# * LocalStack shutdown works because the LocalStack gateway shuts down and terminates the open connection.
# * Our LS-internal optimizations using custom headers reduce the load on the LocalStack gateway by allowing for larger batch sizes and longer wait times than the AWS API.
LOG.debug("Polled %d events from %s", len(messages), self.source_arn) | ||
|
||
messages = response.get("Messages", []) | ||
LOG.debug("Polled %d events from %s", len(messages), self.source_arn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably good for debugging to be explicit here. I'm just wondering whether it's intentional to log empty polls as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking it could be useful for debugging to log explicitly whether nothing was polled from the event source. Perhaps we can distinguish this better with a "Polled no events from %s"
-- wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main thought is around avoiding log pollution (imagine 100 ESMs printing every second), but it's probably worth keeping for now. For example: it would help to identify whether jitter around the 1s interval is needed 💡 .
The format is fine, being consistent is good 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be very careful about this - many people have DEBUG=1
by default, and this can be a lot. I agree it is not urgent to remove - but especially the message for no events could be removed in the future.
|
||
messages = response.get("Messages", []) | ||
LOG.debug("Polled %d events from %s", len(messages), self.source_arn) | ||
# NOTE: Split up a batch into mini-batches of up to 2.5K records each. This is to prevent exceeding the 6MB size-limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could move the # TODO: implement invocation payload size quota
here, clarifying that's only a heuristic and not a perfect parity implementation
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the unlimited persistence and going the extra mile @gregfurman to gain more confidence and learn more about potential limitations of this impactful change 🙇
The code changes look good, we have iterated on them and all suggestions are taken into consideration 👏👏👏 .
The benchmark results indicate that this change does not negatively impact the LocalStack gateway latency, but we should keep an 👂 open to potential user feedback in large (>100 ESMs) and high-performance (>100 users) environments.
I love the out-of-the-box thinking around custom headers to go beyond AWS API restrictions to reduce the number of requests on LocalStack while still keeping remote API compatibility 🧠 💯
Strictly speaking, we don't have explicit aws-validated tests for the new MaximumBatchingWindowInSeconds
feature, but I think that's fine here given how the cost/value trade-off. It would be hard to test against AWS because their internal behavior is somewhat unpredictable (depending on internal performance optimizations) and probably not guaranteed (i.e., max is an upper bound). It would likely require multiple iterations of time-consuming testing.
Good to 🚢 from my side 🚀
nit: Any idea where that weird line mismatch (995
vs 999
) comes from in the failing Lambda test?
It's clearly unrelated to these changes.
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Outdated
Show resolved
Hide resolved
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is good to go, let's get it in!
I agree with Joel's final commend wholeheartedly, great work and patience with this PR!
LOG.debug("Polled %d events from %s", len(messages), self.source_arn) | ||
|
||
messages = response.get("Messages", []) | ||
LOG.debug("Polled %d events from %s", len(messages), self.source_arn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be very careful about this - many people have DEBUG=1
by default, and this can be a lot. I agree it is not urgent to remove - but especially the message for no events could be removed in the future.
@@ -1617,17 +1608,72 @@ def test_sqs_event_source_mapping_batch_size_override( | |||
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid)) | |||
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid) | |||
|
|||
expected_invocations = -(batch_size // -2500) # converts floor division to ceil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we just add math.ceil
here? If I have to explain what a line does in a comment, it is not really well readable :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lol yeah. Lemme change this now...
@gregfurman Do we know why |
@joe4dev Just ran some tests for this locally. There's a chance that the batch window of 10s exceeds before all 30 records have been processed and we end up flushing 3 times instead of twice. If we make the window larger this can be mitigated else if we load up the SQS queue before polling begins then the flake should disappear |
Motivation
This PR allows for the ESM poller to collect and send batches of SQS messages based on the number of messages collected and the duration of the batching event (up to 300s).
The motivations were as follows:
The feature request bug: lambda event source mapping for non-FIFO SQS queue has invalid max batch size #5042 required LocalStack to support batch size over 10, which in turn would depend on MaximumBatchingWindowInSeconds being larger than 1. While offering CRUD support for this parameter, LocalStack did not actually collect a batch for the specified duration. Hence, these functional changes mean that we can now collect records until the window elapses.
Under high volumes of requests, we expect the latency of LocalStack's gateway to degrade due. By supporting window batching, we have also introduced the ability for ESM pollers to long-poll an SQS event source for batches of data -- which should help alleviate some pressure on the gateway.
Changes
Testing
test_sqs_event_source_mapping_batching_reserved_concurrency
since we can now collect and send batches of more than 10 records at a time.Benchmarking
Some benchmarks were run to assess how the long-polling changes would the latency of the LocalStack gateway. See benchmarking results here.
tl;dr Long polling saw fewer requests being made and, in some cases, had improved latencies at higher percentiles
P(>95)
.Assumptions
Results
~7%
) being processed within the 5m test interval - likely due to high resource contention while long-polling calls were being made. However, this did not hold true for maximising Batch Size (10k
) and Batch Window (300s
) with long-polling which saw the highest throughput and best latency improvements.Conclusions