8000 bpo-31308: If multiprocessing's forkserver dies, launch it again when… · python/cpython@fc6b348 · GitHub
[go: up one dir, main page]

Skip to content

Commit fc6b348

Browse files
authored
bpo-31308: If multiprocessing's forkserver dies, launch it again when necessary (#3246)
* bpo-31308: If multiprocessing's forkserver dies, launch it again when necessary. * Fix test on Windows * Add NEWS entry * Adopt a different approach: ignore SIGINT and SIGTERM, as in semaphore tracker. * Fix comment * Make sure the test doesn't muck with process state * Also test previously-started processes * Update 2017-08-30-17-59-36.bpo-31308.KbexyC.rst * Avoid masking SIGTERM in forkserver. It's not necessary and causes a race condition in test_many_processes.
1 parent 4f57409 commit fc6b348

File tree

3 files changed

+66
-5
lines changed

3 files changed

+66
-5
lines changed

Lib/multiprocessing/forkserver.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class ForkServer(object):
3434
def __init__(self):
3535
self._forkserver_address = None
3636
self._forkserver_alive_fd = None
37+
self._forkserver_pid = None
3738
self._inherited_fds = None
3839
self._lock = threading.Lock()
3940
self._preload_modules = ['__main__']
@@ -90,8 +91,17 @@ def ensure_running(self):
9091
'''
9192
with self._lock:
9293
semaphore_tracker.ensure_running()
93-
if self._forkserver_alive_fd is not None:
94-
return
94+
if self._forkserver_pid is not None:
95+
# forkserver was launched before, is it still running?
96+
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
97+
8000 if not pid:
98+
# still alive
99+
return
100+
# dead, launch it again
101+
os.close(self._forkserver_alive_fd)
102+
self._forkserver_address = None
103+
self._forkserver_alive_fd = None
104+
self._forkserver_pid = None
95105

96106
cmd = ('from multiprocessing.forkserver import main; ' +
97107
'main(%d, %d, %r, **%r)')
@@ -127,6 +137,7 @@ def ensure_running(self):
127137
os.close(alive_r)
128138
self._forkserver_address = address
129139
self._forkserver_alive_fd = alive_w
140+
self._forkserver_pid = pid
130141

131142
#
132143
#
@@ -157,11 +168,11 @@ def sigchld_handler(*_unused):
157168
# Dummy signal handler, doesn't do anything
158169
pass
159170

160-
# letting SIGINT through avoids KeyboardInterrupt tracebacks
161-
# unblocking SIGCHLD allows the wakeup fd to notify our event loop
162171
handlers = {
172+
# unblocking SIGCHLD allows the wakeup fd to notify our event loop
163173
signal.SIGCHLD: sigchld_handler,
164-
signal.SIGINT: signal.SIG_DFL,
174+
# protect the process from ^C
175+
signal.SIGINT: signal.SIG_IGN,
165176
}
166177
old_handlers = {sig: signal.signal(sig, val)
167178
for (sig, val) in handlers.items()}

Lib/test/_test_multiprocessing.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,54 @@ def test_error_on_stdio_flush(self):
603603
finally:
604604
setattr(sys, stream_name, old_stream)
605605

606+
@classmethod
607+
def _sleep_and_set_event(self, evt, delay=0.0):
608+
time.sleep(delay)
609+
evt.set()
610+
611+
def check_forkserver_death(self, signum):
612+
# bpo-31308: if the forkserver process has died, we should still
613+
# be able to create and run new Process instances (the forkserver
614+
# is implicitly restarted).
615+
if self.TYPE == 'threads':
616+
self.skipTest('test not appropriate for {}'.format(self.TYPE))
617+
sm = multiprocessing.get_start_method()
618+
if sm != 'forkserver':
619+
# The fork method by design inherits all fds from the parent,
620+
# trying to go against it is a lost battle
621+
self.skipTest('test not appropriate for {}'.format(sm))
622+
623+
from multiprocessing.forkserver import _forkserver
624+
_forkserver.ensure_running()
625+
626+
evt = self.Event()
627+
proc = self.Process(target=self._sleep_and_set_event, args=(evt, 1.0))
628+
proc.start()
629+
630+
pid = _forkserver._forkserver_pid
631+
os.kill(pid, signum)
632+
time.sleep(1.0) # give it time to die
633+
634+
evt2 = self.Event()
635+
proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
636+
proc2.start()
637+
proc2.join()
638+
self.assertTrue(evt2.is_set())
639+
self.assertEqual(proc2.exitcode, 0)
640+
641+
proc.join()
642+
self.assertTrue(evt.is_set())
643+
self.assertIn(proc.exitcode, (0, 255))
644+
645+
def test_forkserver_sigint(self):
646+
# Catchable signal
647+
self.check_forkserver_death(signal.SIGINT)
648+
649+
def test_forkserver_sigkill(self):
650+
# Uncatchable signal
651+
if os.name != 'nt':
652+
self.check_forkserver_death(signal.SIGKILL)
653+
606654

607655
#
608656
#
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Make multiprocessing's forkserver process immune to Ctrl-C and other user interruptions.
2+
If it crashes, restart it when necessary.

0 commit comments

Comments
 (0)
0