8000 SQS: Improve update support for CloudFormation handlers. by peter-smith-phd · Pull Request #13477 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,26 @@ def recursive_convert(obj):
return recursive_convert(input_dict)


def resource_tags_to_remove_or_update(
prev_tags: list[dict], new_tags: list[dict]
) -> tuple[list[str], dict[str, str]]:
"""
When updating resources that have tags, we need to determine which tags to remove and which to add/update,
as these are typically done in separate API calls. The format of prev_tags and new_tags is expected to
be [{ "Key": tagName, "Value": tagValue }, ...]. The return value will be a tuple of (tags_to_remove, tags_to_update),
where:
- tags_to_remove is a list of tag keys that are present in prev_tags but not in new_tags.
- tags_to_update is a dict of tags to add or update, with the format: { tagName: tagValue, ... }.
"""
prev_tag_keys = [tag["Key"] for tag in prev_tags]
new_tag_keys = [tag["Key"] for tag in new_tags]
tags_to_remove = list(set(prev_tag_keys) - set(new_tag_keys))

# convert from list of dicts, to a single dict because that's what tag_queue APIs expect.
tags_to_update = {tag["Key"]: tag["Value"] for tag in new_tags}
return (tags_to_remove, tags_to_update)


# LocalStack specific utilities
def get_schema_path(file_path: Path) -> dict:
file_name_base = file_path.name.removesuffix(".py").removesuffix(".py.enc")
Expand Down
6 changes: 6 additions & 0 deletions localstack-core/localstack/services/sqs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
QueueAttributeName.QueueArn,
]

#
# If these attributes are set to their default values, they are effectively
# deleted from the queue attributes and not returned in future calls to get_queue_attributes()
#
DELETE_IF_DEFAULT = {"KmsMasterKeyId": "", "KmsDataKeyReusePeriodSeconds": "300"}
Comment on lines +34 to +38
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra nitty nit: changing this to a list of keys and then accessing the new DEFAULT_ATTRIBUTE_VALUES avoids duplicating those values. Just in case they ever change the KmsDataKeyReusePeriodSeconds 🙃

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting thought. I had deliberately not done this, because DEFAULT_ATTRIBUTE_VALUE is defined in the resource providers, but DELETE_IF_DEFAULT is defined inside the service provider. I guess there's nothing stopping this from working, but it feels odd to have a service -> resource provider dependency in the code.

(not to mention, this would be impossible in a real AWS resource provider where they execute in completely separate domains).

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the dependency problem, I was more referring to this in a general manner, which could maybe solved by moving this to constants? In any case, it's absolutely non-blocking :)


INVALID_STANDARD_QUEUE_ATTRIBUTES = [
QueueAttributeName.FifoQueue,
QueueAttributeName.ContentBasedDeduplication,
Expand Down
6 changes: 5 additions & 1 deletion localstack-core/localstack/services/sqs/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,11 @@ def set_queue_attributes(
for k, v in attributes.items():
if k in sqs_constants.INTERNAL_QUEUE_ATTRIBUTES:
raise InvalidAttributeName(f"Unknown Attribute {k}.")
queue.attributes[k] = v
if k in sqs_constants.DELETE_IF_DEFAULT and v == sqs_constants.DELETE_IF_DEFAULT[k]:
if k in queue.attributes:
del queue.attributes[k]
else:
queue.attributes[k] = v
Comment on lines +1276 to +1280
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Have you considered hide the values in the response for the get_attributes operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I hadn't thought of that. Is there a benefit to hiding it in the output of get_queue_attributes? I had done it this way because I wasn't sure if there's internal code in the service that might check if it exists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thrau, @gregfurman or @baermat should know what's the best approach here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thrau @gregfurman or @baermat, do any of you want to review this PR. Particularly for the couple of files within localstack-core/localstack/services/sqs that I've updated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think deleting the attributes is the better approach right now, because we do not include KmsMasterKeyId or KmsDataKeyReusePeriodSeconds in our current default values. I don't know if this is simply a discrepancy between the cloudformation and the "regular" logic, or if this is something we should handle at some point. For now, deleting them seems like the strategy with less friction to me.
We probably also want to test this behaviour in our provider tests with something like this.

    @markers.aws.validated
    def test_set_queue_attributes_default_values(self, sqs_create_queue, snapshot, aws_sqs_client):
        attributes = {
            "KmsMasterKeyId": "",
            "KmsDataKeyReusePeriodSeconds": "300",
        }
        queue_url = sqs_create_queue()
        response = aws_sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])
        snapshot.match("get-queue-attributes-before-set", response)
        aws_sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)

        response = aws_sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])
        snapshot.match("get-queue-attributes-after-set", response)


# Special cases
if queue.attributes.get(QueueAttributeName.Policy) == "":
Expand Down
8020
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,35 @@ class SQSQueueProvider(ResourceProvider[SQSQueueProperties]):
TYPE = "AWS::SQS::Queue" # Autogenerated. Don't change
SCHEMA = util.get_schema_path(Path(__file__)) # Autogenerated. Don't change

# Values used when a property is removed from a template and needs to be set to its default.
# If AWS changes their defaults in the future, our parity tests should break.
DEFAULT_ATTRIBUTE_VALUES = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Praise: We should add this type of constant in more resource providers 🙌

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree, but @peter-smith-phd 's comment below "Note: CloudFormation sets this to 256KB on update, but 1MB on create" makes me rethink my position.

Context: in the previous CFn resource provider implementation we had the GenericBaseModel subclasses, which tried to abstract to much from the process, and were difficult to maintain. This default values dictionary is starting to become like this.

As an example, I was about to write something like

perhaps we should add an additional layer of structure to this dictionary, e.g.

DEFAULT_ATTRIBUTE_VALUES: dict[str, str | dict[str, str]] = {
    "MaximumMessageSize": {"CREATE": "1048576", "UPDATE": 262144},
}

but the type signature alone reminds me of the old way where we would define these deploy templates which was an attempt to make this API declarative, but as you can see by the number of closures in that AWS::IAM::User method, it was not straightforward.

I don't think this approach is as bad since it's only the defaults, but I personally don't like having very dynamic types in my type signatures, which is a code smell for how we have to add conditional logic down the line when checking.

"ReceiveMessageWaitTimeSeconds": "0",
"DelaySeconds": "0",
"KmsMasterKeyId": "",
"RedrivePolicy": "",
"MessageRetentionPeriod": "345600",
"MaximumMessageSize": "262144", # Note: CloudFormation sets this to 256KB on update, but 1MB on create
"VisibilityTimeout": "30",
"KmsDataKeyReusePeriodSeconds": "300",
}

# Private method for creating a unique queue name, if none is specified.
def _autogenerated_queue_name(self, request: ResourceRequest[SQSQueueProperties]) -> str:
Comment on lines +80 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit(minor): I tend to prefer methods that perform work be named as such

Suggested change
# Private method for creating a unique queue name, if none is specified.
def _autogenerated_queue_name(self, request: ResourceRequest[SQSQueueProperties]) -> str:
# Private method for creating a unique queue name, if none is specified.
def _generate_queue_name(self, request: ResourceRequest[SQSQueueProperties]) -> str:

queue_name = util.generate_default_name(request.stack_name, request.logical_resource_id)
isFifoQueue = request.desired_state.get("FifoQueue")

# Note that it's an SQS FIFO queue only if the FifoQueue property is set to boolean True, or the string "true"
# (case insensitive). If it's None (property was omitted) or False, or any type of string (e.g. a typo
# such as "Fasle"), then it's not a FIFO queue. This extra check is needed because the CloudFormation engine
# doesn't fully validate the FifoQueue property before passing it to the resource provider.
if (
isFifoQueue == True # noqa: E712
or (isinstance(isFifoQueue, str) and isFifoQueue.lower() == "true")
):
queue_name = f"{queue_name[:-5]}.fifo"
return queue_name

def create(
self,
request: ResourceRequest[SQSQueueProperties],
Expand All @@ -74,8 +103,6 @@ def create(
Primary identifier fields:
- /properties/QueueUrl



Create-only properties:
- /properties/FifoQueue
- /properties/QueueName
Expand All @@ -92,26 +119,13 @@ def create(
- sqs:TagQueue

"""
# TODO: validations
# TODO: validations - what validations are needed?
model = request.desired_state
sqs = request.aws_client_factory.sqs

if model.get("FifoQueue", False):
model["FifoQueue"] = model["FifoQueue"]

queue_name = model.get("QueueName")
if not queue_name:
# TODO: verify patterns here
if model.get("FifoQueue"):
queue_name = util.generate_default_name(
request.stack_name, request.logical_resource_id
)[:-5]
queue_name = f"{queue_name}.fifo"
else:
queue_name = util.generate_default_name(
request.stack_name, request.logical_resource_id
)
model["QueueName"] = queue_name
# if no QueueName is specified, automatically generate one
if not model.get("QueueName"):
model["QueueName"] = self._autogenerated_queue_name(request)

attributes = self._compile_sqs_queue_attributes(model)
result = request.aws_client_factory.sqs.create_queue(
Expand Down Expand Up @@ -184,38 +198,30 @@ def update(
"""
sqs = request.aws_client_factory.sqs
model = request.desired_state
prev_model = request.previous_state

assert request.previous_state is not None

should_replace = (
request.desired_state.get("QueueName", request.previous_state["QueueName"])
!= request.previous_state["QueueName"]
) or (
request.desired_state.get("FifoQueue", request.previous_state.get("FifoQueue&quo E988 t;))
!= request.previous_state.get("FifoQueue")
queue_url = prev_model["QueueUrl"]
self._populate_missing_attributes_with_defaults(model)
sqs.set_queue_attributes(
QueueUrl=queue_url, Attributes=self._compile_sqs_queue_attributes(model)
)

if not should_replace:
return ProgressEvent(OperationStatus.SUCCESS, resource_model=request.previous_state)

# TODO: copied from the create handler, extract?
if model.get("FifoQueue"):
queue_name = util.generate_default_name(
request.stack_name, request.logical_resource_id
)[:-5]
queue_name = f"{queue_name}.fifo"
else:
queue_name = util.generate_default_name(request.stack_name, request.logical_resource_id)

# replacement (TODO: find out if we should handle this in the provider or outside of it)
# delete old queue
sqs.delete_queue(QueueUrl=request.previous_state["QueueUrl"])
# create new queue (TODO: re-use create logic to make this more robust, e.g. for
# auto-generated queue names)
model["QueueUrl"] = sqs.create_queue(QueueName=queue_name)["QueueUrl"]
model["Arn"] = sqs.get_queue_attributes(
QueueUrl=model["QueueUrl"], AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
(tags_to_remove, tags_to_add_or_update) = util.resource_tags_to_remove_or_update(
prev_model.get("Tags", []), model.get("Tags", [])
)
sqs.untag_queue(QueueUrl=queue_url, TagKeys=tags_to_remove)
sqs.tag_queue(QueueUrl=queue_url, Tags=tags_to_add_or_update)

model["QueueUrl"] = queue_url
model["Arn"] = request.previous_state["Arn"]

# For QueueName and FifoQueue, always use the value from the previous model. These fields
# are create-only, so they cannot be changed via an update (even though they might be omitted)
model["QueueName"] = prev_model.get("QueueName")
model["FifoQueue"] = prev_model.get("FifoQueue", False)

return ProgressEvent(OperationStatus.SUCCESS, resource_model=model)

def _compile_sqs_queue_attributes(self, properties: SQSQueueProperties) -> dict[str, str]:
Expand Down Expand Up @@ -250,6 +256,15 @@ def _compile_sqs_queue_attributes(self, properties: SQSQueueProperties) -> dict[

return result

def _populate_missing_attributes_with_defaults(self, properties: SQSQueueProperties) -> None:
"""
For any attribute that is missing from the desired state, populate it with the default value.
This is the only way to remove an attribute from an existing SQS queue's configuration.
:param properties: the properties passed from cloudformation
"""
for k, v in self.DEFAULT_ATTRIBUTE_VALUES.items():
properties.setdefault(k, v)

def list(
self,
request: ResourceRequest[SQSQueueProperties],
Expand Down
1 change: 1 addition & 0 deletions tests/aws/services/cloudformation/api/test_stacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def test_update_stack_with_same_template_withoutchange_transformation(
)

@markers.aws.validated
@skip_if_legacy_engine()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 it's great to see more skips for the old engine meaning that the new engine is a higher parity!

def test_update_stack_actual_update(self, deploy_cfn_template, aws_client):
template = load_file(
os.path.join(os.path.dirname(__file__), "../../../templates/sqs_queue_update.yml")
Expand Down
143 changes: 0 additions & 143 deletions tests/aws/services/cloudformation/resources/test_sqs.py

This file was deleted.

Loading
Loading
0