8000 bpo-31310: multiprocessing's semaphore tracker should be launched aga… · python/cpython@cbe1756 · GitHub
[go: up one dir, main page]

Skip to content

Commit cbe1756

Browse files
authored
bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed (#3247)
* bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed * Avoid mucking with process state in test. Add a warning if the semaphore process died, as semaphores may then be leaked. * Add NEWS entry
1 parent fc6b348 commit cbe1756

File tree

3 files changed

+57
-7
lines changed

3 files changed

+57
-7
lines changed

Lib/multiprocessing/semaphore_tracker.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class SemaphoreTracker(object):
2929
def __init__(self):
3030
self._lock = threading.Lock()
3131
self._fd = None
32+
self._pid = None
3233

3334
def getfd(self):
3435
self.ensure_running()
@@ -40,8 +41,20 @@ def ensure_running(self):
4041
This can be run from any process. Usually a child process will use
4142
the semaphore created by its parent.'''
4243
with self._lock:
43-
if self._fd is not None:
44-
return
44+
if self._pid is not None:
45+
# semaphore tracker was launched before, is it still running?
46+
pid, status = os.waitpid(self._pid, os.WNOHANG)
47+
if not pid:
48+
# => still alive
49+
return
50+
# => dead, launch it again
51+
os.close(self._fd)
52+
self._fd = None
53+
self._pid = None
54+
55+
warnings.warn('semaphore_tracker: process died unexpectedly, '
56+
'relaunching. Some semaphores might leak.')
57+
4558
fds_to_pass = []
4659
try:
4760
fds_to_pass.append(sys.stderr.fileno())
@@ -55,12 +68,13 @@ def ensure_running(self):
5568
exe = spawn.get_executable()
5669
args = [exe] + util._args_from_interpreter_flags()
5770
args += ['-c', cmd % r]
58-
util.spawnv_passfds(exe, args, fds_to_pass)
71+
pid = util.spawnv_passfds(exe, args, fds_to_pass)
5972
except:
6073
os.close(w)
6174
raise
6275
else:
6376
self._fd = w
77+
self._pid = pid
6478
finally:
6579
os.close(r)
6680

Lib/test/_test_multiprocessing.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import unittest
66
import queue as pyqueue
7+
import contextlib
78
import time
89
import io
910
import itertools
@@ -4344,14 +4345,14 @@ def test_preload_resources(self):
43444345
self.fail("failed spawning forkserver or grandchild")
43454346

43464347

4347-
#
4348-
# Check that killing process does not leak named semaphores
4349-
#
4350-
43514348
@unittest.skipIf(sys.platform == "win32",
43524349
"test semantics don't make sense on Windows")
43534350
class TestSemaphoreTracker(unittest.TestCase):
4351+
43544352
def test_semaphore_tracker(self):
4353+
#
4354+
# Check that killing process does not leak named semaphores
4355+
#
43554356
import subprocess
43564357
cmd = '''if 1:
43574358
import multiprocessing as mp, time, os
@@ -4385,6 +4386,40 @@ def test_semaphore_tracker(self):
43854386
self.assertRegex(err, expected)
43864387
self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
43874388

4389+
def check_semaphore_tracker_death(self, signum, should_die):
4390+
# bpo-31310: if the semaphore tracker process has died, it should
4391+
# be restarted implicitly.
4392+
from multiprocessing.semaphore_tracker import _semaphore_tracker
4393+
_semaphore_tracker.ensure_running()
4394+
pid = _semaphore_tracker._pid
4395+
os.kill(pid, signum)
4396+
time.sleep(1.0) # give it time to die
4397+
4398+
ctx = multiprocessing.get_context("spawn")
4399+
with contextlib.ExitStack() as stack:
4400+
if should_die:
4401+
stack.enter_context(self.assertWarnsRegex(
4402+
UserWarning,
4403+
"semaphore_tracker: process died"))
4404+
sem = ctx.Semaphore()
4405+
sem.acquire()
4406+
sem.release()
4407+
wr = weakref.ref(sem)
4408+
# ensure `sem` gets collected, which triggers communication with
4409+
# the semaphore tracker
4410+
del sem
4411+
gc.collect()
4412+
self.assertIsNone(wr())
4413+
4414+
def test_semaphore_tracker_sigint(self):
4415+
# Catchable signal (ignored by semaphore tracker)
4416+
self.check_semaphore_tracker_death(signal.SIGINT, False)
4417+
4418+
def test_semaphore_tracker_sigkill(self):
4419+
# Uncatchable signal.
4420+
self.check_semaphore_tracker_death(signal.SIGKILL, True)
4421+
4422+
43884423
class TestSimpleQueue(unittest.TestCase):
43894424

43904425
@classmethod
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
multiprocessing's semaphore tracker should be launched again if crashed.

0 commit comments

Comments
 (0)
0