8000 Added hooks to the SFN provider (#7834) · localstack/localstack@fd25513 · GitHub
[go: up one dir, main page]

Skip to content

Commit fd25513

Browse files
authored
Added hooks to the SFN provider (#7834)
1 parent d47f509 commit fd25513

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

localstack/services/stepfunctions/provider.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import logging
2+
import os
13
import threading
24

35
from localstack import config
@@ -16,12 +18,16 @@
1618
from localstack.services.plugins import ServiceLifecycleHook
1719
from localstack.services.stepfunctions.stepfunctions_starter import (
1820
start_stepfunctions,
21+
stop_stepfunctions,
1922
wait_for_stepfunctions,
2023
)
24+
from localstack.state import AssetDirectory, StateVisitor
2125

2226
# lock to avoid concurrency issues when creating state machines in parallel (required for StepFunctions-Local)
2327
CREATION_LOCK = threading.RLock()
2428

29+
LOG = logging.getLogger(__name__)
30+
2531

2632
class StepFunctionsProvider(StepfunctionsApi, ServiceLifecycleHook):
2733
def __init__(self):
@@ -31,10 +37,27 @@ def get_forward_url(self) -> str:
3137
"""Return the URL of the backend StepFunctions server to forward requests to"""
3238
return f"http://{LOCALHOST}:{config.LOCAL_PORT_STEPFUNCTIONS}"
3339

< 10000 code>40+
def accept_state_visitor(self, visitor: StateVisitor):
41+
visitor.visit(AssetDirectory(os.path.join(config.dirs.data, self.service)))
42+
3443
def on_before_start(self):
3544
start_stepfunctions()
3645
wait_for_stepfunctions()
3746

47+
def on_before_state_reset(self):
48+
stop_stepfunctions()
49+
50+
def on_before_state_load(self):
51+
stop_stepfunctions()
52+
53+
def on_after_state_reset(self):
54+
start_stepfunctions()
55+
wait_for_stepfunctions()
56+
57+
def on_after_state_load(self):
58+
start_stepfunctions()
59+
wait_for_stepfunctions()
60+
3861
def create_state_machine(
3962
self, context: RequestContext, request: CreateStateMachineInput
4063
) -> CreateStateMachineOutput:

localstack/services/stepfunctions/stepfunctions_starter.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import logging
2-
from typing import Optional
2+
import subprocess
33

44
from localstack import config
55
from localstack.aws.accounts import get_aws_account_id
66
from localstack.services.infra import do_run, log_startup_message
77
from localstack.services.stepfunctions.packages import stepfunctions_local_package
88
from localstack.utils.aws import aws_stack
99
from localstack.utils.common import wait_for_port_open
10+
from localstack.utils.net import wait_for_port_closed
11+
from localstack.utils.run import ShellCommandThread, wait_for_process_to_be_killed
1012
from localstack.utils.sync import retry
1113

1214
LOG = logging.getLogger(__name__)
@@ -15,7 +17,7 @@
1517
MAX_HEAP_SIZE = "256m"
1618

1719
# todo: will be replaced with plugin mechanism
18-
PROCESS_THREAD = None
20+
PROCESS_THREAD: ShellCommandThread | subprocess.Popen | None = None
1921

2022

2123
# TODO: pass env more explicitly
@@ -66,7 +68,7 @@ def get_command(backend_port):
6668
return cmd
6769

6870

69-
def start_stepfunctions(asynchronous=True, persistence_path: Optional[str] = None):
71+
def start_stepfunctions(asynchronous: bool = True, persistence_path: str | None = None):
7072
# TODO: introduce Server abstraction for StepFunctions process
7173
global PROCESS_THREAD
7274
backend_port = config.LOCAL_PORT_STEPFUNCTIONS
@@ -91,7 +93,22 @@ def wait_for_stepfunctions():
9193
retry(check_stepfunctions, sleep=0.5, retries=15)
9294

9395

94-
def check_stepfunctions(expect_shutdown=False, print_error=False):
96+
def stop_stepfunctions():
97+
if PROCESS_THREAD or not PROCESS_THREAD.process:
98+
return
99+
LOG.debug("Restarting StepFunctions process ...")
100+
101+
pid = PROCESS_THREAD.process.pid
102+
PROCESS_THREAD.stop()
103+
wait_for_port_closed(config.LOCAL_PORT_STEPFUNCTIONS, sleep_time=0.5, retries=15)
104+
try:
105+
# TODO: currently failing in CI (potentially due to a defunct process) - need to investigate!
106+
wait_for_process_to_be_killed(pid, sleep=0.3, retries=10)
107+
except Exception as e:
108+
LOG.warning("StepFunctions process not properly terminated: %s", e)
109+
110+
111+
def check_stepfunctions(expect_shutdown: bool = False, print_error: bool = False) -> None:
95112
out = None
96113
try:
97114
wait_for_port_open(config.LOCAL_PORT_STEPFUNCTIONS, sleep_time=2)

0 commit comments

Comments
 (0)
0