1
1
import logging
2
- from typing import Optional
2
+ import subprocess
3
3
4
4
from localstack import config
5
5
from localstack .aws .accounts import get_aws_account_id
6
6
from localstack .services .infra import do_run , log_startup_message
7
7
from localstack .services .stepfunctions .packages import stepfunctions_local_package
8
8
from localstack .utils .aws import aws_stack
9
9
from localstack .utils .common import wait_for_port_open
10
+ from localstack .utils .net import wait_for_port_closed
11
+ from localstack .utils .run import ShellCommandThread , wait_for_process_to_be_killed
10
12
from localstack .utils .sync import retry
11
13
12
14
LOG = logging .getLogger (__name__ )
15
17
MAX_HEAP_SIZE = "256m"
16
18
17
19
# todo: will be replaced with plugin mechanism
18
- PROCESS_THREAD = None
20
+ PROCESS_THREAD : ShellCommandThread | subprocess . Popen | None = None
19
21
20
22
21
23
# TODO: pass env more explicitly
@@ -66,7 +68,7 @@ def get_command(backend_port):
66
68
return cmd
67
69
68
70
69
- def start_stepfunctions (asynchronous = True , persistence_path : Optional [ str ] = None ):
71
+ def start_stepfunctions (asynchronous : bool = True , persistence_path : str | None = None ):
70
72
# TODO: introduce Server abstraction for StepFunctions process
71
73
global PROCESS_THREAD
72
74
backend_port = config .LOCAL_PORT_STEPFUNCTIONS
@@ -91,7 +93,22 @@ def wait_for_stepfunctions():
91
93
retry (check_stepfunctions , sleep = 0.5 , retries = 15 )
92
94
93
95
94
- def check_stepfunctions (expect_shutdown = False , print_error = False ):
96
+ def stop_stepfunctions ():
97
+ if PROCESS_THREAD or not PROCESS_THREAD .process :
98
+ return
99
+ LOG .debug ("Restarting StepFunctions process ..." )
100
+
101
+ pid = PROCESS_THREAD .process .pid
102
+ PROCESS_THREAD .stop ()
103
+ wait_for_port_closed (config .LOCAL_PORT_STEPFUNCTIONS , sleep_time = 0.5 , retries = 15 )
104
+ try :
105
+ # TODO: currently failing in CI (potentially due to a defunct process) - need to investigate!
106
+ wait_for_process_to_be_killed (pid , sleep = 0.3 , retries = 10 )
107
+ except Exception as e :
108
+ LOG .warning ("StepFunctions process not properly terminated: %s" , e )
109
+
110
+
111
+ def check_stepfunctions (expect_shutdown : bool = False , print_error : bool = False ) -> None :
95
112
out = None
96
113
try :
97
114
wait_for_port_open (config .LOCAL_PORT_STEPFUNCTIONS , sleep_time = 2 )
0 commit comments