8000 migrate Kinesis to ASF by alexrashed · Pull Request #6166 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

migrate Kinesis to ASF #6166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
875 changes: 875 additions & 0 deletions localstack/aws/api/kinesis/__init__.py

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion localstack/aws/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ def _forward_request(context, service_request: ServiceRequest = None) -> Service
parameters=service_request,
region=context.region,
)
local_context.request.headers.update(context.request.headers)
# update the newly created context with non-payload specific request headers (the payload can differ from
# the original request, f.e. it could be JSON encoded now while the initial request was CBOR encoded)
headers = Headers(context.request.headers)
headers.pop("Content-Type", None)
headers.pop("Content-Length", None)
local_context.request.headers.update(headers)
context = local_context
return forward_request(context, forward_url_getter)

Expand Down
4 changes: 2 additions & 2 deletions localstack/aws/handlers/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def create_not_implemented_response(self, context):
message = f"no handler for operation '{operation_name}' on service '{service_name}'"
error = CommonServiceException("InternalFailure", message, status_code=501)
serializer = create_serializer(context.service)
return serializer.serialize_error_to_response(error, operation)
return serializer.serialize_error_to_response(error, operation, context.request.headers)


class ServiceExceptionSerializer(ExceptionHandler):
Expand Down Expand Up @@ -225,7 +225,7 @@ def create_exception_response(self, exception: Exception, context: RequestContex
context.service_exception = error

serializer = create_serializer(context.service) # TODO: serializer cache
return serializer.serialize_error_to_response(error, operation)
return serializer.serialize_error_to_response(error, operation, context.request.headers)


class ServiceResponseParser(Handler):
Expand Down
343 changes: 245 additions & 98 deletions localstack/aws/protocol/serializer.py

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions localstack/aws/skeleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def dispatch_request(self, context: RequestContext, instance: ServiceRequest) ->
context.service_response = result

# Serialize result dict to an HTTPResponse and return it
return self.serializer.serialize_to_response(result, operation)
return self.serializer.serialize_to_response(result, operation, context.request.headers)

def on_service_exception(
self, context: RequestContext, exception: ServiceException
Expand All @@ -185,7 +185,9 @@ def on_service_exception(
"""
context.service_exception = exception

return self.serializer.serialize_error_to_response(exception, context.operation)
return self.serializer.serialize_error_to_response(
exception, context.operation, context.request.headers
)

def on_not_implemented_error(self, context: RequestContext) -> HttpResponse:
"""
Expand All @@ -211,4 +213,4 @@ def on_not_implemented_error(self, context: RequestContext) -> HttpResponse:
)
context.service_exception = error

return serializer.serialize_error_to_response(error, operation)
return serializer.serialize_error_to_response(error, operation, context.request.headers)
2 changes: 1 addition & 1 deletion localstack/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def in_docker():
# Delay between data persistence (in seconds)
KINESIS_MOCK_PERSIST_INTERVAL = os.environ.get("KINESIS_MOCK_PERSIST_INTERVAL", "").strip() or "5s"

# Kinesis provider - either "kinesis-mock" or "kinesalite"
# Kinesis provider - either "kinesis-mock" or "kinesalite" (deprecated, kinesalite support will be removed)
KINESIS_PROVIDER = os.environ.get("KINESIS_PROVIDER") or "kinesis-mock"

# Whether or not to handle lambda event sources as synchronous invocations
Expand Down
1 change: 1 addition & 0 deletions localstack/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@

# content types / encodings
HEADER_CONTENT_TYPE = "Content-Type"
TEXT_XML = "text/xml"
APPLICATION_AMZ_JSON_1_0 = "application/x-amz-json-1.0"
APPLICATION_AMZ_JSON_1_1 = "application/x-amz-json-1.1"
APPLICATION_AMZ_CBOR_1_1 = "application/x-amz-cbor-1.1"
Expand Down
21 changes: 15 additions & 6 deletions localstack/services/kinesis/kinesis_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from localstack import config
from localstack.services.infra import log_startup_message, start_proxy_for_service
from localstack.services.kinesis import kinesalite_server, kinesis_mock_server
from localstack.services.plugins import SERVICE_PLUGINS
from localstack.utils.aws import aws_stack
from localstack.utils.serving import Server

Expand Down Expand Up @@ -38,12 +39,16 @@ def start_kinesis(
_server.start()
log_startup_message("Kinesis")
port = port or config.service_port("kinesis")
start_proxy_for_service(
"kinesis",
port,
backend_port=_server.port,
update_listener=update_listener,
)

# TODO: flip back to "!= kinesis:asf" to be sure we have the old control path when merging
if SERVICE_PLUGINS.get("kinesis").name() == "kinesis:legacy":
start_proxy_for_service(
"kinesis",
port,
backend_port=_server.port,
update_listener=update_listener,
)

return _server


Expand Down Expand Up @@ -76,3 +81,7 @@ def is_kinesis_running() -> bool:
if _server is None:
return False
return _server.is_running()


def get_server():
return _server
Loading
0