diff --git a/localstack/services/awslambda/invocation/lambda_models.py b/localstack/services/awslambda/invocation/lambda_models.py index e62917f7f3446..7ca9333ff2cbb 100644 --- a/localstack/services/awslambda/invocation/lambda_models.py +++ b/localstack/services/awslambda/invocation/lambda_models.py @@ -82,6 +82,7 @@ SNAP_START_SUPPORTED_RUNTIMES = [Runtime.java11] +# TODO: maybe we should make this more "transient" by always initializing to Pending and *not* persisting it? @dataclasses.dataclass(frozen=True) class VersionState: state: State @@ -520,11 +521,6 @@ def status_error(self, executor_id: str) -> None: raise NotImplementedError() -@dataclasses.dataclass -class EventSourceMapping: - ... - - @dataclasses.dataclass(frozen=True) class CodeSigningConfig: csc_id: str diff --git a/localstack/services/awslambda/invocation/lambda_service.py b/localstack/services/awslambda/invocation/lambda_service.py index 89ddd75beea0d..911f65bebe9dc 100644 --- a/localstack/services/awslambda/invocation/lambda_service.py +++ b/localstack/services/awslambda/invocation/lambda_service.py @@ -111,7 +111,7 @@ def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager: return version_manager - def create_function_version(self, function_version: FunctionVersion) -> None: + def create_function_version(self, function_version: FunctionVersion) -> Future[None]: """ Creates a new function version (manager), and puts it in the startup dict @@ -130,7 +130,7 @@ def create_function_version(self, function_version: FunctionVersion) -> None: function_arn=qualified_arn, function_version=function_version, lambda_service=self ) self.lambda_starting_versions[qualified_arn] = version_manager - self.task_executor.submit(version_manager.start) + return self.task_executor.submit(version_manager.start) def publish_version(self, function_version: FunctionVersion): """ @@ -238,7 +238,7 @@ def invoke( ) ) - def update_version(self, new_version: FunctionVersion) -> None: + def update_version(self, new_version: FunctionVersion) -> Future[None]: """ Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready to be invoked diff --git a/localstack/services/awslambda/invocation/models.py b/localstack/services/awslambda/invocation/models.py index 6794438fc9127..3b29abaac33a8 100644 --- a/localstack/services/awslambda/invocation/models.py +++ b/localstack/services/awslambda/invocation/models.py @@ -1,7 +1,7 @@ +from localstack.aws.api.lambda_ import EventSourceMappingConfiguration from localstack.services.awslambda.invocation.lambda_models import ( AccountSettings, CodeSigningConfig, - EventSourceMapping, Function, Layer, ) @@ -13,7 +13,7 @@ class LambdaStore(BaseStore): functions: dict[str, Function] = LocalAttribute(default=dict) # maps EventSourceMapping UUIDs to the respective EventSourceMapping - event_source_mappings: dict[str, EventSourceMapping] = LocalAttribute(default=dict) + event_source_mappings: dict[str, EventSourceMappingConfiguration] = LocalAttribute(default=dict) # maps CodeSigningConfig ARNs to the respective CodeSigningConfig code_signing_configs: dict[str, CodeSigningConfig] = LocalAttribute(default=dict) @@ -25,4 +25,4 @@ class LambdaStore(BaseStore): settings: AccountSettings = LocalAttribute(default=AccountSettings) -lambda_stores = AccountRegionBundle[LambdaStore]("lambda", LambdaStore) +lambda_stores = AccountRegionBundle("lambda", LambdaStore) diff --git a/localstack/services/awslambda/invocation/version_manager.py b/localstack/services/awslambda/invocation/version_manager.py index a9e2007dd9b81..50e39f3211a61 100644 --- a/localstack/services/awslambda/invocation/version_manager.py +++ b/localstack/services/awslambda/invocation/version_manager.py @@ -158,6 +158,7 @@ def __init__( ) def start(self) -> None: + new_state = None try: invocation_thread = Thread(target=self.invocation_loop) invocation_thread.start() @@ -167,23 +168,24 @@ def start(self) -> None: # code and reason not set for success scenario because only failed states provide this field: # https://docs.aws.amazon.com/lambda/latest/dg/API_GetFunctionConfiguration.html#SSS-GetFunctionConfiguration-response-LastUpdateStatusReasonCode - self.state = VersionState(state=State.Active) + new_state = VersionState(state=State.Active) LOG.debug( - f"Lambda '{self.function_arn}' (id {self.function_version.config.internal_revision}) changed to active" + f"Changing Lambda '{self.function_arn}' (id {self.function_version.config.internal_revision}) to active" ) except Exception as e: - self.state = VersionState( + new_state = VersionState( state=State.Failed, code=StateReasonCode.InternalError, reason=f"Error while creating lambda: {e}", ) LOG.debug( - f"Lambda '{self.function_arn}' changed to failed. Reason: %s", e, exc_info=True + f"Changing Lambda '{self.function_arn}' to failed. Reason: %s", e, exc_info=True ) finally: - self.lambda_service.update_version_state( - function_version=self.function_version, new_state=self.state - ) + if new_state: + self.lambda_service.update_version_state( + function_version=self.function_version, new_state=new_state + ) def stop(self) -> None: LOG.debug("Stopping lambda version '%s'", self.function_arn) diff --git a/localstack/services/awslambda/provider.py b/localstack/services/awslambda/provider.py index 2eb85b1ecfaba..fcc4350e59dfa 100644 --- a/localstack/services/awslambda/provider.py +++ b/localstack/services/awslambda/provider.py @@ -187,6 +187,7 @@ from localstack.services.awslambda.urlrouter import FunctionUrlRouter from localstack.services.edge import ROUTER from localstack.services.plugins import ServiceLifecycleHook +from localstack.state import StateVisitor from localstack.utils.aws import aws_stack from localstack.utils.aws.arns import extract_service_from_arn from localstack.utils.collections import PaginatedList @@ -218,6 +219,44 @@ def __init__(self) -> None: self.layer_fetcher = None lambda_hooks.inject_layer_fetcher.run(self) + def accept_state_visitor(self, visitor: StateVisitor): + visitor.visit(lambda_stores) + + def on_before_state_load(self): + self.lambda_service.stop() + + def on_after_state_load(self): + self.lambda_service = LambdaService() + + # TODO: provisioned concurrency + for account_id, account_bundle in lambda_stores.items(): + for region_name, state in account_bundle.items(): + for fn in state.functions.values(): + for fn_version in fn.versions.values(): + # restore the "Pending" state for every function version and start it + try: + new_state = VersionState( + state=State.Pending, + code=StateReasonCode.Creating, + reason="The function is being created.", + ) + new_config = dataclasses.replace(fn_version.config, state=new_state) + new_version = dataclasses.replace(fn_version, config=new_config) + fn.versions[fn_version.id.qualifier] = new_version + self.lambda_service.create_function_version(fn_version).result( + timeout=5 + ) + except Exception: + LOG.warning( + "Failed to restore function version %s", + fn_version.id.qualified_arn(), + exc_info=True, + ) + + # Restore event source listeners + for esm in state.event_source_mappings.values(): + EventSourceListener.start_listeners_for_asf(esm, self.lambda_service) + def on_after_init(self): self.router.register_routes()