8000 [SFN] Support for SFN and Sync, reworked exception handling, fixes, tests by MEPalma · Pull Request #8623 · localstack/localstack · GitHub
[go: up one dir, main page]

Skip to content

[SFN] Support for SFN and Sync, reworked exception handling, fixes, tests #8623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
3b0c2e9
base
MEPalma May 8, 2023
2f392e7
fixes
MEPalma May 10, 2023
4d63d30
update snap test
MEPalma May 10, 2023
21f10ee
reworked parameters, improved double dep of lambda state tasks, tests
MEPalma May 11, 2023
0b750d2
minor conflicts
MEPalma May 16, 2023
0cd04f9
Merge branch 'MEP-sfn-callbacks' into MEP-sfn-lambda-fix-params
MEPalma May 16, 2023
d2f539b
Merge branch 'master' into MEP-sfn-lambda-fix-params
MEPalma May 24, 2023
a24ee43
timeouts support
MEPalma May 26, 2023
f07015a
Merge branch 'master' into MEP-sfn-lambda-fix-params
MEPalma May 27, 2023
eb5f0ea
task failure support and tests
MEPalma May 29, 2023
1382972
heartbeat base support
MEPalma May 29, 2023
4c75562
provider
MEPalma May 29, 2023
4f83e57
heartbeat tests, fixes
MEPalma Jun 2, 2023
406f3c0
Merge branch 'master' into MEP-sfn-timeout
MEPalma Jun 21, 2023
ae5ee1e
minor
MEPalma Jun 21, 2023
d0a65b1
pr items
MEPalma Jun 22, 2023
2682ffa
minor cleanup
MEPalma Jun 22, 2023
95ee8e0
Merge branch 'master' into MEP-sfn-timeout
MEPalma Jun 22, 2023
07107db
conflicts, minors
MEPalma Jun 22, 2023
de9a320
fix cycle on send success
MEPalma Jun 22, 2023
1ceb08a
fix fail sender machine
MEPalma Jun 23, 2023
387016f
Merge branch 'master' into MEP-sfn-timeout
MEPalma Jun 23, 2023
51a9f84
Merge branch 'MEP-sfn-timeout' into MEP-sfn-taskfailure
MEPalma Jun 23, 2023
157993c
conflicts
MEPalma Jun 25, 2023
03ba9ce
conflicts, iterator in heartbeat_success machine
MEPalma Jun 25, 2023
497186c
conflicts
MEPalma Jun 26, 2023
d55ace6
update heartbeat snapshot tests
MEPalma Jun 26, 2023
44a83eb
base sfn support working
MEPalma Jun 28, 2023
2e3bb6e
input stringification
MEPalma Jun 28, 2023
e77df2b
sync happy path
MEPalma Jul 3, 2023
a73fe68
[]
MEPalma Jul 4, 2023
a90ecb8
timeout error types
MEPalma Jul 4, 2023
840a8a4
Merge branch 'master' into MEP-snf-heartbeat
MEPalma Jul 5, 2023
ad247a5
Merge branch 'MEP-snf-heartbeat' into MEP-sfn-sync-and-sfn
MEPalma Jul 5, 2023
525fae0
reworked exceptinos handling, fixes, tests
MEPalma Jul 5, 2023
7b85932
Merge branch 'master' into MEP-snf-heartbeat
MEPalma Jul 6, 2023
581f46f
cleanup
MEPalma Jul 6, 2023
b8d2526
Merge branch 'MEP-snf-heartbeat' into MEP-sfn-sync-and-sfn
MEPalma Jul 6, 2023
fc69033
Merge branch 'master' into MEP-sfn-sync-and-sfn
MEPalma Jul 6, 2023
0942505
Merge branch 'master' into MEP-sfn-sync-and-sfn
MEPalma Jul 7, 2023
9edad2d
connect_to in sqs
MEPalma Jul 13, 2023
562c236
Merge branch 'master' into MEP-sfn-sync-and-sfn
MEPalma Jul 13, 2023
e4985ee
Merge branch 'master' into MEP-sfn-sync-and-sfn
MEPalma Jul 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Final

from localstack.aws.api.stepfunctions import HistoryEventType
from localstack.aws.api.stepfunctions import ExecutionFailedEventDetails, HistoryEventType
from localstack.services.stepfunctions.asl.component.common.error_name.error_name import ErrorName
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails

Expand All @@ -16,3 +16,20 @@ def __init__(
self.error_name = error_name
self.event_type = event_type
self.event_details = event_details


class FailureEventException(Exception):
failure_event: Final[ 8000 FailureEvent]

def __init__(self, failure_event: FailureEvent):
self.failure_event = failure_event

def get_execution_failed_event_details(self) -> ExecutionFailedEventDetails:
failure_event_spec = list(self.failure_event.event_details.values())[0]
execution_failed_event_details = ExecutionFailedEventDetails(
error=failure_event_spec.get("error")
or f"NoErrorSpecification in {failure_event_spec}",
)
if "cause" in failure_event_spec:
execution_failed_event_details["cause"] = failure_event_spec["cause"]
return execution_failed_event_details
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
HistoryEventType,
)
from localstack.services.stepfunctions.asl.component.common.comment import Comment
from localstack.services.stepfunctions.asl.component.common.error_name.failure_event import (
FailureEventException,
)
from localstack.services.stepfunctions.asl.component.common.flow.start_at import StartAt
from localstack.services.stepfunctions.asl.component.eval_component import EvalComponent
from localstack.services.stepfunctions.asl.component.state.state import CommonStateField
Expand Down Expand Up @@ -46,8 +49,16 @@ def _eval_body(self, env: Environment) -> None:
while env.is_running():
next_state: CommonStateField = self._get_state(env.next_state_name)
next_state.eval(env)
except FailureEventException as ex:
env.set_error(error=ex.get_execution_failed_event_details())
except Exception as ex:
LOG.debug(f"Stepfunctions computation ended with exception '{ex}'.")
cause = f"{type(ex)}({str(ex)})"
LOG.error(f"Stepfunctions computation ended with exception '{cause}'.")
env.set_error(
ExecutionFailedEventDetails(
error="Internal Error", cause=f"Internal Error due to '{cause}'"
)
)

program_state: ProgramState = env.program_state()
if isinstance(program_state, ProgramError):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,15 @@
from threading import Thread
from typing import Any, Optional

from localstack.aws.api.stepfunctions import (
ExecutionFailedEventDetails,
HistoryEventType,
TaskFailedEventDetails,
)
from localstack.aws.api.stepfunctions import HistoryEventType, TaskFailedEventDetails
from localstack.services.stepfunctions.asl.component.common.catch.catch_decl import CatchDecl
from localstack.services.stepfunctions.asl.component.common.catch.catch_outcome import (
CatchOutcome,
CatchOutcomeNotCaught,
)
from localstack.services.stepfunctions.asl.component.common.error_name.failure_event import (
FailureEvent,
FailureEventException,
)
from localstack.services.stepfunctions.asl.component.common.error_name.states_error_name import (
StatesErrorName,
Expand Down Expand Up @@ -126,13 +123,17 @@ def from_state_props(self, state_props: StateProps) -> None:
self.heartbeat = heartbeat

def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
LOG.warning("State Task executed generic failure event reporting logic.")
if isinstance(ex, FailureEventException):
return ex.failure_event
LOG.warning(
"State Task encountered an unhandled exception that lead to a State.Runtime error."
)
return FailureEvent(
error_name=StatesErrorName(typ=StatesErrorNameType.StatesTaskFailed),
error_name=StatesErrorName(typ=StatesErrorNameType.StatesRuntime),
event_type=HistoryEventType.TaskFailed,
event_details=EventDetails(
taskFailedEventDetails=TaskFailedEventDetails(
error="Unsupported Error Handling",
error=StatesErrorNameType.StatesRuntime.to_name(),
cause=str(ex),
)
),
Expand Down Expand Up @@ -184,10 +185,7 @@ def _handle_uncaught(self, ex: Exception, env: Environment):

@staticmethod
def _terminate_with_event(failure_event: FailureEvent, env: Environment) -> None:
# Halt execution with the given failure event.
env.set_error(
ExecutionFailedEventDetails(**(list(failure_event.event_details.values())[0]))
)
raise FailureEventException(failure_event=failure_event)

def _evaluate_with_timeout(self, env: Environment) -> None:
self.timeout.eval(env=env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

class ResourceCondition(str):
WaitForTaskToken = "waitForTaskToken"
Sync = "sync"


class ResourceARN(TypedDict):
Expand Down Expand Up @@ -121,5 +122,7 @@ def __init__(
match tail_parts[-1]:
case "waitForTaskToken":
self.condition = ResourceCondition.WaitForTaskToken
case "sync":
self.condition = ResourceCondition.Sync
case unsupported:
raise RuntimeError(f"Unsupported condition '{unsupported}'.")
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ def for_service(cls, service_name: str) -> StateTaskService:
)

return StateTaskServiceSqs()
case "states":
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.service.state_task_service_sfn import (
StateTaskServiceSfn,
)

return StateTaskServiceSfn()

case unknown:
raise NotImplementedError(f"Unsupported service: '{unknown}'.") # noqa
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from botocore.config import Config
from botocore.exceptions import ClientError

from localstack.aws.api.stepfunctions import HistoryEventType, TaskFailedEventDetails
Expand All @@ -15,7 +16,6 @@
StateTaskServiceCallback,
)
from localstack.services.stepfunctions.asl.component.state.state_props import StateProps
from localstack.services.stepfunctions.asl.eval.callback.callback import CallbackOutcomeFailureError
from localstack.services.stepfunctions.asl.eval.environment import Environment
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails
from localstack.utils.aws import aws_stack
Expand Down Expand Up @@ -75,14 +75,10 @@ def _get_task_failure_event(self, error: str, cause: str) -> FailureEvent:
)

def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
if isinstance(ex, CallbackOutcomeFailureError):
return self._get_callback_outcome_failure_event(ex=ex)
if isinstance(ex, TimeoutError):
return self._get_timed_out_failure_event()

norm_service_name: str = self._normalise_service_name(self.resource.api_name)
error: str = self._normalise_exception_name(norm_service_name, ex)
if isinstance(ex, ClientError):
norm_service_name: str = self._normalise_service_name(self.resource.api_name)
error: str = self._normalise_exception_name(norm_service_name, ex)

error_message: str = ex.response["Error"]["Message"]
cause_details = [
f"Service: {norm_service_name}",
Expand All @@ -97,14 +93,12 @@ def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
cause: str = f"{error_message} ({', '.join(cause_details)})"
failure_event = self._get_task_failure_event(error=error, cause=cause)
return failure_event

failure_event = self._get_task_failure_event(
error=error, cause=str(ex) # TODO: update cause decoration.
)
return failure_event
return super()._from_error(env=env, ex=ex)

def _eval_service_task(self, env: Environment, parameters: dict) -> None:
api_client = aws_stack.create_external_boto_client(service_name=self._normalised_api_name)
api_client = aws_stack.create_external_boto_client(
service_name=self._normalised_api_name, config=Config(parameter_validation=False)
)
response = getattr(api_client, self._normalised_api_action)(**parameters) or dict()
if response:
response.pop("ResponseMetadata", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ def _wait_for_task_token(self, env: Environment) -> None: # noqa
else:
raise NotImplementedError(f"Unsupported CallbackOutcome type '{type(outcome)}'.")

def _sync(self, env: Environment) -> None:
raise RuntimeError(
f"Unsupported .sync callback procedure in resource {self.resource.resource_arn}"
F987 )

def _is_condition(self):
return self.resource.condition is not None

Expand All @@ -105,6 +110,11 @@ def _get_callback_outcome_failure_event(self, ex: CallbackOutcomeFailureError) -
),
)

def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
if isinstance(ex, CallbackOutcomeFailureError):
return self._get_callback_outcome_failure_event(ex=ex)
return super()._from_error(env=env, ex=ex)

def _eval_execution(self, env: Environment) -> None:
parameters = self._eval_parameters(env=env)
parameters_str = to_json_str(parameters)
Expand Down Expand Up @@ -137,10 +147,11 @@ def _eval_execution(self, env: Environment) -> None:
),
)

self._eval_service_task(env=env, parameters=parameters)
normalised_parameters = self._normalised_parameters_bindings(parameters)
self._eval_service_task(env=env, parameters=normalised_parameters)

if self._is_condition():
output = env.stack.pop()
output = env.stack[-1]
env.event_history.add_event(
hist_type_event=HistoryEventType.TaskSubmitted,
event_detail=EventDetails(
Expand All @@ -155,6 +166,8 @@ def _eval_execution(self, env: Environment) -> None:
match self.resource.condition:
case ResourceCondition.WaitForTaskToken:
self._wait_for_task_token(env=env)
case ResourceCondition.Sync:
self._sync(env=env)
case unsupported:
raise NotImplementedError(f"Unsupported callback type '{unsupported}'.")

Expand Down
7D99
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,12 @@
from localstack.services.stepfunctions.asl.component.common.error_name.failure_event import (
FailureEvent,
)
from localstack.services.stepfunctions.asl.component.common.error_name.states_error_name import (
StatesErrorName,
)
from localstack.services.stepfunctions.asl.component.common.error_name.states_error_name_type import (
StatesErrorNameType,
)
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task import (
lambda_eval_utils,
)
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.service.state_task_service_callback import (
StateTaskServiceCallback,
)
from localstack.services.stepfunctions.asl.eval.callback.callback import CallbackOutcomeFailureError
from localstack.services.stepfunctions.asl.eval.environment import Environment
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails

Expand Down Expand Up @@ -60,11 +53,6 @@ def _error_cause_from_client_error(client_error: ClientError) -> tuple[str, str]
return error, cause

def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
if isinstance(ex, CallbackOutcomeFailureError):
return self._get_callback_outcome_failure_event(ex=ex)
if isinstance(ex, TimeoutError):
return self._get_timed_out_failure_event()

if isinstance(ex, lambda_eval_utils.LambdaFunctionErrorException):
error = "Exception"
error_name = CustomErrorName(error)
Expand All @@ -73,10 +61,7 @@ def _from_error(self, env: Environment, ex: Exception) -> FailureEvent:
error, cause = self._error_cause_from_client_error(ex)
error_name = CustomErrorName(error)
else:
error = "Exception"
error_name = StatesErrorName(typ=StatesErrorNameType.StatesTaskFailed)
cause = str(ex)

return super()._from_error(env=env, ex=ex)
return FailureEvent(
error_name=error_name,
event_type=HistoryEventType.TaskFailed,
Expand Down
Loading
0