-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
[ESM] Re-initialize shards when NextShardIterator value is empty #12483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
LocalStack Community integration with Pro 2 files ± 0 2 suites ±0 1h 32m 52s ⏱️ - 21m 52s Results for commit cdc35b3. ± Comparison against base commit 8999cc4. This pull request removes 1175 and adds 2 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch and excellent link 👍
General limitation (unrelated to this change):
Do we clarify in the code and docs that resharding might lead to undefined/untested behavior?
I guess we could lose data with LATEST
and potentially re-process records with AT_TIMESTAMP
and TRIM_RECORDS
here 😬
@@ -185,7 +185,10 @@ def poll_events(self): | |||
def poll_events_from_shard(self, shard_id: str, shard_iterator: str): | |||
get_records_response = self.get_records(shard_iterator) | |||
records: list[dict] = get_records_response.get("Records", []) | |||
next_shard_iterator = get_records_response["NextShardIterator"] | |||
if not (next_shard_iterator := get_records_response.get("NextShardIterator")): | |||
# If the next shard iterator is None, we can assume the shard is closed or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add that great docs link here: https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty
Motivation
We do not currently cater for the case of a next shard iterator value not being present in a
GetRecords
response. According to the AWS doc:In addition, DynamoDB Local can occasionally not return a
NextShardIterator
in its response. This seems to occur when trying to read from a closed shard or when using an expired iterator. Hence, an exception is raised since the StreamPoller always expects this field in aGetRecords
response -- raising the belowKeyError
exception:Changes
NextShardIterator
is not returned or isNone
in aGetRecords
response.