8000 migrate Kinesis to ASF (#6166) · localstack/localstack@122e414 · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 122e414

Browse files
authored
migrate Kinesis to ASF (#6166)
1 parent 0c741f7 commit 122e414

File tree

23 files changed

+1722
-273
lines changed

23 files changed

+1722
-273
lines changed

localstack/aws/api/kinesis/__init__.py

Lines changed: 875 additions & 0 deletions
Large diffs are not rendered by default.

localstack/aws/forwarder.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,12 @@ def _forward_request(context, service_request: ServiceRequest = None) -> Service
7474
parameters=service_request,
7575
region=context.region,
7676
)
77-
local_context.request.headers.update(context.request.headers)
77+
# update the newly created context with non-payload specific request headers (the payload can differ from
78+
# the original request, f.e. it could be JSON encoded now while the initial request was CBOR encoded)
79+
headers = Headers(context.request.headers)
80+
headers.pop("Content-Type", None)
81+
headers.pop("Content-Length", None)
82+
local_context.request.headers.update(headers)
7883
context = local_context
7984
return forward_request(context, forward_url_getter)
8085

localstack/aws/handlers/service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ def create_not_implemented_response(self, context):
150150
message = f"no handler for operation '{operation_name}' on service '{service_name}'"
151151
error = CommonServiceException("InternalFailure", message, status_code=501)
152152
serializer = create_serializer(context.service)
153-
return serializer.serialize_error_to_response(error, operation)
153+
return serializer.serialize_error_to_response(error, operation, context.request.headers)
154154

155155

156156
class ServiceExceptionSerializer(ExceptionHandler):
@@ -225,7 +225,7 @@ def create_exception_response(self, exception: Exception, context: RequestContex
225225
context.service_exception = error
226226

227227
serializer = create_serializer(context.service) # TODO: serializer cache
228-
return serializer.serialize_error_to_response(error, operation)
228+
return serializer.serialize_error_to_response(error, operation, context.request.headers)
229229

230230

231231
class ServiceResponseParser(Handler):

localstack/aws/protocol/serializer.py

Lines changed: 245 additions & 98 deletions
Large diffs are not rendered by default.

localstack/aws/skeleton.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def dispatch_request(self, context: RequestContext, instance: ServiceRequest) ->
171171
context.service_response = result
172172

173173
# Serialize result dict to an HTTPResponse and return it
174-
return self.serializer.serialize_to_response(result, operation)
174+
return self.serializer.serialize_to_response(result, operation, context.request.headers)
175175

176176
def on_service_exception(
177177
self, context: RequestContext, exception: ServiceException
@@ -185,7 +185,9 @@ def on_service_exception(
185185
"""
186186
context.service_exception = exception
187187

188-
return self.serializer.serialize_error_to_response(exception, context.operation)
188+
return self.serializer.serialize_error_to_response(
189+
exception, context.operation, context.request.headers
190+
)
189191

190192
def on_not_implemented_error(self, context: RequestContext) -> HttpResponse:
191193
"""
@@ -211,4 +213,4 @@ def on_not_implemented_error(self, context: RequestContext) -> HttpResponse:
211213
)
212214
context.service_exception = error
213215

214-
return serializer.serialize_error_to_response(error, operation)
216+
return serializer.serialize_error_to_response(error, operation, context.request.headers)

localstack/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ def in_docker():
554554
# Delay between data persistence (in seconds)
555555
KINESIS_MOCK_PERSIST_INTERVAL = os.environ.get("KINESIS_MOCK_PERSIST_INTERVAL", "").strip() or "5s"
556556

557-
# Kinesis provider - either "kinesis-mock" or "kinesalite"
557+
# Kinesis provider - either "kinesis-mock" or "kinesalite" (deprecated, kinesalite support will be removed)
558558
KINESIS_PROVIDER = os.environ.get("KINESIS_PROVIDER") or "kinesis-mock"
559559

560560
# Whether or not to handle lambda event sources as synchronous invocations

localstack/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090

9191
# content types / encodings
9292
HEADER_CONTENT_TYPE = "Content-Type"
93+
TEXT_XML = "text/xml"
9394
APPLICATION_AMZ_JSON_1_0 = "application/x-amz-json-1.0"
9495
APPLICATION_AMZ_JSON_1_1 = "application/x-amz-json-1.1"
9596
APPLICATION_AMZ_CBOR_1_1 = "application/x-amz-cbor-1.1"

localstack/services/kinesis/kinesis_starter.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from localstack import config
55
from localstack.services.infra import log_startup_message, start_proxy_for_service
66
from localstack.services.kinesis import kinesalite_server, kinesis_mock_server
7+
from localstack.services.plugins import SERVICE_PLUGINS
78
from localstack.utils.aws import aws_stack
89
from localstack.utils.serving import Server
910

@@ -38,12 +39,16 @@ def start_kinesis(
3839
_server.start()
3940
log_startup_message("Kinesis")
4041
port = port or config.service_port("kinesis")
41-
start_proxy_for_service(
42-
"kinesis",
43-
port,
44-
backend_port=_server.port,
45-
update_listener=update_listener,
46-
)
42+
43+
# TODO: flip back to "!= kinesis:asf" to be sure we have the old control path when merging
44+
if SERVICE_PLUGINS.get("kinesis").name() == "kinesis:legacy":
45+
start_proxy_for_service(
46+
"kinesis",
47+
port,
48+
backend_port=_server.port,
49+
update_listener=update_listener,
50+
)
51+
4752
return _server
4853

4954

@@ -76,3 +81,7 @@ def is_kinesis_running() -> bool:
7681
if _server is None:
7782
return False
7883
return _server.is_running()
84+
85+
86+
def get_server():
87+
return _server

0 commit comments

Comments
 (0)
0