diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index ff2df653e41b89..c22d0777b0b3be 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -28,9 +28,6 @@ __all__ = ( 'SelectorEventLoop', - 'AbstractChildWatcher', - 'PidfdChildWatcher', - 'ThreadedChildWatcher', 'DefaultEventLoopPolicy', 'EventLoop', ) @@ -65,6 +62,10 @@ def __init__(self, selector=None): super().__init__(selector) self._signal_handlers = {} self._unix_server_sockets = {} + if can_use_pidfd(): + self._watcher = _PidfdChildWatcher() + else: + self._watcher = _ThreadedChildWatcher() def close(self): super().close() @@ -197,33 +198,22 @@ def _make_write_pipe_transport(self, pipe, protocol, waiter=None, async def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): - with warnings.catch_warnings(): - warnings.simplefilter('ignore', DeprecationWarning) - watcher = events.get_event_loop_policy()._watcher - - with watcher: - if not watcher.is_active(): - # Check early. - # Raising exception before process creation - # prevents subprocess execution if the watcher - # is not ready to handle it. - raise RuntimeError("asyncio.get_child_watcher() is not activated, " - "subprocess support is not installed.") - waiter = self.create_future() - transp = _UnixSubprocessTransport(self, protocol, args, shell, - stdin, stdout, stderr, bufsize, - waiter=waiter, extra=extra, - **kwargs) - watcher.add_child_handler(transp.get_pid(), - self._child_watcher_callback, transp) - try: - await waiter - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: - transp.close() - await transp._wait() - raise + watcher = self._watcher + waiter = self.create_future() + transp = _UnixSubprocessTransport(self, protocol, args, shell, + stdin, stdout, stderr, bufsize, + waiter=waiter, extra=extra, + **kwargs) + watcher.add_child_handler(transp.get_pid(), + self._child_watcher_callback, transp) + try: + await waiter + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: + transp.close() + await transp._wait() + raise return transp @@ -865,93 +855,7 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): stdin_w.close() -class AbstractChildWatcher: - """Abstract base class for monitoring child processes. - - Objects derived from this class monitor a collection of subprocesses and - report their termination or interruption by a signal. - - New callbacks are registered with .add_child_handler(). Starting a new - process must be done within a 'with' block to allow the watcher to suspend - its activity until the new process if fully registered (this is needed to - prevent a race condition in some implementations). - - Example: - with watcher: - proc = subprocess.Popen("sleep 1") - watcher.add_child_handler(proc.pid, callback) - - Notes: - Implementations of this class must be thread-safe. - - Since child watcher objects may catch the SIGCHLD signal and call - waitpid(-1), there should be only one active object per process. - """ - - def __init_subclass__(cls) -> None: - if cls.__module__ != __name__: - warnings._deprecated("AbstractChildWatcher", - "{name!r} is deprecated as of Python 3.12 and will be " - "removed in Python {remove}.", - remove=(3, 14)) - - def add_child_handler(self, pid, callback, *args): - """Register a new child handler. - - Arrange for callback(pid, returncode, *args) to be called when - process 'pid' terminates. Specifying another callback for the same - process replaces the previous handler. - - Note: callback() must be thread-safe. - """ - raise NotImplementedError() - - def remove_child_handler(self, pid): - """Removes the handler for process 'pid'. - - The function returns True if the handler was successfully removed, - False if there was nothing to remove.""" - - raise NotImplementedError() - - def attach_loop(self, loop): - """Attach the watcher to an event loop. - - If the watcher was previously attached to an event loop, then it is - first detached before attaching to the new loop. - - Note: loop may be None. - """ - raise NotImplementedError() - - def close(self): - """Close the watcher. - - This must be called to make sure that any underlying resource is freed. - """ - raise NotImplementedError() - - def is_active(self): - """Return ``True`` if the watcher is active and is used by the event loop. - - Return True if the watcher is installed and ready to handle process exit - notifications. - - """ - raise NotImplementedError() - - def __enter__(self): - """Enter the watcher's context and allow starting new processes - - This function must return self""" - raise NotImplementedError() - - def __exit__(self, a, b, c): - """Exit the watcher's context""" - raise NotImplementedError() - - -class PidfdChildWatcher(AbstractChildWatcher): +class _PidfdChildWatcher: """Child watcher implementation using Linux's pid file descriptors. This child watcher polls process file descriptors (pidfds) to await child @@ -963,21 +867,9 @@ class PidfdChildWatcher(AbstractChildWatcher): recent (5.3+) kernels. """ - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, exc_traceback): - pass - def is_active(self): return True - def close(self): - pass - - def attach_loop(self, loop): - pass - def add_child_handler(self, pid, callback, *args): loop = events.get_running_loop() pidfd = os.pidfd_open(pid) @@ -1002,14 +894,7 @@ def _do_wait(self, pid, pidfd, callback, args): os.close(pidfd) callback(pid, returncode, *args) - def remove_child_handler(self, pid): - # asyncio never calls remove_child_handler() !!! - # The method is no-op but is implemented because - # abstract base classes require it. - return True - - -class ThreadedChildWatcher(AbstractChildWatcher): +class _ThreadedChildWatcher: """Threaded child watcher implementation. The watcher uses a thread per process @@ -1029,15 +914,6 @@ def __init__(self): def is_active(self): return True - def close(self): - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - def __del__(self, _warn=warnings.warn): threads = [thread for thread in list(self._threads.values()) if thread.is_alive()] @@ -1055,15 +931,6 @@ def add_child_handler(self, pid, callback, *args): self._threads[pid] = thread thread.start() - def remove_child_handler(self, pid): - # asyncio never calls remove_child_handler() !!! - # The method is no-op but is implemented because - # abstract base classes require it. - return True - - def attach_loop(self, loop): - pass - def _do_waitpid(self, loop, expected_pid, callback, args): assert expected_pid > 0 @@ -1103,29 +970,9 @@ def can_use_pidfd(): class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): - """UNIX event loop policy with a watcher for child processes.""" + """UNIX event loop policy""" _loop_factory = _UnixSelectorEventLoop - def __init__(self): - super().__init__() - if can_use_pidfd(): - self._watcher = PidfdChildWatcher() - else: - self._watcher = ThreadedChildWatcher() - - def set_event_loop(self, loop): - """Set the event loop. - - As a side effect, if a child watcher was set before, then calling - .set_event_loop() from the main thread will call .attach_loop(loop) on - the child watcher. - """ - - super().set_event_loop(loop) - - if (self._watcher is not None and - threading.current_thread() is threading.main_thread()): - self._watcher.attach_loop(loop) SelectorEventLoop = _UnixSelectorEventLoop DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 5b660de28d6fa0..34ea02b4c252d0 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2209,22 +2209,8 @@ def test_remove_fds_after_closing(self): else: import selectors - class UnixEventLoopTestsMixin(EventLoopTestsMixin): - def setUp(self): - super().setUp() - watcher = asyncio.ThreadedChildWatcher() - watcher.attach_loop(self.loop) - policy = asyncio.get_event_loop_policy() - policy._watcher = watcher - - def tearDown(self): - policy = asyncio.get_event_loop_policy() - policy._watcher = None - super().tearDown() - - if hasattr(selectors, 'KqueueSelector'): - class KqueueEventLoopTests(UnixEventLoopTestsMixin, + class KqueueEventLoopTests(EventLoopTestsMixin, SubprocessTestsMixin, test_utils.TestCase): @@ -2249,7 +2235,7 @@ def test_write_pty(self): super().test_write_pty() if hasattr(selectors, 'EpollSelector'): - class EPollEventLoopTests(UnixEventLoopTestsMixin, + class EPollEventLoopTests(EventLoopTestsMixin, SubprocessTestsMixin, test_utils.TestCase): @@ -2257,7 +2243,7 @@ def create_event_loop(self): return asyncio.SelectorEventLoop(selectors.EpollSelector()) if hasattr(selectors, 'PollSelector'): - class PollEventLoopTests(UnixEventLoopTestsMixin, + class PollEventLoopTests(EventLoopTestsMixin, SubprocessTestsMixin, test_utils.TestCase): @@ -2265,7 +2251,7 @@ def create_event_loop(self): return asyncio.SelectorEventLoop(selectors.PollSelector()) # Should always exist. - class SelectEventLoopTests(UnixEventLoopTestsMixin, + class SelectEventLoopTests(EventLoopTestsMixin, SubprocessTestsMixin, test_utils.TestCase): @@ -2830,10 +2816,6 @@ def setUp(self): def tearDown(self): try: - if sys.platform != 'win32': - policy = asyncio.get_event_loop_policy() - policy._watcher = None - super().tearDown() finally: self.loop.close() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index d7f03e6dd0f4a9..23987c70ca7b63 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -869,31 +869,27 @@ async def main(): # Unix class SubprocessWatcherMixin(SubprocessMixin): - Watcher = None - def setUp(self): super().setUp() policy = asyncio.get_event_loop_policy() self.loop = policy.new_event_loop() self.set_event_loop(self.loop) - watcher = self._get_watcher() - watcher.attach_loop(self.loop) - policy._watcher = watcher + def test_watcher_implementation(self): + loop = self.loop + watcher = loop._watcher + if unix_events.can_use_pidfd(): + self.assertIsInstance(watcher, unix_events._PidfdChildWatcher) + else: + self.assertIsInstance(watcher, unix_events._ThreadedChildWatcher) - def tearDown(self): - super().tearDown() - policy = asyncio.get_event_loop_policy() - watcher = policy._watcher - policy._watcher = None - watcher.attach_loop(None) - watcher.close() class SubprocessThreadedWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): - - def _get_watcher(self): - return unix_events.ThreadedChildWatcher() + def setUp(self): + # Force the use of the threaded child watcher + unix_events.can_use_pidfd = mock.Mock(return_value=False) + super().setUp() @unittest.skipUnless( unix_events.can_use_pidfd(), @@ -902,9 +898,7 @@ def _get_watcher(self): class SubprocessPidfdWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): - def _get_watcher(self): - return unix_events.PidfdChildWatcher() - + pass else: # Windows diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 2ea698f4d74cf7..4966775acac7be 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1112,32 +1112,6 @@ def test_write_eof_pending(self): self.assertFalse(self.protocol.connection_lost.called) -class AbstractChildWatcherTests(unittest.TestCase): - - def test_warns_on_subclassing(self): - with self.assertWarns(DeprecationWarning): - class MyWatcher(asyncio.AbstractChildWatcher): - pass - - def test_not_implemented(self): - f = mock.Mock() - watcher = asyncio.AbstractChildWatcher() - self.assertRaises( - NotImplementedError, watcher.add_child_handler, f, f) - self.assertRaises( - NotImplementedError, watcher.remove_child_handler, f) - self.assertRaises( - NotImplementedError, watcher.attach_loop, f) - self.assertRaises( - NotImplementedError, watcher.close) - self.assertRaises( - NotImplementedError, watcher.is_active) - self.assertRaises( - NotImplementedError, watcher.__enter__) - self.assertRaises( - NotImplementedError, watcher.__exit__, f, f, f) - - class TestFunctional(unittest.TestCase): def setUp(self): diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index 3fe2ecd2be6d0c..dbb8d27c176950 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -547,23 +547,6 @@ def close_loop(loop): loop._default_executor.shutdown(wait=True) loop.close() - policy = support.maybe_get_event_loop_policy() - if policy is not None: - try: - watcher = policy._watcher - except AttributeError: - # watcher is not implemented by EventLoopPolicy, e.g. Windows - pass - else: - if isinstance(watcher, asyncio.ThreadedChildWatcher): - # Wait for subprocess to finish, but not forever - for thread in list(watcher._threads.values()): - thread.join(timeout=support.SHORT_TIMEOUT) - if thread.is_alive(): - raise RuntimeError(f"thread {thread} still alive: " - "subprocess still running") - - def set_event_loop(self, loop, *, cleanup=True): if loop is None: raise AssertionError('loop is None')