8000 Kinesis: SubscribeToShard session cut after 5min by ackdav · Pull Request #6732 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

Kinesis: SubscribeToShard session cut after 5min #6732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 28, 2022
12 changes: 8 additions & 4 deletions localstack/services/kinesis/kinesis_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ACTION_PUT_RECORD = "%s.PutRecord" % ACTION_PREFIX
ACTION_PUT_RECORDS = "%s.PutRecords" % ACTION_PREFIX
ACTION_LIST_STREAMS = "%s.ListStreams" % ACTION_PREFIX
MAX_SUBSCRIPTION_SECONDS = 300


class KinesisBackend(RegionBackend):
Expand Down Expand Up @@ -296,8 +297,9 @@ def send_events():
yield convert_to_binary_event_payload("", event_type="initial-response")
iter = iterator
last_sequence_number = starting_sequence_number
# TODO: find better way to run loop up to max 5 minutes (until connection terminates)!
for i in range(5 * 60):
maximum_duration_subscription_timestamp = now_utc() + MAX_SUBSCRIPTION_SECONDS

while now_utc() < maximum_duration_subscription_timestamp:
result = None
try:
result = kinesis.get_records(ShardIterator=iter)
Expand All @@ -319,8 +321,10 @@ def send_events():
record["Data"] = to_str(base64.b64encode(record["Data"]))
last_sequence_number = record["SequenceNumber"]
if not records:
time.sleep(1)
continue
# On AWS there is *at least* 1 event every 5 seconds
# but this is not possible in this structure.
# In order to avoid a 5-second blocking call, we make the compromise of 3 seconds.
time.sleep(3)

response = {
"ChildShards": [],
Expand Down
33 changes: 33 additions & 0 deletions localstack/testing/snapshots/transformer_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,39 @@ def s3_api():
),
]

@staticmethod
def kinesis_api():
"""
:return: array with Transformers, for kinesis api.
"""
return [
JsonpathTransformer(
jsonpath="$..Records..SequenceNumber",
replacement="sequence_number",
replace_reference=True,
),
TransformerUtility.key_value("StartingSequenceNumber", "starting_sequence_number"),
TransformerUtility.key_value("ShardId", "shard_id"),
TransformerUtility.key_value(
"EndingHashKey", "ending_hash", reference_replacement=False
),
TransformerUtility.key_value(
"StartingHashKey", "starting_hash", reference_replacement=False
),
TransformerUtility.key_value(_resource_name_transformer, "ConsumerARN"),
RegexTransformer(
r"([a-zA-Z0-9-_.]*)?\/consumer:([0-9-_.]*)?",
replacement="<stream-consumer>",
),
RegexTransformer(
r"([a-zA-Z0-9-_.]*)?\/test-stream-([a-zA-Z0-9-_.]*)?",
replacement="<stream-name>",
),
TransformerUtility.key_value(
"ContinuationSequenceNumber", "<continuation_sequence_number>"
),
]

@staticmethod
def sqs_api():
"""
Expand Down
Loading
0