8000 implement new runtime framework for bootstrapping localstack by thrau · Pull Request #10942 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

implement new runtime framework for bootstrapping localstack #10942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions localstack-core/localstack/aws/components.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from functools import cached_property

from rolo.gateway import Gateway

from localstack.aws.app import LocalstackAwsGateway
from localstack.runtime.components import BaseComponents


class AwsComponents(BaseComponents):
"""
Runtime components specific to the AWS emulator.
"""

name = "aws"

@cached_property
def gateway(self) -> Gateway:
# FIXME: the ServiceManager should be reworked to be more generic, and then become part of the
# components
from localstack.services.plugins import SERVICE_PLUGINS

return LocalstackAwsGateway(SERVICE_PLUGINS)
4 changes: 4 additions & 0 deletions localstack-core/localstack/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,9 @@ def populate_edge_configuration(
# the gateway server that should be used (supported: hypercorn, twisted dev: werkzeug)
GATEWAY_SERVER = os.environ.get("GATEWAY_SERVER", "").strip() or "twisted"

# whether to use the legacy runtime (``localstack.service.infra``)
LEGACY_RUNTIME = is_env_true("LEGACY_RUNTIME")

# IP of the docker bridge used to enable access between containers
DOCKER_BRIDGE_IP = os.environ.get("DOCKER_BRIDGE_IP", "").strip()

Expand Down Expand Up @@ -1188,6 +1191,7 @@ def use_custom_dns():
"LAMBDA_SQS_EVENT_SOURCE_MAPPING_INTERVAL",
"LEGACY_DOCKER_CLIENT",
"LEGACY_SNS_GCM_PUBLISHING",
"LEGACY_RUNTIME",
"LOCALSTACK_API_KEY",
"LOCALSTACK_AUTH_TOKEN",
"LOCALSTACK_HOST",
Expand Down
30 changes: 14 additions & 16 deletions localstack-core/localstack/http/duplex_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from asyncio.selector_events import BaseSelectorEventLoop

from localstack.utils.asyncio import run_sync
from localstack.utils.objects import singleton_factory
from localstack.utils.patch import Patch, patch

# set up logger
LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,28 +54,24 @@ def peek_ssl_header():
return False


@singleton_factory
def enable_duplex_socket():
"""
Function which replaces the ssl.SSLContext.sslsocket_class with the DuplexSocket, enabling serving both,
HTTP and HTTPS connections on a single port.
"""

# set globally defined SSL socket implementation class
ssl.SSLContext.sslsocket_class = DuplexSocket
Patch(ssl.SSLContext, "sslsocket_class", DuplexSocket).apply()

async def _accept_connection2(self, protocol_factory, conn, extra, sslcontext, *args, **kwargs):
is_ssl_socket = await run_sync(DuplexSocket.is_ssl_socket, conn)
if is_ssl_socket is False:
sslcontext = None
result = await _accept_connection2_orig(
self, protocol_factory, conn, extra, sslcontext, *args, **kwargs
)
return result
if hasattr(BaseSelectorEventLoop, "_accept_connection2"):

# patch asyncio server to accept SSL and non-SSL traffic over same port
if hasattr(BaseSelectorEventLoop, "_accept_connection2") and not hasattr(
BaseSelectorEventLoop, "_ls_patched"
):
_accept_connection2_orig = BaseSelectorEventLoop._accept_connection2
BaseSelectorEventLoop._accept_connection2 = _accept_connection2
BaseSelectorEventLoop._ls_patched = True
@patch(BaseSelectorEventLoop._accept_connection2)
async def _accept_connection2(
fn, self, protocol_factory, conn, extra, sslcontext, *args, **kwargs
):
is_ssl_socket = await run_sync(DuplexSocket.is_ssl_socket, conn)
if is_ssl_socket is False:
sslcontext = None
result = await fn(self, protocol_factory, conn, extra, sslcontext, *args, **kwargs)
return result
5 changes: 5 additions & 0 deletions localstack-core/localstack/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .current import get_current_runtime

__all__ = [
"get_current_runtime",
]
56 changes: 56 additions & 0 deletions localstack-core/localstack/runtime/components.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
This package contains code to define and manage the core components that make up a ``LocalstackRuntime``.
These include:
- A ``Gateway``
- A ``RuntimeServer`` as the main control loop
- A ``ServiceManager`` to manage service plugins (TODO: once the Service concept has been generalized)
- ... ?

Components can then be accessed via ``get_current_runtime()``. 6D40
"""

from functools import cached_property

from plux import Plugin, PluginManager
from rolo.gateway import Gateway

from .server.core import RuntimeServer, RuntimeServerPlugin


class Components(Plugin):
"""
A Plugin that allows a specific localstack runtime implementation (aws, snowflake, ...) to expose its
own component factory.
"""

namespace = "localstack.runtime.components"

@cached_property
def gateway(self) -> Gateway:
raise NotImplementedError

@cached_property
def runtime_server(self) -> RuntimeServer:
raise NotImplementedError


class BaseComponents(Components):
"""
A component base, which includes a ``RuntimeServer`` created from the config variable, and a default
ServicePluginManager as ServiceManager.
"""

@cached_property
def runtime_server(self) -> RuntimeServer:
from localstack import config

# TODO: rename to RUNTIME_SERVER
server_type = config.GATEWAY_SERVER
Comment on lines +47 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: That comment should probably go into config.py


plugins = PluginManager(RuntimeServerPlugin.namespace)

if not plugins.exists(server_type):
raise ValueError(f"Unknown gateway server type {server_type}")

plugins.load(server_type)
return plugins.get_container(server_type).load_value
40 changes: 40 additions & 0 deletions localstack-core/localstack/runtime/current.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""This package gives access to the singleton ``LocalstackRuntime`` instance. This is the only global state
that should exist within localstack, which contains the singleton ``LocalstackRuntime`` which is currently
running."""

import threading
import typing

if typing.TYPE_CHECKING:
# make sure we don't have any imports here at runtime, so it can be imported anywhere without conflicts
from .runtime import LocalstackRuntime

_runtime: typing.Optional["LocalstackRuntime"] = None
"""The singleton LocalStack Runtime"""
_runtime_lock = threading.RLock()


def get_current_runtime() -> "LocalstackRuntime":
with _runtime_lock:
if not _runtime:
raise ValueError("LocalStack runtime has not yet been set")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure a ValueError is necessarily the clearest Exception here.

What's the reason for not just returning an optional/None here? Downstream typing having to handle the None?

return _runtime


def set_current_runtime(runtime: "LocalstackRuntime"):
with _runtime_lock:
global _runtime
_runtime = runtime


def initialize_runtime() -> "LocalstackRuntime":
from localstack.runtime import runtime

with _runtime_lock:
try:
return get_current_runtime()
except ValueError:
pass
rt = runtime.create_from_environment()
set_current_runtime(rt)
return rt
1 change: 1 addition & 0 deletions localstack-core/localstack/runtime/events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import threading

# TODO: deprecate and replace access with ``get_current_runtime().starting``, ...
infra_starting = threading.Event()
infra_ready = threading.Event()
infra_stopping = threading.Event()
Expand Down
14 changes: 14 additions & 0 deletions localstack-core/localstack/runtime/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

# plugin namespace constants
HOOKS_CONFIGURE_LOCALSTACK_CONTAINER = "localstack.hooks.configure_localstack_container"
HOOKS_ON_RUNTIME_CREATE = "localstack.hooks.on_runtime_create"
HOOKS_ON_INFRA_READY = "localstack.hooks.on_infra_ready"
HOOKS_ON_INFRA_START = "localstack.hooks.on_infra_start"
HOOKS_ON_PRO_INFRA_START = "localstack.hooks.on_pro_infra_start"
Expand Down Expand Up @@ -79,12 +80,25 @@ def __repr__(self):
on_infra_start = hook_spec(HOOKS_ON_INFRA_START)
"""Hooks that are executed right before starting the LocalStack infrastructure."""

on_runtime_create = hook_spec(HOOKS_ON_RUNTIME_CREATE)
"""Hooks that are executed right before the LocalstackRuntime is created. These can be used to apply
patches or otherwise configure the interpreter before any other code is imported."""

on_runtime_start = on_infra_start
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's start deprecating this asap so that we can remove it with 4.0 at least

"""Alias for on_infra_start. TODO: switch and deprecated `infra` naming."""

on_pro_infra_start = hook_spec(HOOKS_ON_PRO_INFRA_START)
"""Hooks that are executed after on_infra_start hooks, and only if LocalStack pro has been activated."""

on_infra_ready = hook_spec(HOOKS_ON_INFRA_READY)
"""Hooks that are execute after all startup hooks have been executed, and the LocalStack infrastructure has become
available."""

on_runtime_ready = on_infra_ready
"""Alias for on_infra_ready. TODO: switch and deprecated `infra` naming."""

on_infra_shutdown = hook_spec(HOOKS_ON_INFRA_SHUTDOWN)
"""Hooks that are execute when localstack shuts down."""

on_runtime_shutdown = on_infra_shutdown
"""Alias for on_infra_shutdown. TODO: switch and deprecated `infra` naming."""
40 changes: 40 additions & 0 deletions localstack-core/localstack/runtime/legacy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Adapter code for the legacy runtime to make sure the new runtime is compatible with the old one,
and at the same time doesn't need ``localstack.services.infra``, which imports AWS-specific modules."""

import logging
import os
import signal
import threading

from localstack.runtime import events
from localstack.utils import objects

LOG = logging.getLogger(__name__)

# event flag indicating the infrastructure has been started and that the ready marker has been printed
# TODO: deprecated, use events.infra_ready
INFRA_READY = events.infra_ready

# event flag indicating that the infrastructure has been shut down
SHUTDOWN_INFRA = threading.Event()
Comment on lines +14 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think this is used anywhere (yet), could this maybe be removed?
Actually, I don't think there's even a usage across all repos in this org:
https://github.com/search?q=org%3Alocalstack+%22INFRA_READY%22&type=code
https://github.com/search?q=org%3Alocalstack+%22SHUTDOWN_INFRA%22&type=code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, I'll remove it as part of the cleanup I'm a second PR.


# can be set
EXIT_CODE: objects.Value[int] = objects.Value(0)


def signal_supervisor_restart():
if pid := os.environ.get("SUPERVISOR_PID"):
os.kill(int(pid), signal.SIGUSR1)
else:
LOG.warning("could not signal supervisor to restart localstack")


def exit_infra(code: int):
"""
Triggers an orderly shutdown of the localstack infrastructure and sets the code the main process should
exit with to a specific value.

:param code: the exit code the main process should return with
"""
EXIT_CODE.set(code)
SHUTDOWN_INFRA.set()
89 changes: 88 additions & 1 deletion localstack-core/localstack/runtime/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

import signal
import sys
import traceback

from localstack import config, constants
from localstack.runtime.exceptions import LocalstackExit


def main():
def main_legacy():
from localstack.services import infra

# signal handler to make sure SIGTERM properly shuts down localstack
Expand All @@ -25,5 +27,90 @@ def _terminate_localstack(sig: int, frame):
sys.exit(infra.EXIT_CODE.get())


def print_runtime_information(in_docker: bool = False):
# FIXME: this is legacy code from the old CLI, reconcile with new CLI and runtime output
from localstack.utils.container_networking import get_main_container_name
from localstack.utils.container_utils.container_client import ContainerException
from localstack.utils.docker_utils import DOCKER_CLIENT

print()
print(f"LocalStack version: {constants.VERSION}")
if in_docker:
try:
container_name = get_main_container_name()
print("LocalStack Docker container name: %s" % container_name)
inspect_result = DOCKER_CLIENT.inspect_container(container_name)
container_id = inspect_result["Id"]
print("LocalStack Docker container id: %s" % container_id[:12])
image_sha = inspect_result["Image"]
print("LocalStack Docker image sha: %s" % image_sha)
Comment on lines +45 to +46
1241 Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be adapted for #10951 but it's not critical and can be done in a follow-up, just shouldn't forget about it 👍

except ContainerException:
print(
"LocalStack Docker container info: Failed to inspect the LocalStack docker container. "
"This is likely because the docker socket was not mounted into the container. "
"Without access to the docker socket, LocalStack will not function properly. Please "
"consult the LocalStack documentation on how to correctly start up LocalStack. ",
end="",
)
if config.DEBUG:
print("Docker debug information:")
traceback.print_exc()
else:
print(
"You can run LocalStack with `DEBUG=1` to get more information about the error."
)

if config.LOCALSTACK_BUILD_DATE:
print("LocalStack build date: %s" % config.LOCALSTACK_BUILD_DATE)

if config.LOCALSTACK_BUILD_GIT_HASH:
print("LocalStack build git hash: %s" % config.LOCALSTACK_BUILD_GIT_HASH)

print()


def main_v2():
from localstack.logging.setup import setup_logging_from_config
from localstack.runtime import current

try:
setup_logging_from_config()
runtime = current.initialize_runtime()
except Exception as e:
sys.stdout.write(f"ERROR: The LocalStack Runtime could not be initialized: {e}\n")
sys.stdout.flush()
raise

# TODO: where should this go?
print_runtime_information()

# signal handler to make sure SIGTERM properly shuts down localstack
def _terminate_localstack(sig: int, frame):
sys.stdout.write(f"Localstack runtime received signal {sig}\n")
sys.stdout.flush()
runtime.exit(0)

signal.signal(signal.SIGINT, _terminate_localstack)
signal.signal(signal.SIGTERM, _terminate_localstack)

try:
runtime.run()
except LocalstackExit as e:
sys.exit(e.code)
except Exception as e:
sys.stdout.write(f"ERROR: the LocalStack runtime exited unexpectedly: {e}\n")
sys.stdout.flush()
raise

sys.exit(runtime.exit_code)


def main():
if config.LEGACY_RUNTIME:
main_legacy()
else:
main_v2()


if __name__ == "__main__":
main()
Loading
Loading
0