10000 Add idempotency when calling StartExecution in Standard Step Function… · localstack/localstack@63d8634 · GitHub
[go: up one dir, main page]

Skip to content

Commit 63d8634

Browse files
authored
Add idempotency when calling StartExecution in Standard Step Functions (#11118)
1 parent 8b67be3 commit 63d8634

File tree

4 files changed

+186
-10
lines changed

4 files changed

+186
-10
lines changed

localstack-core/localstack/services/stepfunctions/provider.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,36 @@ def _idempotent_revision(
284284
return state_machine
285285
return None
286286

287+
def _idempotent_start_execution(
288+
self,
289+
execution: Optional[Execution],
290+
state_machine: StateMachineInstance,
291+
name: Name,
292+
input_data: SensitiveData,
293+
) -> Optional[Execution]:
294+
# StartExecution is idempotent for STANDARD workflows. For a STANDARD workflow,
295+
# if you call StartExecution with the same name and input as a running execution,
296+
# the call succeeds and return the same response as the original request.
297+
# If the execution is closed or if the input is different,
298+
# it returns a 400 ExecutionAlreadyExists error. You can reuse names after 90 days.
299+
300+
if not execution:
301+
return None
302+
303+
match (name, input_data, execution.exec_status, state_machine.sm_type):
304+
case (
305+
execution.name,
306+
execution.input_data,
307+
ExecutionStatus.RUNNING,
308+
StateMachineType.STANDARD,
309+
):
310+
return execution
311+
312+
raise CommonServiceException(
313+
code="ExecutionAlreadyExists",
314+
message=f"Execution Already Exists: '{execution.exec_arn}'",
315+
)
316+
287317
def _revision_by_name(
288318
self, context: RequestContext, name: str
289319
) -> Optional[StateMachineInstance]:
@@ -570,8 +600,17 @@ def start_execution(
570600
# Exhaustive check on STANDARD and EXPRESS type, validated on creation.
571601
exec_arn = stepfunctions_express_execution_arn(normalised_state_machine_arn, exec_name)
572602

573-
if exec_arn in self.get_store(context).executions:
574-
raise InvalidName() # TODO
603+
if execution := self.get_store(context).executions.get(exec_arn):
604+
# Return already running execution if name and input match
605+
existing_execution = self._idempotent_start_execution(
606+
execution=execution,
607+
state_machine=state_machine_clone,
608+
name=name,
609+
input_data=input_data,
610+
)
611+
612+
if existing_execution:
613+
return existing_execution.to_start_output()
575614

576615
# Create the execution logging session, if logging is configured.
577616
cloud_watch_logging_session = None
@@ -595,6 +634,7 @@ def start_execution(
595634
trace_header=trace_header,
596635
activity_store=self.get_store(context).activities,
597636
)
637+
598638
self.get_store(context).executions[exec_arn] = execution
599639

600640
execution.start()

tests/aws/services/stepfunctions/v2/test_sfn_api.py

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
import pytest
44
import yaml
55
from botocore.exceptions import ClientError
6-
from localstack_snapshot.snapshots.transformer import RegexTransformer
6+
from localstack_snapshot.snapshots.transformer import JsonpathTransformer, RegexTransformer
77

88
from localstack.aws.api.lambda_ import Runtime
99
from localstack.aws.api.stepfunctions import HistoryEventList, StateMachineType
1010
from localstack.testing.pytest import markers
1111
from localstack.testing.pytest.stepfunctions.utils import (
1212
await_execution_aborted,
13+
await_execution_started,
1314
await_execution_success,
1415
await_execution_terminated,
1516
await_list_execution_status,
@@ -22,6 +23,9 @@
2223
from localstack.utils.sync import retry, wait_until
2324
from tests.aws.services.stepfunctions.lambda_functions import lambda_functions
2425
from tests.aws.services.stepfunctions.templates.base.base_templates import BaseTemplate
26+
from tests.aws.services.stepfunctions.templates.callbacks.callback_templates import (
27+
CallbackTemplates as CT,
28+
)
2529

2630

2731
@markers.snapshot.skip_snapshot_verify(paths=["$..tracingConfiguration"])
@@ -414,7 +418,88 @@ def _verify_paginate_results() -> list:
414418
# expect no state machines created in this test to be leftover after deletion
415419
wait_until(lambda: not _list_state_machines(expected_results_count=0), max_retries=20)
416420

417-
@markers.aws.needs_fixing
421+
@markers.aws.validated
422+
def test_start_execution_idempotent(
423+
self,
424+
create_iam_role_for_sfn,
425+
create_state_machine,
426+
sqs_send_task_success_state_machine,
427+
sqs_create_queue,
428+
sfn_snapshot,
429+
aws_client,
430+
):
431+
sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sqs_integration())
432+
sfn_snapshot.add_transformer(
433+
JsonpathTransformer(
434+
jsonpath="$..TaskToken",
435+
replacement="<task_token>",
436+
replace_reference=True,
437+
)
438+
)
439+
440+
queue_name = f"queue-{short_uid()}"
441+
queue_url = sqs_create_queue(QueueName=queue_name)
442+
sfn_snapshot.add_transformer(RegexTransformer(queue_url, "<sqs_queue_url>"))
443+
sfn_snapshot.add_transformer(RegexTransformer(queue_name, "<sqs_queue_name>"))
444+
445+
snf_role_arn = create_iam_role_for_sfn()
446+
sfn_snapshot.add_transformer(RegexTransformer(snf_role_arn, "snf_role_arn"))
447+
448+
sm_name: str = f"statemachine_{short_uid()}"
449+
execution_name: str = f"execution_name_{short_uid()}"
450+
451+
template = BaseTemplate.load_sfn_template(CT.SQS_WAIT_FOR_TASK_TOKEN)
452+
definition = json.dumps(template)
453+
454+
creation_resp = create_state_machine(
455+
name=sm_name, definition=definition, roleArn=snf_role_arn
456+
)
457+
sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_create_arn(creation_resp, 0))
458+
sfn_snapshot.match("creation_resp", creation_resp)
459+
state_machine_arn = creation_resp["stateMachineArn"]
460+
461+
input_data = json.dumps({"QueueUrl": queue_url, "Message": "test_message_txt"})
462+
exec_resp = aws_client.stepfunctions.start_execution(
463+
stateMachineArn=state_machine_arn, input=input_data, name=execution_name
464+
)
465+
sfn_snapshot.add_transformer(sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp, 0))
466+
sfn_snapshot.match("exec_resp", exec_resp)
467+
execution_arn = exec_resp["executionArn"]
468+
469+
await_execution_started(
470+
stepfunctions_client=aws_client.stepfunctions, execution_arn=execution_arn
471+
)
472+
473+
exec_resp_idempotent = aws_client.stepfunctions.start_execution(
474+
stateMachineArn=state_machine_arn, input=input_data, name=execution_name
475+
)
476+
sfn_snapshot.add_transformer(
477+
sfn_snapshot.transform.sfn_sm_exec_arn(exec_resp_idempotent, 0)
478+
)
479+
sfn_snapshot.match("exec_resp_idempotent", exec_resp_idempotent)
480+
481+
# Should fail because the execution has the same 'name' as another but a different 'input'.
482+
with pytest.raises(Exception) as err:
483+
aws_client.stepfunctions.start_execution(
484+
stateMachineArn=state_machine_arn,
485+
input='{"body" : "different-data"}',
486+
name=execution_name,
487+
)
488+
sfn_snapshot.match("start_exec_already_exists", err.value.response)
489+
490+
stop_res = aws_client.stepfunctions.stop_execution(executionArn=execution_arn)
491+
sfn_snapshot.match("stop_res", stop_res)
492+
493+
sqs_send_task_success_state_machine(queue_name)
494+
495+
await_execution_terminated(
496+
stepfunctions_client=aws_client.stepfunctions, execution_arn=execution_arn
497+
)
498+
499+
assert exec_resp_idempotent["executionArn"] == execution_arn
500+
501+
@markers.aws.validated
502+
@markers.snapshot.skip_snapshot_verify(paths=["$..redriveCount"])
418503
def test_start_execution(
419504
self, create_iam_role_for_sfn, create_state_machine, sfn_snapshot, aws_client
420505
):
@@ -1261,7 +1346,7 @@ def test_describe_execution(
12611346
)
12621347

12631348
describe_execution = aws_client.stepfunctions.describe_execution(executionArn=execution_arn)
1264-
sfn_snapshot.match("ddescribe_execution", describe_execution)
1349+
sfn_snapshot.match("describe_execution", describe_execution)
12651350

12661351
@markers.aws.validated
12671352
def test_describe_execution_no_such_state_machine(

tests/aws/services/stepfunctions/v2/test_sfn_api.snapshot.json

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@
275275
}
276276
},
277277
"tests/aws/services/stepfunctions/v2/test_sfn_api.py::TestSnfApi::test_start_execution": {
278-
"recorded-date": "22-06-2023, 13:52:39",
278+
"recorded-date": "30-06-2024, 16:12:59",
279279
"recorded-content": {
280280
"creation_resp": {
281281
"creationDate": "datetime",
@@ -298,6 +298,7 @@
298298
{
299299
"executionArn": "arn:<partition>:states:<region>:111111111111:execution:<ArnPart_0idx>:<ExecArnPart_0idx>",
300300
"name": "<ExecArnPart_0idx>",
301+
"redriveCount": 0,
301302
"startDate": "datetime",
302303
"stateMachineArn": "arn:<partition>:states:<region>:111111111111:stateMachine:<ArnPart_0idx>",
303304
"status": "SUCCEEDED",
@@ -1534,7 +1535,7 @@
15341535
}
15351536
},
15361537
"tests/aws/services/stepfunctions/v2/test_sfn_api.py::TestSnfApi::test_describe_execution": {
1537-
"recorded-date": "08-03-2024, 11:52:48",
1538+
"recorded-date": "21-08-2024, 09:14:30",
15381539
"recorded-content": {
15391540
"creation_resp": {
15401541
"creationDate": "datetime",
@@ -1552,7 +1553,7 @@
15521553
"HTTPStatusCode": 200
15531554
}
15541555
},
1555-
"ddescribe_execution": {
1556+
"describe_execution": {
15561557
"executionArn": "arn:<partition>:states:<region>:111111111111:execution:<ArnPart_0idx>:<ExecArnPart_0idx>",
15571558
"input": {},
15581559
"inputDetails": {
@@ -2260,5 +2261,52 @@
22602261
}
22612262
}
22622263
}
2264+
},
2265+
"tests/aws/services/stepfunctions/v2/test_sfn_api.py::TestSnfApi::test_start_execution_idempotent": {
2266+
"recorded-date": "21-08-2024, 09:15:48",
2267+
"recorded-content": {
2268+
"creation_resp": {
2269+
"creationDate": "datetime",
2270+
"stateMachineArn": "arn:<partition>:states:<region>:111111111111:stateMachine:<resource:1>",
2271+
"ResponseMetadata": {
2272+
"HTTPHeaders": {},
2273+
"HTTPStatusCode": 200
2274+
}
2275+
},
2276+
"exec_resp": {
2277+
"executionArn": "arn:<partition>:states:<region>:111111111111:execution:<resource:1>:<resource:2>",
2278+
"startDate": "datetime",
2279+
"ResponseMetadata": {
2280+
"HTTPHeaders": {},
2281+
"HTTPStatusCode": 200
2282+
}
2283+
},
2284+
"exec_resp_idempotent": {
2285+
"executionArn": "arn:<partition>:states:<region>:111111111111:execution:<resource:1>:<resource:2>",
2286+
F438 "startDate": "datetime",
2287+
"ResponseMetadata": {
2288+
"HTTPHeaders": {},
2289+
"HTTPStatusCode": 200
2290+
}
2291+
},
2292+
"start_exec_already_exists": {
2293+
"Error": {
2294+
"Code": "ExecutionAlreadyExists",
2295+
"Message": "Execution Already Exists: 'arn:<partition>:states:<region>:111111111111:execution:<resource:1>:<resource:2>'"
2296+
},
2297+
"message": "Execution Already Exists: 'arn:<partition>:states:<region>:111111111111:execution:<resource:1>:<resource:2>'",
2298+
"ResponseMetadata": {
2299+
"HTTPHeaders": {},
2300+
"HTTPStatusCode": 400
2301+
}
2302+
},
2303+
"stop_res": {
2304+
"stopDate": "datetime",
2305+
"ResponseMetadata": {
2306+
"HTTPHeaders": {},
2307+
"HTTPStatusCode": 200
2308+
}
2309+
}
2310+
}
22632311
}
22642312
}

tests/aws/services/stepfunctions/v2/test_sfn_api.validation.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"last_validated_date": "2023-06-22T11:47:36+00:00"
4646
},
4747
"tests/aws/services/stepfunctions/v2/test_sfn_api.py::TestSnfApi::test_describe_execution": {
48-
"last_validated_date": "2024-03-08T11:52:48+00:00"
48+
"last_validated_date": "2024-08-21T09:14:30+00:00"
4949
},
5050
"tests/aws/services/stepfunctions/v2/test_sfn_api.py::TestSnfApi::test_describe_execution_arn_containing_punctuation": {
5151
"last_validated_date": "2024-06-11T14:54:08+00:00"
@@ -102,7 +102,10 @@
102102
"last_validated_date": "2024-07-01T12:04:17+00:00"
103103
},
104104
"tests/aws/services/stepfunctions/v2/test_sfn_api.py::TestSnfApi::test_start_execution": {
105-
"last_validated_date": "2023-06-22T11:52:39+00:00"
105+
"last_validated_date": "2024-06-30T16:12:59+00:00"
106+
},
107+
"tests/aws/services/stepfunctions/v2/test_sfn_api.py::TestSnfApi::test_start_execution_idempotent": {
108+
"last_validated_date": "2024-08-21T09:15:48+00:00"
106109
},
107110
"tests/aws/services/stepfunctions/v2/test_sfn_api.py::TestSnfApi::test_start_sync_execution": {
108111
"last_validated_date": "2024-07-03T17:10:25+00:00"

0 commit comments

Comments
 (0)
0