8000 migrate Kinesis to ASF by alexrashed · Pull Request #6166 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

migrate Kinesis to ASF #6166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add content-negotiation support to ASF serializer
  • Loading branch information
alexrashed committed Aug 30, 2022
commit 187b3f83605597fc229b93bf47d9c05f6b24701b
7 changes: 6 additions & 1 deletion localstack/aws/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ def _forward_request(context, service_request: ServiceRequest = None) -> Service
parameters=service_request,
region=context.region,
)
local_context.request.headers.update(context.request.headers)
# update the newly created context with non-payload specific request headers (the payload can differ from
# the original request, f.e. it could be JSON encoded now while the initial request was CBOR encoded)
headers = Headers(context.request.headers)
headers.pop("Content-Type", None)
headers.pop("Content-Length", None)
local_context.request.headers.update(headers)
context = local_context
return forward_request(context, forward_url_getter)

Expand Down
4 changes: 2 additions & 2 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def create_not_implemented_response(self, context):
message = f"no handler for operation '{operation_name}' on service '{service_name}'"
error = CommonServiceException("InternalFailure", message, status_code=501)
serializer = create_serializer(context.service)
return serializer.serialize_error_to_response(error, operation)
return serializer.serialize_error_to_response(error, operation, context.request.headers)


class ServiceExceptionSerializer(ExceptionHandler):
Expand Down Expand Up @@ -225,7 +225,7 @@ def create_exception_response(self, exception: Exception, context: RequestContex
context.service_exception = error

serializer = create_serializer(context.service) # TODO: serializer cache
return serializer.serialize_error_to_response(error, operation)
return serializer.serialize_error_to_response(error, operation, context.request.headers)


class ServiceResponseParser(Handler):
Expand Down
343 changes: 245 additions & 98 deletions localstack/aws/protocol/serializer.py

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions localstack/aws/skeleton.py
E30A
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def dispatch_request(self, context: RequestContext, instance: ServiceRequest) ->
context.service_response = result

# Serialize result dict to an HTTPResponse and return it
return self.serializer.serialize_to_response(result, operation)
return self.serializer.serialize_to_response(result, operation, context.request.headers)

def on_service_exception(
self, context: RequestContext, exception: ServiceException
Expand All @@ -185,7 +185,9 @@ def on_service_exception(
"""
context.service_exception = exception

return self.serializer.serialize_error_to_response(exception, context.operation)
return self.serializer.serialize_error_to_response(
exception, context.operation, context.request.headers
)

def on_not_implemented_error(self, context: RequestContext) -> HttpResponse:
"""
Expand All @@ -211,4 +213,4 @@ def on_not_implemented_error(self, context: RequestContext) -> HttpResponse:
)
context.service_exception = error

return serializer.serialize_error_to_response(error, operation)
return serializer.serialize_error_to_response(error, operation, context.request.headers)
1 change: 1 addition & 0 deletions localstack/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@

# content types / encodings
HEADER_CONTENT_TYPE = "Content-Type"
TEXT_XML = "text/xml"
APPLICATION_AMZ_JSON_1_0 = "application/x-amz-json-1.0"
APPLICATION_AMZ_JSON_1_1 = "application/x-amz-json-1.1"
APPLICATION_AMZ_CBOR_1_1 = "application/x-amz-cbor-1.1"
Expand Down
9 changes: 0 additions & 9 deletions localstack/services/kinesis/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,6 @@

LOG = logging.getLogger(__name__)

# TODO ASF: Check if we need to implement CBOR encoding in the serializer!
# TODO ASF: Set "X-Amzn-Errortype" (HEADER_AMZN_ERROR_TYPE) on responses
# TODO ASF: Rewrite responses
# - Region in content of responses
# - Record rewriting:
# - SDKv2: Transform timestamps to int?
# - Remove double quotes for JSON responses
# - Convert base64 encoded data back to bytes for the cbor encoding


class KinesisBackend(RegionBackend):
def __init__(self):
Expand Down
53 changes: 26 additions & 27 deletions localstack/services/moto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
This module provides tools to call moto using moto and botocore internals without going through the moto HTTP server.
"""
import sys
from functools import lru_cache
from typing import Callable
from functools import lru_cache, partial
from typing import Callable, Optional, Union

from moto.backends import get_backend as get_moto_backend
from moto.core.exceptions import RESTError
Expand All @@ -21,8 +21,11 @@
ServiceRequest,
ServiceResponse,
)
from localstack.aws.client import parse_response, raise_service_exception
from localstack.aws.forwarder import ForwardingFallbackDispatcher, create_aws_request_context
from localstack.aws.forwarder import (
ForwardingFallbackDispatcher,
create_aws_request_context,
dispatch_to_backend,
)
from localstack.aws.skeleton import DispatchTable
from localstack.http import Response

Expand All @@ -31,23 +34,6 @@
user_agent = f"Localstack/{localstack_version} Python/{sys.version.split(' ')[0]}"


def call_moto(context: RequestContext, include_response_metadata=False) -> ServiceResponse:
"""
Call moto with the given request context and receive a parsed ServiceResponse.

:param context: the request context
:param include_response_metadata: whether to include botocore's "ResponseMetadata" attribute
:return: an AWS ServiceResponse (same as a service provider would return)
:raises ServiceException: if moto returned an error response
"""
status, headers, content = dispatch_to_moto(context)
response = Response(content, status, headers)
parsed_response = parse_response(context.operation, response, include_response_metadata)
raise_service_exception(response, parsed_response)

return parsed_response


def call_moto_with_request(
context: RequestContext, service_request: ServiceRequest
) -> ServiceResponse:
Expand All @@ -72,18 +58,19 @@ def call_moto_with_request(
return call_moto(local_context)


def proxy_moto(context: RequestContext, service_request: ServiceRequest = None) -> Response:
def proxy_moto(
context: RequestContext, service_request: ServiceRequest = None
) -> Optional[Union[ServiceResponse]]:
"""
Similar to ``call``, only that ``proxy`` does not parse the HTTP response into a ServiceResponse, but instead
returns directly the HTTP response. This can be useful to pass through moto's response directly to the client.

:param context: the request context
:param service_request: currently not being used, added to satisfy ServiceRequestHandler contract
:return: the Response from moto
:return: the Response from moto or the ServiceResponse dictionary (to be serialized again) in case the Content-Type
of the response does not explicitly match the Accept header of the request
"""
status, headers, content = dispatch_to_moto(context)

return Response(response=content, status=status, headers=headers)
return dispatch_to_backend(context, dispatch_to_moto)


def MotoFallbackDispatcher(provider: object) -> DispatchTable:
Expand Down Expand Up @@ -111,7 +98,8 @@ def dispatch_to_moto(context: RequestContext) -> Response:
dispatch = get_dispatcher(service.service_name, request.path)

try:
return dispatch(request, request.url, request.headers)
status, headers, content = dispatch(request, request.url, request.headers)
return Response(content, status, headers)
except RESTError as e:
raise CommonServiceException(e.error_type, e.message, status_code=e.code) from e

Expand Down Expand Up @@ -171,3 +159,14 @@ def load_moto_routing_table(service: str) -> Map:
url_map.add(Rule(url_path, endpoint=endpoint, strict_slashes=strict_slashes))

return url_map


call_moto = partial(dispatch_to_backend, http_request_dispatcher=dispatch_to_moto)
"""
Call moto with the given request context and receive a parsed ServiceResponse.

:param context: the request context
:param include_response_metadata: whether to include botocore's "ResponseMetadata" attribute
:return: an AWS ServiceResponse (same as a service provider would return)
:raises ServiceException: if moto returned an error response
"""
6 changes: 3 additions & 3 deletions localstack/services/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ def iam():

@aws_provider()
def sts():
from localstack.services.sts.provider import StsAwsApiListener
from localstack.services.sts.provider import StsProvider

listener = StsAwsApiListener()
return Service("sts", listener=listener)
provider = StsProvider()
return Service("sts", listener=AwsApiListener("sts", MotoFallbackDispatcher(provider)))


@aws_provider(api="kinesis", name="legacy")
Expand Down
5 changes: 3 additions & 2 deletions localstack/services/sqs/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ def handle_request(request: Request, region: str) -> Response:
try:
response, operation = try_call_sqs(request, region)
del response["ResponseMetadata"]
return serializer.serialize_to_response(response, operation)
return serializer.serialize_to_response(response, operation, request.headers)
except UnknownOperationException:
return Response("<UnknownOperationException/>", 404)
except CommonServiceException as e:
# use a dummy operation for the serialization to work
op = service.operation_model(service.operation_names[0])
return serializer.serialize_error_to_response(e, op)
return serializer.serialize_error_to_response(e, op, request.headers)
except Exception as e:
LOG.exception("exception")
op = service.operation_model(service.operation_names[0])
Expand All @@ -134,6 +134,7 @@ def handle_request(request: Request, region: str) -> Response:
"InternalError", f"An internal error ocurred: {e}", status_code=500
),
op,
request.headers,
)


Expand Down
40 changes: 1 addition & 39 deletions localstack/services/sts/provider.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
import logging
import re

import xmltodict

from localstack.aws.api import RequestContext
from localstack.aws.api.sts import GetCallerIdentityResponse, StsApi
from localstack.aws.proxy import AwsApiListener
from localstack.constants import APPLICATION_JSON
from localstack.http import Request, Response
from localstack.services.moto import MotoFallbackDispatcher, call_moto
from localstack.services.moto import call_moto
from localstack.services.plugins import ServiceLifecycleHook
from localstack.utils.strings import to_str
from localstack.utils.time import parse_timestamp
from localstack.utils.xml import strip_xmlns

LOG = logging.getLogger(__name__)

Expand All @@ -23,32 +14,3 @@ def get_caller_identity(self, context: RequestContext) -> GetCallerIdentityRespo
if "user/moto" in result["Arn"] and "sts" in result["Arn"]:
result["Arn"] = f"arn:aws:iam::{result['Account']}:root"
return result


class StsAwsApiListener(AwsApiListener):
def __init__(self):
self.provider = StsProvider()
super().__init__("sts", MotoFallbackDispatcher(self.provider))

def request(self, request: Request) -> Response:
response = super().request(request)

if request.headers.get("Accept") == APPLICATION_JSON:
# convert "Expiration" to int for JSON response format (tested against AWS)
# TODO: introduce a proper/generic approach that works across arbitrary date fields in JSON

def _replace(match):
timestamp = parse_timestamp(match.group(1).strip())
return f"<Expiration>{int(timestamp.timestamp())}</Expiration>"

def _replace_response_content(_pattern, _replacement):
content = to_str(response.data or "")
data = re.sub(_pattern, _replacement, content)
content = xmltodict.parse(data)
stripped_content = strip_xmlns(content)
response.set_json(stripped_content)

pattern = r"<Expiration>([^<]+)</Expiration>"
_replace_response_content(pattern, _replace)

return response
6 changes: 3 additions & 3 deletions tests/integration/apigateway_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def create_rest_resource_method(apigateway_client, **kwargs):
< 6D4E br>
def create_rest_authorizer(apigateway_client, **kwargs):
response = apigateway_client.create_authorizer(**kwargs)
assert_response_is_200(response)
assert_response_is_201(response)
return response.get("id"), response.get("type")


Expand Down Expand Up @@ -127,12 +127,12 @@ def create_rest_api_integration_response(apigateway_client, **kwargs):

def create_domain_name(apigateway_client, **kwargs):
response = apigateway_client.create_domain_name(**kwargs)
assert_response_is_200(response)
assert_response_is_201(response)


def create_base_path_mapping(apigateway_client, **kwargs):
response = apigateway_client.create_base_path_mapping(**kwargs)
assert_response_is_200(response)
assert_response_is_201(response)
return response.get("basePath"), response.get("stage")


Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_apigateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ def test_api_gateway_handle_domain_name(self):
domain_name = f"{short_uid()}.example.com"
apigw_client = aws_stack.create_external_boto_client("apigateway")
rs = apigw_client.create_domain_name(domainName=domain_name)
assert 200 == rs["ResponseMetadata"]["HTTPStatusCode"]
assert 201 == rs["ResponseMetadata"]["HTTPStatusCode"]
rs = apigw_client.get_domain_name(domainName=domain_name)
assert 200 == rs["ResponseMetadata"]["HTTPStatusCode"]
assert domain_name == rs["domainName"]
Expand Down
6 changes: 6 additions & 0 deletions tests/integration/test_apigateway_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def test_integration_response(apigateway_client):
# this is hard to match against, so remove it
response["ResponseMetadata"].pop("HTTPHeaders", None)
response["ResponseMetadata"].pop("RetryAttempts", None)
response["ResponseMetadata"].pop("RequestId", None)
assert response == (
{
"statusCode": "200",
Expand All @@ -274,6 +275,7 @@ def test_integration_response(apigateway_client):
# this is hard to match against, so remove it
response["ResponseMetadata"].pop("HTTPHeaders", None)
response["ResponseMetadata"].pop("RetryAttempts", None)
response["ResponseMetadata"].pop("RequestId", None)
assert response == (
{
"statusCode": "200",
Expand All @@ -287,6 +289,7 @@ def test_integration_response(apigateway_client):
# this is hard to match against, so remove it
response["ResponseMetadata"].pop("HTTPHeaders", None)
response["ResponseMetadata"].pop("RetryAttempts", None)
response["ResponseMetadata"].pop("RequestId", None)
assert response["methodIntegration"]["integrationResponses"] == (
{
"200": {
Expand Down Expand Up @@ -338,6 +341,7 @@ def test_integration_response(apigateway_client):
# this is hard to match against, so remove it
response["ResponseMetadata"].pop("HTTPHeaders", None)
response["ResponseMetadata"].pop("RetryAttempts", None)
response["ResponseMetadata"].pop("RequestId", None)
assert response == (
{
"statusCode": "200",
Expand All @@ -354,6 +358,7 @@ def test_integration_response(apigateway_client):
# this is hard to match against, so remove it
response["ResponseMetadata"].pop("HTTPHeaders", None)
response["ResponseMetadata"].pop("RetryAttempts", None)
response["ResponseMetadata"].pop("RequestId", None)
assert response == (
{
"statusCode": "200",
Expand Down Expand Up @@ -402,6 +407,7 @@ def test_put_integration_response_with_response_template(apigateway_client):
# this is hard to match against, so remove it
response["ResponseMetadata"].pop("HTTPHeaders", None)
response["ResponseMetadata"].pop("RetryAttempts", None)
response["ResponseMetadata"].pop("RequestId", None)
assert response == {
"statusCode": "200",
"selectionPattern": "foobar",
Expand Down
Loading
0