From 84076970775c929ca2f92b812b1e6581e89f8c16 Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Mon, 3 Jun 2024 04:23:46 +0200 Subject: [PATCH 1/5] implement new runtime framework to bootstrap localstack --- localstack-core/localstack/aws/components.py | 22 ++ localstack-core/localstack/config.py | 4 + .../localstack/runtime/__init__.py | 5 + .../localstack/runtime/components.py | 56 +++++ localstack-core/localstack/runtime/current.py | 40 ++++ localstack-core/localstack/runtime/events.py | 1 + localstack-core/localstack/runtime/hooks.py | 9 + localstack-core/localstack/runtime/main.py | 53 ++++- localstack-core/localstack/runtime/patches.py | 45 ++++ localstack-core/localstack/runtime/runtime.py | 211 ++++++++++++++++++ .../localstack/runtime/server/__init__.py | 5 + .../localstack/runtime/server/core.py | 51 +++++ .../localstack/runtime/server/hypercorn.py | 68 ++++++ .../localstack/runtime/server/plugins.py | 19 ++ .../localstack/runtime/server/twisted.py | 52 +++++ .../testing/pytest/in_memory_localstack.py | 33 ++- 16 files changed, 655 insertions(+), 19 deletions(-) create mode 100644 localstack-core/localstack/aws/components.py create mode 100644 localstack-core/localstack/runtime/components.py create mode 100644 localstack-core/localstack/runtime/current.py create mode 100644 localstack-core/localstack/runtime/patches.py create mode 100644 localstack-core/localstack/runtime/runtime.py create mode 100644 localstack-core/localstack/runtime/server/__init__.py create mode 100644 localstack-core/localstack/runtime/server/core.py create mode 100644 localstack-core/localstack/runtime/server/hypercorn.py create mode 100644 localstack-core/localstack/runtime/server/plugins.py create mode 100644 localstack-core/localstack/runtime/server/twisted.py diff --git a/localstack-core/localstack/aws/components.py b/localstack-core/localstack/aws/components.py new file mode 100644 index 0000000000000..82b203741de60 --- /dev/null +++ b/localstack-core/localstack/aws/components.py @@ -0,0 +1,22 @@ +from functools import cached_property + +from rolo.gateway import Gateway + +from localstack.aws.app import LocalstackAwsGateway +from localstack.runtime.components import BaseComponents + + +class AwsComponents(BaseComponents): + """ + Runtime components specific to the AWS emulator. + """ + + name = "aws" + + @cached_property + def gateway(self) -> Gateway: + # FIXME: the ServiceManager should be reworked to be more generic, and then become part of the + # components + from localstack.services.plugins import SERVICE_PLUGINS + + return LocalstackAwsGateway(SERVICE_PLUGINS) diff --git a/localstack-core/localstack/config.py b/localstack-core/localstack/config.py index 20c114067b8dd..20c9d9797c687 100644 --- a/localstack-core/localstack/config.py +++ b/localstack-core/localstack/config.py @@ -660,6 +660,9 @@ def populate_edge_configuration( # the gateway server that should be used (supported: hypercorn, twisted dev: werkzeug) GATEWAY_SERVER = os.environ.get("GATEWAY_SERVER", "").strip() or "twisted" +# whether to use the legacy runtime (``localstack.service.infra``) +LEGACY_RUNTIME = is_env_true("LEGACY_RUNTIME") + # IP of the docker bridge used to enable access between containers DOCKER_BRIDGE_IP = os.environ.get("DOCKER_BRIDGE_IP", "").strip() @@ -1188,6 +1191,7 @@ def use_custom_dns(): "LAMBDA_SQS_EVENT_SOURCE_MAPPING_INTERVAL", "LEGACY_DOCKER_CLIENT", "LEGACY_SNS_GCM_PUBLISHING", + "LEGACY_RUNTIME", "LOCALSTACK_API_KEY", "LOCALSTACK_AUTH_TOKEN", "LOCALSTACK_HOST", diff --git a/localstack-core/localstack/runtime/__init__.py b/localstack-core/localstack/runtime/__init__.py index e69de29bb2d1d..99044a674080a 100644 --- a/localstack-core/localstack/runtime/__init__.py +++ b/localstack-core/localstack/runtime/__init__.py @@ -0,0 +1,5 @@ +from .current import get_current_runtime + +__all__ = [ + "get_current_runtime", +] diff --git a/localstack-core/localstack/runtime/components.py b/localstack-core/localstack/runtime/components.py new file mode 100644 index 0000000000000..db9662b2e030b --- /dev/null +++ b/localstack-core/localstack/runtime/components.py @@ -0,0 +1,56 @@ +""" +This package contains code to define and manage the core components that make up a ``LocalstackRuntime``. +These include: + - A ``Gateway`` + - A ``RuntimeServer`` as the main control loop + - A ``ServiceManager`` to manage service plugins (TODO: once the Service concept has been generalized) + - ... ? + +Components can then be accessed via ``get_current_runtime()``. +""" + +from functools import cached_property + +from plux import Plugin, PluginManager +from rolo.gateway import Gateway + +from .server.core import RuntimeServer, RuntimeServerPlugin + + +class Components(Plugin): + """ + A Plugin that allows a specific localstack runtime implementation (aws, snowflake, ...) to expose its + own component factory. + """ + + namespace = "localstack.runtime.components" + + @cached_property + def gateway(self) -> Gateway: + raise NotImplementedError + + @cached_property + def runtime_server(self) -> RuntimeServer: + raise NotImplementedError + + +class BaseComponents(Components): + """ + A component base, which includes a ``RuntimeServer`` created from the config variable, and a default + ServicePluginManager as ServiceManager. + """ + + @cached_property + def runtime_server(self) -> RuntimeServer: + from localstack import config + + # TODO: rename to RUNTIME_SERVER + server_type = config.GATEWAY_SERVER + + plugins = PluginManager(RuntimeServerPlugin.namespace) + + if not plugins.exists(server_type): + raise ValueError(f"Unknown gateway server type {server_type}") + + plugins.load(server_type) + return plugins.get_container(server_type).load_value diff --git a/localstack-core/localstack/runtime/current.py b/localstack-core/localstack/runtime/current.py new file mode 100644 index 0000000000000..fa033c58844fa --- /dev/null +++ b/localstack-core/localstack/runtime/current.py @@ -0,0 +1,40 @@ +"""This package gives access to the singleton ``LocalstackRuntime`` instance. This is the only global state +that should exist within localstack, which contains the singleton ``LocalstackRuntime`` which is currently +running.""" + +import threading +import typing + +if typing.TYPE_CHECKING: + # make sure we don't have any imports here at runtime, so it can be imported anywhere without conflicts + from .runtime import LocalstackRuntime + +_runtime: typing.Optional["LocalstackRuntime"] = None +"""The singleton LocalStack Runtime""" +_runtime_lock = threading.RLock() + + +def get_current_runtime() -> "LocalstackRuntime": + with _runtime_lock: + if not _runtime: + raise ValueError("LocalStack runtime has not yet been set") + return _runtime + + +def set_current_runtime(runtime: "LocalstackRuntime"): + with _runtime_lock: + global _runtime + _runtime = runtime + + +def initialize_runtime() -> "LocalstackRuntime": + from localstack.runtime import runtime + + with _runtime_lock: + try: + return get_current_runtime() + except ValueError: + pass + rt = runtime.create_from_environment() + set_current_runtime(rt) + return rt diff --git a/localstack-core/localstack/runtime/events.py b/localstack-core/localstack/runtime/events.py index 8256ae78a373c..2382fab6a47a2 100644 --- a/localstack-core/localstack/runtime/events.py +++ b/localstack-core/localstack/runtime/events.py @@ -1,5 +1,6 @@ import threading +# TODO: deprecate and replace access with ``get_current_runtime().starting``, ... infra_starting = threading.Event() infra_ready = threading.Event() infra_stopping = threading.Event() diff --git a/localstack-core/localstack/runtime/hooks.py b/localstack-core/localstack/runtime/hooks.py index 9b5a8b575bb04..1449c32312110 100644 --- a/localstack-core/localstack/runtime/hooks.py +++ b/localstack-core/localstack/runtime/hooks.py @@ -79,6 +79,9 @@ def __repr__(self): on_infra_start = hook_spec(HOOKS_ON_INFRA_START) """Hooks that are executed right before starting the LocalStack infrastructure.""" +on_runtime_start = on_infra_start +"""Alias for on_infra_start. TODO: switch and deprecated `infra` naming.""" + on_pro_infra_start = hook_spec(HOOKS_ON_PRO_INFRA_START) """Hooks that are executed after on_infra_start hooks, and only if LocalStack pro has been activated.""" @@ -86,5 +89,11 @@ def __repr__(self): """Hooks that are execute after all startup hooks have been executed, and the LocalStack infrastructure has become available.""" +on_runtime_ready = on_infra_ready +"""Alias for on_infra_ready. TODO: switch and deprecated `infra` naming.""" + on_infra_shutdown = hook_spec(HOOKS_ON_INFRA_SHUTDOWN) """Hooks that are execute when localstack shuts down.""" + +on_runtime_shutdown = on_infra_shutdown +"""Alias for on_infra_shutdown. TODO: switch and deprecated `infra` naming.""" diff --git a/localstack-core/localstack/runtime/main.py b/localstack-core/localstack/runtime/main.py index 45e6a604a4191..cfc509e9a331a 100644 --- a/localstack-core/localstack/runtime/main.py +++ b/localstack-core/localstack/runtime/main.py @@ -4,10 +4,11 @@ import signal import sys +from localstack import config from localstack.runtime.exceptions import LocalstackExit -def main(): +def main_legacy(): from localstack.services import infra # signal handler to make sure SIGTERM properly shuts down localstack @@ -25,5 +26,55 @@ def _terminate_localstack(sig: int, frame): sys.exit(infra.EXIT_CODE.get()) +def print_runtime_information(): + # FIXME: refactor legacy code + from localstack.services.infra import print_runtime_information + + print_runtime_information() + + +def main_v2(): + from localstack.logging.setup import setup_logging_from_config + from localstack.runtime import current + + try: + setup_logging_from_config() + runtime = current.initialize_runtime() + except Exception as e: + sys.stdout.write(f"ERROR: The LocalStack Runtime could not be initialized: {e}\n") + sys.stdout.flush() + raise + + # TODO: where should this go? + print_runtime_information() + + # signal handler to make sure SIGTERM properly shuts down localstack + def _terminate_localstack(sig: int, frame): + sys.stdout.write(f"Localstack runtime received signal {sig}\n") + sys.stdout.flush() + runtime.exit(0) + + signal.signal(signal.SIGINT, _terminate_localstack) + signal.signal(signal.SIGTERM, _terminate_localstack) + + try: + runtime.run() + except LocalstackExit as e: + sys.exit(e.code) + except Exception as e: + sys.stdout.write(f"ERROR: the LocalStack runtime exited unexpectedly: {e}\n") + sys.stdout.flush() + raise + + sys.exit(runtime.exit_code) + + +def main(): + if config.LEGACY_RUNTIME: + main_legacy() + else: + main_v2() + + if __name__ == "__main__": main() diff --git a/localstack-core/localstack/runtime/patches.py b/localstack-core/localstack/runtime/patches.py new file mode 100644 index 0000000000000..af4da4e5c2f84 --- /dev/null +++ b/localstack-core/localstack/runtime/patches.py @@ -0,0 +1,45 @@ +""" +System-wide patches that should be applied. +""" + +from localstack.runtime import hooks +from localstack.utils.patch import patch + + +def patch_thread_pool(): + """ + This patch to ThreadPoolExecutor makes the executor remove the threads it creates from the global + ``_thread_queues`` of ``concurrent.futures.thread``, which joins all created threads at python exit and + will block interpreter shutdown if any threads are still running, even if they are daemon threads. + """ + + import concurrent.futures.thread + + @patch(concurrent.futures.thread.ThreadPoolExecutor._adjust_thread_count) + def _adjust_thread_count(fn, self) -> None: + fn(self) + + for t in self._threads: + if not t.daemon: + continue + try: + del concurrent.futures.thread._threads_queues[t] + except KeyError: + pass + + +_applied = False + + +@hooks.on_runtime_start(priority=100) # apply patches earlier than other hooks +def apply_runtime_patches(): + # FIXME: find a better way to apply system-wide patches + global _applied + if _applied: + return + _applied = True + + from localstack.services.infra import patch_urllib3_connection_pool + + patch_urllib3_connection_pool(maxsize=128) + patch_thread_pool() diff --git a/localstack-core/localstack/runtime/runtime.py b/localstack-core/localstack/runtime/runtime.py new file mode 100644 index 0000000000000..8281fd556f76a --- /dev/null +++ b/localstack-core/localstack/runtime/runtime.py @@ -0,0 +1,211 @@ +import logging +import os +import threading + +from plux import PluginManager + +from localstack import config, constants +from localstack.runtime import events, hooks +from localstack.utils import files, net, sync, threads + +from .components import Components + +LOG = logging.getLogger(__name__) + + +class LocalstackRuntime: + """ + The localstack runtime. It has the following responsibilities: + + - Manage localstack filesystem directories + - Execute runtime lifecycle hook plugins from ``localstack.runtime.hooks``. + - Manage the localstack SSL certificate + - Serve the gateway (It uses a ``RuntimeServer`` to serve a ``Gateway`` instance coming from the + ``Components`` factory.) + """ + + def __init__(self, components: Components): + self.components = components + + # at some point, far far in the future, we should no longer access a global config object, but rather + # the one from the current runtime. This will allow us to truly instantiate multiple localstack + # runtime instances in one process, which can be useful for many different things. but there is too + # much global state at the moment think about this seriously. however, this assignment here can + # serve as a reminder to avoid global state in general. + self.config = config + + # TODO: move away from `localstack.runtime.events` and instantiate new `threading.Event()` here + # instead + self.starting = events.infra_starting + self.ready = events.infra_ready + self.stopping = events.infra_stopping + self.stopped = events.infra_stopped + + def run(self): + """ + Start the main control loop of the runtime and block the thread. This will initialize the + filesystem, run all lifecycle hooks, initialize the gateway server, and then serve the + ``RuntimeServer`` until ``shutdown()`` is called. + """ + # indicates to the environment that this is an "infra process" (old terminology referring to the + # localstack runtime). this is necessary for disabling certain hooks that may run in the context of + # the CLI host mode. TODO: should not be needed over time. + os.environ[constants.LOCALSTACK_INFRA_PROCESS] = "1" + + self._init_filesystem() + self._on_starting() + self._init_gateway_server() + + # since we are blocking the main thread with the runtime server, we need to run the monitor that + # prints the ready marker asynchronously. this is different from how the runtime was started in the + # past, where the server was running in a thread. + # TODO: ideally we pass down a `shutdown` event that can be waited on so we can cancel the thread + # if the runtime shuts down beforehand + threading.Thread(target=self._run_ready_monitor, daemon=True).start() + # FIXME: legacy compatibility code + threading.Thread(target=self._run_shutdown_monitor, daemon=True).start() + + # run the main control loop of the server and block execution + try: + self.components.runtime_server.run() + finally: + self._on_return() + + def exit(self, code: int = 0): + """ + Sets the exit code and runs ``shutdown``. It does not actually call ``sys.exit``, this is for the + caller to do. + + :param code: the exit code to be set + """ + self.exit_code = code + self.shutdown() + + def shutdown(self): + """ + Initiates an orderly shutdown of the runtime by stopping the main control loop of the + ``RuntimeServer``. The shutdown hooks are actually called by the main control loop (in the main + thread) after it returns. + """ + if self.stopping.is_set(): + return + self.stopping.set() + self.components.runtime_server.shutdown() + + def is_ready(self) -> bool: + return self.ready.is_set() + + def _init_filesystem(self): + self._clear_tmp_directory() + self.config.dirs.mkdirs() + + def _init_gateway_server(self): + from localstack.utils.ssl import create_ssl_cert, install_predefined_cert_if_available + + install_predefined_cert_if_available() + serial_number = self.config.GATEWAY_LISTEN[0].port + _, cert_file_name, key_file_name = create_ssl_cert(serial_number=serial_number) + ssl_creds = (cert_file_name, key_file_name) + + self.components.runtime_server.register( + self.components.gateway, self.config.GATEWAY_LISTEN, ssl_creds + ) + + def _on_starting(self): + self.starting.set() + hooks.on_runtime_start.run() + + def _on_ready(self): + hooks.on_runtime_ready.run() + print(constants.READY_MARKER_OUTPUT, flush=True) + self.ready.set() + + def _on_return(self): + LOG.debug("[shutdown] Running shutdown hooks ...") + hooks.on_runtime_shutdown.run() + LOG.debug("[shutdown] Cleaning up resources ...") + self._cleanup_resources() + self.stopped.set() + LOG.debug("[shutdown] Completed, bye!") + + def _run_ready_monitor(self): + self._wait_for_gateway() + self._on_ready() + + def _wait_for_gateway(self): + host_and_port = self.config.GATEWAY_LISTEN[0] + + if not sync.poll_condition( + lambda: net.is_port_open(host_and_port.port), timeout=15, interval=0.3 + ): + if LOG.isEnabledFor(logging.DEBUG): + # make another call with quiet=False to print detailed error logs + net.is_port_open(host_and_port.port, quiet=False) + raise TimeoutError(f"gave up waiting for gateway server to start on {host_and_port}") + + def _clear_tmp_directory(self): + if self.config.CLEAR_TMP_FOLDER: + # try to clear temp dir on startup + try: + files.rm_rf(self.config.dirs.tmp) + except PermissionError as e: + LOG.error( + "unable to delete temp folder %s: %s, please delete manually or you will " + "keep seeing these errors.", + self.config.dirs.tmp, + e, + ) + + def _cleanup_resources(self): + threads.cleanup_threads_and_processes() + self._clear_tmp_directory() + + # more legacy compatibility code + @property + def exit_code(self): + # FIXME: legacy compatibility code + from localstack.services.infra import EXIT_CODE + + return EXIT_CODE.get() + + @exit_code.setter + def exit_code(self, value): + # FIXME: legacy compatibility code + from localstack.services.infra import EXIT_CODE + + EXIT_CODE.set(value) + + def _run_shutdown_monitor(self): + # FIXME: legacy compatibility code. this can be removed once we replace access to the + # ``SHUTDOWN_INFRA`` event with ``get_current_runtime().shutdown()``. + from localstack.services import infra + + infra.SHUTDOWN_INFRA.wait() + self.shutdown() + + +def create_from_environment() -> LocalstackRuntime: + """ + Creates a new runtime instance from the current environment. It uses a plugin manager to resolve the + necessary components from the ``localstack.runtime.components`` plugin namespace to start the runtime. + + TODO: perhaps we could control which components should be instantiated with a config variable/constant + + :return: a new LocalstackRuntime instance + """ + plugin_manager = PluginManager(Components.namespace) + components = plugin_manager.load_all() + + if not components: + raise ValueError( + f"No component plugins found in namespace {Components.namespace}. Are entry points created " + f"correctly?" + ) + + if len(components) > 1: + LOG.warning( + "There are more than one component plugins, using the first one which is %s", + components[0].name, + ) + + return LocalstackRuntime(components[0]) diff --git a/localstack-core/localstack/runtime/server/__init__.py b/localstack-core/localstack/runtime/server/__init__.py new file mode 100644 index 0000000000000..808f22795246a --- /dev/null +++ b/localstack-core/localstack/runtime/server/__init__.py @@ -0,0 +1,5 @@ +from localstack.runtime.server.core import RuntimeServer + +__all__ = [ + "RuntimeServer", +] diff --git a/localstack-core/localstack/runtime/server/core.py b/localstack-core/localstack/runtime/server/core.py new file mode 100644 index 0000000000000..60644ef80a10a --- /dev/null +++ b/localstack-core/localstack/runtime/server/core.py @@ -0,0 +1,51 @@ +from plux import Plugin +from rolo.gateway import Gateway + +from localstack import config + + +class RuntimeServer: + """ + The main network IO loop of LocalStack. This could be twisted, hypercorn, or any other server + implementation. + """ + + def register( + self, + gateway: Gateway, + listen: list[config.HostAndPort], + ssl_creds: tuple[str, str] | None = None, + ): + """ + Registers the Gateway and the port configuration into the server. Some servers like ``twisted`` or + ``hypercorn`` support multiple calls to ``register``, allowing you to serve several Gateways + through a single event loop. + + :param gateway: the gateway to serve + :param listen: the host and port configuration + :param ssl_creds: ssl credentials (certificate file path, key file path) + """ + raise NotImplementedError + + def run(self): + """ + Run the server and block the thread. + """ + raise NotImplementedError + + def shutdown(self): + """ + Shutdown the running server. + """ + raise NotImplementedError + + +class RuntimeServerPlugin(Plugin): + """ + Plugin to expose RuntimeServer plugins to the + """ + + namespace = "localstack.runtime.server" + + def load(self, *args, **kwargs) -> RuntimeServer: + raise NotImplementedError diff --git a/localstack-core/localstack/runtime/server/hypercorn.py b/localstack-core/localstack/runtime/server/hypercorn.py new file mode 100644 index 0000000000000..ce15ea3d043e0 --- /dev/null +++ b/localstack-core/localstack/runtime/server/hypercorn.py @@ -0,0 +1,68 @@ +import asyncio +import threading + +from hypercorn import Config +from hypercorn.asyncio import serve +from rolo.gateway import Gateway +from rolo.gateway.asgi import AsgiGateway + +from localstack import config +from localstack.logging.setup import setup_hypercorn_logger + +from .core import RuntimeServer + + +class HypercornRuntimeServer(RuntimeServer): + def __init__(self): + self.loop = asyncio.get_event_loop() + + self._close = asyncio.Event() + self._closed = threading.Event() + + self._futures = [] + + def register( + self, + gateway: Gateway, + listen: list[config.HostAndPort], + ssl_creds: tuple[str, str] | None = None, + ): + hypercorn_config = Config() + hypercorn_config.h11_pass_raw_headers = True + hypercorn_config.bind = [str(host_and_port) for host_and_port in listen] + # hypercorn_config.use_reloader = use_reloader + + setup_hypercorn_logger(hypercorn_config) + + if ssl_creds: + cert_file_name, key_file_name = ssl_creds + hypercorn_config.certfile = cert_file_name + hypercorn_config.keyfile = key_file_name + + app = AsgiGateway(gateway, event_loop=self.loop) + + future = asyncio.run_coroutine_threadsafe( + serve(app, hypercorn_config, shutdown_trigger=self._shutdown_trigger), + self.loop, + ) + self._futures.append(future) + + def run(self): + self.loop.run_forever() + + def shutdown(self): + self._close.set() + asyncio.run_coroutine_threadsafe(self._set_closed(), self.loop) + # TODO: correctly wait for all hypercorn serve coroutines to finish + asyncio.run_coroutine_threadsafe(self.loop.shutdown_asyncgens(), self.loop) + self.loop.shutdown_default_executor() + self.loop.stop() + + async def _wait_server_stopped(self): + self._closed.set() + + async def _set_closed(self): + self._close.set() + + async def _shutdown_trigger(self): + await self._close.wait() diff --git a/localstack-core/localstack/runtime/server/plugins.py b/localstack-core/localstack/runtime/server/plugins.py new file mode 100644 index 0000000000000..95746e110375d --- /dev/null +++ b/localstack-core/localstack/runtime/server/plugins.py @@ -0,0 +1,19 @@ +from localstack.runtime.server.core import RuntimeServer, RuntimeServerPlugin + + +class TwistedRuntimeServerPlugin(RuntimeServerPlugin): + name = "twisted" + + def load(self, *args, **kwargs) -> RuntimeServer: + from .twisted import TwistedRuntimeServer + + return TwistedRuntimeServer() + + +class HypercornRuntimeServerPlugin(RuntimeServerPlugin): + name = "hypercorn" + + def load(self, *args, **kwargs) -> RuntimeServer: + from .hypercorn import HypercornRuntimeServer + + return HypercornRuntimeServer() diff --git a/localstack-core/localstack/runtime/server/twisted.py b/localstack-core/localstack/runtime/server/twisted.py new file mode 100644 index 0000000000000..e43350e60b624 --- /dev/null +++ b/localstack-core/localstack/runtime/server/twisted.py @@ -0,0 +1,52 @@ +from rolo.gateway import Gateway +from rolo.serving.twisted import TwistedGateway +from twisted.internet import endpoints, reactor, ssl + +from localstack import config +from localstack.aws.serving.twisted import TLSMultiplexerFactory, stop_thread_pool +from localstack.utils import patch + +from .core import RuntimeServer + + +class TwistedRuntimeServer(RuntimeServer): + def __init__(self): + self.thread_pool = None + + def register( + self, + gateway: Gateway, + listen: list[config.HostAndPort], + ssl_creds: tuple[str, str] | None = None, + ): + # setup twisted webserver Site + site = TwistedGateway(gateway) + + # configure ssl + if ssl_creds: + cert_file_name, key_file_name = ssl_creds + context_factory = ssl.DefaultOpenSSLContextFactory(key_file_name, cert_file_name) + context_factory.getContext().use_certificate_chain_file(cert_file_name) + protocol_factory = TLSMultiplexerFactory(context_factory, False, site) + else: + protocol_factory = site + + # add endpoint for each host/port combination + for host_and_port in listen: + # TODO: interface = host? + endpoint = endpoints.TCP4ServerEndpoint(reactor, host_and_port.port) + endpoint.listen(protocol_factory) + + def run(self): + reactor.suggestThreadPoolSize(config.GATEWAY_WORKER_COUNT) + self.thread_pool = reactor.getThreadPool() + patch.patch(self.thread_pool.stop)(stop_thread_pool) + + # we don't need signal handlers, since all they do is call ``reactor`` stop, which we expect the + # caller to do via ``shutdown``. + return reactor.run(installSignalHandlers=False) + + def shutdown(self): + if self.thread_pool: + self.thread_pool.stop(timeout=10) + reactor.stop() diff --git a/localstack-core/localstack/testing/pytest/in_memory_localstack.py b/localstack-core/localstack/testing/pytest/in_memory_localstack.py index b6ad67a6837aa..2e0f35fa51750 100644 --- a/localstack-core/localstack/testing/pytest/in_memory_localstack.py +++ b/localstack-core/localstack/testing/pytest/in_memory_localstack.py @@ -31,7 +31,6 @@ def pytest_configure(config): if localstack_config.is_collect_metrics_mode(): pytest_plugins = "localstack.testing.pytest.metric_collection" - _started = threading.Event() @@ -54,8 +53,6 @@ def pytest_runtestloop(session: Session): LOG.info("TEST_SKIP_LOCALSTACK_START is set, not starting localstack") return - from localstack.runtime import events - from localstack.services import infra from localstack.utils.common import safe_requests if is_aws_cloud(): @@ -66,11 +63,16 @@ def pytest_runtestloop(session: Session): os.environ[ENV_INTERNAL_TEST_RUN] = "1" safe_requests.verify_ssl = False + from localstack.runtime import current + _started.set() - infra.start_infra(asynchronous=True) - # wait for infra to start (threading event) - if not events.infra_ready.wait(timeout=120): - raise TimeoutError("gave up waiting for infra to be ready") + runtime = current.initialize_runtime() + # start runtime asynchronously + threading.Thread(target=runtime.run).start() + + # wait for runtime to be ready + if not runtime.ready.wait(timeout=120): + raise TimeoutError("gave up waiting for runtime to be ready") @pytest.hookimpl(trylast=True) @@ -79,16 +81,11 @@ def pytest_sessionfinish(session: Session): if not _started.is_set(): return - from localstack.runtime import events - from localstack.services import infra - from localstack.utils.threads import start_thread - - def _stop_infra(*_args): - LOG.info("stopping infra") - infra.stop_infra() + from localstack.runtime import get_current_runtime - start_thread(_stop_infra) - LOG.info("waiting for infra to stop") + get_current_runtime().shutdown() + LOG.info("waiting for runtime to stop") - if not events.infra_stopped.wait(timeout=20): - LOG.warning("gave up waiting for infra to stop, returning anyway") + # wait for runtime to shut down + if not get_current_runtime().stopped.wait(timeout=20): + LOG.warning("gave up waiting for runtime to stop, returning anyway") From 63757018b64813b2416a59879c69638111a7c791 Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Thu, 6 Jun 2024 14:45:48 +0200 Subject: [PATCH 2/5] fix duplex socket patch --- .../localstack/http/duplex_socket.py | 30 +++++++++---------- localstack-core/localstack/runtime/patches.py | 2 ++ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/localstack-core/localstack/http/duplex_socket.py b/localstack-core/localstack/http/duplex_socket.py index 015cd4fce2366..8006f398668e5 100644 --- a/localstack-core/localstack/http/duplex_socket.py +++ b/localstack-core/localstack/http/duplex_socket.py @@ -6,6 +6,8 @@ from asyncio.selector_events import BaseSelectorEventLoop from localstack.utils.asyncio import run_sync +from localstack.utils.objects import singleton_factory +from localstack.utils.patch import Patch, patch # set up logger LOG = logging.getLogger(__name__) @@ -52,6 +54,7 @@ def peek_ssl_header(): return False +@singleton_factory def enable_duplex_socket(): """ Function which replaces the ssl.SSLContext.sslsocket_class with the DuplexSocket, enabling serving both, @@ -59,21 +62,16 @@ def enable_duplex_socket(): """ # set globally defined SSL socket implementation class - ssl.SSLContext.sslsocket_class = DuplexSocket + Patch(ssl.SSLContext, "sslsocket_class", DuplexSocket).apply() - async def _accept_connection2(self, protocol_factory, conn, extra, sslcontext, *args, **kwargs): - is_ssl_socket = await run_sync(DuplexSocket.is_ssl_socket, conn) - if is_ssl_socket is False: - sslcontext = None - result = await _accept_connection2_orig( - self, protocol_factory, conn, extra, sslcontext, *args, **kwargs - ) - return result + if hasattr(BaseSelectorEventLoop, "_accept_connection2"): - # patch asyncio server to accept SSL and non-SSL traffic over same port - if hasattr(BaseSelectorEventLoop, "_accept_connection2") and not hasattr( - BaseSelectorEventLoop, "_ls_patched" - ): - _accept_connection2_orig = BaseSelectorEventLoop._accept_connection2 - BaseSelectorEventLoop._accept_connection2 = _accept_connection2 - BaseSelectorEventLoop._ls_patched = True + @patch(BaseSelectorEventLoop._accept_connection2) + async def _accept_connection2( + fn, self, protocol_factory, conn, extra, sslcontext, *args, **kwargs + ): + is_ssl_socket = await run_sync(DuplexSocket.is_ssl_socket, conn) + if is_ssl_socket is False: + sslcontext = None + result = await fn(self, protocol_factory, conn, extra, sslcontext, *args, **kwargs) + return result diff --git a/localstack-core/localstack/runtime/patches.py b/localstack-core/localstack/runtime/patches.py index af4da4e5c2f84..5f080bf661b7e 100644 --- a/localstack-core/localstack/runtime/patches.py +++ b/localstack-core/localstack/runtime/patches.py @@ -39,7 +39,9 @@ def apply_runtime_patches(): return _applied = True + from localstack.http.duplex_socket import enable_duplex_socket from localstack.services.infra import patch_urllib3_connection_pool patch_urllib3_connection_pool(maxsize=128) patch_thread_pool() + enable_duplex_socket() From a43287429a83209c8e6be5ea77108c0d4bd6187e Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Sun, 9 Jun 2024 15:08:36 +0200 Subject: [PATCH 3/5] decouple new runtime from localstack.services.infra --- localstack-core/localstack/runtime/hooks.py | 5 ++ localstack-core/localstack/runtime/legacy.py | 40 ++++++++++++++++ localstack-core/localstack/runtime/main.py | 48 ++++++++++++++++--- localstack-core/localstack/runtime/patches.py | 25 +++++++++- localstack-core/localstack/runtime/runtime.py | 14 +++--- localstack-core/localstack/services/infra.py | 41 ++++------------ .../localstack/services/internal.py | 2 +- 7 files changed, 129 insertions(+), 46 deletions(-) create mode 100644 localstack-core/localstack/runtime/legacy.py diff --git a/localstack-core/localstack/runtime/hooks.py b/localstack-core/localstack/runtime/hooks.py index 1449c32312110..05161679cf54e 100644 --- a/localstack-core/localstack/runtime/hooks.py +++ b/localstack-core/localstack/runtime/hooks.py @@ -4,6 +4,7 @@ # plugin namespace constants HOOKS_CONFIGURE_LOCALSTACK_CONTAINER = "localstack.hooks.configure_localstack_container" +HOOKS_ON_RUNTIME_CREATE = "localstack.hooks.on_runtime_create" HOOKS_ON_INFRA_READY = "localstack.hooks.on_infra_ready" HOOKS_ON_INFRA_START = "localstack.hooks.on_infra_start" HOOKS_ON_PRO_INFRA_START = "localstack.hooks.on_pro_infra_start" @@ -79,6 +80,10 @@ def __repr__(self): on_infra_start = hook_spec(HOOKS_ON_INFRA_START) """Hooks that are executed right before starting the LocalStack infrastructure.""" +on_runtime_create = hook_spec(HOOKS_ON_RUNTIME_CREATE) +"""Hooks that are executed right before the LocalstackRuntime is created. These can be used to apply +patches or otherwise configure the interpreter before any other code is imported.""" + on_runtime_start = on_infra_start """Alias for on_infra_start. TODO: switch and deprecated `infra` naming.""" diff --git a/localstack-core/localstack/runtime/legacy.py b/localstack-core/localstack/runtime/legacy.py new file mode 100644 index 0000000000000..e2215d8daf35a --- /dev/null +++ b/localstack-core/localstack/runtime/legacy.py @@ -0,0 +1,40 @@ +"""Adapter code for the legacy runtime to make sure the new runtime is compatible with the old one, +and at the same time doesn't need ``localstack.services.infra``, which imports AWS-specific modules.""" + +import logging +import os +import signal +import threading + +from localstack.runtime import events +from localstack.utils import objects + +LOG = logging.getLogger(__name__) + +# event flag indicating the infrastructure has been started and that the ready marker has been printed +# TODO: deprecated, use events.infra_ready +INFRA_READY = events.infra_ready + +# event flag indicating that the infrastructure has been shut down +SHUTDOWN_INFRA = threading.Event() + +# can be set +EXIT_CODE: objects.Value[int] = objects.Value(0) + + +def signal_supervisor_restart(): + if pid := os.environ.get("SUPERVISOR_PID"): + os.kill(int(pid), signal.SIGUSR1) + else: + LOG.warning("could not signal supervisor to restart localstack") + + +def exit_infra(code: int): + """ + Triggers an orderly shutdown of the localstack infrastructure and sets the code the main process should + exit with to a specific value. + + :param code: the exit code the main process should return with + """ + EXIT_CODE.set(code) + SHUTDOWN_INFRA.set() diff --git a/localstack-core/localstack/runtime/main.py b/localstack-core/localstack/runtime/main.py index cfc509e9a331a..3107f4248ccd5 100644 --- a/localstack-core/localstack/runtime/main.py +++ b/localstack-core/localstack/runtime/main.py @@ -3,8 +3,9 @@ import signal import sys +import traceback -from localstack import config +from localstack import config, constants from localstack.runtime.exceptions import LocalstackExit @@ -26,11 +27,46 @@ def _terminate_localstack(sig: int, frame): sys.exit(infra.EXIT_CODE.get()) -def print_runtime_information(): - # FIXME: refactor legacy code - from localstack.services.infra import print_runtime_information - - print_runtime_information() +def print_runtime_information(in_docker: bool = False): + # FIXME: this is legacy code from the old CLI, reconcile with new CLI and runtime output + from localstack.utils.container_networking import get_main_container_name + from localstack.utils.container_utils.container_client import ContainerException + from localstack.utils.docker_utils import DOCKER_CLIENT + + print() + print(f"LocalStack version: {constants.VERSION}") + if in_docker: + try: + container_name = get_main_container_name() + print("LocalStack Docker container name: %s" % container_name) + inspect_result = DOCKER_CLIENT.inspect_container(container_name) + container_id = inspect_result["Id"] + print("LocalStack Docker container id: %s" % container_id[:12]) + image_sha = inspect_result["Image"] + print("LocalStack Docker image sha: %s" % image_sha) + except ContainerException: + print( + "LocalStack Docker container info: Failed to inspect the LocalStack docker container. " + "This is likely because the docker socket was not mounted into the container. " + "Without access to the docker socket, LocalStack will not function properly. Please " + "consult the LocalStack documentation on how to correctly start up LocalStack. ", + end="", + ) + if config.DEBUG: + print("Docker debug information:") + traceback.print_exc() + else: + print( + "You can run LocalStack with `DEBUG=1` to get more information about the error." + ) + + if config.LOCALSTACK_BUILD_DATE: + print("LocalStack build date: %s" % config.LOCALSTACK_BUILD_DATE) + + if config.LOCALSTACK_BUILD_GIT_HASH: + print("LocalStack build git hash: %s" % config.LOCALSTACK_BUILD_GIT_HASH) + + print() def main_v2(): diff --git a/localstack-core/localstack/runtime/patches.py b/localstack-core/localstack/runtime/patches.py index 5f080bf661b7e..4772a480bfee1 100644 --- a/localstack-core/localstack/runtime/patches.py +++ b/localstack-core/localstack/runtime/patches.py @@ -28,6 +28,30 @@ def _adjust_thread_count(fn, self) -> None: pass +def patch_urllib3_connection_pool(**constructor_kwargs): + """ + Override the default parameters of HTTPConnectionPool, e.g., set the pool size via maxsize=16 + """ + try: + from urllib3 import connectionpool, poolmanager + + class MyHTTPSConnectionPool(connectionpool.HTTPSConnectionPool): + def __init__(self, *args, **kwargs): + kwargs.update(constructor_kwargs) + super(MyHTTPSConnectionPool, self).__init__(*args, **kwargs) + + poolmanager.pool_classes_by_scheme["https"] = MyHTTPSConnectionPool + + class MyHTTPConnectionPool(connectionpool.HTTPConnectionPool): + def __init__(self, *args, **kwargs): + kwargs.update(constructor_kwargs) + super(MyHTTPConnectionPool, self).__init__(*args, **kwargs) + + poolmanager.pool_classes_by_scheme["http"] = MyHTTPConnectionPool + except Exception: + pass + + _applied = False @@ -40,7 +64,6 @@ def apply_runtime_patches(): _applied = True from localstack.http.duplex_socket import enable_duplex_socket - from localstack.services.infra import patch_urllib3_connection_pool patch_urllib3_connection_pool(maxsize=128) patch_thread_pool() diff --git a/localstack-core/localstack/runtime/runtime.py b/localstack-core/localstack/runtime/runtime.py index 8281fd556f76a..aff1d03820b55 100644 --- a/localstack-core/localstack/runtime/runtime.py +++ b/localstack-core/localstack/runtime/runtime.py @@ -164,23 +164,23 @@ def _cleanup_resources(self): @property def exit_code(self): # FIXME: legacy compatibility code - from localstack.services.infra import EXIT_CODE + from localstack.runtime import legacy - return EXIT_CODE.get() + return legacy.EXIT_CODE.get() @exit_code.setter def exit_code(self, value): # FIXME: legacy compatibility code - from localstack.services.infra import EXIT_CODE + from localstack.runtime import legacy - EXIT_CODE.set(value) + legacy.EXIT_CODE.set(value) def _run_shutdown_monitor(self): # FIXME: legacy compatibility code. this can be removed once we replace access to the # ``SHUTDOWN_INFRA`` event with ``get_current_runtime().shutdown()``. - from localstack.services import infra + from localstack.runtime import legacy - infra.SHUTDOWN_INFRA.wait() + legacy.SHUTDOWN_INFRA.wait() self.shutdown() @@ -193,6 +193,8 @@ def create_from_environment() -> LocalstackRuntime: :return: a new LocalstackRuntime instance """ + hooks.on_runtime_create.run() + plugin_manager = PluginManager(Components.namespace) components = plugin_manager.load_all() diff --git a/localstack-core/localstack/services/infra.py b/localstack-core/localstack/services/infra.py index 164721c4488ba..1fd0957f289c8 100644 --- a/localstack-core/localstack/services/infra.py +++ b/localstack-core/localstack/services/infra.py @@ -1,14 +1,13 @@ import logging import os -import signal import sys -import threading import traceback from localstack import config, constants from localstack.constants import LOCALSTACK_INFRA_PROCESS, VERSION from localstack.http.duplex_socket import enable_duplex_socket from localstack.runtime import events, hooks +from localstack.runtime import legacy as legacy_runtime from localstack.runtime.exceptions import LocalstackExit from localstack.services.plugins import SERVICE_PLUGINS, ServiceDisabled, wait_for_infra_shutdown from localstack.utils import files, objects @@ -42,13 +41,13 @@ # event flag indicating the infrastructure has been started and that the ready marker has been printed # TODO: deprecated, use events.infra_ready -INFRA_READY = events.infra_ready +INFRA_READY = legacy_runtime.INFRA_READY # event flag indicating that the infrastructure has been shut down -SHUTDOWN_INFRA = threading.Event() +SHUTDOWN_INFRA = legacy_runtime.SHUTDOWN_INFRA # can be set -EXIT_CODE: objects.Value[int] = objects.Value(0) +EXIT_CODE: objects.Value[int] = legacy_runtime.EXIT_CODE # --------------- @@ -57,27 +56,9 @@ def patch_urllib3_connection_pool(**constructor_kwargs): - """ - Override the default parameters of HTTPConnectionPool, e.g., set the pool size via maxsize=16 - """ - try: - from urllib3 import connectionpool, poolmanager - - class MyHTTPSConnectionPool(connectionpool.HTTPSConnectionPool): - def __init__(self, *args, **kwargs): - kwargs.update(constructor_kwargs) - super(MyHTTPSConnectionPool, self).__init__(*args, **kwargs) - - poolmanager.pool_classes_by_scheme["https"] = MyHTTPSConnectionPool - - class MyHTTPConnectionPool(connectionpool.HTTPConnectionPool): - def __init__(self, *args, **kwargs): - kwargs.update(constructor_kwargs) - super(MyHTTPConnectionPool, self).__init__(*args, **kwargs) + from localstack.runtime.patches import patch_urllib3_connection_pool - poolmanager.pool_classes_by_scheme["http"] = MyHTTPConnectionPool - except Exception: - pass + patch_urllib3_connection_pool(**constructor_kwargs) def exit_infra(code: int): @@ -87,8 +68,7 @@ def exit_infra(code: int): :param code: the exit code the main process should return with """ - EXIT_CODE.set(code) - SHUTDOWN_INFRA.set() + legacy_runtime.exit_infra(code) def stop_infra(): @@ -153,10 +133,7 @@ def log_startup_message(service): def signal_supervisor_restart(): - if pid := os.environ.get("SUPERVISOR_PID"): - os.kill(int(pid), signal.SIGUSR1) - else: - LOG.warning("could not signal supervisor to restart localstack") + legacy_runtime.signal_supervisor_restart() # ------------- @@ -241,7 +218,7 @@ def start_infra(asynchronous=False, apis=None): if not asynchronous and thread: # We're making sure that we stay in the execution context of the # main thread, otherwise our signal handlers don't work - SHUTDOWN_INFRA.wait() + events.infra_stopped.wait() return thread diff --git a/localstack-core/localstack/services/internal.py b/localstack-core/localstack/services/internal.py index fff0730d1cc04..c9fbbe59d9380 100644 --- a/localstack-core/localstack/services/internal.py +++ b/localstack-core/localstack/services/internal.py @@ -14,7 +14,7 @@ from localstack.deprecations import deprecated_endpoint from localstack.http import Request, Resource, Response, Router from localstack.http.dispatcher import handler_dispatcher -from localstack.services.infra import exit_infra, signal_supervisor_restart +from localstack.runtime.legacy import exit_infra, signal_supervisor_restart from localstack.utils.analytics.metadata import ( get_client_metadata, get_localstack_edition, From 00bccf3a5668d3edf8564683e373377d83dc1e01 Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Tue, 11 Jun 2024 23:25:52 +0200 Subject: [PATCH 4/5] fix tests and pr comments --- localstack-core/localstack/runtime/server/core.py | 2 +- .../localstack/testing/pytest/in_memory_localstack.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/localstack-core/localstack/runtime/server/core.py b/localstack-core/localstack/runtime/server/core.py index 60644ef80a10a..137f276f3d496 100644 --- a/localstack-core/localstack/runtime/server/core.py +++ b/localstack-core/localstack/runtime/server/core.py @@ -42,7 +42,7 @@ def shutdown(self): class RuntimeServerPlugin(Plugin): """ - Plugin to expose RuntimeServer plugins to the + Plugin that serves as a factory for specific ```RuntimeServer`` implementations. """ namespace = "localstack.runtime.server" diff --git a/localstack-core/localstack/testing/pytest/in_memory_localstack.py b/localstack-core/localstack/testing/pytest/in_memory_localstack.py index 2e0f35fa51750..deccde1633f0d 100644 --- a/localstack-core/localstack/testing/pytest/in_memory_localstack.py +++ b/localstack-core/localstack/testing/pytest/in_memory_localstack.py @@ -83,6 +83,12 @@ def pytest_sessionfinish(session: Session): from localstack.runtime import get_current_runtime + try: + get_current_runtime() + except ValueError: + LOG.warning("Could not access the current runtime in a pytest sessionfinish hook.") + return + get_current_runtime().shutdown() LOG.info("waiting for runtime to stop") From 76392257e6fde5d42c0e5aa17fe4b611878c30cd Mon Sep 17 00:00:00 2001 From: Alexander Rashed Date: Wed, 12 Jun 2024 17:23:37 +0200 Subject: [PATCH 5/5] avoid starting LocalStack in pytest collect-only --- .../localstack/testing/pytest/in_memory_localstack.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/localstack-core/localstack/testing/pytest/in_memory_localstack.py b/localstack-core/localstack/testing/pytest/in_memory_localstack.py index deccde1633f0d..8a43b3aba80d7 100644 --- a/localstack-core/localstack/testing/pytest/in_memory_localstack.py +++ b/localstack-core/localstack/testing/pytest/in_memory_localstack.py @@ -44,6 +44,10 @@ def pytest_addoption(parser: Parser, pluginmanager: PytestPluginManager): @pytest.hookimpl(tryfirst=True) def pytest_runtestloop(session: Session): + # avoid starting up localstack if we only collect the tests (-co / --collect-only) + if session.config.option.collectonly: + return + if not session.config.option.start_localstack: return