8000 Kinesis: SubscribeToShard session cut after 5min by ackdav · Pull Request #6732 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

Kinesis: SubscribeToShard session cut after 5min #6732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 28, 2022
Prev Previous commit
Next Next commit
fixes an early timeout in kinesis subscribe_to_shard mentioned in #5830
…, adds a test for it
  • Loading branch information
ackdav authored and thrau committed Aug 27, 2022
commit 0ed3c41db61e466a2a25545eae444b1113b21100
7 changes: 4 additions & 3 deletions localstack/services/kinesis/kinesis_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
ACTION_PUT_RECORD = "%s.PutRecord" % ACTION_PREFIX
ACTION_PUT_RECORDS = "%s.PutRecords" % ACTION_PREFIX
ACTION_LIST_STREAMS = "%s.ListStreams" % ACTION_PREFIX

MAX_SUBSCRIPTION_SECONDS = 300

class KinesisBackend(RegionBackend):
def __init__(self):
Expand Down Expand Up @@ -296,8 +296,9 @@ def send_events():
yield convert_to_binary_event_payload("", event_type="initial-response")
iter = iterator
last_sequence_number = starting_sequence_number
# TODO: find better way to run loop up to max 5 minutes (until connection terminates)!
for i in range(5 * 60):
maximum_duration_subscription_timestamp = now_utc() + MAX_SUBSCRIPTION_SECONDS

while now_utc() < maximum_duration_subscription_timestamp:
result = None
try:
result = kinesis.get_records(ShardIterator=iter)
Expand Down
51 changes: 51 additions & 0 deletions tests/integration/test_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,57 @@ def test_record_lifecycle_data_integrity(
for response_record in response_records:
assert response_record.get("Data").decode("utf-8") in records_data

@pytest.mark.aws_validated
def test_subscribe_to_shard_timeout(self, kinesis_client, kinesis_create_stream, wait_for_stream_ready,
wait_for_consumer_ready, monkeypatch):

monkeypatch.setattr(kinesis_listener, "MAX_SUBSCRIPTION_SECONDS", 5)

stream_name = "test-%s" % short_uid()

# create stream and consumer
kinesis_create_stream(StreamName=stream_name, ShardCount=1)
stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][
"StreamARN"
]
wait_for_stream_ready(stream_name)
# create consumer
result = kinesis_client.register_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")[
"Consumer"
]
consumer_arn = result["ConsumerARN"]
wait_for_consumer_ready(consumer_arn=consumer_arn)

# subscribe to shard
response = kinesis_client.describe_stream(StreamName=stream_name)
shard_id = response.get("StreamDescription").get("Shards")[0].get("ShardId")
result = kinesis_client.subscribe_to_shard(
ConsumerARN=result["ConsumerARN"],
ShardId=shard_id,
StartingPosition={"Type": "TRIM_HORIZON"},
)
stream = result["EventStream"]

# letting the subscription run out
time.sleep(7)

# put records
msg = b"Hello world1"
kinesis_client.put_records(
StreamName=stream_name, Records=[{"Data": msg, "PartitionKey": "1"}]
)

# due to the subscription being timed out, we should not be able to read out results
results = []
for entry in stream:
records = entry["SubscribeToShardEvent"]["Records"]
results.extend(records)

assert len(results) == 0

# clean up
kinesis_client.deregister_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")

@pytest.mark.aws_validated
def test_add_tags_to_stream(self, kinesis_client, kinesis_create_stream, wait_for_stream_ready):
stream_name = "test-%s" % short_uid()
Expand Down
0