From 47b162aac9d4d4c14449506989715a4f1738f449 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 10:31:10 -0800 Subject: [PATCH 01/33] gh-128041 - Add a terminate_workers method to ProcessPoolExecutor Provides a way to forcefully stop all the workers in the pool Typically this would be used as a last effort to stop all workers if unable to shutdown / join in the expected way --- Lib/concurrent/futures/process.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 42eee72bc1457f..72525cea1a56e6 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -57,6 +57,7 @@ import weakref from functools import partial import itertools +import signal import sys from traceback import format_exception @@ -855,3 +856,29 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ + + def terminate_workers(self, signal=signal.SIGINT): + """Attempts to terminate the executor's workers using the given signal. + Iterates through all of the current processes and sends the given signal if + the process is still alive. + + After terminating workers, the pool will be in a broken state and no longer usable. + + Args: + signal: The signal to send to each worker process. Defaults to + signal.SIGINT. + """ + if self._processes: + for pid, proc in self._processes.items(): + try: + is_alive = proc.is_alive() + except ValueError: + # The process is already exited/closed out. + is_alive = False + + if is_alive: + try: + os.kill(pid, signal) + except ProcessLookupError: + # The process just ended before our signal + pass From 6ef88338b9050e9b2b6f5068dce1015e3e4bf8e4 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Tue, 17 Dec 2024 18:53:24 +0000 Subject: [PATCH 02/33] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst diff --git a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst new file mode 100644 index 00000000000000..1afedd97d071ac --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst @@ -0,0 +1 @@ +Add a `terminate_workers` method to `ProcessPoolExecutor` to allow a way to attempt to force kill the worker processes. From 61c9b1481746e7a7d18c6d53a4f454228983465d Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 10:54:51 -0800 Subject: [PATCH 03/33] Fix lint --- .../next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst index 1afedd97d071ac..d33ec0fe9d422d 100644 --- a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst +++ b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst @@ -1 +1 @@ -Add a `terminate_workers` method to `ProcessPoolExecutor` to allow a way to attempt to force kill the worker processes. +Add a ``terminate_workers`` method to ``ProcessPoolExecutor`` to allow a way to attempt to force kill the worker processes. From 3bf5464143436a253e5104ae63fb8b294ca35c7a Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 11:16:44 -0800 Subject: [PATCH 04/33] Swap to SIGTERM as the default --- Lib/concurrent/futures/process.py | 32 ++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 72525cea1a56e6..b69ed7e4492e77 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -857,7 +857,7 @@ def shutdown(self, wait=True, *, cancel_futures=False): shutdown.__doc__ = _base.Executor.shutdown.__doc__ - def terminate_workers(self, signal=signal.SIGINT): + def terminate_workers(self, signal=signal.SIGTERM): """Attempts to terminate the executor's workers using the given signal. Iterates through all of the current processes and sends the given signal if the process is still alive. @@ -866,19 +866,21 @@ def terminate_workers(self, signal=signal.SIGINT): Args: signal: The signal to send to each worker process. Defaults to - signal.SIGINT. + signal.SIGTERM. """ - if self._processes: - for pid, proc in self._processes.items(): - try: - is_alive = proc.is_alive() - except ValueError: - # The process is already exited/closed out. - is_alive = False + if not self._processes: + return - if is_alive: - try: - os.kill(pid, signal) - except ProcessLookupError: - # The process just ended before our signal - pass + for pid, proc in self._processes.items(): + try: + is_alive = proc.is_alive() + except ValueError: + # The process is already exited/closed out. + is_alive = False + + if is_alive: + try: + os.kill(pid, signal) + except ProcessLookupError: + # The process just ended before our signal + pass From 4b285b88b7180de209bb35de6cf1c635fd34ac8f Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 12:16:01 -0800 Subject: [PATCH 05/33] Add some tests --- .../test_process_pool.py | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 8b1bdaa33d8f5c..14c0edf94749c0 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,8 +1,11 @@ +import multiprocessing import os +import queue import sys import threading import time import unittest +import unittest.mock from concurrent import futures from concurrent.futures.process import BrokenProcessPool @@ -22,6 +25,12 @@ def __init__(self, mgr): def __del__(self): self.event.set() +def _put_sleep_put(queue): + """ Used as part of test_process_pool_executor_terminate_workers """ + queue.put('started') + time.sleep(2) + queue.put('finished') + class ProcessPoolExecutorTest(ExecutorTest): @@ -218,6 +227,60 @@ def mock_start_new_thread(func, *args, **kwargs): list(executor.map(mul, [(2, 3)] * 10)) executor.shutdown() + def test_process_pool_executor_terminate_workers(self): + manager = multiprocessing.Manager() + q = manager.Queue() + + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor.submit(_put_sleep_put, q) + + # We should get started, but not finished since we'll terminate the workers just after + self.assertEqual(q.get(timeout=1), 'started') + + executor.terminate_workers() + + try: + q.get(timeout=1) + raise RuntimeError("Queue should not have gotten a second value") + except queue.Empty: + pass + + def test_process_pool_executor_terminate_workers_dead_workers(self): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + try: + executor.submit(os._exit, 1).result() + except BrokenProcessPool: + # BrokenProcessPool will be raised by our call to .result() since the worker will die + pass + + # The worker has been killed already, terminate_workers should basically no-op + executor.terminate_workers() + + def test_process_pool_executor_terminate_workers_not_started_yet(self): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + # The worker has not been started yet, terminate_workers should basically no-op + executor.terminate_workers() + + def test_process_pool_executor_terminate_workers_stops_pool(self): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor.submit(time.sleep, 0).result() + + executor.terminate_workers() + + try: + executor.submit(time.sleep, 0).result() + raise RuntimeError("Should have raised BrokenProcessPool") + except BrokenProcessPool: + pass + + @unittest.mock.patch('concurrent.futures.process.os.kill') + def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor.submit(time.sleep, 0).result() + + executor.terminate_workers(9) + mock_kill.assert_called_once_with(list(executor._processes.values())[0].pid, 9) + create_executor_tests(globals(), ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, From b4939fd7ae7c8e326ba9b42718630f3a199a34a6 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 12:27:45 -0800 Subject: [PATCH 06/33] Update some docs --- Doc/library/concurrent.futures.rst | 13 +++++++++++++ Doc/whatsnew/3.14.rst | 3 +++ 2 files changed, 16 insertions(+) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 5a950081a1c98d..693d1356946e5e 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -415,6 +415,19 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. require the *fork* start method for :class:`ProcessPoolExecutor` you must explicitly pass ``mp_context=multiprocessing.get_context("fork")``. + .. method:: terminate_workers(signal=signal.SIGTERM) + + Attempt to terminate all living worker processes immediately by sending each + of them the given signal. If the signal is not specified, the default signal + :data:`signal.SIGTERM` is used. + + After calling :meth:`ProcessPoolExecutor.terminate_workers`, the caller, should + no longer submit tasks to the executor. It is also recommended to still call + :meth:`ProcessPoolExecutor.shutdown` to ensure that all other resources + associated with the executor are freed. + + .. versionadded:: 3.14 + .. _processpoolexecutor-example: ProcessPoolExecutor Example diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index d13cd2d5173a04..54d7909c94482e 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -306,6 +306,9 @@ concurrent.futures incompatible *fork* start method you must explicitly request it by supplying a *mp_context* to :class:`concurrent.futures.ProcessPoolExecutor`. (Contributed by Gregory P. Smith in :gh:`84559`.) +* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as + a way to terminate all living worker processes in the given pool. + (Contributed by Charles Machalow in :gh:`128043`.) ctypes ------ From ba6a4c04f29945c6932e7718bea26c576ad7a271 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 12:48:32 -0800 Subject: [PATCH 07/33] Fix docs --- Doc/library/concurrent.futures.rst | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 693d1356946e5e..20159488d7c646 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -421,10 +421,9 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. of them the given signal. If the signal is not specified, the default signal :data:`signal.SIGTERM` is used. - After calling :meth:`ProcessPoolExecutor.terminate_workers`, the caller, should - no longer submit tasks to the executor. It is also recommended to still call - :meth:`ProcessPoolExecutor.shutdown` to ensure that all other resources - associated with the executor are freed. + After calling this, the caller, should no longer submit tasks to the executor. + It is also recommended to still call :meth:`Executor.shutdown` to ensure that all + other resources associated with the executor are freed. .. versionadded:: 3.14 From 5d58e50985869436bc373b1647fb9c59f9283db8 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 12:50:48 -0800 Subject: [PATCH 08/33] Fix docs --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 20159488d7c646..0f897f23b9ea4b 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -421,7 +421,7 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. of them the given signal. If the signal is not specified, the default signal :data:`signal.SIGTERM` is used. - After calling this, the caller, should no longer submit tasks to the executor. + After calling this, the caller should no longer submit tasks to the executor. It is also recommended to still call :meth:`Executor.shutdown` to ensure that all other resources associated with the executor are freed. From 0db381bf2041e1784873ccb885048bb124c300e4 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 13:09:13 -0800 Subject: [PATCH 09/33] PR fixes/updates --- Doc/library/concurrent.futures.rst | 14 ++--- Doc/whatsnew/3.14.rst | 1 + .../test_process_pool.py | 52 +++++++++++-------- ...-12-17-18-53-21.gh-issue-128041.W96kAr.rst | 4 +- 4 files changed, 42 insertions(+), 29 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 0f897f23b9ea4b..dd7f1e38357130 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -417,15 +417,15 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. .. method:: terminate_workers(signal=signal.SIGTERM) - Attempt to terminate all living worker processes immediately by sending each - of them the given signal. If the signal is not specified, the default signal - :data:`signal.SIGTERM` is used. + Attempt to terminate all living worker processes immediately by sending + each of them the given signal. If the signal is not specified, the default + signal :data:`signal.SIGTERM` is used. - After calling this, the caller should no longer submit tasks to the executor. - It is also recommended to still call :meth:`Executor.shutdown` to ensure that all - other resources associated with the executor are freed. + After calling this method the caller should no longer submit tasks to the + executor. It is also recommended to still call :meth:`Executor.shutdown` + to ensure that all other resources associated with the executor are freed. - .. versionadded:: 3.14 + .. versionadded:: next .. _processpoolexecutor-example: diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 54d7909c94482e..9955e2676b8b09 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -306,6 +306,7 @@ concurrent.futures incompatible *fork* start method you must explicitly request it by supplying a *mp_context* to :class:`concurrent.futures.ProcessPoolExecutor`. (Contributed by Gregory P. Smith in :gh:`84559`.) + * Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as a way to terminate all living worker processes in the given pool. (Contributed by Charles Machalow in :gh:`128043`.) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 14c0edf94749c0..f3de3d6571d4d7 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,6 +1,7 @@ import multiprocessing import os import queue +import signal import sys import threading import time @@ -239,47 +240,56 @@ def test_process_pool_executor_terminate_workers(self): executor.terminate_workers() - try: - q.get(timeout=1) - raise RuntimeError("Queue should not have gotten a second value") - except queue.Empty: - pass + self.assertRaises(queue.Empty, q.get, timeout=1) + def test_process_pool_executor_terminate_workers_dead_workers(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: - try: - executor.submit(os._exit, 1).result() - except BrokenProcessPool: - # BrokenProcessPool will be raised by our call to .result() since the worker will die - pass + future = executor.submit(os._exit, 1) + self.assertRaises(BrokenProcessPool, future.result) - # The worker has been killed already, terminate_workers should basically no-op - executor.terminate_workers() + # Patching in here instead of at the function level since we only want + # to patch it for this function call, not other parts of the flow. + with unittest.mock.patch('concurrent.futures.process.os.kill') as mock_kill: + executor.terminate_workers() - def test_process_pool_executor_terminate_workers_not_started_yet(self): + mock_kill.assert_not_called() + + @unittest.mock.patch('concurrent.futures.process.os.kill') + def test_process_pool_executor_terminate_workers_not_started_yet(self, mock_kill): with futures.ProcessPoolExecutor(max_workers=1) as executor: # The worker has not been started yet, terminate_workers should basically no-op executor.terminate_workers() + mock_kill.assert_not_called() + def test_process_pool_executor_terminate_workers_stops_pool(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: executor.submit(time.sleep, 0).result() executor.terminate_workers() - try: - executor.submit(time.sleep, 0).result() - raise RuntimeError("Should have raised BrokenProcessPool") - except BrokenProcessPool: - pass + future = executor.submit(time.sleep, 0) + self.assertRaises(BrokenProcessPool, future.result) @unittest.mock.patch('concurrent.futures.process.os.kill') def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill): with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor.submit(time.sleep, 0).result() + future = executor.submit(time.sleep, 0) + future.result() + + executor.terminate_workers(signal.SIGKILL) + + worker_process = list(executor._processes.values())[0] + mock_kill.assert_called_once_with(worker_process.pid, signal.SIGKILL) + + def test_process_pool_executor_terminate_workers_passes_even_bad_signals(self): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + future = executor.submit(time.sleep, 0) + future.result() - executor.terminate_workers(9) - mock_kill.assert_called_once_with(list(executor._processes.values())[0].pid, 9) + # 'potatoes' isn't a valid signal, so os.kill will raise a TypeError + self.assertRaises(TypeError, executor.terminate_workers, 'potatoes') create_executor_tests(globals(), ProcessPoolExecutorTest, diff --git a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst index d33ec0fe9d422d..2097360b59cd38 100644 --- a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst +++ b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst @@ -1 +1,3 @@ -Add a ``terminate_workers`` method to ``ProcessPoolExecutor`` to allow a way to attempt to force kill the worker processes. +Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as +a way to terminate all living worker processes in the given pool. +(Contributed by Charles Machalow in :gh:`128043`.) From 7ae16857ebf144d86b129ec5c9f5f7d3f3506f15 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 14:38:08 -0800 Subject: [PATCH 10/33] SIGKILL doesn't exist on windows --- Lib/test/test_concurrent_futures/test_process_pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index f3de3d6571d4d7..54c23dd5629545 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -278,10 +278,10 @@ def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill): future = executor.submit(time.sleep, 0) future.result() - executor.terminate_workers(signal.SIGKILL) + executor.terminate_workers(signal.SIGABRT) worker_process = list(executor._processes.values())[0] - mock_kill.assert_called_once_with(worker_process.pid, signal.SIGKILL) + mock_kill.assert_called_once_with(worker_process.pid, signal.SIGABRT) def test_process_pool_executor_terminate_workers_passes_even_bad_signals(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: From f7ad96cf9b87c344706badafd4e0b7f2b76417aa Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 15:59:04 -0800 Subject: [PATCH 11/33] Update Lib/concurrent/futures/process.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- Lib/concurrent/futures/process.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index b69ed7e4492e77..50cc2a18d29875 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -862,7 +862,9 @@ def terminate_workers(self, signal=signal.SIGTERM): Iterates through all of the current processes and sends the given signal if the process is still alive. - After terminating workers, the pool will be in a broken state and no longer usable. + After terminating workers, the pool will be in a broken state + and no longer usable (for instance, new tasks should not be + submitted). Args: signal: The signal to send to each worker process. Defaults to From 2c0b578d653e7e33ce723bca446f5af466f287bd Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 16:01:47 -0800 Subject: [PATCH 12/33] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- Lib/concurrent/futures/process.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 50cc2a18d29875..9bbd71aaa515af 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -875,14 +875,14 @@ def terminate_workers(self, signal=signal.SIGTERM): for pid, proc in self._processes.items(): try: - is_alive = proc.is_alive() + if not proc.is_alive(): + continue except ValueError: # The process is already exited/closed out. - is_alive = False + continue - if is_alive: - try: - os.kill(pid, signal) - except ProcessLookupError: - # The process just ended before our signal - pass + try: + os.kill(pid, signal) + except ProcessLookupError: + # The process just ended before our signal + pass From a878221e8d8e7a2f6297d7b114762867ea2ce558 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 17 Dec 2024 16:24:05 -0800 Subject: [PATCH 13/33] Fix indenting from suggestions --- Lib/concurrent/futures/process.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 9bbd71aaa515af..f6f2337246890f 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -881,8 +881,8 @@ def terminate_workers(self, signal=signal.SIGTERM): # The process is already exited/closed out. continue - try: - os.kill(pid, signal) - except ProcessLookupError: - # The process just ended before our signal - pass + try: + os.kill(pid, signal) + except ProcessLookupError: + # The process just ended before our signal + continue From 794ee25a979342de4d7ebccfecc2f46ba3319ca3 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Fri, 20 Dec 2024 13:31:57 -0800 Subject: [PATCH 14/33] Internally call shutdown to prevent a resource leak when calling terminate_workers --- Doc/library/concurrent.futures.rst | 7 ++++--- Lib/concurrent/futures/process.py | 12 ++++++++++-- .../test_concurrent_futures/test_process_pool.py | 9 +++++---- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index dd7f1e38357130..52aa16f38e3d91 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -419,11 +419,12 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. Attempt to terminate all living worker processes immediately by sending each of them the given signal. If the signal is not specified, the default - signal :data:`signal.SIGTERM` is used. + signal :data:`signal.SIGTERM` is used. Internally, it will also call + :meth:`Executor.shutdown` to ensure that all other resources associated with + the executor are freed. After calling this method the caller should no longer submit tasks to the - executor. It is also recommended to still call :meth:`Executor.shutdown` - to ensure that all other resources associated with the executor are freed. + executor. .. versionadded:: next diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index f6f2337246890f..60c1fcd72e3009 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -870,10 +870,18 @@ def terminate_workers(self, signal=signal.SIGTERM): signal: The signal to send to each worker process. Defaults to signal.SIGTERM. """ - if not self._processes: + processes = {} + if self._processes: + processes = self._processes.copy() + + # shutdown will invalidate ._processes, so we copy it right before calling. + # If we waited here, we would deadlock if a process decides not to exit. + self.shutdown(wait=False, cancel_futures=True) + + if not processes: return - for pid, proc in self._processes.items(): + for pid, proc in processes.items(): try: if not proc.is_alive(): continue diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 54c23dd5629545..ed10b9356be0ac 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -251,7 +251,9 @@ def test_process_pool_executor_terminate_workers_dead_workers(self): # Patching in here instead of at the function level since we only want # to patch it for this function call, not other parts of the flow. with unittest.mock.patch('concurrent.futures.process.os.kill') as mock_kill: - executor.terminate_workers() + with unittest.mock.patch.object(executor, 'shutdown') as mock_shutdown: + executor.terminate_workers() + mock_shutdown.assert_called_once_with(wait=False, cancel_futures=True) mock_kill.assert_not_called() @@ -269,8 +271,7 @@ def test_process_pool_executor_terminate_workers_stops_pool(self): executor.terminate_workers() - future = executor.submit(time.sleep, 0) - self.assertRaises(BrokenProcessPool, future.result) + self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) @unittest.mock.patch('concurrent.futures.process.os.kill') def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill): @@ -278,9 +279,9 @@ def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill): future = executor.submit(time.sleep, 0) future.result() + worker_process = list(executor._processes.values())[0] executor.terminate_workers(signal.SIGABRT) - worker_process = list(executor._processes.values())[0] mock_kill.assert_called_once_with(worker_process.pid, signal.SIGABRT) def test_process_pool_executor_terminate_workers_passes_even_bad_signals(self): From 64693a7f69bf81923ddb191d2c837f20d5c82e0e Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Fri, 20 Dec 2024 13:52:32 -0800 Subject: [PATCH 15/33] Change test to not validate calling of os.kill since shutdown may call that --- Lib/test/test_concurrent_futures/test_process_pool.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index ed10b9356be0ac..e61b7a118bbd68 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -242,21 +242,11 @@ def test_process_pool_executor_terminate_workers(self): self.assertRaises(queue.Empty, q.get, timeout=1) - def test_process_pool_executor_terminate_workers_dead_workers(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: future = executor.submit(os._exit, 1) self.assertRaises(BrokenProcessPool, future.result) - # Patching in here instead of at the function level since we only want - # to patch it for this function call, not other parts of the flow. - with unittest.mock.patch('concurrent.futures.process.os.kill') as mock_kill: - with unittest.mock.patch.object(executor, 'shutdown') as mock_shutdown: - executor.terminate_workers() - mock_shutdown.assert_called_once_with(wait=False, cancel_futures=True) - - mock_kill.assert_not_called() - @unittest.mock.patch('concurrent.futures.process.os.kill') def test_process_pool_executor_terminate_workers_not_started_yet(self, mock_kill): with futures.ProcessPoolExecutor(max_workers=1) as executor: From 926dff1d4aecaab9a14d050dae4fd44c63a2507f Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Fri, 20 Dec 2024 14:19:54 -0800 Subject: [PATCH 16/33] Commit to retrigger CI From 4429b2f0b50476101c827973d02c601dc57c6d58 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Mon, 24 Feb 2025 19:14:52 -0800 Subject: [PATCH 17/33] PR feedback. Split terminate_workers into terminate_workers and kill_workers. Remove the ability to give an arbitrary signal, though leave plumbing to make it easy enough to re-add if needed. --- Doc/library/concurrent.futures.rst | 23 +++-- Doc/whatsnew/3.14.rst | 5 +- Lib/concurrent/futures/process.py | 57 ++++++++++--- .../test_process_pool.py | 83 +++++++++++-------- ...-12-17-18-53-21.gh-issue-128041.W96kAr.rst | 7 +- 5 files changed, 118 insertions(+), 57 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 52aa16f38e3d91..eebeee4561115d 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -415,13 +415,24 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. require the *fork* start method for :class:`ProcessPoolExecutor` you must explicitly pass ``mp_context=multiprocessing.get_context("fork")``. - .. method:: terminate_workers(signal=signal.SIGTERM) + .. method:: terminate_workers() - Attempt to terminate all living worker processes immediately by sending - each of them the given signal. If the signal is not specified, the default - signal :data:`signal.SIGTERM` is used. Internally, it will also call - :meth:`Executor.shutdown` to ensure that all other resources associated with - the executor are freed. + Attempt to terminate all living worker processes immediately by calling + :meth:`Process.terminate ` on each of them. + Internally, it will also call :meth:`Executor.shutdown` to ensure that all + other resources associated with the executor are freed. + + After calling this method the caller should no longer submit tasks to the + executor. + + .. versionadded:: next + + .. method:: kill_workers() + + Attempt to kill all living worker processes immediately by calling + :meth:`Process.terminate ` on each of them. + Internally, it will also call:meth:`Executor.shutdown` to ensure that all + other resources associated with the executor are freed. After calling this method the caller should no longer submit tasks to the executor. diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index c2f31986519dbe..b34610e76f2afe 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -440,8 +440,9 @@ contextvars (Contributed by Andrew Svetlov in :gh:`129889`.) -* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as - a way to terminate all living worker processes in the given pool. +* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and + :meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as + ways to terminate or kill all living worker processes in the given pool. (Contributed by Charles Machalow in :gh:`128043`.) ctypes diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 60c1fcd72e3009..6890fda982ec67 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -45,6 +45,7 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' +import enum import os from concurrent.futures import _base import queue @@ -627,6 +628,18 @@ class BrokenProcessPool(_base.BrokenExecutor): while a future was in the running state. """ +class _TerminateOrKillOperation(enum.Enum): + """Enum for _terminate_or_kill_workers(). + + Used to determine the operation used by the + _terminate_or_kill_workers() method. + """ + # Delegate to call process.terminate() + TERMINATE = 1 + + # Delegate to call process.kill() + KILL = 2 + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, mp_context=None, @@ -857,19 +870,18 @@ def shutdown(self, wait=True, *, cancel_futures=False): shutdown.__doc__ = _base.Executor.shutdown.__doc__ - def terminate_workers(self, signal=signal.SIGTERM): - """Attempts to terminate the executor's workers using the given signal. - Iterates through all of the current processes and sends the given signal if - the process is still alive. + def _terminate_or_kill_workers(self, operation: _TerminateOrKillOperation): + """Attempts to terminate or kill the executor's workers based off the given + operation. Iterates through all of the current processes and performs the + relevant task if the process is still alive. After terminating workers, the pool will be in a broken state and no longer usable (for instance, new tasks should not be submitted). - - Args: - signal: The signal to send to each worker process. Defaults to - signal.SIGTERM. """ + if operation not in _TerminateOrKillOperation._member_map_.values(): + raise ValueError(f"Unsupported operation: {operation}") + processes = {} if self._processes: processes = self._processes.copy() @@ -881,7 +893,7 @@ def terminate_workers(self, signal=signal.SIGTERM): if not processes: return - for pid, proc in processes.items(): + for proc in processes.values(): try: if not proc.is_alive(): continue @@ -890,7 +902,32 @@ def terminate_workers(self, signal=signal.SIGTERM): continue try: - os.kill(pid, signal) + if operation == _TerminateOrKillOperation.TERMINATE: + proc.terminate() + elif operation == _TerminateOrKillOperation.KILL: + proc.kill() except ProcessLookupError: # The process just ended before our signal continue + + def terminate_workers(self): + """Attempts to terminate the executor's workers. + Iterates through all of the current worker processes and terminates + each one that is still alive. + + After terminating workers, the pool will be in a broken state + and no longer usable (for instance, new tasks should not be + submitted). + """ + return self._terminate_or_kill_workers(operation=_TerminateOrKillOperation.TERMINATE) + + def kill_workers(self): + """Attempts to kill the executor's workers. + Iterates through all of the current worker processes and kills + each one that is still alive. + + After killing workers, the pool will be in a broken state + and no longer usable (for instance, new tasks should not be + submitted). + """ + return self._terminate_or_kill_workers(operation=_TerminateOrKillOperation.KILL) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index e61b7a118bbd68..de002b5fdac550 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -228,59 +228,70 @@ def mock_start_new_thread(func, *args, **kwargs): list(executor.map(mul, [(2, 3)] * 10)) executor.shutdown() - def test_process_pool_executor_terminate_workers(self): - manager = multiprocessing.Manager() - q = manager.Queue() + def test_process_pool_executor_terminate_kill_workers(self): + for function_name in ('terminate_workers', 'kill_workers'): + manager = multiprocessing.Manager() + q = manager.Queue() - with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor.submit(_put_sleep_put, q) + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor.submit(_put_sleep_put, q) - # We should get started, but not finished since we'll terminate the workers just after - self.assertEqual(q.get(timeout=1), 'started') + # We should get started, but not finished since we'll terminate the workers just after + self.assertEqual(q.get(timeout=1), 'started') - executor.terminate_workers() + getattr(executor, function_name)() - self.assertRaises(queue.Empty, q.get, timeout=1) + self.assertRaises(queue.Empty, q.get, timeout=1) - def test_process_pool_executor_terminate_workers_dead_workers(self): - with futures.ProcessPoolExecutor(max_workers=1) as executor: - future = executor.submit(os._exit, 1) - self.assertRaises(BrokenProcessPool, future.result) + def test_process_pool_executor_terminate_kill_workers_dead_workers(self): + for function_name in ('terminate_workers', 'kill_workers'): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + future = executor.submit(os._exit, 1) + self.assertRaises(BrokenProcessPool, future.result) - @unittest.mock.patch('concurrent.futures.process.os.kill') - def test_process_pool_executor_terminate_workers_not_started_yet(self, mock_kill): - with futures.ProcessPoolExecutor(max_workers=1) as executor: - # The worker has not been started yet, terminate_workers should basically no-op - executor.terminate_workers() + # even though the pool is broken, this shouldn't raise + getattr(executor, function_name)() - mock_kill.assert_not_called() + def test_process_pool_executor_terminate_kill_workers_not_started_yet(self): + for function_name in ('terminate_workers', 'kill_workers'): - def test_process_pool_executor_terminate_workers_stops_pool(self): - with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor.submit(time.sleep, 0).result() + context_with_mocked_process = multiprocessing.get_context() + with unittest.mock.patch.object(context_with_mocked_process, 'Process') as mock_process: - executor.terminate_workers() + with futures.ProcessPoolExecutor(max_workers=1, mp_context=context_with_mocked_process) as executor: + # The worker has not been started yet, terminate/kill_workers should basically no-op + getattr(executor, function_name)() + + mock_process.return_value.kill.assert_not_called() + mock_process.return_value.terminate.assert_not_called() - self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) + def test_process_pool_executor_terminate_kill_workers_stops_pool(self): + for function_name in ('terminate_workers', 'kill_workers'): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor.submit(time.sleep, 0).result() - @unittest.mock.patch('concurrent.futures.process.os.kill') - def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill): + getattr(executor, function_name)() + + self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) + + def test_process_pool_executor_terminate_workers(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: - future = executor.submit(time.sleep, 0) - future.result() + executor._terminate_or_kill_workers = unittest.mock.Mock() + executor.terminate_workers() + + executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TerminateOrKillOperation.TERMINATE) - worker_process = list(executor._processes.values())[0] - executor.terminate_workers(signal.SIGABRT) + def test_process_pool_executor_kill_workers(self): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor._terminate_or_kill_workers = unittest.mock.Mock() + executor.kill_workers() - mock_kill.assert_called_once_with(worker_process.pid, signal.SIGABRT) + executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TerminateOrKillOperation.KILL) - def test_process_pool_executor_terminate_workers_passes_even_bad_signals(self): + def test_process_pool_executor_terminate_or_kill_workers_invalid_operation(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: - future = executor.submit(time.sleep, 0) - future.result() + self.assertRaises(ValueError, executor._terminate_or_kill_workers, operation='invalid operation'), - # 'potatoes' isn't a valid signal, so os.kill will raise a TypeError - self.assertRaises(TypeError, executor.terminate_workers, 'potatoes') create_executor_tests(globals(), ProcessPoolExecutorTest, diff --git a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst index 2097360b59cd38..e59a5310006d11 100644 --- a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst +++ b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst @@ -1,3 +1,4 @@ -Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as -a way to terminate all living worker processes in the given pool. -(Contributed by Charles Machalow in :gh:`128043`.) +* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and + :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as + ways to terminate or kill all living worker processes in the given pool. + (Contributed by Charles Machalow in :gh:`128043`.) \ No newline at end of file From b8d6e5f04d4cc3a9f1db274adbac21cade825f5c Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Mon, 24 Feb 2025 19:16:39 -0800 Subject: [PATCH 18/33] Remove un-needed imports --- Lib/concurrent/futures/process.py | 1 - Lib/test/test_concurrent_futures/test_process_pool.py | 1 - 2 files changed, 2 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 6890fda982ec67..4d1516f0df88b1 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -58,7 +58,6 @@ import weakref from functools import partial import itertools -import signal import sys from traceback import format_exception diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index de002b5fdac550..ca2cff89a72e7c 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,7 +1,6 @@ import multiprocessing import os import queue -import signal import sys import threading import time From f9a77143b2e0642da9ed18f2d017a88bf0925423 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Mon, 24 Feb 2025 19:18:35 -0800 Subject: [PATCH 19/33] lint --- Doc/library/concurrent.futures.rst | 2 +- .../next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index eebeee4561115d..d0ebcdd9a2fbf8 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -431,7 +431,7 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. Attempt to kill all living worker processes immediately by calling :meth:`Process.terminate ` on each of them. - Internally, it will also call:meth:`Executor.shutdown` to ensure that all + Internally, it will also call :meth:`Executor.shutdown` to ensure that all other resources associated with the executor are freed. After calling this method the caller should no longer submit tasks to the diff --git a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst index e59a5310006d11..1bc0ba4fd12dbb 100644 --- a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst +++ b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst @@ -1,4 +1,4 @@ * Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as ways to terminate or kill all living worker processes in the given pool. - (Contributed by Charles Machalow in :gh:`128043`.) \ No newline at end of file + (Contributed by Charles Machalow in :gh:`128043`.) From 7cfa42eb1f2fe28560eda239517c86901e8428d2 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Mon, 24 Feb 2025 19:39:07 -0800 Subject: [PATCH 20/33] Harden a test a bit to ensure the correct type of kill/terminate was used --- Lib/test/test_concurrent_futures/test_process_pool.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index ca2cff89a72e7c..922d2602f7c90e 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,6 +1,7 @@ import multiprocessing import os import queue +import signal import sys import threading import time @@ -238,7 +239,15 @@ def test_process_pool_executor_terminate_kill_workers(self): # We should get started, but not finished since we'll terminate the workers just after self.assertEqual(q.get(timeout=1), 'started') + worker_process = list(executor._processes.values())[0] getattr(executor, function_name)() + worker_process.join() + + if function_name == 'terminate_workers' or sys.platform == 'win32': + # On windows, kill and terminate both send SIGTERM + self.assertEqual(worker_process.exitcode, -signal.SIGTERM) + elif function_name == 'kill_workers': + self.assertEqual(worker_process.exitcode, -signal.SIGKILL) self.assertRaises(queue.Empty, q.get, timeout=1) @@ -256,7 +265,6 @@ def test_process_pool_executor_terminate_kill_workers_not_started_yet(self): context_with_mocked_process = multiprocessing.get_context() with unittest.mock.patch.object(context_with_mocked_process, 'Process') as mock_process: - with futures.ProcessPoolExecutor(max_workers=1, mp_context=context_with_mocked_process) as executor: # The worker has not been started yet, terminate/kill_workers should basically no-op getattr(executor, function_name)() From 2b31fab267f31d56c626e5592bca50c8ae9159f0 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Mon, 24 Feb 2025 20:26:08 -0800 Subject: [PATCH 21/33] rekick ci From ad15ee5f22a7aff2362ce8a5ef3d520a4caa736a Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Mon, 24 Feb 2025 20:47:22 -0800 Subject: [PATCH 22/33] Allow more time for queue to get data back Technically this should probably be faster than the 1 second, but sometimes can be a bit slower --- Lib/test/test_concurrent_futures/test_process_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 922d2602f7c90e..836a519903cdd6 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -237,7 +237,7 @@ def test_process_pool_executor_terminate_kill_workers(self): executor.submit(_put_sleep_put, q) # We should get started, but not finished since we'll terminate the workers just after - self.assertEqual(q.get(timeout=1), 'started') + self.assertEqual(q.get(timeout=5), 'started') worker_process = list(executor._processes.values())[0] getattr(executor, function_name)() From 0f5791232fb8d54dae000cd796af17f3a6b15e88 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Tue, 25 Feb 2025 17:59:04 -0800 Subject: [PATCH 23/33] Use subTest to break up tests --- .../test_process_pool.py | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 836a519903cdd6..7de560e136c8e0 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -230,56 +230,59 @@ def mock_start_new_thread(func, *args, **kwargs): def test_process_pool_executor_terminate_kill_workers(self): for function_name in ('terminate_workers', 'kill_workers'): - manager = multiprocessing.Manager() - q = manager.Queue() + with self.subTest(function_name=function_name): + manager = multiprocessing.Manager() + q = manager.Queue() - with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor.submit(_put_sleep_put, q) + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor.submit(_put_sleep_put, q) - # We should get started, but not finished since we'll terminate the workers just after - self.assertEqual(q.get(timeout=5), 'started') + # We should get started, but not finished since we'll terminate the workers just after + self.assertEqual(q.get(timeout=5), 'started') - worker_process = list(executor._processes.values())[0] - getattr(executor, function_name)() - worker_process.join() + worker_process = list(executor._processes.values())[0] + getattr(executor, function_name)() + worker_process.join() - if function_name == 'terminate_workers' or sys.platform == 'win32': - # On windows, kill and terminate both send SIGTERM - self.assertEqual(worker_process.exitcode, -signal.SIGTERM) - elif function_name == 'kill_workers': - self.assertEqual(worker_process.exitcode, -signal.SIGKILL) + if function_name == 'terminate_workers' or sys.platform == 'win32': + # On windows, kill and terminate both send SIGTERM + self.assertEqual(worker_process.exitcode, -signal.SIGTERM) + elif function_name == 'kill_workers': + self.assertEqual(worker_process.exitcode, -signal.SIGKILL) - self.assertRaises(queue.Empty, q.get, timeout=1) + self.assertRaises(queue.Empty, q.get, timeout=1) def test_process_pool_executor_terminate_kill_workers_dead_workers(self): for function_name in ('terminate_workers', 'kill_workers'): - with futures.ProcessPoolExecutor(max_workers=1) as executor: - future = executor.submit(os._exit, 1) - self.assertRaises(BrokenProcessPool, future.result) + with self.subTest(function_name=function_name): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + future = executor.submit(os._exit, 1) + self.assertRaises(BrokenProcessPool, future.result) - # even though the pool is broken, this shouldn't raise - getattr(executor, function_name)() + # even though the pool is broken, this shouldn't raise + getattr(executor, function_name)() def test_process_pool_executor_terminate_kill_workers_not_started_yet(self): for function_name in ('terminate_workers', 'kill_workers'): + with self.subTest(function_name=function_name): + context_with_mocked_process = multiprocessing.get_context() + with unittest.mock.patch.object(context_with_mocked_process, 'Process') as mock_process: + with futures.ProcessPoolExecutor(max_workers=1, mp_context=context_with_mocked_process) as executor: + # The worker has not been started yet, terminate/kill_workers should basically no-op + getattr(executor, function_name)() - context_with_mocked_process = multiprocessing.get_context() - with unittest.mock.patch.object(context_with_mocked_process, 'Process') as mock_process: - with futures.ProcessPoolExecutor(max_workers=1, mp_context=context_with_mocked_process) as executor: - # The worker has not been started yet, terminate/kill_workers should basically no-op - getattr(executor, function_name)() - - mock_process.return_value.kill.assert_not_called() - mock_process.return_value.terminate.assert_not_called() + mock_process.return_value.kill.assert_not_called() + mock_process.return_value.terminate.assert_not_called() def test_process_pool_executor_terminate_kill_workers_stops_pool(self): for function_name in ('terminate_workers', 'kill_workers'): - with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor.submit(time.sleep, 0).result() + with self.subTest(function_name=function_name): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor.submit(time.sleep, 0).result() - getattr(executor, function_name)() + getattr(executor, function_name)() - self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) + self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) def test_process_pool_executor_terminate_workers(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: From 1bedb284d936579067ad1d005cda9146a8188558 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Wed, 26 Feb 2025 17:28:54 -0800 Subject: [PATCH 24/33] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- Doc/whatsnew/3.14.rst | 4 ++-- Lib/concurrent/futures/process.py | 2 +- Lib/test/test_concurrent_futures/test_process_pool.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 4a2b9dfbab6fd1..d1518a5b2a6df1 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -443,13 +443,13 @@ contextvars * Support context manager protocol by :class:`contextvars.Token`. (Contributed by Andrew Svetlov in :gh:`129889`.) - - + * Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and :meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as ways to terminate or kill all living worker processes in the given pool. (Contributed by Charles Machalow in :gh:`128043`.) + ctypes ------ diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 4d1516f0df88b1..cde7d527ff0215 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -878,7 +878,7 @@ def _terminate_or_kill_workers(self, operation: _TerminateOrKillOperation): and no longer usable (for instance, new tasks should not be submitted). """ - if operation not in _TerminateOrKillOperation._member_map_.values(): + if operation not in _TerminateOrKillOperation: raise ValueError(f"Unsupported operation: {operation}") processes = {} diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 7de560e136c8e0..0c8a2860e239fc 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -230,7 +230,7 @@ def mock_start_new_thread(func, *args, **kwargs): def test_process_pool_executor_terminate_kill_workers(self): for function_name in ('terminate_workers', 'kill_workers'): - with self.subTest(function_name=function_name): + with self.subTest(function_name): manager = multiprocessing.Manager() q = manager.Queue() From f1b0cf6de2adc6926208c581940ec144a4d5f023 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Wed, 26 Feb 2025 18:05:48 -0800 Subject: [PATCH 25/33] PR feedback: swap to dict with constants, better subtest parameterization, lint --- Lib/concurrent/futures/process.py | 40 +++--- .../test_process_pool.py | 124 +++++++++--------- 2 files changed, 83 insertions(+), 81 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index cde7d527ff0215..8b553d750fb121 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -45,7 +45,6 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' -import enum import os from concurrent.futures import _base import queue @@ -627,17 +626,13 @@ class BrokenProcessPool(_base.BrokenExecutor): while a future was in the running state. """ -class _TerminateOrKillOperation(enum.Enum): - """Enum for _terminate_or_kill_workers(). +_TERMINATE = "terminate" +_KILL = "kill" - Used to determine the operation used by the - _terminate_or_kill_workers() method. - """ - # Delegate to call process.terminate() - TERMINATE = 1 - - # Delegate to call process.kill() - KILL = 2 +_TERMINATE_OR_KILL_OPERATION = { + _TERMINATE, + _KILL +} class ProcessPoolExecutor(_base.Executor): @@ -869,24 +864,25 @@ def shutdown(self, wait=True, *, cancel_futures=False): shutdown.__doc__ = _base.Executor.shutdown.__doc__ - def _terminate_or_kill_workers(self, operation: _TerminateOrKillOperation): - """Attempts to terminate or kill the executor's workers based off the given - operation. Iterates through all of the current processes and performs the - relevant task if the process is still alive. + def _terminate_or_kill_workers(self, operation): + """Attempts to terminate or kill the executor's workers based off the + given operation. Iterates through all of the current processes and + performs the relevant task if the process is still alive. After terminating workers, the pool will be in a broken state and no longer usable (for instance, new tasks should not be submitted). """ - if operation not in _TerminateOrKillOperation: + if operation not in _TERMINATE_OR_KILL_OPERATION: raise ValueError(f"Unsupported operation: {operation}") processes = {} if self._processes: processes = self._processes.copy() - # shutdown will invalidate ._processes, so we copy it right before calling. - # If we waited here, we would deadlock if a process decides not to exit. + # shutdown will invalidate ._processes, so we copy it right before + # calling. If we waited here, we would deadlock if a process decides not + # to exit. self.shutdown(wait=False, cancel_futures=True) if not processes: @@ -901,9 +897,9 @@ def _terminate_or_kill_workers(self, operation: _TerminateOrKillOperation): continue try: - if operation == _TerminateOrKillOperation.TERMINATE: + if operation == _TERMINATE: proc.terminate() - elif operation == _TerminateOrKillOperation.KILL: + elif operation == _KILL: proc.kill() except ProcessLookupError: # The process just ended before our signal @@ -918,7 +914,7 @@ def terminate_workers(self): and no longer usable (for instance, new tasks should not be submitted). """ - return self._terminate_or_kill_workers(operation=_TerminateOrKillOperation.TERMINATE) + return self._terminate_or_kill_workers(operation=_TERMINATE) def kill_workers(self): """Attempts to kill the executor's workers. @@ -929,4 +925,4 @@ def kill_workers(self): and no longer usable (for instance, new tasks should not be submitted). """ - return self._terminate_or_kill_workers(operation=_TerminateOrKillOperation.KILL) + return self._terminate_or_kill_workers(operation=_KILL) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 0c8a2860e239fc..dc541bd415ef9d 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -12,6 +12,7 @@ from test import support from test.support import hashlib_helper +from test.test_importlib.metadata.fixtures import parameterize from .executor import ExecutorTest, mul from .util import ( @@ -26,6 +27,11 @@ def __init__(self, mgr): def __del__(self): self.event.set() +TERMINATE_OR_KILL_PARAMS = [ + dict(function_name='terminate_workers'), + dict(function_name='kill_workers'), +] + def _put_sleep_put(queue): """ Used as part of test_process_pool_executor_terminate_workers """ queue.put('started') @@ -228,80 +234,80 @@ def mock_start_new_thread(func, *args, **kwargs): list(executor.map(mul, [(2, 3)] * 10)) executor.shutdown() - def test_process_pool_executor_terminate_kill_workers(self): - for function_name in ('terminate_workers', 'kill_workers'): - with self.subTest(function_name): - manager = multiprocessing.Manager() - q = manager.Queue() - - with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor.submit(_put_sleep_put, q) - - # We should get started, but not finished since we'll terminate the workers just after - self.assertEqual(q.get(timeout=5), 'started') + def test_process_pool_executor_terminate_workers(self): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor._terminate_or_kill_workers = unittest.mock.Mock() + executor.terminate_workers() - worker_process = list(executor._processes.values())[0] - getattr(executor, function_name)() - worker_process.join() + executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TERMINATE) - if function_name == 'terminate_workers' or sys.platform == 'win32': - # On windows, kill and terminate both send SIGTERM - self.assertEqual(worker_process.exitcode, -signal.SIGTERM) - elif function_name == 'kill_workers': - self.assertEqual(worker_process.exitcode, -signal.SIGKILL) + def test_process_pool_executor_kill_workers(self): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor._terminate_or_kill_workers = unittest.mock.Mock() + executor.kill_workers() - self.assertRaises(queue.Empty, q.get, timeout=1) + executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._KILL) - def test_process_pool_executor_terminate_kill_workers_dead_workers(self): - for function_name in ('terminate_workers', 'kill_workers'): - with self.subTest(function_name=function_name): - with futures.ProcessPoolExecutor(max_workers=1) as executor: - future = executor.submit(os._exit, 1) - self.assertRaises(BrokenProcessPool, future.result) + def test_process_pool_executor_terminate_or_kill_workers_invalid_op(self): + with futures.ProcessPoolExecutor(max_workers=1) as executor: + self.assertRaises(ValueError, + executor._terminate_or_kill_workers, + operation='invalid operation'), - # even though the pool is broken, this shouldn't raise - getattr(executor, function_name)() + @parameterize(*TERMINATE_OR_KILL_PARAMS) + def test_process_pool_executor_terminate_kill_workers(self, function_name): + manager = multiprocessing.Manager() + q = manager.Queue() - def test_process_pool_executor_terminate_kill_workers_not_started_yet(self): - for function_name in ('terminate_workers', 'kill_workers'): - with self.subTest(function_name=function_name): - context_with_mocked_process = multiprocessing.get_context() - with unittest.mock.patch.object(context_with_mocked_process, 'Process') as mock_process: - with futures.ProcessPoolExecutor(max_workers=1, mp_context=context_with_mocked_process) as executor: - # The worker has not been started yet, terminate/kill_workers should basically no-op - getattr(executor, function_name)() + with futures.ProcessPoolExecutor(max_workers=1) as executor: + executor.submit(_put_sleep_put, q) - mock_process.return_value.kill.assert_not_called() - mock_process.return_value.terminate.assert_not_called() + # We should get started, but not finished since we'll terminate the + # workers just after + self.assertEqual(q.get(timeout=5), 'started') - def test_process_pool_executor_terminate_kill_workers_stops_pool(self): - for function_name in ('terminate_workers', 'kill_workers'): - with self.subTest(function_name=function_name): - with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor.submit(time.sleep, 0).result() + worker_process = list(executor._processes.values())[0] + getattr(executor, function_name)() + worker_process.join() - getattr(executor, function_name)() + if function_name == 'terminate_workers' or \ + sys.platform == 'win32': + # On windows, kill and terminate both send SIGTERM + self.assertEqual(worker_process.exitcode, -signal.SIGTERM) + elif function_name == 'kill_workers': + self.assertEqual(worker_process.exitcode, -signal.SIGKILL) - self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) + self.assertRaises(queue.Empty, q.get, timeout=1) - def test_process_pool_executor_terminate_workers(self): + @parameterize(*TERMINATE_OR_KILL_PARAMS) + def test_process_pool_executor_terminate_kill_workers_dead_workers(self, function_name): with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor._terminate_or_kill_workers = unittest.mock.Mock() - executor.terminate_workers() - - executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TerminateOrKillOperation.TERMINATE) - - def test_process_pool_executor_kill_workers(self): + future = executor.submit(os._exit, 1) + self.assertRaises(BrokenProcessPool, future.result) + + # even though the pool is broken, this shouldn't raise + getattr(executor, function_name)() + + @parameterize(*TERMINATE_OR_KILL_PARAMS) + def test_process_pool_executor_terminate_kill_workers_not_started_yet(self, function_name): + ctx = self.get_context() + with unittest.mock.patch.object(ctx, 'Process') as mock_process: + with futures.ProcessPoolExecutor(max_workers=1, mp_context=ctx) as executor: + # The worker has not been started yet, terminate/kill_workers + # should basically no-op + getattr(executor, function_name)() + + mock_process.return_value.kill.assert_not_called() + mock_process.return_value.terminate.assert_not_called() + + @parameterize(*TERMINATE_OR_KILL_PARAMS) + def test_process_pool_executor_terminate_kill_workers_stops_pool(self, function_name): with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor._terminate_or_kill_workers = unittest.mock.Mock() - executor.kill_workers() + executor.submit(time.sleep, 0).result() - executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TerminateOrKillOperation.KILL) - - def test_process_pool_executor_terminate_or_kill_workers_invalid_operation(self): - with futures.ProcessPoolExecutor(max_workers=1) as executor: - self.assertRaises(ValueError, executor._terminate_or_kill_workers, operation='invalid operation'), + getattr(executor, function_name)() + self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) create_executor_tests(globals(), ProcessPoolExecutorTest, From cc5f35966a52cc4324cb48ca5cf10f89a232ca80 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Wed, 26 Feb 2025 18:09:34 -0800 Subject: [PATCH 26/33] swap to using context in the test --- Lib/test/test_concurrent_futures/test_process_pool.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index dc541bd415ef9d..c80e86e9a1e6cc 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,4 +1,3 @@ -import multiprocessing import os import queue import signal @@ -256,7 +255,7 @@ def test_process_pool_executor_terminate_or_kill_workers_invalid_op(self): @parameterize(*TERMINATE_OR_KILL_PARAMS) def test_process_pool_executor_terminate_kill_workers(self, function_name): - manager = multiprocessing.Manager() + manager = self.get_context().Manager() q = manager.Queue() with futures.ProcessPoolExecutor(max_workers=1) as executor: From b3cc8a22c947f8c857e968f7f9ea161fc2212fb7 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Wed, 26 Feb 2025 18:13:37 -0800 Subject: [PATCH 27/33] trailing whitespace --- Doc/whatsnew/3.14.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index d1518a5b2a6df1..1d2775cc59d30c 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -443,7 +443,7 @@ contextvars * Support context manager protocol by :class:`contextvars.Token`. (Contributed by Andrew Svetlov in :gh:`129889`.) - + * Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and :meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as ways to terminate or kill all living worker processes in the given pool. From dbf9d3238811e8716da164e5f3ffdd5bd34d3e4e Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Sun, 2 Mar 2025 11:12:19 -0800 Subject: [PATCH 28/33] Various pr feedbacks Drop extra prefix on tests, fix formating on gh issue file Separate a couple operations into different lines --- Doc/library/concurrent.futures.rst | 2 +- Lib/concurrent/futures/process.py | 2 +- .../test_process_pool.py | 17 +++++++++-------- ...24-12-17-18-53-21.gh-issue-128041.W96kAr.rst | 8 ++++---- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index d0ebcdd9a2fbf8..dc613f2f8f00cd 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -430,7 +430,7 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. .. method:: kill_workers() Attempt to kill all living worker processes immediately by calling - :meth:`Process.terminate ` on each of them. + :meth:`Process.kill ` on each of them. Internally, it will also call :meth:`Executor.shutdown` to ensure that all other resources associated with the executor are freed. diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8b553d750fb121..0c2ae6da6db4e7 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -874,7 +874,7 @@ def _terminate_or_kill_workers(self, operation): submitted). """ if operation not in _TERMINATE_OR_KILL_OPERATION: - raise ValueError(f"Unsupported operation: {operation}") + raise ValueError(f"Unsupported operation: {operation!r}") processes = {} if self._processes: diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index c80e86e9a1e6cc..61e82814e0c04c 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -233,28 +233,28 @@ def mock_start_new_thread(func, *args, **kwargs): list(executor.map(mul, [(2, 3)] * 10)) executor.shutdown() - def test_process_pool_executor_terminate_workers(self): + def test_terminate_workers(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: executor._terminate_or_kill_workers = unittest.mock.Mock() executor.terminate_workers() executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TERMINATE) - def test_process_pool_executor_kill_workers(self): + def test_kill_workers(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: executor._terminate_or_kill_workers = unittest.mock.Mock() executor.kill_workers() executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._KILL) - def test_process_pool_executor_terminate_or_kill_workers_invalid_op(self): + def test_terminate_or_kill_workers_invalid_op(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: self.assertRaises(ValueError, executor._terminate_or_kill_workers, operation='invalid operation'), @parameterize(*TERMINATE_OR_KILL_PARAMS) - def test_process_pool_executor_terminate_kill_workers(self, function_name): + def test_terminate_kill_workers(self, function_name): manager = self.get_context().Manager() q = manager.Queue() @@ -279,7 +279,7 @@ def test_process_pool_executor_terminate_kill_workers(self, function_name): self.assertRaises(queue.Empty, q.get, timeout=1) @parameterize(*TERMINATE_OR_KILL_PARAMS) - def test_process_pool_executor_terminate_kill_workers_dead_workers(self, function_name): + def test_terminate_kill_workers_dead_workers(self, function_name): with futures.ProcessPoolExecutor(max_workers=1) as executor: future = executor.submit(os._exit, 1) self.assertRaises(BrokenProcessPool, future.result) @@ -288,7 +288,7 @@ def test_process_pool_executor_terminate_kill_workers_dead_workers(self, functio getattr(executor, function_name)() @parameterize(*TERMINATE_OR_KILL_PARAMS) - def test_process_pool_executor_terminate_kill_workers_not_started_yet(self, function_name): + def test_terminate_kill_workers_not_started_yet(self, function_name): ctx = self.get_context() with unittest.mock.patch.object(ctx, 'Process') as mock_process: with futures.ProcessPoolExecutor(max_workers=1, mp_context=ctx) as executor: @@ -300,9 +300,10 @@ def test_process_pool_executor_terminate_kill_workers_not_started_yet(self, func mock_process.return_value.terminate.assert_not_called() @parameterize(*TERMINATE_OR_KILL_PARAMS) - def test_process_pool_executor_terminate_kill_workers_stops_pool(self, function_name): + def test_terminate_kill_workers_stops_pool(self, function_name): with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor.submit(time.sleep, 0).result() + task = executor.submit(time.sleep, 0) + self.assertIsNone(task.result()) getattr(executor, function_name)() diff --git a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst index 1bc0ba4fd12dbb..186574a717bbc7 100644 --- a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst +++ b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst @@ -1,4 +1,4 @@ -* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and - :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as - ways to terminate or kill all living worker processes in the given pool. - (Contributed by Charles Machalow in :gh:`128043`.) +Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and +:meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as +ways to terminate or kill all living worker processes in the given pool. +(Contributed by Charles Machalow in :gh:`128043`.) From 1e16da6c2de8b50f29eff23c77a289cb6ca86557 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Sun, 2 Mar 2025 11:16:07 -0800 Subject: [PATCH 29/33] PR feedback: swap name of terminate_or_kill to force_shutdown --- Lib/concurrent/futures/process.py | 10 ++++----- .../test_process_pool.py | 22 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 0c2ae6da6db4e7..d79d6b959c90d3 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -629,7 +629,7 @@ class BrokenProcessPool(_base.BrokenExecutor): _TERMINATE = "terminate" _KILL = "kill" -_TERMINATE_OR_KILL_OPERATION = { +_SHUTDOWN_CALLBACK_OPERATION = { _TERMINATE, _KILL } @@ -864,7 +864,7 @@ def shutdown(self, wait=True, *, cancel_futures=False): shutdown.__doc__ = _base.Executor.shutdown.__doc__ - def _terminate_or_kill_workers(self, operation): + def _force_shutdown(self, operation): """Attempts to terminate or kill the executor's workers based off the given operation. Iterates through all of the current processes and performs the relevant task if the process is still alive. @@ -873,7 +873,7 @@ def _terminate_or_kill_workers(self, operation): and no longer usable (for instance, new tasks should not be submitted). """ - if operation not in _TERMINATE_OR_KILL_OPERATION: + if operation not in _SHUTDOWN_CALLBACK_OPERATION: raise ValueError(f"Unsupported operation: {operation!r}") processes = {} @@ -914,7 +914,7 @@ def terminate_workers(self): and no longer usable (for instance, new tasks should not be submitted). """ - return self._terminate_or_kill_workers(operation=_TERMINATE) + return self._force_shutdown(operation=_TERMINATE) def kill_workers(self): """Attempts to kill the executor's workers. @@ -925,4 +925,4 @@ def kill_workers(self): and no longer usable (for instance, new tasks should not be submitted). """ - return self._terminate_or_kill_workers(operation=_KILL) + return self._force_shutdown(operation=_KILL) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 61e82814e0c04c..8dbad0a93b8574 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -26,7 +26,7 @@ def __init__(self, mgr): def __del__(self): self.event.set() -TERMINATE_OR_KILL_PARAMS = [ +FORCE_SHUTDOWN_PARAMS = [ dict(function_name='terminate_workers'), dict(function_name='kill_workers'), ] @@ -235,25 +235,25 @@ def mock_start_new_thread(func, *args, **kwargs): def test_terminate_workers(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor._terminate_or_kill_workers = unittest.mock.Mock() + executor._force_shutdown = unittest.mock.Mock() executor.terminate_workers() - executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._TERMINATE) + executor._force_shutdown.assert_called_once_with(operation=futures.process._TERMINATE) def test_kill_workers(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: - executor._terminate_or_kill_workers = unittest.mock.Mock() + executor._force_shutdown = unittest.mock.Mock() executor.kill_workers() - executor._terminate_or_kill_workers.assert_called_once_with(operation=futures.process._KILL) + executor._force_shutdown.assert_called_once_with(operation=futures.process._KILL) - def test_terminate_or_kill_workers_invalid_op(self): + def test_force_shutdown_workers_invalid_op(self): with futures.ProcessPoolExecutor(max_workers=1) as executor: self.assertRaises(ValueError, - executor._terminate_or_kill_workers, + executor._force_shutdown, operation='invalid operation'), - @parameterize(*TERMINATE_OR_KILL_PARAMS) + @parameterize(*FORCE_SHUTDOWN_PARAMS) def test_terminate_kill_workers(self, function_name): manager = self.get_context().Manager() q = manager.Queue() @@ -278,7 +278,7 @@ def test_terminate_kill_workers(self, function_name): self.assertRaises(queue.Empty, q.get, timeout=1) - @parameterize(*TERMINATE_OR_KILL_PARAMS) + @parameterize(*FORCE_SHUTDOWN_PARAMS) def test_terminate_kill_workers_dead_workers(self, function_name): with futures.ProcessPoolExecutor(max_workers=1) as executor: future = executor.submit(os._exit, 1) @@ -287,7 +287,7 @@ def test_terminate_kill_workers_dead_workers(self, function_name): # even though the pool is broken, this shouldn't raise getattr(executor, function_name)() - @parameterize(*TERMINATE_OR_KILL_PARAMS) + @parameterize(*FORCE_SHUTDOWN_PARAMS) def test_terminate_kill_workers_not_started_yet(self, function_name): ctx = self.get_context() with unittest.mock.patch.object(ctx, 'Process') as mock_process: @@ -299,7 +299,7 @@ def test_terminate_kill_workers_not_started_yet(self, function_name): mock_process.return_value.kill.assert_not_called() mock_process.return_value.terminate.assert_not_called() - @parameterize(*TERMINATE_OR_KILL_PARAMS) + @parameterize(*FORCE_SHUTDOWN_PARAMS) def test_terminate_kill_workers_stops_pool(self, function_name): with futures.ProcessPoolExecutor(max_workers=1) as executor: task = executor.submit(time.sleep, 0) From 52a5326275a3204b7a62c3166a5eaf93fb7f2398 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Sun, 2 Mar 2025 11:19:01 -0800 Subject: [PATCH 30/33] PR feedback: swap test names --- Lib/test/test_concurrent_futures/test_process_pool.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 8dbad0a93b8574..dc40958f05cfb2 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -254,7 +254,7 @@ def test_force_shutdown_workers_invalid_op(self): operation='invalid operation'), @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_terminate_kill_workers(self, function_name): + def test_force_shutdown_workers(self, function_name): manager = self.get_context().Manager() q = manager.Queue() @@ -279,7 +279,7 @@ def test_terminate_kill_workers(self, function_name): self.assertRaises(queue.Empty, q.get, timeout=1) @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_terminate_kill_workers_dead_workers(self, function_name): + def test_force_shutdown_workers_dead_workers(self, function_name): with futures.ProcessPoolExecutor(max_workers=1) as executor: future = executor.submit(os._exit, 1) self.assertRaises(BrokenProcessPool, future.result) @@ -288,7 +288,7 @@ def test_terminate_kill_workers_dead_workers(self, function_name): getattr(executor, function_name)() @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_terminate_kill_workers_not_started_yet(self, function_name): + def test_force_shutdown_workers_not_started_yet(self, function_name): ctx = self.get_context() with unittest.mock.patch.object(ctx, 'Process') as mock_process: with futures.ProcessPoolExecutor(max_workers=1, mp_context=ctx) as executor: @@ -300,7 +300,7 @@ def test_terminate_kill_workers_not_started_yet(self, function_name): mock_process.return_value.terminate.assert_not_called() @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_terminate_kill_workers_stops_pool(self, function_name): + def test_force_shutdown_workers_stops_pool(self, function_name): with futures.ProcessPoolExecutor(max_workers=1) as executor: task = executor.submit(time.sleep, 0) self.assertIsNone(task.result()) From 7f0958652cbba8828a6ecc2ffa898418d1c397af Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Sun, 2 Mar 2025 11:22:49 -0800 Subject: [PATCH 31/33] PR feedback: use self.executor_type instead of ProcessPoolExecutor directly --- .../test_concurrent_futures/test_process_pool.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index dc40958f05cfb2..d3fa74e6a8e3ad 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -234,21 +234,21 @@ def mock_start_new_thread(func, *args, **kwargs): executor.shutdown() def test_terminate_workers(self): - with futures.ProcessPoolExecutor(max_workers=1) as executor: + with self.executor_type(max_workers=1) as executor: executor._force_shutdown = unittest.mock.Mock() executor.terminate_workers() executor._force_shutdown.assert_called_once_with(operation=futures.process._TERMINATE) def test_kill_workers(self): - with futures.ProcessPoolExecutor(max_workers=1) as executor: + with self.executor_type(max_workers=1) as executor: executor._force_shutdown = unittest.mock.Mock() executor.kill_workers() executor._force_shutdown.assert_called_once_with(operation=futures.process._KILL) def test_force_shutdown_workers_invalid_op(self): - with futures.ProcessPoolExecutor(max_workers=1) as executor: + with self.executor_type(max_workers=1) as executor: self.assertRaises(ValueError, executor._force_shutdown, operation='invalid operation'), @@ -258,7 +258,7 @@ def test_force_shutdown_workers(self, function_name): manager = self.get_context().Manager() q = manager.Queue() - with futures.ProcessPoolExecutor(max_workers=1) as executor: + with self.executor_type(max_workers=1) as executor: executor.submit(_put_sleep_put, q) # We should get started, but not finished since we'll terminate the @@ -280,7 +280,7 @@ def test_force_shutdown_workers(self, function_name): @parameterize(*FORCE_SHUTDOWN_PARAMS) def test_force_shutdown_workers_dead_workers(self, function_name): - with futures.ProcessPoolExecutor(max_workers=1) as executor: + with self.executor_type(max_workers=1) as executor: future = executor.submit(os._exit, 1) self.assertRaises(BrokenProcessPool, future.result) @@ -291,7 +291,7 @@ def test_force_shutdown_workers_dead_workers(self, function_name): def test_force_shutdown_workers_not_started_yet(self, function_name): ctx = self.get_context() with unittest.mock.patch.object(ctx, 'Process') as mock_process: - with futures.ProcessPoolExecutor(max_workers=1, mp_context=ctx) as executor: + with self.executor_type(max_workers=1, mp_context=ctx) as executor: # The worker has not been started yet, terminate/kill_workers # should basically no-op getattr(executor, function_name)() @@ -301,7 +301,7 @@ def test_force_shutdown_workers_not_started_yet(self, function_name): @parameterize(*FORCE_SHUTDOWN_PARAMS) def test_force_shutdown_workers_stops_pool(self, function_name): - with futures.ProcessPoolExecutor(max_workers=1) as executor: + with self.executor_type(max_workers=1) as executor: task = executor.submit(time.sleep, 0) self.assertIsNone(task.result()) From d5f757871f2c45122c61f91650ceda1e50a2af2a Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Sun, 2 Mar 2025 11:26:18 -0800 Subject: [PATCH 32/33] Add constants for terminate/kill methods --- .../test_concurrent_futures/test_process_pool.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index d3fa74e6a8e3ad..dcaef92338d7bb 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -26,13 +26,15 @@ def __init__(self, mgr): def __del__(self): self.event.set() +TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__ +KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__ FORCE_SHUTDOWN_PARAMS = [ - dict(function_name='terminate_workers'), - dict(function_name='kill_workers'), + dict(function_name=TERMINATE_WORKERS), + dict(function_name=KILL_WORKERS), ] def _put_sleep_put(queue): - """ Used as part of test_process_pool_executor_terminate_workers """ + """ Used as part of test_terminate_workers """ queue.put('started') time.sleep(2) queue.put('finished') @@ -269,12 +271,14 @@ def test_force_shutdown_workers(self, function_name): getattr(executor, function_name)() worker_process.join() - if function_name == 'terminate_workers' or \ + if function_name == TERMINATE_WORKERS or \ sys.platform == 'win32': # On windows, kill and terminate both send SIGTERM self.assertEqual(worker_process.exitcode, -signal.SIGTERM) - elif function_name == 'kill_workers': + elif function_name == KILL_WORKERS: self.assertEqual(worker_process.exitcode, -signal.SIGKILL) + else: + self.fail(f"Unknown operation: {function_name}") self.assertRaises(queue.Empty, q.get, timeout=1) From 0e42eca198599b4a90373b5d50472915e35447b1 Mon Sep 17 00:00:00 2001 From: Charles Machalow Date: Sun, 2 Mar 2025 12:10:49 -0800 Subject: [PATCH 33/33] feedback to get below 80 chars per pep8 --- Lib/test/test_concurrent_futures/test_process_pool.py | 10 ++++++---- .../2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index dcaef92338d7bb..354b7d0a346970 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -236,18 +236,20 @@ def mock_start_new_thread(func, *args, **kwargs): executor.shutdown() def test_terminate_workers(self): + mock_fn = unittest.mock.Mock() with self.executor_type(max_workers=1) as executor: - executor._force_shutdown = unittest.mock.Mock() + executor._force_shutdown = mock_fn executor.terminate_workers() - executor._force_shutdown.assert_called_once_with(operation=futures.process._TERMINATE) + mock_fn.assert_called_once_with(operation=futures.process._TERMINATE) def test_kill_workers(self): + mock_fn = unittest.mock.Mock() with self.executor_type(max_workers=1) as executor: - executor._force_shutdown = unittest.mock.Mock() + executor._force_shutdown = mock_fn executor.kill_workers() - executor._force_shutdown.assert_called_once_with(operation=futures.process._KILL) + mock_fn.assert_called_once_with(operation=futures.process._KILL) def test_force_shutdown_workers_invalid_op(self): with self.executor_type(max_workers=1) as executor: diff --git a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst index 186574a717bbc7..bb9ef96d45eb79 100644 --- a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst +++ b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst @@ -1,4 +1,4 @@ Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and -:meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as +:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as ways to terminate or kill all living worker processes in the given pool. (Contributed by Charles Machalow in :gh:`128043`.)