8000 add resource events to CFn v2 by pinzon · Pull Request #12721 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

add resource events to CFn v2 #12721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

LOG = logging.getLogger(__name__)

StatusFromAction = {"Add": "CREATE", "Modify": "UPDATE", "Remove": "DELETE"}


@dataclass
class ChangeSetModelExecutorResult:
Expand All @@ -51,6 +53,24 @@ def __init__(self, change_set: ChangeSet):
self.outputs = dict()
self.resolved_parameters = dict()

def _get_physical_id(self, logical_resource_id, strict: bool = True) -> str:
physical_resource_id = None
try:
physical_resource_id = self._after_resource_physical_id(logical_resource_id)
except RuntimeError:
# The physical id is missing or is set to None, which is invalid.
pass
if physical_resource_id is None:
# The physical resource id is None after an update that didn't rewrite the resource, the previous
# resource id is therefore the current physical id of this resource.

try:
physical_resource_id = self._before_resource_physical_id(logical_resource_id)
except RuntimeError as e:
if strict:
raise e
return physical_resource_id

# TODO: use a structured type for the return value
def execute(self) -> ChangeSetModelExecutorResult:
self.process()
Expand Down Expand Up @@ -128,6 +148,29 @@ def visit_node_resource(
after.physical_resource_id = after_physical_id
return delta

def _add_resource_event(
self,
action: ChangeAction,
logical_resource_id,
event_status: OperationStatus,
special_action: str = None,
reason: str = None,
):
status_from_action = special_action or StatusFromAction[action.value]
if event_status.value == OperationStatus.SUCCESS.value:
status = StackStatus(f"{status_from_action}_COMPLETE")
else:
status = StackStatus(f"{status_from_action}_{event_status.name}")
self._change_set.stack.add_resource_event(
logical_resource_id,
self._get_physical_id(logical_resource_id, False),
status=status,
status_reason=reason,
)

if event_status.value == OperationStatus.FAILED.value:
self._change_set.stack.set_stack_status(StackStatus(status))

def visit_node_output(
self, node_output: NodeOutput
) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
Expand All @@ -150,56 +193,75 @@ def _execute_resource_change(
# XXX hacky, stick the previous resources' properties into the payload
before_properties = self._merge_before_properties(name, before)

self._execute_resource_action(
self._add_resource_event(ChangeAction.Modify, name, OperationStatus.IN_PROGRESS)
event = self._execute_resource_action(
action=ChangeAction.Modify,
logical_resource_id=name,
resource_type=before.resource_type,
before_properties=before_properties,
after_properties=after.properties,
)
self._add_resource_event(
ChangeAction.Modify, name, event.status, reason=event.message
)
# Case: type migration.
# TODO: Add test to assert that on type change the resources are replaced.
else:
# XXX hacky, stick the previous resources' properties into the payload
before_properties = self._merge_before_properties(name, before)
self._add_resource_event(ChangeAction.Modify, name, OperationStatus.IN_PROGRESS)
# Register a Create for the next type.
event = self._execute_resource_action(
action=ChangeAction.Add,
logical_resource_id=name,
resource_type=after.resource_type,
before_properties=None,
after_properties=after.properties,
)
self._add_resource_event(
ChangeAction.Modify, name, event.status, reason=event.message
)

if event.status == OperationStatus.FAILED:
return

# Register a Removed for the previous type.
self._execute_resource_action(
event = self._execute_resource_action(
action=ChangeAction.Remove,
logical_resource_id=name,
resource_type=before.resource_type,
before_properties=before_properties,
after_properties=None,
)
# Register a Create for the next type.
self._execute_resource_action(
action=ChangeAction.Add,
logical_resource_id=name,
resource_type=after.resource_type,
before_properties=None,
after_properties=after.properties,
self._add_resource_event(
ChangeAction.Modify, name, event.status, reason=event.message
)

elif not is_nothing(before):
# Case: removal
# XXX hacky, stick the previous resources' properties into the payload
# XXX hacky, stick the previous resources' properties into the payload
before_properties = self._merge_before_properties(name, before)
< 8000 /td>
self._execute_resource_action(
self._add_resource_event(ChangeAction.Remove, name, OperationStatus.IN_PROGRESS)
event = self._execute_resource_action(
action=ChangeAction.Remove,
logical_resource_id=name,
resource_type=before.resource_type,
before_properties=before_properties,
after_properties=None,
)
self._add_resource_event(ChangeAction.Remove, name, event.status, reason=event.message)
elif not is_nothing(after):
# Case: addition
self._execute_resource_action(
self._add_resource_event(ChangeAction.Add, name, OperationStatus.IN_PROGRESS)
event = self._execute_resource_action(
action=ChangeAction.Add,
logical_resource_id=name,
resource_type=after.resource_type,
before_properties=None,
after_properties=after.properties,
)
self._add_resource_event(ChangeAction.Add, name, event.status, reason=event.message)

def _merge_before_properties(
self, name: str, preproc_resource: PreprocResource
Expand All @@ -219,8 +281,9 @@ def _execute_resource_action(
resource_type: str,
before_properties: Optional[PreprocProperties],
after_properties: Optional[PreprocProperties],
) -> None:
) -> ProgressEvent:
LOG.debug("Executing resource action: %s for resource '%s'", action, logical_resource_id)

resource_provider_executor = ResourceProviderExecutor(
stack_name=self._change_set.stack.stack_name, stack_id=self._change_set.stack.stack_id
)
Expand All @@ -240,74 +303,60 @@ def _execute_resource_action(
event = resource_provider_executor.deploy_loop(
resource_provider, extra_resource_properties, payload
)

except Exception as e:
reason = str(e)
LOG.warning(
"Resource provider operation failed: '%s'",
reason,
exc_info=LOG.isEnabledFor(logging.DEBUG),
)
stack = self._change_set.stack
stack_status = stack.status
if stack_status == StackStatus.CREATE_IN_PROGRESS:
stack.set_stack_status(StackStatus.CREATE_FAILED, reason=reason)
elif stack_status == StackStatus.UPDATE_IN_PROGRESS:
stack.set_stack_status(StackStatus.UPDATE_FAILED, reason=reason)
return
event = ProgressEvent(
OperationStatus.FAILED,
resource_model={},
message=f"Resource provider operation failed: {reason}",
)
else:
event = ProgressEvent(OperationStatus.SUCCESS, resource_model={})
LOG.warning(
"Resource provider not found for type: %s",
resource_type,
exc_info=LOG.isEnabledFor(logging.DEBUG),
)
event = ProgressEvent(
OperationStatus.SUCCESS,
resource_model={},
message="Resource Provider not found for this resource",
)

self.resources.setdefault(logical_resource_id, {"Properties": {}})
match event.status:
case OperationStatus.SUCCESS:
# merge the resources state with the external state
# TODO: this is likely a duplicate of updating from extra_resource_properties

# TODO: add typing
# TODO: avoid the use of string literals for sampling from the object, use typed classes instead
# TODO: avoid sampling from resources and use tmp var reference
# TODO: add utils functions to abstract this logic away (resource.update(..))
# TODO: avoid the use of setdefault (debuggability/readability)
# TODO: review the use of merge

self.resources[logical_resource_id]["Properties"].update(event.resource_model)
self.resources[logical_resource_id].update(extra_resource_properties)
# XXX for legacy delete_stack compatibility
self.resources[logical_resource_id]["LogicalResourceId"] = logical_resource_id
self.resources[logical_resource_id]["Type"] = resource_type

# TODO: review why the physical id is returned as None during updates
# TODO: abstract this in member function of resource classes instead
physical_resource_id = None
try:
physical_resource_id = self._after_resource_physical_id(logical_resource_id)
except RuntimeError:
6855 # The physical id is missing or is set to None, which is invalid.
pass
if physical_resource_id is None:
# The physical resource id is None after an update that didn't rewrite the resource, the previous
# resource id is therefore the current physical id of this resource.
physical_resource_id = self._before_resource_physical_id(logical_resource_id)
self.resources[logical_resource_id]["PhysicalResourceId"] = physical_resource_id
physical_resource_id = self._get_physical_id(logical_resource_id)
self.resources[logical_resource_id]["PhysicalResourceId"] = physical_resource_id

case OperationStatus.FAILED:
reason = event.message
LOG.warning(
"Resource provider operation failed: '%s'",
reason,
)
# TODO: duplication
stack = self._change_set.stack
stack_status = stack.status
if stack_status == StackStatus.CREATE_IN_PROGRESS:
stack.set_stack_status(StackStatus.CREATE_FAILED, reason=reason)
elif stack_status == StackStatus.UPDATE_IN_PROGRESS:
stack.set_stack_status(StackStatus.UPDATE_FAILED, reason=reason)
else:
raise NotImplementedError(f"Unhandled stack status: '{stack.status}'")

case OperationStatus.IN_PROGRESS:
LOG.info(
"Resource provider operation in progress",
)

case any:
raise NotImplementedError(f"Event status '{any}' not handled")

return event

def create_resource_provider_payload(
self,
action: ChangeAction,
Expand Down
A373
41 changes: 40 additions & 1 deletion localstack-core/localstack/services/cloudformation/v2/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Parameter,
StackDriftInformation,
StackDriftStatus,
StackEvent,
StackStatus,
StackStatusReason,
)
Expand All @@ -25,7 +26,8 @@
NodeTemplate,
)
from localstack.utils.aws import arns
from localstack.utils.strings import short_uid
from localstack.utils.strings import long_uid, short_uid
from localstack.utils.time import timestamp_millis


class ResolvedResource(TypedDict):
Expand All @@ -41,6 +43,7 @@ class Stack:
status_reason: StackStatusReason | None
stack_id: str
creation_time: datetime
events: list[StackEvent]

# state after deploy
resolved_parameters: dict[str, str]
Expand Down Expand Up @@ -84,12 +87,48 @@ def __init__(
self.resolved_parameters = {}
self.resolved_resources = {}
self.resolved_outputs = {}
self.events = []

def set_stack_status(self, status: StackStatus, reason: StackStatusReason | None = None):
self.status = status
if reason:
self.status_reason = reason

self.add_resource_event(
self.stack_name, self.stack_id, status.value, status_reason=reason or ""
)

def add_resource_event(
self,
resource_id: str = None,
physical_res_id: str = None,
status: str = "",
status_reason: str = "",
):
resource_id = resource_id or self.stack_name
physical_res_id = physical_res_id or self.stack_id
resource_type = (
self.template.get("Resources", {})
.get(resource_id, {})
.get("Type", "AWS::CloudFormation::Stack")
)

event: StackEvent = {
"EventId": long_uid(),
"Timestamp": timestamp_millis(),
"StackId": self.stack_id,
"StackName": self.stack_name,
"LogicalResourceId": resource_id,
"PhysicalResourceId": physical_res_id,
"ResourceStatus": status,
"ResourceType": resource_type,
}

if status_reason:
event["ResourceStatusReason"] = status_reason

self.events.insert(0, event)

def describe_details(self) -> ApiStack:
result = {
"ChangeSetId": self.change_set_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,32 @@ def describe_stack_events(
next_token: NextToken = None,
**kwargs,
) -> DescribeStackEventsOutput:
return DescribeStackEventsOutput(StackEvents=[])
state = get_cloudformation_store(context.account_id, context.region)
if stack_name:
if is_stack_arn(stack_name):
stack = state.stacks_v2[stack_name]
else:
stack_candidates = []
for stack in state.stacks_v2.values():
if (
stack.stack_name == stack_name
and stack.status != StackStatus.DELETE_COMPLETE
):
stack_candidates.append(stack)
if len(stack_candidates) == 0:
raise ValidationError(f"No stack with name {stack_name} found")
elif len(stack_candidates) > 1:
raise RuntimeError("Programing error, duplicate stacks found")
else:
stack = stack_candidates[0]
else:
raise NotImplementedError

if not stack:
# aws will silently ignore invalid stack names - we should do the same
return

return DescribeStackEventsOutput(StackEvents=stack.events)

@handler("DeleteStack")
def delete_stack(
Expand Down
Loading
Loading
0