8000 Implement basic persistence for lambda v2 by dominikschubert · Pull Request #7913 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

Implement basic persistence for lambda v2 #7913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions localstack/services/awslambda/invocation/lambda_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
SNAP_START_SUPPORTED_RUNTIMES = [Runtime.java11]


# TODO: maybe we should make this more "transient" by always initializing to Pending and *not* persisting it?
@dataclasses.dataclass(frozen=True)
class VersionState:
state: State
Expand Down Expand Up @@ -520,11 +521,6 @@ def status_error(self, executor_id: str) -> None:
raise NotImplementedError()


@dataclasses.dataclass
class EventSourceMapping:
...


@dataclasses.dataclass(frozen=True)
class CodeSigningConfig:
csc_id: str
Expand Down
6 changes: 3 additions & 3 deletions localstack/services/awslambda/invocation/lambda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:

return version_manager

def create_function_version(self, function_version: FunctionVersion) -> None:
def create_function_version(self, function_version: FunctionVersion) -> Future[None]:
"""
Creates a new function version (manager), and puts it in the startup dict

Expand All @@ -130,7 +130,7 @@ def create_function_version(self, function_version: FunctionVersion) -> None:
function_arn=qualified_arn, function_version=function_version, lambda_service=self
)
self.lambda_starting_versions[qualified_arn] = version_manager
self.task_executor.submit(version_manager.start)
return self.task_executor.submit(version_manager.start)

def publish_version(self, function_version: FunctionVersion):
"""
Expand Down Expand Up @@ -238,7 +238,7 @@ def invoke(
)
)

def update_version(self, new_version: FunctionVersion) -> None:
def update_version(self, new_version: FunctionVersion) -> Future[None]:
"""
Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready
to be invoked
Expand Down
6 changes: 3 additions & 3 deletions localstack/services/awslambda/invocation/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from localstack.aws.api.lambda_ import EventSourceMappingConfiguration
from localstack.services.awslambda.invocation.lambda_models import (
AccountSettings,
CodeSigningConfig,
EventSourceMapping,
Function,
Layer,
)
Expand All @@ -13,7 +13,7 @@ class LambdaStore(BaseStore):
functions: dict[str, Function] = LocalAttribute(default=dict)

# maps EventSourceMapping UUIDs to the respective EventSourceMapping
event_source_mappings: dict[str, EventSourceMapping] = LocalAttribute(default=dict)
event_source_mappings: dict[str, EventSourceMappingConfiguration] = LocalAttribute(default=dict)

# maps CodeSigningConfig ARNs to the respective CodeSigningConfig
code_signing_configs: dict[str, CodeSigningConfig] = LocalAttribute(default=dict)
Expand All @@ -25,4 +25,4 @@ class LambdaStore(BaseStore):
settings: AccountSettings = LocalAttribute(default=AccountSettings)


lambda_stores = AccountRegionBundle[LambdaStore]("lambda", LambdaStore)
lambda_stores = AccountRegionBundle("lambda", LambdaStore)
16 changes: 9 additions & 7 deletions localstack/services/awslambda/invocation/version_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def __init__(
)

def start(self) -> None:
new_state = None
try:
invocation_thread = Thread(target=self.invocation_loop)
invocation_thread.start()
Expand All @@ -167,23 +168,24 @@ def start(self) -> None:

# code and reason not set for success scenario because only failed states provide this field:
# https://docs.aws.amazon.com/lambda/latest/dg/API_GetFunctionConfiguration.html#SSS-GetFunctionConfiguration-response-LastUpdateStatusReasonCode
self.state = VersionState(state=State.Active)
new_state = VersionState(state=State.Active)
LOG.debug(
f"Lambda '{self.function_arn}' (id {self.function_version.config.internal_revision}) changed to active"
f"Changing Lambda '{self.function_arn}' (id {self.function_version.config.internal_revision}) to active"
)
except Exception as e:
self.state = VersionState(
new_state = VersionState(
state=State.Failed,
code=StateReasonCode.InternalError,
reason=f"Error while creating lambda: {e}",
)
LOG.debug(
f"Lambda '{self.function_arn}' changed to failed. Reason: %s", e, exc_info=True
f"Changing Lambda '{self.function_arn}' to failed. Reason: %s", e, exc_info=True
)
finally:
self.lambda_service.update_version_state(
function_version=self.function_version, new_state=self.state
)
if new_state:
self.lambda_service.update_version_state(
function_version=self.function_version, new_state=new_state
)

def stop(self) -> None:
LOG.debug("Stopping lambda version '%s'", self.function_arn)
Expand Down
39 changes: 39 additions & 0 deletions localstack/services/awslambda/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
from localstack.services.awslambda.urlrouter import FunctionUrlRouter
from localstack.services.edge import ROUTER
from localstack.services.plugins import ServiceLifecycleHook
from localstack.state import StateVisitor
from localstack.utils.aws import aws_stack
from localstack.utils.aws.arns import extract_service_from_arn
from localstack.utils.collections import PaginatedList
Expand Down Expand Up @@ -218,6 +219,44 @@ def __init__(self) -> None:
self.layer_fetcher = None
lambda_hooks.inject_layer_fetcher.run(self)

def accept_state_visitor(self, visitor: StateVisitor):
visitor.visit(lambda_stores)

def on_before_state_load(self):
self.lambda_service.stop()

def on_after_state_load(self):
self.lambda_service = LambdaService()

# TODO: provisioned concurrency
for account_id, account_bundle in lambda_stores.items():
for region_name, state in account_bundle.items():
for fn in state.functions.values():
for fn_version in fn.versions.values():
# restore the "Pending" state for every function version and start it
try:
new_state = VersionState(
state=State.Pending,
code=StateReasonCode.Creating,
reason="The function is being created.",
)
new_config = dataclasses.replace(fn_version.config, state=new_state)
new_version = dataclasses.replace(fn_version, config=new_config)
fn.versions[fn_version.id.qualifier] = new_version
self.lambda_service.create_function_version(fn_version).result(
timeout=5
)
except Exception:
LOG.warning(
"Failed to restore function version %s",
fn_version.id.qualified_arn(),
exc_info=True,
)

# Restore event source listeners
for esm in state.event_source_mappings.values():
EventSourceListener.start_listeners_for_asf(esm, self.lambda_service)

def on_after_init(self):
self.router.register_routes()

Expand Down
0