-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
gh-108973: Fix asyncio test_subprocess_consistent_callbacks() #109431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -226,8 +226,7 @@ async def _make_subprocess_transport(self, protocol, args, shell, | |
return transp | ||
|
||
def _child_watcher_callback(self, pid, returncode, transp): | ||
# Skip one iteration for callbacks to be executed | ||
self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode) | ||
self.call_soon_threadsafe(transp._process_exited, returncode) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's safe to call the method immediately. It's up to the child watcher to take care to call this in the loop thread. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The child watcher runs in a different thread. call_soon_threadsafe() is needed to make sure that the callback is run in the same thread that the event loop. It's an important principle in asyncio, no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I meant is that the child watcher itself already uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did a bit of digging and the comment requiring the callback to be thread-safe was there already before there was even support for running the loop on different threads. The watchers have been updated to use Possibly one of the redundant calls can still be removed, but considering 3.14 will remove watchers altogether it might not be worth it. Let's say that I'm skeptical of this approach of several layers of scheduling callbacks (there is one more when the transport calls the protocol's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you be ok if this call is removed in a separated PR? I'm not comfortable to touch asyncio, but the bug is impacting many CIs and it's very annoying. I would prefer that someone who is more comfortable with asyncio does this change :-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, clearly this would need more discussion/investigation/etc. Let's forget about it for this PR. |
||
|
||
async def create_unix_connection( | ||
self, protocol_factory, path=None, *, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -753,21 +753,44 @@ async def main() -> None: | |
|
||
self.loop.run_until_complete(main()) | ||
|
||
def test_subprocess_consistent_callbacks(self): | ||
def test_subprocess_protocol_events(self): | ||
# gh-108973: Test that all subprocess protocol methods are called. | ||
# The protocol methods are not called in a determistic order. | ||
# The order depends on the event loop and the operating system. | ||
events = [] | ||
fds = [1, 2] | ||
expected = [ | ||
('pipe_data_received', 1, b'stdout'), | ||
('pipe_data_received', 2, b'stderr'), | ||
('pipe_connection_lost', 1), | ||
('pipe_connection_lost', 2), | ||
'process_exited', | ||
] | ||
per_fd_expected = [ | ||
'pipe_data_received', | ||
'pipe_connection_lost', | ||
] | ||
|
||
class MyProtocol(asyncio.SubprocessProtocol): | ||
def __init__(self, exit_future: asyncio.Future) -> None: | ||
self.exit_future = exit_future | ||
|
||
def pipe_data_received(self, fd, data) -> None: | ||
events.append(('pipe_data_received', fd, data)) | ||
self.exit_maybe() | ||
|
||
def pipe_connection_lost(self, fd, exc) -> None: | ||
events.append('pipe_connection_lost') | ||
events.append(('pipe_connection_lost', fd)) | ||
self.exit_maybe() | ||
|
||
def process_exited(self) -> None: | ||
events.append('process_exited') | ||
self.exit_future.set_result(True) | ||
self.exit_maybe() | ||
|
||
def exit_maybe(self): | ||
# Only exit when we got all expected events | ||
if len(events) >= len(expected): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before, we stopped as soon as we seen process_exited. But sometimes, process_exited is get earlier than other events, and so events only contains some expected events and the test fails also in this case :-( |
||
self.exit_future.set_result(True) | ||
|
||
async def main() -> None: | ||
loop = asyncio.get_running_loop() | ||
|
@@ -777,15 +800,24 @@ async def main() -> None: | |
sys.executable, '-c', code, stdin=None) | ||
await exit_future | ||
transport.close() | ||
self.assertEqual(events, [ | ||
('pipe_data_received', 1, b'stdout'), | ||
('pipe_data_received', 2, b'stderr'), | ||
'pipe_connection_lost', | ||
'pipe_connection_lost', | ||
'process_exited', | ||
]) | ||
|
||
self.loop.run_until_complete(main()) | ||
return events | ||
|
||
events = self.loop.run_until_complete(main()) | ||
|
||
# First, make sure that we received all events | ||
self.assertSetEqual(set(events), set(expected)) | ||
|
||
# Second, check order of pipe events per file descriptor | ||
per_fd_events = {fd: [] for fd in fds} | ||
for event in events: | ||
if event == 'process_exited': | ||
continue | ||
name, fd = event[:2] | ||
per_fd_events[fd].append(name) | ||
|
||
for fd in fds: | ||
self.assertEqual(per_fd_events[fd], per_fd_expected, (fd, events)) | ||
|
||
def test_subprocess_communicate_stdout(self): | ||
# See https://github.com/python/cpython/issues/100133 | ||
|
Uh oh!
There was an error while loading. Please reload this page.