From 3f03bd7dda85f6a67d0a45f21f143aab8029d42a Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Tue, 24 Mar 2020 20:18:49 -0400 Subject: [PATCH 1/8] Remove daemon threads in concurrent.futures --- Lib/concurrent/futures/process.py | 24 ++++++------------------ Lib/concurrent/futures/thread.py | 21 +++++---------------- Lib/threading.py | 27 +++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 34 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 39fadcce027c28..8658b54be009b2 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -59,20 +59,6 @@ import sys import traceback -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a -# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, -# allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpreter shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads/processes finish. - _threads_wakeups = weakref.WeakKeyDictionary() _global_shutdown = False @@ -107,6 +93,12 @@ def _python_exit(): for t, _ in items: t.join() +# Register for `_python_exit()` to be called just before joining all +# non-daemon threads. This is used instead of `atexit.register()` for +# compatibility with subinterpreters, which no longer support daemon threads. +# See bpo-39812 for context. +threading._register_atexit(_python_exit) + # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for # work while a larger number will make Future.cancel() succeed less frequently @@ -306,9 +298,7 @@ def weakref_cb(_, thread_wakeup=self.thread_wakeup): # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} self.pending_work_items = executor._pending_work_items - # Set this thread to be daemonized super().__init__() - self.daemon = True def run(self): # Main loop for the executor manager thread. @@ -732,5 +722,3 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ - -atexit.register(_python_exit) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index be79161bf8561d..31d8d97d6caea1 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -13,20 +13,6 @@ import weakref import os -# Workers are created as daemon threads. This is done to allow the interpreter -# to exit when there are still idle threads in a ThreadPoolExecutor's thread -# pool (i.e. shutdown() was not called). However, allowing workers to die with -# the interpreter has two undesirable properties: -# - The workers would still be running during interpreter shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads finish. - _threads_queues = weakref.WeakKeyDictionary() _shutdown = False # Lock that ensures that new workers are not created while the interpreter is @@ -43,7 +29,11 @@ def _python_exit(): for t, q in items: t.join() -atexit.register(_python_exit) +# Register for `_python_exit()` to be called just before joining all +# non-daemon threads. This is used instead of `atexit.register()` for +# compatibility with subinterpreters, which no longer support daemon threads. +# See bpo-39812 for context. +threading._register_atexit(_python_exit) class _WorkItem(object): @@ -197,7 +187,6 @@ def weakref_cb(_, q=self._work_queue): self._work_queue, self._initializer, self._initargs)) - t.daemon = True t.start() self._threads.add(t) _threads_queues[t] = self._work_queue diff --git a/Lib/threading.py b/Lib/threading.py index 59323679c80dcd..9a414d4d1bb26d 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -3,6 +3,7 @@ import os as _os import sys as _sys import _thread +import functools from time import monotonic as _time from _weakrefset import WeakSet @@ -1346,6 +1347,26 @@ def enumerate(): with _active_limbo_lock: return list(_active.values()) + list(_limbo.values()) + +_threading_atexits = None + +def _register_atexit(func, *arg, **kwargs): + """CPython internal: register *func* to be called before joining threads. + + The registered *func* is called with its arguments just before all + non-daemon threads are joined in `_shutdown()`. It provides a similar + purpose to `atexit.register()`, but its functions are called prior to + threading shutdown instead of interpreter shutdown. + + For similarity to atexit, the registed functions are called in reverse. + """ + global _threading_atexits + if _threading_atexits is None: + _threading_atexits = [] + call = functools.partial(func, *arg, **kwargs) + _threading_atexits.append(call) + + from _thread import stack_size # Create the main thread object, @@ -1376,6 +1397,12 @@ def _shutdown(): tlock.release() _main_thread._stop() + # Call registered threading atexit functions before threads are joined + if _threading_atexits is not None: + # Order is reversed, similar to atexit. + for atexit_call in reversed(_threading_atexits): + atexit_call() + # Join all non-deamon threads while True: with _shutdown_locks_lock: From 7686ba556ae1600df1241e0e1bf5d220dc20545b Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Tue, 24 Mar 2020 20:27:44 -0400 Subject: [PATCH 2/8] Fix whitespace --- Lib/concurrent/futures/process.py | 1 + Lib/concurrent/futures/thread.py | 1 + 2 files changed, 2 insertions(+) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8658b54be009b2..4c39500d675ff5 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -59,6 +59,7 @@ import sys import traceback + _threads_wakeups = weakref.WeakKeyDictionary() _global_shutdown = False diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 31d8d97d6caea1..2aa4e17d47fa7c 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -13,6 +13,7 @@ import weakref import os + _threads_queues = weakref.WeakKeyDictionary() _shutdown = False # Lock that ensures that new workers are not created while the interpreter is From 21a12e9340644c81e758ddf20fc9034f265d1930 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Wed, 25 Mar 2020 00:35:51 +0000 Subject: [PATCH 3/8] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst diff --git a/Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst b/Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst new file mode 100644 index 00000000000000..4cea878d0ccb44 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst @@ -0,0 +1,4 @@ +Removed daemon threads from :mod:`concurrent.futures` by adding +an internal `threading._register_atexit()`, which calls registered functions +prior to joining all non-daemon threads. This allows for compatibility +with subinterpreters, which don't support daemon threads. \ No newline at end of file From b266ee5b5ac503ddaab487393b209af83cec86d0 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 25 Mar 2020 06:08:49 -0400 Subject: [PATCH 4/8] Fix typo in threading._register_atexit() docstring --- Lib/threading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/threading.py b/Lib/threading.py index 9a414d4d1bb26d..38e7090064284f 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -1358,7 +1358,7 @@ def _register_atexit(func, *arg, **kwargs): purpose to `atexit.register()`, but its functions are called prior to threading shutdown instead of interpreter shutdown. - For similarity to atexit, the registed functions are called in reverse. + For similarity to atexit, the registered functions are called in reverse. """ global _threading_atexits if _threading_atexits is None: From 320fc12ec092ecfb4a1ed6a431b1bdc800193e67 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 25 Mar 2020 17:02:05 -0400 Subject: [PATCH 5/8] Immediately initialize _threading_atexits_ --- Lib/threading.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/Lib/threading.py b/Lib/threading.py index 38e7090064284f..fc90d227696021 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -1348,7 +1348,7 @@ def enumerate(): return list(_active.values()) + list(_limbo.values()) -_threading_atexits = None +_threading_atexits = [] def _register_atexit(func, *arg, **kwargs): """CPython internal: register *func* to be called before joining threads. @@ -1360,9 +1360,6 @@ def _register_atexit(func, *arg, **kwargs): For similarity to atexit, the registered functions are called in reverse. """ - global _threading_atexits - if _threading_atexits is None: - _threading_atexits = [] call = functools.partial(func, *arg, **kwargs) _threading_atexits.append(call) @@ -1397,11 +1394,10 @@ def _shutdown(): tlock.release() _main_thread._stop() - # Call registered threading atexit functions before threads are joined - if _threading_atexits is not None: - # Order is reversed, similar to atexit. - for atexit_call in reversed(_threading_atexits): - atexit_call() + # Call registered threading atexit functions before threads are joined. + # Order is reversed, similar to atexit. + for atexit_call in reversed(_threading_atexits): + atexit_call() # Join all non-deamon threads while True: From e548615675e21c60f413e57bb1f7e8d73abdbe27 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 25 Mar 2020 17:05:26 -0400 Subject: [PATCH 6/8] Disallow new atexits after shutdown --- Lib/threading.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Lib/threading.py b/Lib/threading.py index fc90d227696021..eda8b05736455a 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -1349,6 +1349,7 @@ def enumerate(): _threading_atexits = [] +_SHUTTING_DOWN = False def _register_atexit(func, *arg, **kwargs): """CPython internal: register *func* to be called before joining threads. @@ -1360,6 +1361,9 @@ def _register_atexit(func, *arg, **kwargs): For similarity to atexit, the registered functions are called in reverse. """ + if _SHUTTING_DOWN: + raise RuntimeError("can't register atexit after shutdown") + call = functools.partial(func, *arg, **kwargs) _threading_atexits.append(call) @@ -1385,6 +1389,8 @@ def _shutdown(): # _shutdown() was already called return + global _SHUTTING_DOWN + _SHUTTING_DOWN = True # Main thread tlock = _main_thread._tstate_lock # The main thread isn't finished yet, so its thread state lock can't have From fdf66c39bf25323604516f3cd2191c2913949e4a Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 25 Mar 2020 17:58:56 -0400 Subject: [PATCH 7/8] Add unit tests for _register_atexit() --- Lib/test/test_threading.py | 50 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index f1037b5d940b04..da17e1281d9867 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1397,5 +1397,55 @@ def test_interrupt_main_noerror(self): signal.signal(signal.SIGINT, handler) +class AtexitTests(unittest.TestCase): + + def test_atexit_output(self): + rc, out, err = assert_python_ok("-c", """if True: + import threading + + def run_last(): + print('parrot') + + threading._register_atexit(run_last) + """) + + self.assertFalse(err) + self.assertEqual(out.strip(), b'parrot') + + def test_atexit_called_once(self): + rc, out, err = assert_python_ok("-c", """if True: + import threading + from unittest.mock import Mock + + mock = Mock() + threading._register_atexit(mock) + mock.assert_not_called() + # force early shutdown to ensure it was called once + threading._shutdown() + mock.assert_called_once() + """) + + self.assertFalse(err) + + def test_atexit_after_shutdown(self): + # The only way to do this is by registering an atexit within + # an atexit, which is intended to raise an exception. + rc, out, err = assert_python_ok("-c", """if True: + import threading + + def func(): + pass + + def run_last(): + threading._register_atexit(func) + + threading._register_atexit(run_last) + """) + + self.assertTrue(err) + self.assertIn("RuntimeError: can't register atexit after shutdown", + err.decode()) + + if __name__ == "__main__": unittest.main() From 4cf8c4eebfe85dc780179a822693882a733768ab Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Fri, 27 Mar 2020 14:43:07 -0400 Subject: [PATCH 8/8] Add whatsnew --- Doc/whatsnew/3.9.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index b11c29bdf090fa..5171e2402997ae 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -195,6 +195,11 @@ which have not started running, instead of waiting for them to complete before shutting down the executor. (Contributed by Kyle Stanley in :issue:`39349`.) +Removed daemon threads from :class:`~concurrent.futures.ThreadPoolExecutor` +and :class:`~concurrent.futures.ProcessPoolExecutor`. This improves +compatibility with subinterpreters and predictability in their shutdown +processes. (Contributed by Kyle Stanley in :issue:`39812`.) + curses ------