-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread #14344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
3695efd
Close pid waiters on watcher.close()
asvetlov 3559581
Revert "Revert "bpo-35621: Support running subprocesses in asyncio wh…
asvetlov b18e6b9
Merge branch 'dangling-threads-in-watcher' into threaded-waiter
asvetlov 1bc1d8e
Merge remote-tracking branch 'upstream/master' into threaded-waiter
asvetlov ef1e21c
Work on
asvetlov c982e4a
Merge branch 'master' into threaded-waiter
asvetlov 6912ec7
Fix dangling thread warnings in tests
asvetlov efb19c0
Update documentation
asvetlov eb07403
Add docstrings
asvetlov 6aa3357
Update Doc/library/asyncio-policy.rst
asvetlov 25d9ac2
Fix doc text
asvetlov ae3a706
Update Doc/library/asyncio-policy.rst
asvetlov 8baf48c
Update Doc/library/asyncio-policy.rst
asvetlov 6e6a783
Update Doc/library/asyncio-policy.rst
asvetlov 1e1281a
Update Lib/asyncio/unix_events.py
asvetlov 936aeb0
Update Lib/asyncio/unix_events.py
asvetlov cc5a752
Update docs
asvetlov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Revert "Revert "bpo-35621: Support running subprocesses in asyncio wh…
- Loading branch information
commit 3559581ad6ea84a9b5a00f0102999b629add3b95
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
import errno | ||
import io | ||
import itertools | ||
import os | ||
import selectors | ||
import signal | ||
|
@@ -29,7 +30,9 @@ | |
__all__ = ( | ||
'SelectorEventLoop', | ||
'AbstractChildWatcher', 'SafeChildWatcher', | ||
'FastChildWatcher', 'DefaultEventLoopPolicy', | ||
'FastChildWatcher', | ||
'MultiLoopChildWatcher', 'ThreadedChildWatcher', | ||
'DefaultEventLoopPolicy', | ||
) | ||
|
||
|
||
|
@@ -184,6 +187,13 @@ async def _make_subprocess_transport(self, protocol, args, shell, | |
stdin, stdout, stderr, bufsize, | ||
extra=None, **kwargs): | ||
with events.get_child_watcher() as watcher: | ||
if not watcher.is_active(): | ||
# Check early. | ||
# Raising exception before process creation | ||
# prevents subprocess execution if the watcher | ||
# is not ready to handle it. | ||
raise RuntimeError("asyncio.get_child_watcher() is not activated, " | ||
"subprocess support is not installed.") | ||
waiter = self.create_future() | ||
transp = _UnixSubprocessTransport(self, protocol, args, shell, | ||
stdin, stdout, stderr, bufsize, | ||
|
@@ -838,6 +848,15 @@ def close(self): | |
""" | ||
raise NotImplementedError() | ||
|
||
def is_active(self): | ||
"""Watcher status. | ||
|
||
Return True if the watcher is installed and ready to handle process exit | ||
notifications. | ||
|
||
""" | ||
raise NotImplementedError() | ||
|
||
def __enter__(self): | ||
"""Enter the watcher's context and allow starting new processes | ||
|
||
|
@@ -849,6 +868,20 @@ def __exit__(self, a, b, c): | |
raise NotImplementedError() | ||
|
||
|
||
def _compute_returncode(status): | ||
if os.WIFSIGNALED(status): | ||
# The child process died because of a signal. | ||
return -os.WTERMSIG(status) | ||
elif os.WIFEXITED(status): | ||
# The child process exited (e.g sys.exit()). | ||
return os.WEXITSTATUS(status) | ||
else: | ||
# The child exited, but we don't understand its status. | ||
# This shouldn't happen, but if it does, let's just | ||
# return that status; perhaps that helps debug it. | ||
return status | ||
|
||
|
||
class BaseChildWatcher(AbstractChildWatcher): | ||
|
||
def __init__(self): | ||
|
@@ -858,6 +891,9 @@ def __init__(self): | |
def close(self): | ||
self.attach_loop(None) | ||
|
||
def is_active(self): | ||
return self._loop is not None and self._loop.is_running() | ||
|
||
def _do_waitpid(self, expected_pid): | ||
raise NotImplementedError() | ||
|
||
|
@@ -898,19 +934,6 @@ def _sig_chld(self): | |
'exception': exc, | ||
}) | ||
|
||
def _compute_returncode(self, status): | ||
if os.WIFSIGNALED(status): | ||
# The child process died because of a signal. | ||
return -os.WTERMSIG(status) | ||
elif os.WIFEXITED(status): | ||
# The child process exited (e.g sys.exit()). | ||
return os.WEXITSTATUS(status) | ||
else: | ||
# The child exited, but we don't understand its status. | ||
# This shouldn't happen, but if it does, let's just | ||
# return that status; perhaps that helps debug it. | ||
return status | ||
|
||
|
||
class SafeChildWatcher(BaseChildWatcher): | ||
"""'Safe' child watcher implementation. | ||
|
@@ -934,11 +957,6 @@ def __exit__(self, a, b, c): | |
pass | ||
|
||
def add_child_handler(self, pid, callback, *args): | ||
if self._loop is None: | ||
raise RuntimeError( | ||
"Cannot add child handler, " | ||
"the child watcher does not have a loop attached") | ||
|
||
self._callbacks[pid] = (callback, args) | ||
|
||
# Prevent a race condition in case the child is already terminated. | ||
|
@@ -974,7 +992,7 @@ def _do_waitpid(self, expected_pid): | |
# The child process is still alive. | ||
return | ||
|
||
returncode = self._compute_returncode(status) | ||
returncode = _compute_returncode(status) | ||
if self._loop.get_debug(): | ||
logger.debug('process %s exited with returncode %s', | ||
expected_pid, returncode) | ||
|
@@ -1035,11 +1053,6 @@ def __exit__(self, a, b, c): | |
def add_child_handler(self, pid, callback, *args): | ||
assert self._forks, "Must use the context manager" | ||
|
||
if self._loop is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The check is replaced by |
||
raise RuntimeError( | ||
"Cannot add child handler, " | ||
"the child watcher does not have a loop attached") | ||
|
||
with self._lock: | ||
try: | ||
returncode = self._zombies.pop(pid) | ||
|
@@ -1072,7 +1085,7 @@ def _do_waitpid_all(self): | |
# A child process is still alive. | ||
return | ||
|
||
returncode = self._compute_returncode(status) | ||
returncode = _compute_returncode(status) | ||
|
||
with self._lock: | ||
try: | ||
|
@@ -1101,6 +1114,177 @@ def _do_waitpid_all(self): | |
callback(pid, returncode, *args) | ||
|
||
|
||
class MultiLoopChildWatcher(AbstractChildWatcher): | ||
# The class keeps compatibility with AbstractChildWatcher ABC | ||
# To achieve this it has empty attach_loop() method | ||
# and doesn't accept explicit loop argument | ||
# for add_child_handler()/remove_child_handler() | ||
# but retrieves the current loop by get_running_loop() | ||
|
||
def __init__(self): | ||
self._callbacks = {} | ||
self._saved_sighandler = None | ||
|
||
def is_active(self): | ||
return self._saved_sighandler is not None | ||
|
||
def close(self): | ||
self._callbacks.clear() | ||
if self._saved_sighandler is not None: | ||
handler = signal.getsignal(signal.SIGCHLD) | ||
if handler != self._sig_chld: | ||
logger.warning("SIGCHLD handler was changed by outside code") | ||
else: | ||
signal.signal(signal.SIGCHLD, self._saved_sighandler) | ||
self._saved_sighandler = None | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
pass | ||
|
||
def add_child_handler(self, pid, callback, *args): | ||
loop = events.get_running_loop() | ||
self._callbacks[pid] = (loop, callback, args) | ||
|
||
# Prevent a race condition in case the child is already terminated. | ||
self._do_waitpid(pid) | ||
|
||
def remove_child_handler(self, pid): | ||
try: | ||
del self._callbacks[pid] | ||
return True | ||
except KeyError: | ||
return False | ||
|
||
def attach_loop(self, loop): | ||
# Don't save the loop but initialize itself if called first time | ||
# The reason to do it here is that attach_loop() is called from | ||
# unix policy only for the main thread. | ||
# Main thread is required for subscription on SIGCHLD signal | ||
if self._saved_sighandler is None: | ||
self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) | ||
if self._saved_sighandler is None: | ||
logger.warning("Previous SIGCHLD handler was set by non-Python code, " | ||
"restore to default handler on watcher close.") | ||
self._saved_sighandler = signal.SIG_DFL | ||
|
||
# Set SA_RESTART to limit EINTR occurrences. | ||
signal.siginterrupt(signal.SIGCHLD, False) | ||
|
||
def _do_waitpid_all(self): | ||
for pid in list(self._callbacks): | ||
self._do_waitpid(pid) | ||
|
||
def _do_waitpid(self, expected_pid): | ||
assert expected_pid > 0 | ||
|
||
try: | ||
pid, status = os.waitpid(expected_pid, os.WNOHANG) | ||
except ChildProcessError: | ||
# The child process is already reaped | ||
# (may happen if waitpid() is called elsewhere). | ||
pid = expected_pid | ||
returncode = 255 | ||
logger.warning( | ||
"Unknown child process pid %d, will report returncode 255", | ||
pid) | ||
debug_log = False | ||
else: | ||
if pid == 0: | ||
# The child process is still alive. | ||
return | ||
|
||
returncode = _compute_returncode(status) | ||
debug_log = True | ||
try: | ||
loop, callback, args = self._callbacks.pop(pid) | ||
except KeyError: # pragma: no cover | ||
# May happen if .remove_child_handler() is called | ||
# after os.waitpid() returns. | ||
logger.warning("Child watcher got an unexpected pid: %r", | ||
pid, exc_info=True) | ||
else: | ||
if loop.is_closed(): | ||
logger.warning("Loop %r that handles pid %r is closed", loop, pid) | ||
else: | ||
if debug_log and loop.get_debug(): | ||
logger.debug('process %s exited with returncode %s', | ||
expected_pid, returncode) | ||
loop.call_soon_threadsafe(callback, pid, returncode, *args) | ||
|
||
def _sig_chld(self, signum, frame): | ||
try: | ||
self._do_waitpid_all() | ||
except (SystemExit, KeyboardInterrupt): | ||
raise | ||
except BaseException: | ||
logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) | ||
|
||
|
||
class ThreadedChildWatcher(AbstractChildWatcher): | ||
# The watcher uses a thread per process | ||
# for waiting for the process finish. | ||
# It doesn't require subscription on POSIX signal | ||
|
||
def __init__(self): | ||
self._pid_counter = itertools.count(0) | ||
|
||
def is_active(self): | ||
return True | ||
|
||
def close(self): | ||
pass | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
pass | ||
|
||
def add_child_handler(self, pid, callback, *args): | ||
loop = events.get_running_loop() | ||
thread = threading.Thread(target=self._do_waitpid, | ||
name=f"waitpid-{next(self._pid_counter)}", | ||
args=(loop, pid, callback, args), | ||
daemon=True) | ||
thread.start() | ||
|
||
def remove_child_handler(self, pid): | ||
# asyncio never calls remove_child_handler() !!! | ||
# The method is no-op but is implemented because | ||
# abstract base classe requires it | ||
return True | ||
|
||
def attach_loop(self, loop): | ||
pass | ||
|
||
def _do_waitpid(self, loop, expected_pid, callback, args): | ||
assert expected_pid > 0 | ||
|
||
try: | ||
pid, status = os.waitpid(expected_pid, 0) | ||
except ChildProcessError: | ||
# The child process is already reaped | ||
# (may happen if waitpid() is called elsewhere). | ||
pid = expected_pid | ||
returncode = 255 | ||
logger.warning( | ||
"Unknown child process pid %d, will report returncode 255", | ||
pid) | ||
else: | ||
returncode = _compute_returncode(status) | ||
if loop.get_debug(): | ||
logger.debug('process %s exited with returncode %s', | ||
expected_pid, returncode) | ||
|
||
if loop.is_closed(): | ||
logger.warning("Loop %r that handles pid %r is closed", loop, pid) | ||
else: | ||
loop.call_soon_threadsafe(callback, pid, returncode, *args) | ||
|
||
|
||
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): | ||
"""UNIX event loop policy with a watcher for child processes.""" | ||
_loop_factory = _UnixSelectorEventLoop | ||
|
@@ -1112,7 +1296,7 @@ def __init__(self): | |
def _init_watcher(self): | ||
with events._lock: | ||
if self._watcher is None: # pragma: no branch | ||
self._watcher = SafeChildWatcher() | ||
self._watcher = ThreadedChildWatcher() | ||
if isinstance(threading.current_thread(), | ||
threading._MainThread): | ||
self._watcher.attach_loop(self._local._loop) | ||
|
@@ -1134,7 +1318,7 @@ def set_event_loop(self, loop): | |
def get_child_watcher(self): | ||
"""Get the watcher for child processes. | ||
|
||
If not yet set, a SafeChildWatcher object is automatically created. | ||
If not yet set, a ThreadedChildWatcher object is automatically created. | ||
""" | ||
if self._watcher is None: | ||
self._init_watcher() | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.