8000 Kinesis: SubscribeToShard session cut after 5min by ackdav · Pull Request #6732 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

Kinesis: SubscribeToShard session cut after 5min #6732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 28, 2022
Prev Previous commit
Next Next commit
converted 3 tests to snapshots, streamlined streamname creating acros…
…s tests, reduced patched timeout from PR comments
  • Loading branch information
ackdav authored and thrau committed Aug 27, 2022
commit 252005283a1ba092646feda289cd227c1b27fb16
23 changes: 23 additions & 0 deletions localstack/testing/snapshots/transformer_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,29 @@ def s3_api():
),
]

@staticmethod
def kinesis_api():
"""
:return: array with Transformers, for s3 api.
"""
return [
JsonpathTransformer(
jsonpath="$..Records..SequenceNumber",
replacement="sequence_number",
replace_reference=True,
),
TransformerUtility.key_value(
"StartingSequenceNumber", "starting_sequence_number", reference_replacement=False
),
TransformerUtility.key_value("ShardId", "shard_id", reference_replacement=False),
TransformerUtility.key_value(
"EndingHashKey", "ending_hash", reference_replacement=False
),
TransformerUtility.key_value(
"StartingHashKey", "starting_hash", reference_replacement=False
),
]

@staticmethod
def sqs_api():
"""
Expand Down
76 changes: 36 additions & 40 deletions tests/integration/test_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import re
import time
from datetim 10000 e import datetime
from unittest.mock import patch

import cbor2
import pytest
Expand Down Expand Up @@ -32,26 +33,29 @@ def get_shard_iterator(stream_name, kinesis_client):
return response.get("ShardIterator")


@pytest.fixture(autouse=True)
def kinesis_snapshot_transformer(snapshot):
snapshot.add_transformer(snapshot.transform.kinesis_api())


class TestKinesis:
@pytest.mark.aws_validated
def test_create_stream_without_shard_count(
self, kinesis_client, kinesis_create_stream, wait_for_stream_ready
self, kinesis_client, kinesis_create_stream, wait_for_stream_ready, snapshot
):
stream_name = kinesis_create_stream()
wait_for_stream_ready(stream_name)
describe_stream = kinesis_client.describe_stream(StreamName=stream_name)
assert describe_stream
assert "StreamDescription" in describe_stream
assert "Shards" in describe_stream["StreamDescription"]
# By default, new streams have a shard count of 4
assert len(describe_stream["StreamDescription"]["Shards"]) == 4

shards = describe_stream["StreamDescription"]["Shards"]
shards.sort(key=lambda k: k.get("ShardId"))

snapshot.match("Shards", shards)

@pytest.mark.aws_validated
def test_stream_consumers(
self, kinesis_client, kinesis_create_stream, wait_for_stream_ready, wait_for_consumer_ready
):
stream_name = "test-%s" % short_uid()

def assert_consumers(**kwargs):
consumer_list = kinesis_client.list_stream_consumers(StreamARN=stream_arn).get(
"Consumers"
Expand All @@ -60,7 +64,7 @@ def assert_consumers(**kwargs):
return consumer_list

# create stream and assert 0 consumers
kinesis_create_stream(StreamName=stream_name, ShardCount=1)
stream_name = kinesis_create_stream(ShardCount=1)
stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][
"StreamARN"
]
Expand Down Expand Up @@ -108,10 +112,8 @@ def assert_consumers(**kwargs):
def test_subscribe_to_shard(
self, kinesis_client, kinesis_create_stream, wait_for_stream_ready, wait_for_consumer_ready
):
stream_name = "test-%s" % short_uid()

# create stream and consumer
kinesis_create_stream(StreamName=stream_name, ShardCount=1)
stream_name = kinesis_create_stream(ShardCount=1)
stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][
"StreamARN"
]
Expand Down Expand Up @@ -167,11 +169,10 @@ def test_subscribe_to_shard(
def test_subscribe_to_shard_with_sequence_number_as_iterator(
self, kinesis_client, kinesis_create_stream, wait_for_stream_ready, wait_for_consumer_ready
):
stream_name = "test-%s" % short_uid()
record_data = "Hello world"

# create stream and consumer
kinesis_create_stream(StreamName=stream_name, ShardCount=1)
stream_name = kinesis_create_stream(ShardCount=1)
stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][
"StreamARN"
]
Expand Down Expand Up @@ -226,9 +227,8 @@ def test_subscribe_to_shard_with_sequence_number_as_iterator(
kinesis_client.deregister_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")

def test_get_records(self, kinesis_client, kinesis_create_stream, wait_for_stream_ready):
stream_name = "test-%s" % short_uid()

kinesis_create_stream(StreamName=stream_name, ShardCount=1)
# create stream
stream_name = kinesis_create_stream(ShardCount=1)
wait_for_stream_ready(stream_name)

kinesis_client.put_records(
Expand Down Expand Up @@ -261,9 +261,7 @@ def test_get_records(self, kinesis_client, kinesis_create_stream, wait_for_strea
def test_get_records_empty_stream(
self, kinesis_client, kinesis_create_stream, wait_for_stream_ready
):
stream_name = "test-%s" % short_uid()

kinesis_create_stream(StreamName=stream_name, ShardCount=1)
stream_name = kinesis_create_stream(ShardCount=1)
wait_for_stream_ready(stream_name)

# empty get records with JSON encoding
Expand Down Expand Up @@ -291,9 +289,8 @@ def test_record_lifecycle_data_integrity(
"""
kinesis records should contain the same data from when they are sent to when they are received
"""
stream_name = "test-%s" % short_uid()
records_data = {"test", "ünicödé 统一码 💣💻🔥", "a" * 1000, ""}
kinesis_create_stream(StreamName=stream_name, ShardCount=1)
stream_name = kinesis_create_stream(ShardCount=1)
wait_for_stream_ready(stream_name)

iterator = get_shard_iterator(stream_name, kinesis_client)
Expand All @@ -312,21 +309,17 @@ def test_record_lifecycle_data_integrity(
assert response_record.get("Data").decode("utf-8") in records_data

@pytest.mark.aws_validated
@patch.object(kinesis_listener, "MAX_SUBSCRIPTION_SECONDS", 3)
def test_subscribe_to_shard_timeout(
self,
kinesis_client,
kinesis_create_stream,
wait_for_stream_ready,
wait_for_consumer_ready,
monkeypatch,
snapshot,
):

monkeypatch.setattr(kinesis_listener, "MAX_SUBSCRIPTION_SECONDS", 5)

stream_name = "test-%s" % short_uid()

# create stream and consumer
kinesis_create_stream(StreamName=stream_name, ShardCount=1)
stream_name = kinesis_create_stream(ShardCount=1)
stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][
"StreamARN"
]
Expand All @@ -349,10 +342,10 @@ def test_subscribe_to_shard_timeout(
stream = result["EventStream"]

# letting the subscription run out
time.sleep(7)
time.sleep(5)

# put records
msg = b"Hello world1"
msg = b"Hello world"
kinesis_client.put_records(
StreamName=stream_name, Records=[{"Data": msg, "PartitionKey": "1"}]
)
Expand All @@ -363,29 +356,32 @@ def test_subscribe_to_shard_timeout(
records = entry["SubscribeToShardEvent"]["Records"]
results.extend(records)

assert len(results) == 0
snapshot.match("Records", results)

# clean up
kinesis_client.deregister_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")

def test_subscribe_to_shard_pings(self):
pass

@pytest.mark.aws_validated
def test_add_tags_to_stream(self, kinesis_client, kinesis_create_stream, wait_for_stream_ready):
stream_name = "test-%s" % short_uid()
test_tags = {"Hello": "world"}
def test_add_tags_to_stream(
self, kinesis_client, kinesis_create_stream, wait_for_stream_ready, snapshot
):
test_tags = {"foo": "bar"}

# create stream
kinesis_create_stream(StreamName=stream_name, ShardCount=1)
stream_name = kinesis_create_stream(ShardCount=1)
wait_for_stream_ready(stream_name)

# adding tags
kinesis_client.add_tags_to_stream(StreamName=stream_name, Tags=test_tags)

# reading stream tags
list_tags_response = kinesis_client.list_tags_for_stream(StreamName=stream_name)
stream_tags_response = kinesis_client.list_tags_for_stream(StreamName=stream_name)

assert list_tags_response["Tags"][0]["Key"] == "Hello"
assert list_tags_response["Tags"][0]["Value"] == test_tags["Hello"]
assert not list_tags_response["HasMoreTags"]
snapshot.match("Tags", stream_tags_response["Tags"][0])
assert not stream_tags_response["HasMoreTags"]

@pytest.mark.aws_validated
def test_get_records_next_shard_iterator(
Expand Down
62 changes: 62 additions & 0 deletions tests/integration/test_kinesis.snapshot.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"tests/integration/test_kinesis.py::TestKinesis::test_subscribe_to_shard_timeout": {
"recorded-date": "25-08-2022, 09:47:30",
"recorded-content": {}
},
"tests/integration/test_kinesis.py::TestKinesis::test_create_stream_without_shard_count": {
"recorded-date": "25-08-2022, 09:34:05",
"recorded-content": {
"Shards": [
{
"ShardId": "shard_id",
"HashKeyRange": {
"StartingHashKey": "starting_hash",
"EndingHashKey": "ending_hash"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "starting_sequence_number"
}
},
{
"ShardId": "shard_id",
"HashKeyRange": {
"StartingHashKey": "starting_hash",
"EndingHashKey": "ending_hash"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "starting_sequence_number"
}
},
{
"ShardId": "shard_id",
"HashKeyRange": {
"StartingHashKey": "starting_hash",
"EndingHashKey": "ending_hash"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "starting_sequence_number"
}
},
{
"ShardId": "shard_id",
"HashKeyRange": {
"StartingHashKey": "starting_hash",
"EndingHashKey": "ending_hash"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "starting_sequence_number"
}
}
]
}
},
"tests/integration/test_kinesis.py::TestKinesis::test_add_tags_to_stream": {
"recorded-date": "25-08-2022, 08:56:43",
"recorded-content": {
"Tags": {
"Key": "foo",
"Value": "bar"
}
}
}
}
0