-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Step Functions: Improve Nested Map Run Stability #12343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
LocalStack Community integration with Pro 2 files 2 suites 31m 3s ⏱️ Results for commit 232a967. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM functionally, I just added a clarification question.
Feel free to merge :)
def _map_run(self, env: Environment) -> None: | ||
|
||
def _map_run( | ||
self, env: Environment, eval_input: DistributedIterationComponentEvalInput |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice conversion into statelessness using eval_input
as parameter 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs: Would it make sense to add a comment about the statelessness here for our future selves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll leave a comment in the top level class for iterations
elif isinstance(self.iteration_component, InlineItemProcessor): | ||
eval_input = InlineItemProcessorEvalInput( | ||
env.map_run_record_pool_manager.add(map_run_record) | ||
if isinstance(self.iteration_component, DistributedIterator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs: It might be helpful to explain the difference of DistributedIteratorEvalInput
and DistributedItemProcessorEvalInput
in a clarifying comment here
} | ||
} | ||
}, | ||
"MaxConcurrency": 9 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me clarifying why the test case aims to cover no_max_concurrency
, but we have two MaxConcurrency
configurations in the state machine definition here?
This might be worth clarifying in a comment as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of the test is to not limit concurrency, which we always do to 1 in the test suite (MaxConcurrency: 1). We achieve this by still bounding the max concurrency to an upper limit (just not the default maximum of removing the MaxConcurrency setting).
Motivation
Currently, the Step Functions v2 interpreter encounters issues when evaluating nested MapRun calls distributed across multiple MapRuns #12335. This arises from two main issues: certain map run components inappropriately use member variables to share state, and the worker number management is shared across multiple map runs, causing no workers to start for jobs beyond the first job in nested scenarios.
Changes
This pull request addresses the anti-pattern of stateful objects in the map run logic by making them stateless. Additionally, it corrects the creation of workers so they are appropriately initiated for all nested map runs.
Testing
Added a relevant snapshot test to verify the correct functioning of distributed nested map runs without maximum concurrency settings.