8000 gh-124309: fix staggered race on eager tasks by graingert · Pull Request #124847 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

gh-124309: fix staggered race on eager tasks #124847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
in asyncio.staggered wait for the current task to resume before start…
…ing work on child tasks
  • Loading branch information
graingert committed Oct 1, 2024
commit c936c0f2a5ee45cc38cbf684b4ac12cf76994a0f
11 changes: 8 additions & 3 deletions Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ async def staggered_race(coro_fns, delay, *, loop=None):
exceptions = []
running_tasks = []

async def run_one_coro(previous_failed) -> None:
async def run_one_coro(ok_to_start, previous_failed) -> None:
await ok_to_start.wait()
# Wait for the previous task to finish, or for delay seconds
if previous_failed is not None:
with contextlib.suppress(exceptions_mod.TimeoutError):
Expand All @@ -85,8 +86,10 @@ async def run_one_coro(previous_failed) -> None:
return
# Start task that will run the next coroutine
this_failed = locks.Event()
next_task = loop.create_task(run_one_coro(this_failed))
next_ok_to_start = locks.Event()
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
running_tasks.append(next_task)
next_ok_to_start.set()
assert len(running_tasks) == this_index + 2
# Prepare place to put this coroutine's exceptions if not won
exceptions.append(None)
Expand Down Expand Up @@ -116,8 +119,10 @@ async def run_one_coro(previous_failed) -> None:
if i != this_index:
t.cancel()

first_task = loop.create_task(run_one_coro(None))
ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
running_tasks.append(first_task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10000

Actually, while we're here, I'd replace running_tasks with an explicit TaskGroup (which should be passed to run_one_coro via an argument for other nested tasks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also solve this problem: #124847 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tried to use a TaskGroup before, but it doesn't abort fast enough to avoid two tasks winning at the same time. This is resolved in trio by closing socks that are immediate runner ups

ok_to_start.set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first one, should the event be preset so it doesn't negate the benefits of an eager task factory?

Or maybe make the event optional for the first one since we want it to start right away?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it's still needed because all the coro_fn can return immediately without awaiting anything resulting in none of the tasks being appended to running_tasks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not terribly important to maintain the benefits of an eager task factory here as it's supposed to be used for happy_eyeballs over the internet where there is always going to be some delay

Copy link
Contributor
@bdraco bdraco Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9717

I was only thinking about the first one since there is a chance the first ip could connect right away (not sure if it can happen synchronously or not)

In aiohttp we don't know if the host is an internet or lan or local host so everything ends up going through this path.

It's also possible the user connecting to localhost has working ipv4 but broken IPv6 and 127.0.0.1 will connect right away and ::1 never will or vise versa

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even connecting to a localhost socket raises BlockingIOError and asyncio needs to wait for it to be writable

try:
# Wait for a growing list of tasks to all finish: poor man's version of
# curio's TaskGroup or trio's nursery
Expand Down
Loading
0