8000 StepFunctions, support for SFN and Sync, reworked exception handling,… · codeperl/localstack@102d7bd · GitHub
[go: up one dir, main page]

Skip to content

Commit 102d7bd

Browse files
authored
StepFunctions, support for SFN and Sync, reworked exception handling, fixes, tests (localstack#8623)
1 parent 751c1ec commit 102d7bd

32 files changed

+1940
-98
lines changed

localstack/services/stepfunctions/asl/component/common/error_name/failure_event.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Final
22

3-
from localstack.aws.api.stepfunctions import HistoryEventType
3+
from localstack.aws.api.stepfunctions import ExecutionFailedEventDetails, HistoryEventType
44
from localstack.services.stepfunctions.asl.component.common.error_name.error_name import ErrorName
55
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails
66

@@ -16,3 +16,20 @@ def __init__(
1616
self.error_name = error_name
1717
self.event_type = event_type
1818
self.event_details = event_details
19+
20+
21+
class FailureEventException(Exception):
22+
failure_event: Final[FailureEvent]
23+
24+
def __init__(self, failure_event: FailureEvent):
25+
self.failure_event = failure_event
26+
27+
def get_execution_failed_event_details(self) -> ExecutionFailedEventDetails:
28+
failure_event_spec = list(self.failure_event.event_details.values())[0]
29+
execution_failed_event_details = ExecutionFailedEventDetails(
30+
error=failure_event_spec.get("error")
31+
or f"NoErrorSpecification in {failure_event_spec}",
32+
)
33+
if "cause" in failure_event_spec:
34+
execution_failed_event_details["cause"] = failure_event_spec["cause"]
35+
return execution_failed_event_details

localstack/services/stepfunctions/asl/component/program/program.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
HistoryEventType,
1010
)
1111
from localstack.services.stepfunctions.asl.component.common.comment import Comment
12+
from localstack.services.stepfunctions.asl.component.common.error_name.failure_event import (
13+
FailureEventException,
14+
)
1215
from localstack.services.stepfunctions.asl.component.common.flow.start_at import StartAt
1316
from localstack.services.stepfunctions.asl.component.eval_component import EvalComponent
1417
from localstack.services.stepfunctions.asl.component.state.state import CommonStateField
@@ -46,8 +49,16 @@ def _eval_body(self, env: Environment) -> None:
4649
while env.is_running():
4750
next_state: CommonStateField = self._get_state(env.next_state_name)
4851
next_state.eval(env)
52+
except FailureEventException as ex:
53+
env.set_error(error=ex.get_execution_failed_event_details())
4954
except Exception as ex:
50-
LOG.debug(f"Stepfunctions computation ended with exception '{ex}'.")
55+
cause = f"{type(ex)}({str(ex)})"
56+
LOG.error(f"Stepfunctions computation ended with exception '{cause}'.")
57+
env.set_error(
58+
ExecutionFailedEventDetails(
59+
error="Internal Error", cause=f"Internal Error due to '{cause}'"
60+
)
61+
)
5162

5263
program_state: ProgramState = env.program_state()
5364
if isinstance(program_state, ProgramError):

localstack/services/stepfunctions/asl/component/state/state_execution/execute_state.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,15 @@
55
from threading import Thread
66
from typing import Any, Optional
77

8-
from localstack.aws.api.stepfunctions import (
9-
ExecutionFailedEventDetails,
10-
HistoryEventType,
11-
TaskFailedEventDetails,
12-
)
8+
from localstack.aws.api.stepfunctions import HistoryEventType, TaskFailedEventDetails
139
from localstack.services.stepfunctions.asl.component.common.catch.catch_decl import CatchDecl
1410
from localstack.services.stepfunctions.asl.component.common.catch.catch_outcome import (
1511
CatchOutcome,
1612
CatchOutcomeNotCaught,
1713
)
1814
from localstack.services.stepfunctions.asl.component.common.error_name.failure_event import (
1915
FailureEvent,
16+
FailureEventException,
2017
)
2118
from localstack.services.stepfunctions.asl.component.common.error_name.states_error_name import (
2219
StatesErrorName,
@@ -126,13 +123,17 @@ def from_state_props(self, state_props: StateProps) -> None:
126123
self.heartbeat = heartbeat
127124

128125
def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
129-
LOG.warning("State Task executed generic failure event reporting logic.")
126+
if isinstance(ex, FailureEventException):
127+
return ex.failure_event
128+
LOG.warning(
129+
"State Task encountered an unhandled exception that lead to a State.Runtime error."
130+
)
130131
return FailureEvent(
131-
error_name=StatesErrorName(typ=StatesErrorNameType.StatesTaskFailed),
132+
error_name=StatesErrorName(typ=StatesErrorNameType.StatesRuntime),
132133
event_type=HistoryEventType.TaskFailed,
133134
event_details=EventDetails(
134135
taskFailedEventDetails=TaskFailedEventDetails(
135-
error="Unsupported Error Handling",
136+
error=StatesErrorNameType.StatesRuntime.to_name(),
136137
cause=str(ex),
137138
)
138139
),
@@ -184,10 +185,7 @@ def _handle_uncaught(self, ex: Exception, env: Environment):
184185

185186
@staticmethod
186187
def _terminate_with_event(failure_event: FailureEvent, env: Environment) -> None:
187-
# Halt execution with the given failure event.
188-
env.set_error(
189-
ExecutionFailedEventDetails(**(list(failure_event.event_details.values())[0]))
190-
)
188+
raise FailureEventException(failure_event=failure_event)
191189

192190
def _evaluate_with_timeout(self, env: Environment) -> None:
193191
self.timeout.eval(env=env)

localstack/services/stepfunctions/asl/component/state/state_execution/state_task/service/resource.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
class ResourceCondition(str):
1111
WaitForTaskToken = "waitForTaskToken"
12+
Sync = "sync"
1213

1314

1415
class ResourceARN(TypedDict):
@@ -121,5 +122,7 @@ def __init__(
121122
match tail_parts[-1]:
122123
case "waitForTaskToken":
123124
self.condition = ResourceCondition.WaitForTaskToken
125+
case "sync":
126+
self.condition = ResourceCondition.Sync
124127
case unsupported:
125128
raise RuntimeError(f"Unsupported condition '{unsupported}'.")

localstack/services/stepfunctions/asl/component/state/state_execution/state_task/service/state_task_service.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ def for_service(cls, service_name: str) -> StateTaskService:
6565
)
6666

6767
return StateTaskServiceSqs()
68+
case "states":
69+
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.service.state_task_service_sfn import (
70+
StateTaskServiceSfn,
71+
)
72+
73+
return StateTaskServiceSfn()
6874

6975
case unknown:
7076
raise NotImplementedError(f"Unsupported service: '{unknown}'.") # noqa

localstack/services/stepfunctions/asl/component/state/state_execution/state_task/service/state_task_service_aws_sdk.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from botocore.config import Config
12
from botocore.exceptions import ClientError
23

34
from localstack.aws.api.stepfunctions import HistoryEventType, TaskFailedEventDetails
@@ -15,7 +16,6 @@
1516
StateTaskServiceCallback,
1617
)
1718
from localstack.services.stepfunctions.asl.component.state.state_props import StateProps
18-
from localstack.services.stepfunctions.asl.eval.callback.callback import CallbackOutcomeFailureError
1919
from localstack.services.stepfunctions.asl.eval.environment import Environment
2020
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails
2121
from localstack.utils.aws import aws_stack
@@ -75,14 +75,10 @@ def _get_task_failure_event(self, error: str, cause: str) -> FailureEvent:
7575
)
7676

7777
def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
78-
if isinstance(ex, CallbackOutcomeFailureError):
79-
return self._get_callback_outcome_failure_event(ex=ex)
80-
if isinstance(ex, TimeoutError):
81-
return self._get_timed_out_failure_event()
82-
83-
norm_service_name: str = self._normalise_service_name(self.resource.api_name)
84-
error: str = self._normalise_exception_name(norm_service_name, ex)
8578
if isinstance(ex, ClientError):
79+
norm_service_name: str = self._normalise_service_name(self.resource.api_name)
80+
error: str = self._normalise_exception_name(norm_service_name, ex)
81+
8682
error_message: str = ex.response["Error"]["Message"]
8783
cause_details = [
8884
f"Service: {norm_service_name}",
@@ -97,14 +93,12 @@ def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
9793
cause: str = f"{error_message} ({', '.join(cause_details)})"
9894
failure_event = self._get_task_failure_event(error=error, cause=cause)
9995
return failure_event
100-
101-
failure_event = self._get_task_failure_event(
102-
error=error, cause=str(ex) # TODO: update cause decoration.
103-
)
104-
return failure_event
96+
return super()._from_error(env=env, ex=ex)
10597

10698
def _eval_service_task(self, env: Environment, parameters: dict) -> None:
107-
api_client = aws_stack.create_external_boto_client(service_name=self._normalised_api_name)
99+
api_client = aws_stack.create_external_boto_client(
100+
service_name=self._normalised_api_name, config=Config(parameter_validation=False)
101+
)
108102
response = getattr(api_client, self._normalised_api_action)(**parameters) or dict()
109103
if response:
110104
response.pop("ResponseMetadata", None)

localstack/services/stepfunctions/asl/component/state/state_execution/state_task/service/state_task_service_callback.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ def _wait_for_task_token(self, env: Environment) -> None: # noqa
8686
else:
8787
raise NotImplementedError(f"Unsupported CallbackOutcome type '{type(outcome)}'.")
8888

89+
def _sync(self, env: Environment) -> None:
90+
raise RuntimeError(
91+
f"Unsupported .sync callback procedure in resource {self.resource.resource_arn}"
92+
)
93+
8994
def _is_condition(self):
9095
return self.resource.condition is not None
9196

@@ -105,6 +110,11 @@ def _get_callback_outcome_failure_event(self, ex: CallbackOutcomeFailureError) -
105110
),
106111
)
107112

113+
def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
114+
if isinstance(ex, CallbackOutcomeFailureError):
115+
return self._get_callback_outcome_failure_event(ex=ex)
116+
return super()._from_error(env=env, ex=ex)
117+
108118
def _eval_execution(self, env: Environment) -> None:
109119
parameters = self._eval_parameters(env=env)
110120
parameters_str = to_json_str(parameters)
@@ -137,10 +147,11 @@ def _eval_execution(self, env: Environment) -> None:
137147
),
138148
)
139149

140-
self._eval_service_task(env=env, parameters=parameters)
150+
normalised_parameters = self._normalised_parameters_bindings(parameters)
151+
self._eval_service_task(env=env, parameters=normalised_parameters)
141152

142153
if self._is_condition():
143-
output = env.stack.pop()
154+
output = env.stack[-1]
144155
env.event_history.add_event(
145156
hist_type_event=HistoryEventType.TaskSubmitted,
146157
event_detail=EventDetails(
@@ -155,6 +166,8 @@ def _eval_execution(self, env: Environment) -> None:
155166
match self.resource.condition:
156167
case ResourceCondition.WaitForTaskToken:
157168
self._wait_for_task_token(env=env)
169+
case ResourceCondition.Sync:
170+
self._sync(env=env)
158171
case unsupported:
159172
raise NotImplementedError(f"Unsupported callback type '{unsupported}'.")
160173

localstack/services/stepfunctions/asl/component/state/state_execution/state_task/service/state_task_service_lambda.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,12 @@
99
from localstack.services.stepfunctions.asl.component.common.error_name.failure_event import (
1010
FailureEvent,
1111
)
12-
from localstack.services.stepfunctions.asl.component.common.error_name.states_error_name import (
13-
StatesErrorName,
14-
)
15-
from localstack.services.stepfunctions.asl.component.common.error_name.states_error_name_type import (
16-
StatesErrorNameType,
17-
)
1812
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task import (
1913
lambda_eval_utils,
2014
)
2115
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.service.state_task_service_callback import (
2216
StateTaskServiceCallback,
2317
)
24-
from localstack.services.stepfunctions.asl.eval.callback.callback import CallbackOutcomeFailureError
2518
from localstack.services.stepfunctions.asl.eval.environment import Environment
2619
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails
2720

@@ -60,11 +53,6 @@ def _error_cause_from_client_error(client_error: ClientError) -> tuple[str, str]
6053
return error, cause
6154

6255
def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
63-
if isinstance(ex, CallbackOutcomeFailureError):
64-
return self._get_callback_outcome_failure_event(ex=ex)
65-
if isinstance(ex, TimeoutError):
66-
return self._get_timed_out_failure_event()
67-
6856
if isinstance(ex, lambda_eval_utils.LambdaFunctionErrorException):
6957
error = "Exception"
7058
error_name = CustomErrorName(error)
@@ -73,10 +61,7 @@ def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
7361
error, cause = self._error_cause_from_client_error(ex)
7462
error_name = CustomErrorName(error)
7563
else:
76-
error = "Exception"
77-
error_name = StatesErrorName(typ=StatesErrorNameType.StatesTaskFailed)
78-
cause = str(ex)
79-
64+
return super()._from_error(env=env, ex=ex)
8065
return FailureEvent(
8166
error_name=error_name,
8267
event_type=HistoryEventType.TaskFailed,

0 commit comments

Comments
 (0)
0