8000 GH-120804: Remove `PidfdChildWatcher`, `ThreadedChildWatcher` and `AbstractChildWatcher` from asyncio APIs by kumaraditya303 · Pull Request #120893 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

GH-120804: Remove PidfdChildWatcher, ThreadedChildWatcher and AbstractChildWatcher from asyncio APIs #120893

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 23 additions & 176 deletions Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@

__all__ = (
'SelectorEventLoop',
'AbstractChildWatcher',
'PidfdChildWatcher',
'ThreadedChildWatcher',
'DefaultEventLoopPolicy',
'EventLoop',
)
Expand Down Expand Up @@ -65,6 +62,10 @@ def __init__(self, selector=None):
super().__init__(selector)
self._signal_handlers = {}
self._unix_server_sockets = {}
if can_use_pidfd():
self._watcher = _PidfdChildWatcher()
else:
self._watcher = _ThreadedChildWatcher()

def close(self):
super().close()
Expand Down Expand Up @@ -197,33 +198,22 @@ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
async def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
watcher = events.get_event_loop_policy()._watcher

with watcher:
if not watcher.is_active():
# Check early.
# Raising exception before process creation
# prevents subprocess execution if the watcher
# is not ready to handle it.
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
"subprocess support is not installed.")
waiter = self.create_future()
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
waiter=waiter, extra=extra,
**kwargs)
watcher.add_child_handler(transp.get_pid(),
self._child_watcher_callback, transp)
try:
await waiter
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
transp.close()
await transp._wait()
raise
watcher = self._watcher
waiter = self.create_future()
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
waiter=waiter, extra=extra,
**kwargs)
watcher.add_child_handler(transp.get_pid(),
self._child_watcher_callback, transp)
try:
await waiter
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
transp.close()
await transp._wait()
raise

return transp

Expand Down Expand Up @@ -865,93 +855,7 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
stdin_w.close()


class AbstractChildWatcher:
"""Abstract base class for monitoring child processes.

Objects derived from this class monitor a collection of subprocesses and
report their termination or interruption by a signal.

New callbacks are registered with .add_child_handler(). Starting a new
process must be done within a 'with' block to allow the watcher to suspend
its activity until the new process if fully registered (this is needed to
prevent a race condition in some implementations).

Example:
with watcher:
proc = subprocess.Popen("sleep 1")
watcher.add_child_handler(proc.pid, callback)

Notes:
Implementations of this class must be thread-safe.

Since child watcher objects may catch the SIGCHLD signal and call
waitpid(-1), there should be only one active object per process.
"""

def __init_subclass__(cls) -> None:
if cls.__module__ != __name__:
warnings._deprecated("AbstractChildWatcher",
"{name!r} is deprecated as of Python 3.12 and will be "
"removed in Python {remove}.",
remove=(3, 14))

def add_child_handler(self, pid, callback, *args):
"""Register a new child handler.

Arrange for callback(pid, returncode, *args) to be called when
process 'pid' terminates. Specifying another callback for the same
process replaces the previous handler.

Note: callback() must be thread-safe.
"""
raise NotImplementedError()

def remove_child_handler(self, pid):
"""Removes the handler for process 'pid'.

The function returns True if the handler was successfully removed,
False if there was nothing to remove."""

raise NotImplementedError()

def attach_loop(self, loop):
"""Attach the watcher to an event loop.

If the watcher was previously attached to an event loop, then it is
first detached before attaching to the new loop.

Note: loop may be None.
"""
raise NotImplementedError()

def close(self):
"""Close the watcher.

This must be called to make sure that any underlying resource is freed.
"""
raise NotImplementedError()

def is_active(self):
"""Return ``True`` if the watcher is active and is used by the event loop.

Return True if the watcher is installed and ready to handle process exit
notifications.

"""
raise NotImplementedError()

def __enter__(self):
"""Enter the watcher's context and allow starting new processes

This function must return self"""
raise NotImplementedError()

def __exit__(self, a, b, c):
"""Exit the watcher's context"""
raise NotImplementedError()


class PidfdChildWatcher(AbstractChildWatcher):
class _PidfdChildWatcher:
"""Child watcher implementation using Linux's pid file descriptors.

This child watcher polls process file descriptors (pidfds) to await child
Expand All @@ -963,21 +867,9 @@ class PidfdChildWatcher(AbstractChildWatcher):
recent (5.3+) kernels.
"""

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
pass

def is_active(self):
return True

def close(self):
pass

def attach_loop(self, loop):
pass

def add_child_handler(self, pid, callback, *args):
loop = events.get_running_loop()
pidfd = os.pidfd_open(pid)
Expand All @@ -1002,14 +894,7 @@ def _do_wait(self, pid, pidfd, callback, args):
os.close(pidfd)
callback(pid, returncode, *args)

def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base classes require it.
return True


class ThreadedChildWatcher(AbstractChildWatcher):
class _ThreadedChildWatcher:
"""Threaded child watcher implementation.

The watcher uses a thread per process
Expand All @@ -1029,15 +914,6 @@ def __init__(self):
def is_active(self):
return True

def close(self):
pass

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def __del__(self, _warn=warnings.warn):
threads = [thread for thread in list(self._threads.values())
if thread.is_alive()]
Expand All @@ -1055,15 +931,6 @@ def add_child_handler(self, pid, callback, *args):
self._threads[pid] = thread
thread.start()

def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base classes require it.
return True

def attach_loop(self, loop):
pass

def _do_waitpid(self, loop, expected_pid, callback, args):
assert expected_pid > 0

Expand Down Expand Up @@ -1103,29 +970,9 @@ def can_use_pidfd():


class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
"""UNIX event loop policy with a watcher for child processes."""
"""UNIX event loop policy"""
_loop_factory = _UnixSelectorEventLoop

def __init__(self):
super().__init__()
if can_use_pidfd():
self._watcher = PidfdChildWatcher()
else:
self._watcher = ThreadedChildWatcher()

def set_event_loop(self, loop):
"""Set the event loop.

As a side effect, if a child watcher was set before, then calling
.set_event_loop() from the main thread will call .attach_loop(loop) on
the child watcher.
"""

super().set_event_loop(loop)

if (self._watcher is not None and
threading.current_thread() is threading.main_thread()):
self._watcher.attach_loop(loop)

SelectorEventLoop = _UnixSelectorEventLoop
DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
Expand Down
26 changes: 4 additions & 22 deletions Lib/test/test_asyncio/test_events.py
9E81
Original file line number Diff line number Diff line change
Expand Up @@ -2209,22 +2209,8 @@ def test_remove_fds_after_closing(self):
else:
import selectors

class UnixEventLoopTestsMixin(EventLoopTestsMixin):
def setUp(self):
super().setUp()
watcher = asyncio.ThreadedChildWatcher()
watcher.attach_loop(self.loop)
policy = asyncio.get_event_loop_policy()
policy._watcher = watcher

def tearDown(self):
policy = asyncio.get_event_loop_policy()
policy._watcher = None
super().tearDown()


if hasattr(selectors, 'KqueueSelector'):
class KqueueEventLoopTests(UnixEventLoopTestsMixin,
class KqueueEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):

Expand All @@ -2249,23 +2235,23 @@ def test_write_pty(self):
super().test_write_pty()

if hasattr(selectors, 'EpollSelector'):
class EPollEventLoopTests(UnixEventLoopTestsMixin,
class EPollEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):

def create_event_loop(self):
return asyncio.SelectorEventLoop(selectors.EpollSelector())

if hasattr(selectors, 'PollSelector'):
class PollEventLoopTests(UnixEventLoopTestsMixin,
class PollEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):

def create_event_loop(self):
return asyncio.SelectorEventLoop(selectors.PollSelector())

# Should always exist.
class SelectEventLoopTests(UnixEventLoopTestsMixin,
class SelectEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
test_utils.TestCase):

Expand Down Expand Up @@ -2830,10 +2816,6 @@ def setUp(self):

def tearDown(self):
try:
if sys.platform != 'win32':
policy = asyncio.get_event_loop_policy()
policy._watcher = None

super().tearDown()
finally:
self.loop.close()
Expand Down
30 changes: 12 additions & 18 deletions Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,31 +869,27 @@ async def main():
# Unix
class SubprocessWatcherMixin(SubprocessMixin):

Watcher = None

def setUp(self):
super().setUp()
policy = asyncio.get_event_loop_policy()
self.loop = policy.new_event_loop()
self.set_event_loop(self.loop)

watcher = self._get_watcher()
watcher.attach_loop(self.loop)
policy._watcher = watcher
def test_watcher_implementation(self):
loop = self.loop
watcher = loop._watcher
if unix_events.can_use_pidfd():
self.assertIsInstance(watcher, unix_events._PidfdChildWatcher)
else:
self.assertIsInstance(watcher, unix_events._ThreadedChildWatcher)

def tearDown(self):
super().tearDown()
policy = asyncio.get_event_loop_policy()
watcher = policy._watcher
policy._watcher = None
watcher.attach_loop(None)
watcher.close()

class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):

def _get_watcher(self):
return unix_events.ThreadedChildWatcher()
def setUp(self):
# Force the use of the threaded child watcher
unix_events.can_use_pidfd = mock.Mock(return_value=False)
super().setUp()

@unittest.skipUnless(
unix_events.can_use_pidfd(),
Expand All @@ -902,9 +898,7 @@ def _get_watcher(self):
class SubprocessPidfdWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):

def _get_watcher(self):
return unix_events.PidfdChildWatcher()

pass

else:
# Windows
Expand Down
Loading
Loading
0