From 5388715d3356a4624f11e210789c721a1ab36980 Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Mon, 20 Mar 2023 10:46:21 +0100 Subject: [PATCH 1/5] implement basic persistence for lambda v2 --- .../awslambda/invocation/lambda_models.py | 6 +-- .../services/awslambda/invocation/models.py | 6 +-- .../awslambda/invocation/version_manager.py | 16 +++--- localstack/services/awslambda/provider.py | 50 +++++++++++++++++++ 4 files changed, 63 insertions(+), 15 deletions(-) diff --git a/localstack/services/awslambda/invocation/lambda_models.py b/localstack/services/awslambda/invocation/lambda_models.py index e62917f7f3446..7ca9333ff2cbb 100644 --- a/localstack/services/awslambda/invocation/lambda_models.py +++ b/localstack/services/awslambda/invocation/lambda_models.py @@ -82,6 +82,7 @@ SNAP_START_SUPPORTED_RUNTIMES = [Runtime.java11] +# TODO: maybe we should make this more "transient" by always initializing to Pending and *not* persisting it? @dataclasses.dataclass(frozen=True) class VersionState: state: State @@ -520,11 +521,6 @@ def status_error(self, executor_id: str) -> None: raise NotImplementedError() -@dataclasses.dataclass -class EventSourceMapping: - ... - - @dataclasses.dataclass(frozen=True) class CodeSigningConfig: csc_id: str diff --git a/localstack/services/awslambda/invocation/models.py b/localstack/services/awslambda/invocation/models.py index 6794438fc9127..3b29abaac33a8 100644 --- a/localstack/services/awslambda/invocation/models.py +++ b/localstack/services/awslambda/invocation/models.py @@ -1,7 +1,7 @@ +from localstack.aws.api.lambda_ import EventSourceMappingConfiguration from localstack.services.awslambda.invocation.lambda_models import ( AccountSettings, CodeSigningConfig, - EventSourceMapping, Function, Layer, ) @@ -13,7 +13,7 @@ class LambdaStore(BaseStore): functions: dict[str, Function] = LocalAttribute(default=dict) # maps EventSourceMapping UUIDs to the respective EventSourceMapping - event_source_mappings: dict[str, EventSourceMapping] = LocalAttribute(default=dict) + event_source_mappings: dict[str, EventSourceMappingConfiguration] = LocalAttribute(default=dict) # maps CodeSigningConfig ARNs to the respective CodeSigningConfig code_signing_configs: dict[str, CodeSigningConfig] = LocalAttribute(default=dict) @@ -25,4 +25,4 @@ class LambdaStore(BaseStore): settings: AccountSettings = LocalAttribute(default=AccountSettings) -lambda_stores = AccountRegionBundle[LambdaStore]("lambda", LambdaStore) +lambda_stores = AccountRegionBundle("lambda", LambdaStore) diff --git a/localstack/services/awslambda/invocation/version_manager.py b/localstack/services/awslambda/invocation/version_manager.py index a9e2007dd9b81..50e39f3211a61 100644 --- a/localstack/services/awslambda/invocation/version_manager.py +++ b/localstack/services/awslambda/invocation/version_manager.py @@ -158,6 +158,7 @@ def __init__( ) def start(self) -> None: + new_state = None try: invocation_thread = Thread(target=self.invocation_loop) invocation_thread.start() @@ -167,23 +168,24 @@ def start(self) -> None: # code and reason not set for success scenario because only failed states provide this field: # https://docs.aws.amazon.com/lambda/latest/dg/API_GetFunctionConfiguration.html#SSS-GetFunctionConfiguration-response-LastUpdateStatusReasonCode - self.state = VersionState(state=State.Active) + new_state = VersionState(state=State.Active) LOG.debug( - f"Lambda '{self.function_arn}' (id {self.function_version.config.internal_revision}) changed to active" + f"Changing Lambda '{self.function_arn}' (id {self.function_version.config.internal_revision}) to active" ) except Exception as e: - self.state = VersionState( + new_state = VersionState( state=State.Failed, code=StateReasonCode.InternalError, reason=f"Error while creating lambda: {e}", ) LOG.debug( - f"Lambda '{self.function_arn}' changed to failed. Reason: %s", e, exc_info=True + f"Changing Lambda '{self.function_arn}' to failed. Reason: %s", e, exc_info=True ) finally: - self.lambda_service.update_version_state( - function_version=self.function_version, new_state=self.state - ) + if new_state: + self.lambda_service.update_version_state( + function_version=self.function_version, new_state=new_state + ) def stop(self) -> None: LOG.debug("Stopping lambda version '%s'", self.function_arn) diff --git a/localstack/services/awslambda/provider.py b/localstack/services/awslambda/provider.py index 2eb85b1ecfaba..3ecc9205938c5 100644 --- a/localstack/services/awslambda/provider.py +++ b/localstack/services/awslambda/provider.py @@ -187,6 +187,7 @@ from localstack.services.awslambda.urlrouter import FunctionUrlRouter from localstack.services.edge import ROUTER from localstack.services.plugins import ServiceLifecycleHook +from localstack.state import StateVisitor from localstack.utils.aws import aws_stack from localstack.utils.aws.arns import extract_service_from_arn from localstack.utils.collections import PaginatedList @@ -203,12 +204,19 @@ LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5 +@dataclasses.dataclass +class LambdaPersistenceContext: + # TODO: extend for more detailed comparisons + functions_pre_restore: list[str] = dataclasses.field(default_factory=list) + + class LambdaProvider(LambdaApi, ServiceLifecycleHook): lambda_service: LambdaService create_fn_lock: threading.RLock create_layer_lock: threading.RLock router: FunctionUrlRouter layer_fetcher: LayerFetcher | None + lambda_persistence_context: LambdaPersistenceContext def __init__(self) -> None: self.lambda_service = LambdaService() @@ -217,6 +225,48 @@ def __init__(self) -> None: self.router = FunctionUrlRouter(ROUTER, self.lambda_service) self.layer_fetcher = None lambda_hooks.inject_layer_fetcher.run(self) + self.lambda_persistence_context = LambdaPersistenceContext() + + def accept_state_visitor(self, visitor: StateVisitor): + visitor.visit(lambda_stores) + + def on_before_state_load(self): + for account_id, account_bundle in lambda_stores.items(): + for region_name, state in account_bundle.items(): + for fn in state.functions.values(): + self.lambda_persistence_context.functions_pre_restore.append( + fn.latest().id.unqualified_arn() + ) + + def on_after_state_load(self): + # TODO: provisioned concurrency + # TODO: detect new versions + # TODO: detect changes + for account_id, account_bundle in lambda_stores.items(): + for region_name, state in account_bundle.items(): + for fn in state.functions.values(): + # only restore functions that have been loaded and were not in the store before + if ( + fn.latest().id.unqualified_arn() + not in self.lambda_persistence_context.functions_pre_restore + ): + for fn_version in fn.versions.values(): + # restore the "Pending" state for every function version and start it + new_state = VersionState( + state=State.Pending, + code=StateReasonCode.Creating, + reason="The function is being created.", + ) + new_config = dataclasses.replace(fn_version.config, state=new_state) + new_version = dataclasses.replace(fn_version, config=new_config) + fn.versions[fn_version.id.qualifier] = new_version + self.lambda_service.create_function_version(fn_version) + + # Restore event source listeners + for esm in state.event_source_mappings.values(): + EventSourceListener.start_listeners_for_asf( + esm, self.lambda_service + ) def on_after_init(self): self.router.register_routes() From 05902636b0261ed75acf63c40b893efa13a55e6d Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Mon, 20 Mar 2023 15:59:31 +0100 Subject: [PATCH 2/5] refactor lambda restore state --- localstack/services/awslambda/provider.py | 29 +++++------------------ 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/localstack/services/awslambda/provider.py b/localstack/services/awslambda/provider.py index 3ecc9205938c5..792709eac9930 100644 --- a/localstack/services/awslambda/provider.py +++ b/localstack/services/awslambda/provider.py @@ -204,19 +204,12 @@ LAMBDA_LAYERS_LIMIT_PER_FUNCTION = 5 -@dataclasses.dataclass -class LambdaPersistenceContext: - # TODO: extend for more detailed comparisons - functions_pre_restore: list[str] = dataclasses.field(default_factory=list) - - class LambdaProvider(LambdaApi, ServiceLifecycleHook): lambda_service: LambdaService create_fn_lock: threading.RLock create_layer_lock: threading.RLock router: FunctionUrlRouter layer_fetcher: LayerFetcher | None - lambda_persistence_context: LambdaPersistenceContext def __init__(self) -> None: self.lambda_service = LambdaService() @@ -225,31 +218,21 @@ def __init__(self) -> None: self.router = FunctionUrlRouter(ROUTER, self.lambda_service) self.layer_fetcher = None lambda_hooks.inject_layer_fetcher.run(self) - self.lambda_persistence_context = LambdaPersistenceContext() def accept_state_visitor(self, visitor: StateVisitor): visitor.visit(lambda_stores) - def on_before_state_load(self): - for account_id, account_bundle in lambda_stores.items(): - for region_name, state in account_bundle.items(): - for fn in state.functions.values(): - self.lambda_persistence_context.functions_pre_restore.append( - fn.latest().id.unqualified_arn() - ) - def on_after_state_load(self): # TODO: provisioned concurrency - # TODO: detect new versions - # TODO: detect changes for account_id, account_bundle in lambda_stores.items(): for region_name, state in account_bundle.items(): for fn in state.functions.values(): - # only restore functions that have been loaded and were not in the store before - if ( - fn.latest().id.unqualified_arn() - not in self.lambda_persistence_context.functions_pre_restore - ): + # only start functions that don't have a version manager yet + try: + self.lambda_service.get_lambda_version_manager( + fn.latest().id.qualified_arn() + ) + except ValueError: for fn_version in fn.versions.values(): # restore the "Pending" state for every function version and start it new_state = VersionState( From 805067f4b7a6f3ab670f2d2294c54458878a08a9 Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Mon, 20 Mar 2023 20:45:29 +0100 Subject: [PATCH 3/5] stop service before load and sync create function version --- .../awslambda/invocation/lambda_service.py | 6 +-- localstack/services/awslambda/provider.py | 40 ++++++++++++------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/localstack/services/awslambda/invocation/lambda_service.py b/localstack/services/awslambda/invocation/lambda_service.py index 89ddd75beea0d..911f65bebe9dc 100644 --- a/localstack/services/awslambda/invocation/lambda_service.py +++ b/localstack/services/awslambda/invocation/lambda_service.py @@ -111,7 +111,7 @@ def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager: return version_manager - def create_function_version(self, function_version: FunctionVersion) -> None: + def create_function_version(self, function_version: FunctionVersion) -> Future[None]: """ Creates a new function version (manager), and puts it in the startup dict @@ -130,7 +130,7 @@ def create_function_version(self, function_version: FunctionVersion) -> None: function_arn=qualified_arn, function_version=function_version, lambda_service=self ) self.lambda_starting_versions[qualified_arn] = version_manager - self.task_executor.submit(version_manager.start) + return self.task_executor.submit(version_manager.start) def publish_version(self, function_version: FunctionVersion): """ @@ -238,7 +238,7 @@ def invoke( ) ) - def update_version(self, new_version: FunctionVersion) -> None: + def update_version(self, new_version: FunctionVersion) -> Future[None]: """ Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready to be invoked diff --git a/localstack/services/awslambda/provider.py b/localstack/services/awslambda/provider.py index 792709eac9930..f6184185e5dc4 100644 --- a/localstack/services/awslambda/provider.py +++ b/localstack/services/awslambda/provider.py @@ -222,7 +222,12 @@ def __init__(self) -> None: def accept_state_visitor(self, visitor: StateVisitor): visitor.visit(lambda_stores) + def on_before_state_load(self): + self.lambda_service.stop() + def on_after_state_load(self): + self.lambda_service = LambdaService() + # TODO: provisioned concurrency for account_id, account_bundle in lambda_stores.items(): for region_name, state in account_bundle.items(): @@ -235,21 +240,28 @@ def on_after_state_load(self): except ValueError: for fn_version in fn.versions.values(): # restore the "Pending" state for every function version and start it - new_state = VersionState( - state=State.Pending, - code=StateReasonCode.Creating, - reason="The function is being created.", - ) - new_config = dataclasses.replace(fn_version.config, state=new_state) - new_version = dataclasses.replace(fn_version, config=new_config) - fn.versions[fn_version.id.qualifier] = new_version - self.lambda_service.create_function_version(fn_version) - - # Restore event source listeners - for esm in state.event_source_mappings.values(): - EventSourceListener.start_listeners_for_asf( - esm, self.lambda_service + try: + new_state = VersionState( + state=State.Pending, + code=StateReasonCode.Creating, + reason="The function is being created.", + ) + new_config = dataclasses.replace(fn_version.config, state=new_state) + new_version = dataclasses.replace(fn_version, config=new_config) + fn.versions[fn_version.id.qualifier] = new_version + self.lambda_service.create_function_version(fn_version).result( + timeout=5 ) + except Exception: + LOG.warning( + "Failed to restore function version %s", + fn_version.id.qualified_arn(), + exc_info=True, + ) + + # Restore event source listeners + for esm in state.event_source_mappings.values(): + EventSourceListener.start_listeners_for_asf(esm, self.lambda_service) def on_after_init(self): self.router.register_routes() From 2e8419eadd557f49cbe3ebf835190dac0677b485 Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Tue, 21 Mar 2023 09:19:07 +0100 Subject: [PATCH 4/5] remove try/except --- localstack/services/awslambda/provider.py | 46 ++++++++++------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/localstack/services/awslambda/provider.py b/localstack/services/awslambda/provider.py index f6184185e5dc4..1aa2d8e904acd 100644 --- a/localstack/services/awslambda/provider.py +++ b/localstack/services/awslambda/provider.py @@ -232,32 +232,26 @@ def on_after_state_load(self): for account_id, account_bundle in lambda_stores.items(): for region_name, state in account_bundle.items(): for fn in state.functions.values(): - # only start functions that don't have a version manager yet - try: - self.lambda_service.get_lambda_version_manager( - fn.latest().id.qualified_arn() - ) - except ValueError: - for fn_version in fn.versions.values(): - # restore the "Pending" state for every function version and start it - try: - new_state = VersionState( - state=State.Pending, - code=StateReasonCode.Creating, - reason="The function is being created.", - ) - new_config = dataclasses.replace(fn_version.config, state=new_state) - new_version = dataclasses.replace(fn_version, config=new_config) - fn.versions[fn_version.id.qualifier] = new_version - self.lambda_service.create_function_version(fn_version).result( - timeout=5 - ) - except Exception: - LOG.warning( - "Failed to restore function version %s", - fn_version.id.qualified_arn(), - exc_info=True, - ) + for fn_version in fn.versions.values(): + # restore the "Pending" state for every function version and start it + try: + new_state = VersionState( + state=State.Pending, + code=StateReasonCode.Creating, + reason="The function is being created.", + ) + new_config = dataclasses.replace(fn_version.config, state=new_state) + new_version = dataclasses.replace(fn_version, config=new_config) + fn.versions[fn_version.id.qualifier] = new_version + self.lambda_service.create_function_version(fn_version).result( + timeout=5 + ) + except Exception: + LOG.warning( + "Failed to restore function version %s", + fn_version.id.qualified_arn(), + exc_info=True, + ) # Restore event source listeners for esm in state.event_source_mappings.values(): From 528248ad28399f4f75d135c648a6d3ea3ae72832 Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Tue, 21 Mar 2023 13:44:23 +0100 Subject: [PATCH 5/5] remove another level of indentation --- localstack/services/awslambda/provider.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/localstack/services/awslambda/provider.py b/localstack/services/awslambda/provider.py index 1aa2d8e904acd..fcc4350e59dfa 100644 --- a/localstack/services/awslambda/provider.py +++ b/localstack/services/awslambda/provider.py @@ -253,9 +253,9 @@ def on_after_state_load(self): exc_info=True, ) - # Restore event source listeners - for esm in state.event_source_mappings.values(): - EventSourceListener.start_listeners_for_asf(esm, self.lambda_service) + # Restore event source listeners + for esm in state.event_source_mappings.values(): + EventSourceListener.start_listeners_for_asf(esm, self.lambda_service) def on_after_init(self): self.router.register_routes()