-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
add: TaggingService functionality to Lambda Provider #11745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,6 @@ | |
Description, | ||
DestinationConfig, | ||
EventSourceMappingConfiguration, | ||
FunctionArn, | ||
FunctionCodeLocation, | ||
FunctionConfiguration, | ||
FunctionEventInvokeConfig, | ||
|
@@ -127,6 +126,7 @@ | |
StatementId, | ||
StateReasonCode, | ||
String, | ||
TaggableResource, | ||
TagKeyList, | ||
Tags, | ||
TracingMode, | ||
|
@@ -221,8 +221,12 @@ | |
from localstack.services.plugins import ServiceLifecycleHook | ||
from localstack.state import StateVisitor | ||
from localstack.utils.aws.arns import ( | ||
ArnData, | ||
extract_resource_from_arn, | ||
extract_service_from_arn, | ||
get_partition, | ||
lambda_event_source_mapping_arn, | ||
parse_arn, | ||
) | ||
from localstack.utils.bootstrap import is_api_enabled | ||
from localstack.utils.collections import PaginatedList | ||
|
@@ -394,6 +398,18 @@ def _get_function(function_name: str, account_id: str, region: str) -> Function: | |
) | ||
return function | ||
|
||
@staticmethod | ||
def _get_esm(uuid: str, account_id: str, region: str) -> EventSourceMappingConfiguration: | ||
state = lambda_stores[account_id][region] | ||
esm = state.event_source_mappings.get(uuid) | ||
if not esm: | ||
arn = lambda_event_source_mapping_arn(uuid, account_id, region) | ||
raise ResourceNotFoundException( | ||
f"Event source mapping not found: {arn}", | ||
Type="User", | ||
) | ||
return esm | ||
|
||
@staticmethod | ||
def _validate_qualifier_expression(qualifier: str) -> None: | ||
if error_messages := api_utils.validate_qualifier(qualifier): | ||
|
@@ -987,12 +1003,16 @@ def create_function( | |
), | ||
) | ||
fn.versions["$LATEST"] = version | ||
if request.get("Tags"): | ||
self._store_tags(fn, request["Tags"]) | ||
# TODO: should validation failures here "fail" the function creation like it is now? | ||
# TODO: should validation failures here "fail" the function creation like it is now? | ||
joe4dev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
state.functions[function_name] = fn | ||
self.lambda_service.create_function_version(version) | ||
|
||
if tags := request.get("Tags"): | ||
# This will check whether the function exists. | ||
self._store_tags(arn.unqualified_arn(), tags) | ||
# TODO: This should be removed in favour of using the TaggingService | ||
fn.tags = tags | ||
|
||
if request.get("Publish"): | ||
version = self._publish_version_with_changes( | ||
function_name=function_name, region=context_region, account_id=context_account_id | ||
|
@@ -1453,7 +1473,7 @@ def get_function( | |
account_id=account_id, | ||
region=region, | ||
) | ||
tags = self._get_tags(fn) | ||
tags = self._get_tags(api_utils.unqualified_lambda_arn(function_name, account_id, region)) | ||
additional_fields = {} | ||
if tags: | ||
additional_fields["Tags"] = tags | ||
|
@@ -1873,6 +1893,8 @@ def create_event_source_mapping_v2( | |
esm_worker = EsmWorkerFactory(esm_config, function_role, enabled).get_esm_worker() | ||
self.esm_workers[esm_worker.uuid] = esm_worker | ||
# TODO: check StateTransitionReason, LastModified, LastProcessingResult (concurrent updates requires locking!) | ||
if tags := request.get("Tags"): | ||
self._store_tags(esm_config.get("EventSourceMappingArn"), tags) | ||
esm_worker.create() | ||
return esm_config | ||
|
||
|
@@ -2345,6 +2367,7 @@ def create_function_url_config( | |
) | ||
|
||
custom_id: str | None = None | ||
# TODO: References to Function.tags should be replaced with calls to the TaggingService | ||
if fn.tags is not None and TAG_KEY_CUSTOM_URL in fn.tags: | ||
# Note: I really wanted to add verification here that the | ||
# url_id is unique, so we could surface that to the user ASAP. | ||
|
@@ -4118,87 +4141,141 @@ def delete_function_concurrency( | |
# ======================================= | ||
# =============== TAGS =============== | ||
# ======================================= | ||
# only function ARNs are available for tagging | ||
# only Function, Event Source Mapping, and Code Signing Config (not currently supported by LocalStack) ARNs an are available for tagging in AWS | ||
|
||
def _get_tags(self, function: Function) -> dict[str, str]: | ||
return function.tags or {} | ||
def _get_tags(self, resource: TaggableResource) -> dict[str, str]: | ||
state = self.fetch_lambda_store_for_tagging(resource) | ||
lambda_adapted_tags = { | ||
tag["Key"]: tag["Value"] | ||
for tag in state.TAGS.list_tags_for_resource(resource).get("Tags") | ||
} | ||
return lambda_adapted_tags | ||
|
||
def _store_tags(self, function: Function, tags: dict[str, str]): | ||
if len(tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE: | ||
def _store_tags(self, resource: TaggableResource, tags: dict[str, str]): | ||
state = self.fetch_lambda_store_for_tagging(resource) | ||
if len(state.TAGS.tags.get(resource, {}) | tags) > LAMBDA_TAG_LIMIT_PER_RESOURCE: | ||
raise InvalidParameterValueException( | ||
"Number of tags exceeds resource tag limit.", Type="User" | ||
) | ||
with function.lock: | ||
function.tags = tags | ||
# dirty hack for changed revision id, should reevaluate model to prevent this: | ||
latest_version = function.versions["$LATEST"] | ||
function.versions["$LATEST"] = dataclasses.replace( | ||
latest_version, config=dataclasses.replace(latest_version.config) | ||
) | ||
|
||
def _update_tags(self, function: Function, tags: dict[str, str]): | ||
with function.lock: | ||
stored_tags = function.tags or {} | ||
stored_tags |= tags | ||
self._store_tags(function=function, tags=stored_tags) | ||
tag_svc_adapted_tags = [{"Key": key, "Value": value} for key, value in tags.items()] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to adapt the tags There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems Lambda is doing something custom here (tag adaptation). Does it still make sense to use the TaggingService? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the unified approach to tagging. Even though adapting in this way is a bit janky, I think the benefits of a single svc outweighs the costs of adapting to suit it. In future, maybe we can extend the tag svc to better account for cases like this. |
||
state.TAGS.tag_resource(resource, tag_svc_adapted_tags) | ||
|
||
def tag_resource( | ||
self, context: RequestContext, resource: FunctionArn, tags: Tags, **kwargs | ||
) -> None: | ||
if not tags: | ||
raise InvalidParameterValueException( | ||
"An error occurred and the request cannot be processed.", Type="User" | ||
) | ||
def fetch_lambda_store_for_tagging(self, resource: TaggableResource) -> LambdaStore: | ||
""" | ||
Takes a resource ARN for a TaggableResource (Lambda Function, Event Source Mapping, or Code Signing Config) and returns a corresponding | ||
LambdaStore for its region and account. | ||
|
||
# TODO: test layer (added in snapshot update 2023-11) | ||
pattern_match = api_utils.FULL_FN_ARN_PATTERN.search(resource) | ||
if not pattern_match: | ||
In addition, this function validates that the ARN is a valid TaggableResource type, and that the TaggableResource exists. | ||
|
||
Raises: | ||
ValidationException: If the resource ARN is not a full ARN for a TaggableResource. | ||
ResourceNotFoundException: If the specified resource does not exist. | ||
InvalidParameterValueException: If the resource ARN is a qualified Lambda Function. | ||
""" | ||
|
||
def _raise_validation_exception(): | ||
raise ValidationException( | ||
rf"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:lambda:[a-z]{{2}}((-gov)|(-iso(b?)))?-[a-z]+-\d{{1}}:\d{{12}}:(function:[a-zA-Z0-9-_]+(:(\$LATEST|[a-zA-Z0-9-_]+))?|layer:[a-zA-Z0-9-_]+)" | ||
f"1 validation error detected: Value '{resource}' at 'resource' failed to satisfy constraint: Member must satisfy regular expression pattern: {api_utils.TAGGABLE_RESOURCE_ARN_PATTERN}" | ||
) | ||
|
||
groups = pattern_match.groupdict() | ||
fn_name = groups.get("function_name") | ||
# Check whether the ARN we have been passed is correctly formatted | ||
parsed_resource_arn: ArnData = None | ||
try: | ||
parsed_resource_arn = parse_arn(resource) | ||
except Exception: | ||
_raise_validation_exception() | ||
|
||
# TODO: Should we be checking whether this is a full ARN? | ||
region, account_id, resource_type = map( | ||
parsed_resource_arn.get, ("region", "account", "resource") | ||
) | ||
|
||
if groups.get("qualifier"): | ||
if not all((region, account_id, resource_type)): | ||
_raise_validation_exception() | ||
|
||
if not (parts := resource_type.split(":")): | ||
_raise_validation_exception() | ||
|
||
resource_type, resource_identifier, *qualifier = parts | ||
if resource_type not in {"event-source-mapping", "code-signing-config", "function"}: | ||
_raise_validation_exception() | ||
|
||
if qualifier: | ||
if resource_type == "function": | ||
raise InvalidParameterValueException( | ||
"Tags on function aliases and versions are not supported. Please specify a function ARN.", | ||
Type="User", | ||
) | ||
_raise_validation_exception() | ||
|
||
match resource_type: | ||
case "event-source-mapping": | ||
self._get_esm(resource_identifier, account_id, region) | ||
case "code-signing-config": | ||
raise NotImplementedError("Resource tagging on CSC not yet implemented.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We haven't implemented tagging on Code SIgning Configuration as of yet, but we can easily extend the above to include it. |
||
case "function": | ||
self._get_function( | ||
function_name=resource_identifier, account_id=account_id, region=region | ||
) | ||
|
||
# If no exceptions are raised, assume ARN and referenced resource is valid for tag operations | ||
return lambda_stores[account_id][region] | ||
|
||
def tag_resource( | ||
self, context: RequestContext, resource: TaggableResource, tags: Tags, **kwargs | ||
) -> None: | ||
if not tags: | ||
raise InvalidParameterValueException( | ||
"Tags on function aliases and versions are not supported. Please specify a function ARN.", | ||
Type="User", | ||
"An error occurred and the request cannot be processed.", Type="User" | ||
) | ||
self._store_tags(resource, tags) | ||
|
||
account_id, region = api_utils.get_account_and_region(resource, context) | ||
fn = self._get_function(function_name=fn_name, account_id=account_id, region=region) | ||
|
||
self._update_tags(fn, tags) | ||
if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately we need to run these dirty checks to ensure a Revision ID gets updated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, I see that we haven't found a good solution yet for dealing with the revision id updates in general 🤔 A small concern here is that the function and function version update are not synchronized with the tagging service, leading to potential race conditions (tag update and revision id updates not synchronized; quite unlikely, and consequences should be negligible). Would function locking via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Think we should rather look at improving this entire approach in a follow-up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1, good idea |
||
"function" | ||
): | ||
name, _, account, region = function_locators_from_arn(resource) | ||
function = self._get_function(name, account, region) | ||
with function.lock: | ||
function.tags = tags | ||
# dirty hack for changed revision id, should reevaluate model to prevent this: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: yeah, this starts spreading like cancer 😅 |
||
latest_version = function.versions["$LATEST"] | ||
function.versions["$LATEST"] = dataclasses.replace( | ||
latest_version, config=dataclasses.replace(latest_version.config) | ||
) | ||
|
||
def list_tags( | ||
self, context: RequestContext, resource: FunctionArn, **kwargs | ||
self, context: RequestContext, resource: TaggableResource, **kwargs | ||
) -> ListTagsResponse: | ||
account_id, region = api_utils.get_account_and_region(resource, context) | ||
function_name = api_utils.get_function_name(resource, context) | ||
fn = self._get_function(function_name=function_name, account_id=account_id, region=region) | ||
|
||
return ListTagsResponse(Tags=self._get_tags(fn)) | ||
tags = self._get_tags(resource) | ||
return ListTagsResponse(Tags=tags) | ||
|
||
def untag_resource( | ||
self, context: RequestContext, resource: FunctionArn, tag_keys: TagKeyList, **kwargs | ||
self, context: RequestContext, resource: TaggableResource, tag_keys: TagKeyList, **kwargs | ||
) -> None: | ||
if not tag_keys: | ||
raise ValidationException( | ||
"1 validation error detected: Value null at 'tagKeys' failed to satisfy constraint: Member must not be null" | ||
) # should probably be generalized a bit | ||
|
||
account_id, region = api_utils.get_account_and_region(resource, context) | ||
function_name = api_utils.get_function_name(resource, context) | ||
fn = self._get_function(function_name=function_name, account_id=account_id, region=region) | ||
state = self.fetch_lambda_store_for_tagging(resource) | ||
state.TAGS.untag_resource(resource, tag_keys) | ||
|
||
# copy first, then set explicitly in store tags | ||
tags = dict(fn.tags or {}) | ||
if tags: | ||
for key in tag_keys: | ||
if key in tags: | ||
tags.pop(key) | ||
self._store_tags(function=fn, tags=tags) | ||
if (resource_id := extract_resource_from_arn(resource)) and resource_id.startswith( | ||
"function" | ||
): | ||
name, _, account, region = function_locators_from_arn(resource) | ||
function = self._get_function(name, account, region) | ||
with function.lock: | ||
function.tags = { | ||
tag["Key"]: tag["Value"] | ||
for tag in state.TAGS.list_tags_for_resource(resource).get("Tags") | ||
} | ||
# dirty hack for changed revision id, should reevaluate model to prevent this: | ||
latest_version = function.versions["$LATEST"] | ||
function.versions["$LATEST"] = dataclasses.replace( | ||
latest_version, config=dataclasses.replace(latest_version.config) | ||
) | ||
|
||
# ======================================= | ||
# ======= LEGACY / DEPRECATED ======== | ||
|
Uh oh!
There was an error while loading. Please reload this page.