8000 bpo-47075: Add shutdown_timeout to multiprocessing BaseManager by vstinner · Pull Request #32112 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

bpo-47075: Add shutdown_timeout to multiprocessing BaseManager #32112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,7 @@ Manager processes will be shutdown as soon as they are garbage collected or
their parent process exits. The manager classes are defined in the
:mod:`multiprocessing.managers` module:

.. class:: BaseManager([address[, authkey]])
.. class:: BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

Create a BaseManager object.

Expand All @@ -1691,6 +1691,20 @@ their parent process exits. The manager classes are defined in the
*authkey* is ``None`` then ``current_process().authkey`` is used.
Otherwise *authkey* is used and it must be a byte string.

*serializer* must be ``'pickle'`` (use :mod:`pickle` serialization) or
``'xmlrpclib'`` (use :mod:`xmlrpc.client` serialization).

*ctx* is a context object, or ``None`` (use the current context). See the
:func:`get_context` function.

*shutdown_timeout* is a timeout in seconds used to wait until the process
used by the manager completes in the :meth:`shutdown` method. If the
shutdown times out, the process is terminated. If terminating the process
also times out, the process is killed.

.. versionchanged: 3.11
Added the *shutdown_timeout* parameter.

.. method:: start([initializer[, initargs]])

Start a subprocess to start the manager. If *initializer* is not ``None``
Expand Down
16 changes: 10 additions & 6 deletions Lib/multiprocessing/managers.py
10000
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ class BaseManager(object):
_Server = Server

def __init__(self, address=None, authkey=None, serializer='pickle',
ctx=None):
ctx=None, *, shutdown_timeout=1.0):
if authkey is None:
authkey = process.current_process().authkey
self._address = address # XXX not final address if eg ('', 0)
Expand All @@ -507,6 +507,7 @@ def __init__(self, address=None, authkey=None, serializer='pickle',
self._serializer = serializer
self._Listener, self._Client = listener_client[serializer]
self._ctx = ctx or get_context()
self._shutdown_timeout = shutdown_timeout

def get_server(self):
'''
Expand Down Expand Up @@ -570,8 +571,8 @@ def start(self, initializer=None, initargs=()):
self._state.value = State.STARTED
self.shutdown = util.Finalize(
self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey,
self._state, self._Client),
args=(self._process, self._address, self._authkey, self._state,
self._Client, self._shutdown_timeout),
exitpriority=0
)

Expand Down Expand Up @@ -656,7 +657,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()

@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
def _finalize_manager(process, address, authkey, state, _Client,
shutdown_timeout):
'''
Shutdown the manager process; will be registered as a finalizer
'''
Expand All @@ -671,15 +673,17 @@ def _finalize_manager(process, address, authkey, state, _Client):
except Exception:
pass

process.join(timeout=1.0)
process.join(timeout=shutdown_timeout)
if process.is_alive():
util.info('manager still alive')
if hasattr(process, 'terminate'):
util.info('trying to `terminate()` manager process')
process.terminate()
process.join(timeout=0.1)
process.join(timeout=shutdown_timeout)
if process.is_alive():
util.info('manager still alive after terminate')
process.kill()
process.join()

state.value = State.SHUTDOWN
try:
Expand Down
34 changes: 21 additions & 13 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ def _resource_unlink(name, rtype):
else:
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1

# BaseManager.shutdown_timeout
SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT

HAVE_GETVALUE = not getattr(_multiprocessing,
'HAVE_BROKEN_SEM_GETVALUE', False)

Expand Down Expand Up @@ -2897,7 +2900,7 @@ class _TestMyManager(BaseTestCase):
ALLOWED_TYPES = ('manager',)

def test_mymanager(self):
manager = MyManager()
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
self.common(manager)
manager.shutdown()
Expand All @@ -2908,15 +2911,16 @@ def test_mymanager(self):
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))

def test_mymanager_context(self):
with MyManager() as manager:
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
with manager:
self.common(manager)
# bpo-30356: BaseManager._finalize_manager() sends SIGTERM
# to the manager process if it takes longer than 1 second to stop,
# which happens on slow buildbots.
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))

def test_mymanager_context_prestarted(self):
manager = MyManager()
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
with manager:
self.common(manager)
Expand Down Expand Up @@ -2978,8 +2982,8 @@ class _TestRemoteManager(BaseTestCase):
@classmethod
def _putter(cls, address, authkey):
manager = QueueManager2(
address=address, authkey=authkey, serializer=SERIALIZER
)
address=address, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.connect()
queue = manager.get_queue()
# Note that xmlrpclib will deserialize object as a list not a tuple
Expand All @@ -2989,8 +2993,8 @@ def test_remote(self):
authkey = os.urandom(32)

manager = QueueManager(
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER
)
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
self.addCleanup(manager.shutdown)

Expand All @@ -2999,8 +3003,8 @@ def test_remote(self):
p.start()

manager2 = QueueManager2(
address=manager.address, authkey=authkey, serializer=SERIALIZER
)
address=manager.address, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
manager2.connect()
queue = manager2.get_queue()

Expand All @@ -3020,15 +3024,17 @@ class _TestManagerRestart(BaseTestCase):
@classmethod
def _putter(cls, address, authkey):
manager = QueueManager(
address=address, authkey=authkey, serializer=SERIALIZER)
address=address, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.connect()
queue = manager.get_queue()
queue.put('hello world')

def test_rapid_restart(self):
authkey = os.urandom(32)
manager = QueueManager(
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER)
address=(socket_helper.HOST, 0), authkey=authkey,
serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT)
try:
srvr = manager.get_server()
addr = srvr.address
Expand All @@ -3048,7 +3054,8 @@ def test_rapid_restart(self):
manager.shutdown()

manager = QueueManager(
address=addr, authkey=authkey, serializer=SERIALIZER)
address=addr, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
try:
manager.start()
self.addCleanup(manager.shutdown)
Expand All @@ -3059,7 +3066,8 @@ def test_rapid_restart(self):
# (sporadic failure on buildbots)
time.sleep(1.0)
manager = QueueManager(
address=addr, authkey=authkey, serializer=SERIALIZER)
address=addr, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
if hasattr(manager, "shutdown"):
self.addCleanup(manager.shutdown)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add an optional keyword *shutdown_timeout* parameter to the
:class:`multiprocessing.BaseManager` constructor. Kill the process if
terminate() takes longer than the timeout. Patch by Victor Stinner.
0