8000 correctly shutdown kinesis listener once firehose deliverystream is d… · localstack/localstack@bc12737 · GitHub
[go: up one dir, main page]

Skip to content

Commit bc12737

Browse files
authored
correctly shutdown kinesis listener once firehose deliverystream is deleted (#6729)
1 parent 30a1ed8 commit bc12737

File tree

5 files changed

+309
-310
lines changed

5 files changed

+309
-310
lines changed

localstack/services/firehose/provider.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
truncate,
9797
)
9898
from localstack.utils.kinesis import kinesis_connector
99+
from localstack.utils.kinesis.kinesis_connector import KinesisProcessorThread
99100
from localstack.utils.run import run_for_max_seconds
100101
from localstack.utils.tagging import TaggingService
101102

@@ -117,11 +118,13 @@ def next_sequence_number() -> int:
117118
class FirehoseBackend(RegionBackend):
118119
# maps delivery stream names to DeliveryStreamDescription
119120
delivery_streams: Dict[str, DeliveryStreamDescription]
121+
kinesis_listeners: Dict[str, KinesisProcessorThread]
120122
# static tagging service instance
121123
TAGS = TaggingService()
122124

123125
def __init__(self):
124126
self.delivery_streams = {}
127+
self.kinesis_listeners = {}
125128

126129

127130
def _get_description_or_raise_not_found(
@@ -231,13 +234,14 @@ def create_delivery_stream(
231234
def _startup():
232235
stream["DeliveryStreamStatus"] = DeliveryStreamStatus.CREATING
233236
try:
234-
kinesis_connector.listen_to_kinesis(
237+
process = kinesis_connector.listen_to_kinesis(
235238
stream_name=kinesis_stream_name,
236239
fh_d_stream=delivery_stream_name,
237240
listener_func=self._process_records,
238241
wait_until_started=True,
239242
ddb_lease_table_suffix="-firehose",
240243
)
244+
region.kinesis_listeners[delivery_stream_name] = process
241245
stream["DeliveryStreamStatus"] = DeliveryStreamStatus.ACTIVE
242246
except Exception as e:
243247
LOG.warning(
@@ -260,6 +264,10 @@ def delete_delivery_stream(
260264
raise ResourceNotFoundException(
261265
f"Firehose {delivery_stream_name} under account {context.account_id} " f"not found."
262266
)
267+
kinesis_process = region.kinesis_listeners.pop(delivery_stream_name, None)
268+
if kinesis_process:
269+
LOG.debug("Stopping kinesis listener for %s", delivery_stream_name)
270+
kinesis_process.stop()
263271

264272
return DeleteDeliveryStreamOutput()
265273

tests/integration/test_firehose.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ def test_delivery_stream_with_kinesis_as_source(
379379
s3_client,
380380
s3_bucket,
381381
kinesis_create_stream,
382+
cleanups,
382383
):
383384

384385
bucket_arn = aws_stack.s3_bucket_arn(s3_bucket)
@@ -429,9 +430,16 @@ def test_delivery_stream_with_kinesis_as_source(
429430
},
430431
},
431432
)
432-
433+
cleanups.append(
434+
lambda: firehose_client.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
435+
)
433436
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
434437

435-
firehose_client.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)
436-
kinesis_client.delete_stream(StreamName=stream_name)
437-
s3_client.delete_bucket(Bucket=s3_bucket)
438+
# make sure the stream will come up at some point, for cleaner cleanup
439+
def check_stream_state():
440+
stream = firehose_client.describe_delivery_stream(
441+
DeliveryStreamName=delivery_stream_name
442+
)
443+
return stream["DeliveryStreamDescription"]["DeliveryStreamStatus"] == "ACTIVE"
444+
445+
assert poll_condition(check_stream_state, 45, 1)

tests/integration/test_iam.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
import json
2-
import logging
3-
import os
42

53
import pytest
64
from botocore.exceptions import ClientError
@@ -10,7 +8,6 @@
108
from localstack.services.iam.provider import ADDITIONAL_MANAGED_POLICIES
119
from localstack.testing.aws.util import create_client_with_keys, wait_for_user
1210
from localstack.utils.common import short_uid
13-
from localstack.utils.kinesis import kinesis_connector
1411
from localstack.utils.strings import long_uid
1512

1613
GET_USER_POLICY_DOC = """{
@@ -128,30 +125,6 @@ def test_create_user_add_permission_boundary_afterwards(
128125

129126

130127
class TestIAMIntegrations:
131-
132-
# TODO what does this test do?
133-
def test_run_kcl_with_iam_assume_role(self):
134-
env_vars = {}
135-
if os.environ.get("AWS_ASSUME_ROLE_ARN"):
136-
env_vars["AWS_ASSUME_ROLE_ARN"] = os.environ.get("AWS_ASSUME_ROLE_ARN")
137-
env_vars["AWS_ASSUME_ROLE_SESSION_NAME"] = os.environ.get(
138-
"AWS_ASSUME_ROLE_SESSION_NAME"
139-
)
140-
env_vars["ENV"] = os.environ.get("ENV") or "main"
141-
142-
def process_records(records):
143-
print(records)
144-
145-
# start Kinesis client
146-
stream_name = f"test-foobar-{short_uid()}"
147-
kinesis_connector.listen_to_kinesis(
148-
stream_name=stream_name,
149-
listener_func=process_records,
150-
env_vars=env_vars,
151-
kcl_log_level=logging.INFO,
152-
wait_until_started=True,
153-
)
154-
155128
def test_attach_iam_role_to_new_iam_user(self, iam_client):
156129
test_policy_document = {
157130
"Version": "2012-10-17",

0 commit comments

Comments
 (0)
0