8000 gh-91231: Add shutdown_timeout to multiprocessing BaseManager (#32112) · python/cpython@061a8bf · GitHub
[go: up one dir, main page]

Skip to content

Commit 061a8bf

Browse files
authored
gh-91231: Add shutdown_timeout to multiprocessing BaseManager (#32112)
Add an optional keyword 'shutdown_timeout' parameter to the multiprocessing.BaseManager constructor. Kill the process if terminate() takes longer than the timeout. Multiprocessing tests pass test.support.SHORT_TIMEOUT to BaseManager.shutdown_timeout.
1 parent 7407008 commit 061a8bf

File tree

4 files changed

+49
-20
lines changed

4 files changed

+49
-20
lines changed

Doc/library/multiprocessing.rst

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1676,7 +1676,7 @@ Manager processes will be shutdown as soon as they are garbage collected or
16761676
their parent process exits. The manager classes are defined in the
16771677
:mod:`multiprocessing.managers` module:
16781678

1679-
.. class:: BaseManager([address[, authkey]])
1679+
.. class:: BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)
16801680

16811681
Create a BaseManager object.
16821682

@@ -1691,6 +1691,20 @@ their parent process exits. The manager classes are defined in the
16911691
*authkey* is ``None`` then ``current_process().authkey`` is used.
16921692
Otherwise *authkey* is used and it must be a byte string.
16931693

1694+
*serializer* must be ``'pickle'`` (use :mod:`pickle` serialization) or
1695+
``'xmlrpclib'`` (use :mod:`xmlrpc.client` serialization).
1696+
1697+
*ctx* is a context object, or ``None`` (use the current context). See the
1698+
:func:`get_context` function.
1699+
1700+
*shutdown_timeout* is a timeout in seconds used to wait until the process
1701+
used by the manager completes in the :meth:`shutdown` method. If the
1702+
shutdown times out, the process is terminated. If terminating the process
1703+
also times out, the process is killed.
1704+
1705+
.. versionchanged: 3.11
1706+
Added the *shutdown_timeout* parameter.
1707+
16941708
.. method:: start([initializer[, initargs]])
16951709

16961710
Start a subprocess to start the manager. If *initializer* is not ``None``

Lib/multiprocessing/managers.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ class BaseManager(object):
497497
_Server = Server
498498

499499
def __init__(self, address=None, authkey=None, serializer='pickle',
500-
ctx=None):
500+
ctx=None, *, shutdown_timeout=1.0):
501501
if authkey is None:
502502
authkey = process.current_process().authkey
503503
self._address = address # XXX not final address if eg ('', 0)
@@ -507,6 +507,7 @@ def __init__(self, address=None, authkey=None, serializer='pickle',
507507
self._serializer = serializer
508508
self._Listener, self._Client = listener_client[serializer]
509509
self._ctx = ctx or get_context()
510+
self._shutdown_timeout = shutdown_timeout
510511

511512
def get_server(self):
512513
'''
@@ -570,8 +571,8 @@ def start(self, initializer=None, initargs=()):
570571
self._state.value = State.STARTED
571572
self.shutdown = util.Finalize(
572573
self, type(self)._finalize_manager,
573-
args=(self._process, self._address, self._authkey,
574-
self._state, self._Client),
574+
args=(self._process, self._address, self._authkey, self._state,
575+
self._Client, self._shutdown_timeout),
575576
exitpriority=0
576577
)
577578

@@ -656,7 +657,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
656657
self.shutdown()
657658

658659
@staticmethod
659-
def _finalize_manager(process, address, authkey, state, _Client):
660+
def _finalize_manager(process, address, authkey, state, _Client,
661+
shutdown_timeout):
660662
'''
661663
Shutdown the manager process; will be registered as a finalizer
662664
'''
@@ -671,15 +673,17 @@ def _finalize_manager(process, address, authkey, state, _Client):
671673
except Exception:
672674
pass
673675

674-
process.join(timeout=1.0)
676+
process.join(timeout=shutdown_timeout)
675677
if process.is_alive():
676678
util.info('manager still alive')
677679
if hasattr(process, 'terminate'):
678680
util.info('trying to `terminate()` manager process')
679681
process.terminate()
680-
process.join(timeout=0.1)
682+
process.join(timeout=shutdown_timeout)
681683
if process.is_alive():
682684
util.info('manager still alive after terminate')
685+
process.kill()
686+
process.join()
683687

684688
state.value = State.SHUTDOWN
685689
try:

Lib/test/_test_multiprocessing.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ def _resource_unlink(name, rtype):
119119
else:
120120
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
121121

122+
# BaseManager.shutdown_timeout
123+
SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT
124+
122125
HAVE_GETVALUE = not getattr(_multiprocessing,
123126
'HAVE_BROKEN_SEM_GETVALUE', False)
124127

@@ -2897,7 +2900,7 @@ class _TestMyManager(BaseTestCase):
28972900
ALLOWED_TYPES = ('manager',)
28982901

28992902
def test_mymanager(self):
2900-
manager = MyManager()
2903+
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
29012904
manager.start()
29022905
self.common(manager)
29032906
manager.shutdown()
@@ -2908,15 +2911,16 @@ def test_mymanager(self):
29082911
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
29092912

29102913
def test_mymanager_context(self):
2911-
with MyManager() as manager:
2914+
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
2915+
with manager:
29122916
self.common(manager)
29132917
# bpo-30356: BaseManager._finalize_manager() sends SIGTERM
29142918
# to the manager process if it takes longer than 1 second to stop,
29152919
# which happens on slow buildbots.
29162920
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
29172921

29182922
def test_mymanager_context_prestarted(self):
2919-
manager = MyManager()
2923+
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
29202924
manager.start()
29212925
with manager:
29222926
self.common(manager)
@@ -2978,8 +2982,8 @@ class _TestRemoteManager(BaseTestCase):
29782982
@classmethod
29792983
def _putter(cls, address, authkey):
29802984
manager = QueueManager2(
2981-
address=address, authkey=authkey, serializer=SERIALIZER
2982-
)
2985+
address=address, authkey=authkey, serializer=SERIALIZER,
2986+
shutdown_timeout=SHUTDOWN_TIMEOUT)
29832987
manager.connect()
29842988
queue = manager.get_queue()
29852989
# Note that xmlrpclib will deserialize object as a list not a tuple
@@ -2989,8 +2993,8 @@ def test_remote(self):
29892993
authkey = os.urandom(32)
29902994

29912995
manager = QueueManager(
2992-
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER
2993-
)
2996+
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER,
2997+
shutdown_timeout=SHUTDOWN_TIMEOUT)
29942998
manager.start()
29952999
self.addCleanup(manager.shutdown)
29963000

@@ -2999,8 +3003,8 @@ def test_remote(self):
29993003
p.start()
30003004

30013005
manager2 = QueueManager2(
3002-
address=manager.address, authkey=authkey, serializer=SERIALIZER
3003-
)
3006+
address=manager.address, authkey=authkey, serializer=SERIALIZER,
3007+
shutdown_timeout=SHUTDOWN_TIMEOUT)
30043008
manager2.connect()
30053009
queue = manager2.get_queue()
30063010

@@ -3020,15 +3024,17 @@ class _TestManagerRestart(BaseTestCase):
30203024
@classmethod
30213025
def _putter(cls, address, authkey):
30223026
manager = QueueManager(
3023-
address=address, authkey=authkey, serializer=SERIALIZER)
3027+
address=address, authkey=authkey, serializer=SERIALIZER,
3028+
shutdown_timeout=SHUTDOWN_TIMEOUT)
30243029
manager.connect()
30253030
queue = manager.get_queue()
30263031
queue.put('hello world')
30273032

30283033
def test_rapid_restart(self):
30293034
authkey = os.urandom(32)
30303035
manager = QueueManager(
3031-
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER)
3036+
address=(socket_helper.HOST, 0), authkey=authkey,
3037+
serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT)
30323038
try:
30333039
srvr = manager.get_server()
30343040
addr = srvr.address
@@ -3048,7 +3054,8 @@ def test_rapid_restart(self):
30483054
manager.shutdown()
30493055

30503056
manager = QueueManager(
3051-
address=addr, authkey=authkey, serializer=SERIALIZER)
3057+
address=addr, authkey=authkey, serializer=SERIALIZER,
3058+
shutdown_timeout=SHUTDOWN_TIMEOUT)
30523059
try:
30533060
manager.start()
30543061
self.addCleanup(manager.shutdown)
@@ -3059,7 +3066,8 @@ def test_rapid_restart(self):
30593066
# (sporadic failure on buildbots)
30603067
time.sleep(1.0)
30613068
manager = QueueManager(
3062-
address=addr, authkey=authkey, serializer=SERIALIZER)
3069+
address=addr, authkey=authkey, serializer=SERIALIZER,
3070+
shutdown_timeout=SHUTDOWN_TIMEOUT)
30633071
if hasattr(manager, "shutdown"):
30643072
self.addCleanup(manager.shutdown)
30653073

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add an optional keyword *shutdown_timeout* parameter to the
2+
:class:`multiprocessing.BaseManager` constructor. Kill the process if
3+
terminate() takes longer than the timeout. Patch by Victor Stinner.

0 commit comments

Comments
 (0)
0