8000 fix(esm/kinesis): Always store NextShardIterator from GetRecords by gregfurman · Pull Request #12677 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

fix(esm/kinesis): Always store NextShardIterator from GetRecords #12677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 29, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
# https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html
class StreamPoller(Poller):
# Mapping of shard id => shard iterator
# TODO: This mapping approach needs to be re-worked to instead store last processed sequence number.
shards: dict[str, str]
# Iterator for round-robin polling from different shards because a batch cannot contain events from different shards
# This is a workaround for not handling shards in parallel.
Expand Down Expand Up @@ -189,14 +190,17 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str):
# If the next shard iterator is None, we can assume the shard is closed or
# has expired on the DynamoDB Local server, hence we should re-initialize.
self.shards = self.initialize_shards()
else:
# We should always be storing the next_shard_iterator value, otherwise we risk an iterator expiring
# and all records being re-processed.
self.shards[shard_id] = next_shard_iterator

# We cannot reliably back-off when no records found since an iterator
# may have to move multiple times until records are returned.
# See https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty
# However, we still need to check if batcher should be triggered due to time-based batching.
should_flush = self.shard_batcher[shard_id].add(records)
if not should_flush:
self.shards[shard_id] = next_shard_iterator
return

# Retrieve and drain all events in batcher
Expand All @@ -206,9 +210,9 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str):
# This could potentially lead to data loss if forward_events_to_target raises an exception after a flush
# which would otherwise be solved with checkpointing.
# TODO: Implement checkpointing, leasing, etc. from https://docs.aws.amazon.com/streams/latest/dev/kcl-concepts.html
self.forward_events_to_target(shard_id, next_shard_iterator, batch)
self.forward_events_to_target(shard_id, batch)

def forward_events_to_target(self, shard_id, next_shard_iterator, records):
def forward_events_to_target(self, shard_id, records):
polled_events = self.transform_into_events(records, shard_id)
abort_condition = None
# TODO: implement format detection behavior (e.g., for JSON body):
Expand All @@ -230,9 +234,8 @@ def forward_events_to_target(self, shard_id, next_shard_iterator, records):
# TODO: implement MaximumBatchingWindowInSeconds flush condition (before or after filter?)
# Don't trigger upon empty events
if len(matching_events_post_filter) == 0:
# Update shard iterator if no records match the filter
self.shards[shard_id] = next_shard_iterator
return

events = self.add_source_metadata(matching_events_post_filter)
LOG.debug("Polled %d events from %s in shard %s", len(events), self.source_arn, shard_id)
# -> This could be tested by setting a high retry number, using a long pipe execution, and a relatively
Expand Down Expand Up @@ -349,8 +352,6 @@ def forward_events_to_target(self, shard_id, next_shard_iterator, records):
partner_resource_arn=self.partner_resource_arn,
)
self.send_events_to_dlq(shard_id, events, context=failure_context)
# Update shard iterator if the execution failed but the events are sent to a DLQ
self.shards[shard_id] = next_shard_iterator

def get_records(self, shard_iterator: str) -> dict:
"""Returns a GetRecordsOutput from the GetRecords endpoint of streaming services such as Kinesis or DynamoDB"""
Expand Down
Loading
0