8000 StepFunctions: Multi-accounts compatibility by viren-nadkarni · Pull Request #9119 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

StepFunctions: Multi-accounts compatibility #9119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
79ec46e
[DEBUG] Use non default account and region
viren-nadkarni Aug 29, 2023
5cdc0bf
Add account and region context to executions
viren-nadkarni Sep 12, 2023
cfc9743
Use execution context for internal DDB client
viren-nadkarni Sep 12, 2023
2682b40
Build ARNs with proper account ID and region name
viren-nadkarni Sep 12, 2023
14eb9a2
Implement account ID namespacing for legacy stepfunctions provider
viren-nadkarni Sep 12, 2023
f4c8a85
Take account ID and region into consideration for internal requests
viren-nadkarni Sep 13, 2023
9cf16d0
Run the legacy SF tests in a hardcoded region
viren-nadkarni Sep 13, 2023
b2917c0
Use an alternative way to pass the context
viren-nadkarni Sep 13, 2023
e43191c
Use request context account ID and region during program construction
viren-nadkarni Sep 14, 2023
5ef3a98
Merge branch 'master' into stepfunctions-multi-accounts
viren-nadkarni Sep 14, 2023
aad9b80
Maintain backward compatibility
viren-nadkarni Sep 14, 2023
74d02eb
Use proper client utility for eventbridge
viren-nadkarni Sep 14, 2023
a4b0954
Merge branch 'master' into stepfunctions-multi-accounts
MEPalma Sep 29, 2023
54ba905
resources as evaluation components
MEPalma Oct 2, 2023
528eb7e
Merge branch 'master' into stepfunctions-multi-accounts
MEPalma Oct 2, 2023
dfc56cc
split resource into static and runtime parts, minors
MEPalma Oct 2, 2023
5d7cfe4
Merge branch 'master' into stepfunctions-multi-accounts
MEPalma Oct 4, 2023
c75017b
conflicts resolution, add resource evaluation to s3 distributed map r…
MEPalma Oct 4, 2023
90fd710
Merge branch 'master' into stepfunctions-multi-accounts
MEPalma Oct 6, 2023
670df14
revert non default account and region
MEPalma Oct 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
split resource into static and runtime parts, minors
  • Loading branch information
MEPalma committed Oct 2, 2023
commit dfc56cce260a005323a00c0f9c5dd067f879494a
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from localstack.aws.api.stepfunctions import ExecutionFailedEventDetails, HistoryEventType
from localstack.services.stepfunctions.asl.component.common.error_name.error_name import ErrorName
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str


class FailureEvent:
Expand All @@ -28,9 +27,6 @@ class FailureEventException(Exception):
def __init__(self, failure_event: FailureEvent):
self.failure_event = failure_event

def __str__(self) -> str:
return to_json_str(self.failure_event.event_details)

def get_execution_failed_event_details(self) -> Optional[ExecutionFailedEventDetails]:
if self.failure_event.event_details is None:
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,16 @@ def from_arn(cls, arn: str) -> ResourceARN:
)


class Resource(EvalComponent, abc.ABC):
class ResourceOutput:
resource_arn: Final[str]
partition: Final[str]
region: Final[str]
account: Final[str]

def __init__(self, resource_arn: str, partition: str, region: str, account: str):
self.resource_arn = resource_arn
self.partition = partition
self.region = region
self.account = account
class ResourceRuntimePart:
account: Final[str]
region: Final[str]

def __init__(self, account: str, region: str):
self.region = region
self.account = account


class Resource(EvalComponent, abc.ABC):
_region: Final[str]
_account: Final[str]
resource_arn: Final[str]
Expand All @@ -107,97 +104,37 @@ def from_resource_arn(arn: str) -> Resource:
case "states", _:
return ServiceResource(resource_arn=resource_arn)

def _build_resource(self, env: Environment) -> Resource.ResourceOutput:
def _eval_runtime_part(self, env: Environment) -> ResourceRuntimePart:
region = self._region if self._region else env.aws_execution_details.region
account = self._account if self._account else env.aws_execution_details.account
return Resource.ResourceOutput(
resource_arn=self.resource_arn,
partition=self.partition,
region=region,
return ResourceRuntimePart(
account=account,
region=region,
)

def _eval_body(self, env: Environment) -> None:
resource_output = self._build_resource(env=env)
env.stack.append(resource_output)
runtime_part = self._eval_runtime_part(env=env)
env.stack.append(runtime_part)


class ActivityResource(Resource):
class ActivityResourceOutput(Resource.ResourceOutput):
name: Final[str]

def __init__(self, resource_arn: str, partition: str, region: str, account: str, name: str):
super().__init__(
resource_arn=resource_arn, partition=partition, region=region, account=account
)
self.name = name

name: Final[str]

def __init__(self, resource_arn: ResourceARN):
super().__init__(resource_arn=resource_arn)
self.name = resource_arn.name

def _build_resource(self, env: Environment) -> Resource.ResourceOutput:
resource_output: Resource.ResourceOutput = super()._build_resource(env=env)
activity_resource_output = ActivityResource.ActivityResourceOutput(
**vars(resource_output), name=self.name
)
return activity_resource_output


class LambdaResource(Resource):
class LambdaResourceOutput(Resource.ResourceOutput):
function_name: Final[str]

def __init__(
self, resource_arn: str, partition: str, region: str, account: str, function_name: str
):
super().__init__(
resource_arn=resource_arn, partition=partition, region=region, account=account
)
self.function_name = function_name

function_name: Final[str]

def __init__(self, resource_arn: ResourceARN):
super().__init__(resource_arn=resource_arn)
self.function_name = resource_arn.name

def _build_resource(self, env: Environment) -> Resource.ResourceOutput:
resource_output: Resource.ResourceOutput = super()._build_resource(env=env)
lambda_resource_output = LambdaResource.LambdaResourceOutput(
**vars(resource_output), function_name=self.function_name
)
return lambda_resource_output


class ServiceResource(Resource):
class ServiceResourceOutput(Resource.ResourceOutput):
service_name: Final[str]
api_name: Final[str]
api_action: Final[str]
condition: Final[Optional[str]]

def __init__(
self,
resource_arn: str,
partition: str,
region: str,
account: str,
service_name: str,
api_name: str,
api_action: str,
condition: Optional[str],
):
super().__init__(
resource_arn=resource_arn, partition=partition, region=region, account=account
)
self.service_name = service_name
self.api_name = api_name
self.api_action = api_action
self.condition = condition

service_name: Final[str]
api_name: Final[str]
api_action: Final[str]
Expand Down Expand Up @@ -229,14 +166,3 @@ def __init__(self, resource_arn: ResourceARN):
self.condition = ResourceCondition.Sync2
case unsupported:
raise RuntimeError(f"Unsupported condition '{unsupported}'.")

def _build_resource(self, env: Environment) -> Resource.ResourceOutput:
resource_output: Resource.ResourceOutput = super()._build_resource(env=env)
lambda_resource_output = ServiceResource.ServiceResourceOutput(
**vars(resource_output),
service_name=self.service_name,
api_name=self.api_name,
api_action=self.api_action,
condition=self.condition,
)
return lambda_resource_output
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
StatesErrorNameType,
)
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.service.resource import (
ResourceRuntimePart,
ServiceResource,
)
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.state_task import (
Expand Down Expand Up @@ -55,19 +56,22 @@ def _get_timed_out_failure_event(self) -> FailureEvent:

@abc.abstractmethod
def _eval_service_task(
self, env: Environment, resource: ServiceResource.ServiceResourceOutput, parameters: dict
self,
env: Environment,
resource_runtime_part: ResourceRuntimePart,
normalised_parameters: dict,
):
...

def _before_eval_execution(
self, env: Environment, resource: ServiceResource.ServiceResourceOutput, parameters: dict
self, env: Environment, resource_runtime_part: ResourceRuntimePart, raw_parameters: dict
) -> None:
parameters_str = to_json_str(parameters)
parameters_str = to_json_str(raw_parameters)

scheduled_event_details = TaskScheduledEventDetails(
resource=self._get_sfn_resource(),
resourceType=self._get_sfn_resource_type(),
region=resource.region,
region=resource_runtime_part.region,
parameters=parameters_str,
)
if not self.timeout.is_default_value():
Expand All @@ -92,7 +96,12 @@ def _before_eval_execution(
),
)

def _after_eval_execution(self, env: Environment) -> None:
def _after_eval_execution(
self,
env: Environment,
resource_runtime_part: ResourceRuntimePart,
normalised_parameters: dict,
) -> None:
output = env.stack[-1]
env.event_history.add_event(
hist_type_event=HistoryEventType.TaskSucceeded,
Expand All @@ -107,17 +116,27 @@ def _after_eval_execution(self, env: Environment) -> None:
)

def _eval_execution(self, env: Environment) -> None:
parameters = self._eval_parameters(env=env)

self.resource.eval(env=env)
resource_output: ServiceResource.ServiceResourceOutput = env.stack.pop()
resource_runtime_part: ResourceRuntimePart = env.stack.pop()

raw_parameters = self._eval_parameters(env=env)

self._before_eval_execution(env=env, resource=resource_output, parameters=parameters)
self._before_eval_execution(
env=env, resource_runtime_part=resource_runtime_part, raw_parameters=raw_parameters
)

normalised_parameters = self._normalised_parameters_bindings(parameters)
self._eval_service_task(env=env, resource=resource_output, parameters=normalised_parameters)
normalised_parameters = self._normalised_parameters_bindings(raw_parameters)
self._eval_service_task(
env=env,
resource_runtime_part=resource_runtime_part,
normalised_parameters=normalised_parameters,
)

self._after_eval_execution(env=env)
self._after_eval_execution(
env=env,
resource_runtime_part=resource_runtime_part,
normalised_parameters=normalised_parameters,
)

@classmethod
def for_service(cls, service_name: str) -> StateTaskService:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
FailureEvent,
)
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.service.resource import (
ServiceResource,
ResourceRuntimePart,
)
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.service.state_task_service_callback import (
StateTaskServiceCallback,
Expand Down Expand Up @@ -250,10 +250,13 @@ def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
)

def _eval_service_task(
self, env: Environment, resource: ServiceResource.ServiceResourceOutput, parameters: dict
self,
env: Environment,
resource_runtime_part: ResourceRuntimePart,
normalised_parameters: dict,
):
task_parameters: TaskParameters = select_from_typed_dict(
typed_dict=TaskParameters, obj=parameters
typed_dict=TaskParameters, obj=normalised_parameters
)

method = task_parameters["Method"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
StatesErrorNameType,
)
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.service.resource import (
ServiceResource,
ResourceRuntimePart,
)
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.service.state_task_service_callback import (
StateTaskServiceCallback,
Expand Down Expand Up @@ -107,14 +107,19 @@ def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
return super()._from_error(env=env, ex=ex)

def _eval_service_task(
self, env: Environment, resource: ServiceResource.ServiceResourceOutput, parameters: dict
self,
env: Environment,
resource_runtime_part: ResourceRuntimePart,
normalised_parameters: dict,
):
api_client = boto_client_for(
region=resource.region,
account=resource.account,
region=resource_runtime_part.region,
account=resource_runtime_part.account,
service=self._normalised_api_name,
)
response = getattr(api_client, self._normalised_api_action)(**parameters) or dict()
response = (
getattr(api_client, self._normalised_api_action)(**normalised_parameters) or dict()
)
if response:
response.pop("ResponseMetadata", None)
env.stack.append(response)
Loading
0