10000 [3.6] bpo-31310: multiprocessing's semaphore tracker should be launch… · python/cpython@b5f09ac · GitHub
[go: up one dir, main page]

Skip to content

Commit b5f09ac

Browse files
authored
[3.6] bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed (GH-3247) (#4254)
* bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed * Avoid mucking with process state in test. Add a warning if the semaphore process died, as semaphores may then be leaked. * Add NEWS entry (cherry picked from commit cbe1756)
1 parent f8b3f6b commit b5f09ac

File tree

3 files changed

+57
-7
lines changed

3 files changed

+57
-7
lines changed

Lib/multiprocessing/semaphore_tracker.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class SemaphoreTracker(object):
2929
def __init__(self):
3030
self._lock = threading.Lock()
3131
self._fd = None
32+
self._pid = None
3233

3334
def getfd(self):
3435
self.ensure_running()
@@ -40,8 +41,20 @@ def ensure_running(self):
4041
This can be run from any process. Usually a child process will use
4142
the semaphore created by its parent.'''
4243
with self._lock:
43-
if self._fd is not None:
44-
return
44+
if self._pid is not None:
45+
# semaphore tracker was launched before, is it still running?
46+
pid, status = os.waitpid(self._pid, os.WNOHANG)
47+
if not pid:
48+
# => still alive
49+
return
50+
# => dead, launch it again
51+
os.close(self._fd)
52+
self._fd = None
53+
self._pid = None
54+
55+
warnings.warn('semaphore_tracker: process died unexpectedly, '
56+
'relaunching. Some semaphores might leak.')
57+
4558
fds_to_pass = []
4659
try:
4760
fds_to_pass.append(sys.stderr.fileno())
@@ -55,12 +68,13 @@ def ensure_running(self):
5568
exe = spawn.get_executable()
5669
args = [exe] + util._args_from_interpreter_flags()
5770
args += ['-c', cmd % r]
58-
util.spawnv_passfds(exe, args, fds_to_pass)
71+
pid = util.spawnv_passfds(exe, args, fds_to_pass)
5972
except:
6073
os.close(w)
6174
raise
6275
else:
6376
self._fd = w
77+
self._pid = pid
6478
finally:
6579
os.close(r)
6680

Lib/test/_test_multiprocessing.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import unittest
66
import queue as pyqueue
7+
import contextlib
78
import time
89
import io
910
import itertools
@@ -4125,14 +4126,14 @@ def test_preload_resources(self):
41254126
self.fail("failed spawning forkserver or grandchild")
41264127

41274128

4128-
#
4129-
# Check that killing process does not leak named semaphores
4130-
#
4131-
41324129
@unittest.skipIf(sys.platform == "win32",
41334130
"test semantics don't make sense on Windows")
41344131
class TestSemaphoreTracker(unittest.TestCase):
4132+
41354133
def test_semaphore_tracker(self):
4134+
#
4135+
# Check that killing process does not leak named semaphores
4136+
#
41364137
import subprocess
41374138
cmd = '''if 1:
41384139
import multiprocessing as mp, time, os
@@ -4166,6 +4167,40 @@ def test_semaphore_tracker(self):
41664167
self.assertRegex(err, expected)
41674168
self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
41684169

4170+
def check_semaphore_tracker_death(self, signum, should_die):
4171+
# bpo-31310: if the semaphore tracker process has died, it should
4172+
# be restarted implicitly.
4173+
from multiprocessing.semaphore_tracker import _semaphore_tracker
4174+
_semaphore_tracker.ensure_running()
4175+
pid = _semaphore_tracker._pid
4176+
os.kill(pid, signum)
4177+
time.sleep(1.0) # give it time to die
4178+
4179+
ctx = multiprocessing.get_context("spawn")
4180+
with contextlib.ExitStack() as stack:
4181+
if should_die:
4182+
stack.enter_context(self.assertWarnsRegex(
4183+
UserWarning,
4184+
"semaphore_tracker: process died"))
4185+
sem = ctx.Semaphore()
4186+
sem.acquire()
4187+
sem.release()
4188+
wr = weakref.ref(sem)
4189+
# ensure `sem` gets collected, which triggers communication with
4190+
# the semaphore tracker
4191+
del sem
4192+
gc.collect()
4193+
self.assertIsNone(wr())
4194+
4195+
def test_semaphore_tracker_sigint(self):
4196+
# Catchable signal (ignored by semaphore tracker)
4197+
self.check_semaphore_tracker_death(signal.SIGINT, False)
4198+
4199+
def test_semaphore_tracker_sigkill(self):
4200+
# Uncatchable signal.
4201+
self.check_semaphore_tracker_death(signal.SIGKILL, True)
4202+
4203+
41694204
class TestSimpleQueue(unittest.TestCase):
41704205

41714206
@classmethod
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
multiprocessing's semaphore tracker should be launched again if crashed.

0 commit comments

Comments
 (0)
0