8000 Implement basic persistence for lambda v2 (#7913) · codeperl/localstack@b0887d6 · GitHub
[go: up one dir, main page]

Skip to content

Commit b0887d6

Browse files
Implement basic persistence for lambda v2 (localstack#7913)
1 parent 80469cc commit b0887d6

File tree

5 files changed

+55
-18
lines changed

5 files changed

+55
-18
lines changed

localstack/services/awslambda/invocation/lambda_models.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
SNAP_START_SUPPORTED_RUNTIMES = [Runtime.java11]
8383

8484

85+
# TODO: maybe we should make this more "transient" by always initializing to Pending and *not* persisting it?
8586
@dataclasses.dataclass(frozen=True)
8687
class VersionState:
8788
state: State
@@ -520,11 +521,6 @@ def status_error(self, executor_id: str) -> None:
520521
raise NotImplementedError()
521522

522523

523-
@dataclasses.dataclass
524-
class EventSourceMapping:
525-
...
526-
527-
528524
@dataclasses.dataclass(frozen=True)
529525
class CodeSigningConfig:
530526
csc_id: str

localstack/services/awslambda/invocation/lambda_service.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:
111111

112112
return version_manager
113113

114-
def create_function_version(self, function_version: FunctionVersion) -> None:
114+
def create_function_version(self, function_version: FunctionVersion) -> Future[None]:
115115
"""
116116
Creates a new function version (manager), and puts it in the startup dict
117117
@@ -130,7 +130,7 @@ def create_function_version(self, function_version: FunctionVersion) -> None:
130130
function_arn=qualified_arn, function_version=function_version, lambda_service=self
131131
)
132132
self.lambda_starting_versions[qualified_arn] = version_manager
133-
self.task_executor.submit(version_manager.start)
133+
return self.task_executor.submit(version_manager.start)
134134

135135
def publish_version(self, function_version: FunctionVersion):
136136
"""
@@ -238,7 +238,7 @@ def invoke(
238238
)
239239
)
240240

241-
def update_version(self, new_version: FunctionVersion) -> None:
241+
def update_version(self, new_version: FunctionVersion) -> Future[None]:
242242
"""
243243
Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready
244244
to be invoked

localstack/services/awslambda/invocation/models.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
from localstack.aws.api.lambda_ import EventSourceMappingConfiguration
12
from localstack.services.awslambda.invocation.lambda_models import (
23
AccountSettings,
34
CodeSigningConfig,
4-
EventSourceMapping,
55
Function,
66
Layer,
77
)
@@ -13,7 +13,7 @@ class LambdaStore(BaseStore):
1313
functions: dict[str, Function] = LocalAttribute(default=dict)
1414

1515
# maps EventSourceMapping UUIDs to the respective EventSourceMapping
16-
event_source_mappings: dict[str, EventSourceMapping] = LocalAttribute(default=dict)
16+
event_source_mappings: dict[str, EventSourceMappingConfiguration] = LocalAttribute(default=dict)
1717

1818
# maps CodeSigningConfig ARNs to the respective CodeSigningConfig
1919
code_signing_configs: dict[str, CodeSigningConfig] = LocalAttribute(default=dict)
@@ -25,4 +25,4 @@ class LambdaStore(BaseStore):
2525
settings: AccountSettings = LocalAttribute(default=AccountSettings)
2626

2727

28-
lambda_stores = AccountRegionBundle[LambdaStore]("lambda", LambdaStore)
28+
lambda_stores = AccountRegionBundle("lambda", LambdaStore)

localstack/services/awslambda/invocation/version_manager.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ def __init__(
158158
)
159159

160160
def start(self) -> None:
161+
new_state = None
161162
try:
162163
invocation_thread = Thread(target=self.invocation_loop)
163164
invocation_thread.start()
@@ -167,23 +168,24 @@ def start(self) -> None:
167168

168169
# code and reason not set for success scenario because only failed states provide this field:
169170
# https://docs.aws.amazon.com/lambda/latest/dg/API_GetFunctionConfiguration.html#SSS-GetFunctionConfiguration-response-LastUpdateStatusReasonCode
170-
self.state = VersionState(state=State.Active)
171+
new_state = VersionState(state=State.Active)
171172
LOG.debug(
172-
f"Lambda '{self.function_arn}' (id {self.function_version.config.internal_revision}) changed to active"
173+
f"Changing Lambda '{self.function_arn}' (id {self.function_version.config.internal_revision}) to active"
173174
)
174175
except Exception as e:
175-
self.state = VersionState(
176+
new_state = VersionState(
176177
state=State.Failed,
177178
code=StateReasonCode.InternalError,
178179
reason=f"Error while creating lambda: {e}",
179180
)
180181
LOG.debug(
181-
f"Lambda '{self.function_arn}' changed to failed. Reason: %s", e, exc_info=True
182+
f"Changing Lambda '{self.function_arn}' to failed. Reason: %s", e, exc_info=True
182183
)
183184
finally:
184-
self.lambda_service.update_version_state(
185-
function_version=self.function_version, new_state=self.state
186-
)
185+
if new_state:
186+
self.lambda_service.update_version_state(
187+
function_version=self.function_version, new_state=new_state
188+
)
187189

188190
def stop(self) -> None:
189191
LOG.debug("Stopping lambda version '%s'", self.function_arn)

localstack/services/awslambda/provider.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@
186186
from localstack.services.awslambda.urlrouter import FunctionUrlRouter
187187
from localstack.services.edge import ROUTER
188188
from localstack.services.plugins import ServiceLifecycleHook
189+
from localstack.state import StateVisitor
189190
from localstack.utils.aws import aws_stack
190191
from localstack.utils.aws.arns import extract_service_from_arn
191192
from localstack.utils.collections import PaginatedList
@@ -218,6 +219,44 @@ def __init__(self) -> None:
218219
self.layer_fetcher = None
219220
lambda_hooks.inject_layer_fetcher.run(self)
220221

222+
def accept_state_visitor(self, visitor: StateVisitor):
223+
visitor.visit(lambda_stores)
224+
225+
def on_before_state_load(self):
226+
self.lambda_service.stop()
227+
228+
def on_after_state_load(self):
229+
self.lambda_service = LambdaService()
230+
231+
# TODO: provisioned concurrency
232+
for account_id, account_bundle in lambda_stores.items():
233+
for region_name, state in account_bundle.items():
234+
for fn in state.functions.values():
235+
for fn_version in fn.versions.values():
236+
# restore the "Pending" state for every function version and start it
237+
try:
238+
new_state = VersionState(
239+
state=State.Pending,
240+
code=StateReasonCode.Creating,
241+
reason="The function is being created.",
242+
)
243+
new_config = dataclasses.replace(fn_version.config, state=new_state)
244+
new_version = dataclasses.replace(fn_version, config=new_config)
245+
fn.versions[fn_version.id.qualifier] = new_version
246+
self.lambda_service.create_function_version(fn_version).result(
247+
timeout=5
248+
)
249+
except Exception:
250+
LOG.warning(
251+
"Failed to restore function version %s",
252+
fn_version.id.qualified_arn(),
253+
exc_info=True,
254+
)
255+
256+
# Restore event source listeners
257+
for esm in state.event_source_mappings.values():
258+
EventSourceListener.start_listeners_for_asf(esm, self.lambda_service)
259+
221260
def on_after_init(self):
222261
self.router.register_routes()
223262

0 commit comments

Comments
 (0)
0