8000 bpo-43352: Add a Barrier object in asyncio lib by YvesDup · Pull Request #24903 · python/cpython · GitHub
[go: up one dir, main page]

Skip to content

bpo-43352: Add a Barrier object in asyncio lib #24903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 101 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
2cd5ce0
Initial commit
YvesDup Mar 15, 2021
7dfa8ef
make patchcheck
YvesDup Mar 15, 2021
f19ea24
make patchcheck
YvesDup Mar 15, 2021
f4bb33a
Update test_locks.py
YvesDup Mar 16, 2021
5f14a7d
Update locks.py
YvesDup Mar 16, 2021
214fd35
Update test_locks.py
YvesDup Mar 16, 2021
619f60a
Update from patch
YvesDup Mar 16, 2021
04d4f69
remove cpython/doc/*.rst.bak
YvesDup Mar 31, 2021
c2e438a
📜🤖 Added by blurb_it.
blurb-it[bot] Mar 31, 2021
97598f3
Update 2021-03-31-15-22-45.bpo-43352.nSjMuE.rst
YvesDup Apr 2, 2021
dfea4c2
Update documentation about add of Barrier object
YvesDup Apr 11, 2021
8a7d7fa
Merge branch 'fix-issue-43352' of https://github.com/YvesDup/cpython …
YvesDup Apr 11, 2021
44fe563
Update asyncio-sync.rst
YvesDup Apr 30, 2021
e23c860
Update locks.py
YvesDup May 17, 2021
6a51dc6
Update test_locks.py
YvesDup May 17, 2021
04f72a3
Update asyncio-sync.rst
YvesDup May 17, 2021
7a00453
Update asyncio-sync.rst
YvesDup May 17, 2021 8000
3517e33
Update asyncio-sync.rst
YvesDup May 18, 2021
c56e7c6
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
8715b34
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
d15b2a6
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
ed1b361
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
7504a8f
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
e3b23c2
Initial commit
YvesDup Mar 15, 2021
bc3d17a
make patchcheck
YvesDup Mar 15, 2021
a8b7b1a
make patchcheck
YvesDup Mar 15, 2021
32cbefb
Update test_locks.py
YvesDup Mar 16, 2021
e5002bc
Update locks.py
YvesDup Mar 16, 2021
b12599c
Update test_locks.py
YvesDup Mar 16, 2021
f12123e
Update from patch
YvesDup Mar 16, 2021
9ef7109
remove cpython/doc/*.rst.bak
YvesDup Mar 31, 2021
7b65fbc
📜🤖 Added by blurb_it.
blurb-it[bot] Mar 31, 2021
9b60d07
Update 2021-03-31-15-22-45.bpo-43352.nSjMuE.rst
YvesDup Apr 2, 2021
576cd53
Update documentation about add of Barrier object
YvesDup Apr 11, 2021
6106f21
Update asyncio-sync.rst
YvesDup Apr 30, 2021
0384561
Update locks.py
YvesDup May 17, 2021
16515ef
Update test_locks.py
YvesDup May 17, 2021
fa217fd
Update asyncio-sync.rst
YvesDup May 17, 2021
a97f4a3
Update asyncio-sync.rst
YvesDup May 17, 2021
b598470
Update asyncio-sync.rst
YvesDup May 18, 2021
3098105
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
2f55acc
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
f056865
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
d9008f9
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
71b923c
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup May 19, 2021
41bbbee
Merge remote-tracking branch 'upstream/main' into fix-issue-43352
YvesDup May 29, 2021
606a898
Merge branch 'fix-issue-43352' of github.com:YvesDup/cpython into fix…
YvesDup May 29, 2021
8e302d7
Merge remote-tracking branch 'upstream/main' into fix-issue-43352
YvesDup May 29, 2021
58226e0
Update Doc/library/asyncio-sync.rst
YvesDup Sep 9, 2021
0096cdc
Update Doc/library/asyncio-sync.rst
YvesDup Sep 9, 2021
f275a83
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Dec 3, 2021
e90d556
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Dec 3, 2021
408512d
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Dec 3, 2021
069ed28
Merge branch 'python:main' into fix-issue-43352
YvesDup Feb 18, 2022
b7bb530
Merge branch 'python:main' into fix-issue-43352
YvesDup Feb 21, 2022
49de223
bpo-43352: Add a Barrier object to asyncio synchronized primitives (…
YvesDup Feb 21, 2022
fa4623c
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 22, 2022
20fb88e
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 22, 2022
809de71
bpo-43352: Add a Barrier object to asyncio synchroniz 8000 ed primitives (G…
YvesDup Feb 22, 2022
28fc263
Merge branch 'python:main' into fix-issue-43352
YvesDup Feb 22, 2022
f66cd2c
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 22, 2022
62272b3
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 22, 2022
f7dbf9b
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Feb 25, 2022
cc64b69
Merge branch 'python:main' into fix-issue-43352
YvesDup Mar 7, 2022
28eba9b
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
cca2918
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
6d3806f
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
a2467f5
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
90e5d04
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
bea7e7f
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 16, 2022
e97d417
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 17, 2022
b60a931
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 17, 2022
34cfc75
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 17, 2022
c7bbebe
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 18, 2022
fe20bdc
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 18, 2022
edfd7d4
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 18, 2022
27e1768
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 18, 2022
892f389
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 18, 2022
5b8962a
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
63e0688
Update Doc/library/asybpo-43352: Add a Barrier object to asyncio sync…
YvesDup Mar 21, 2022
eb42246
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
e29d0c0
Update Doc/library/asyncio-sync.rstbpo-43352: Add a Barrier object to…
YvesDup Mar 21, 2022
6857cf2
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
d4d5b55
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
5bb6d7e
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
66f023a
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 21, 2022
e5d229c
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 22, 2022
b0d17da
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 22, 2022
1c16176
po-43352: Add a Barrier object to asyncio synchronized primitives (GH…
YvesDup Mar 22, 2022
151ca5d
po-43352: Add a Barrier object to asyncio synchronized primitives (GH…
YvesDup Mar 22, 2022
37efb91
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 23, 2022
337189e
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 23, 2022
ee59e3a
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 23, 2022
df5bd42
Rewrite repr(), use enum for state, drop _count_blocked used by tests…
asvetlov Mar 24, 2022
f1acd93
Inline method
asvetlov Mar 24, 2022
1bb3297
Move BrokenBarrierError to exceptions.py
asvetlov Mar 24, 2022
d82f1a5
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 25, 2022
f89dc5f
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 25, 2022
896e274
Tune docs
asvetlov Mar 25, 2022
b7bdf40
bpo-43352: Add a Barrier object to asyncio synchronized primitives (G…
YvesDup Mar 25, 2022
3c30290
Bump
asvetlov Mar 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Initial commit
Add a Barrier object to asyncio synchronized primitives
  • Loading branch information
YvesDup committed May 28, 2021
commit e3b23c2d816c478aabc1d0bfdfc3f07611dbe6af
173 changes: 172 additions & 1 deletion Lib/asyncio/locks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Synchronization primitives."""

__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore', 'Barrier', 'BrokenBarrierError')

import collections

Expand Down Expand Up @@ -418,3 +418,174 @@ def release(self):
if self._value >= self._bound_value:
raise ValueError('BoundedSemaphore released too many times')
super().release()


# A barrier class. Inspired in part by the pthread_barrier_* api and
# the CyclicBarrier class from Java. See
# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
# CyclicBarrier.html
# for information.
# We maintain two main states, 'filling' and 'draining' enabling the barrier
# to be cyclic. Tasks are not allowed into it until it has fully drained
# since the previous cycle. In addition, a 'resetting' state exists which is
# similar to 'draining' except that tasks leave with a BrokenBarrierError,
# and a 'broken' state in which all tasks get the exception.

class Barrier(mixins._LoopBoundMixin):
"""Asynchronous equivalent to threading.Barrier

Implements a Barrier.
Useful for synchronizing a fixed number of tasks at known synchronization
points. Tasks block on 'wait()' and are simultaneously awoken once they
have all made that call.
"""

def __init__(self, parties, action=None, *, loop=mixins._marker):
"""Create a barrier, initialised to 'parties' tasks.
'action' is a callable which, when supplied, will be called by one of
the tasks after they have all entered the barrier and just prior to
releasing them all.
"""
super().__init__(loop=loop)
if parties < 1:
raise ValueError('parties must be > 0')

self._waiting = Event() # used notify all waiting tasks
self._blocking = Event() # used block tasks while wainting tasks are draining or broken
self._action = action
self._parties = parties
self._state = 0 # 0 filling, 1, draining, -1 resetting, -2 broken
self._count = 0 # count waiting tasks

def __repr__(self):
res = super().__repr__()
_wait = 'set' if self._waiting.is_set() else 'unset'
_block = 'set' if self._blocking.is_set() else 'unset'
extra = f'{_wait}, count:{self._count}/{self._parties}, {_block}, state:{self._state}'
return f'<{res[1:-1]} [{extra}]>'

async def wait(self):
"""Wait for the barrier.
When the specified number of tasks have started waiting, they are all
simultaneously awoken. If an 'action' was provided for the barrier, one
of the tasks will have executed that callback prior to returning.
Returns an individual index number from 0 to 'parties-1'.
"""
await self._block() # Block while the barrier drains or resets.
index = self._count
self._count += 1
try:
if index + 1 == self._parties:
# We release the barrier
self._release()
else:
# We wait until someone releases us
await self._wait()
return index
finally:
self._count -= 1
# Wake up any tasks waiting for barrier to drain.
self._exit()

# Block until the barrier is ready for us, or raise an exception
# if it is broken.
async def _block (self):
if self._state in (-1, 1):
# It is draining or resetting, wait until done
await self._blocking.wait()

#see if the barrier is in a broken state
if self._state < 0:
raise BrokenBarrierError
assert self._state == 0, repr(self)

# Optionally run the 'action' and release the tasks waiting
# in the barrier.
def _release(self):
try:
if self._action:
self._action()
# enter draining state
self._state = 1
self._blocking.clear()
self._waiting.set()
except:
#an exception during the _action handler. Break and reraise
self._state = -2
self._blocking.clear()
self._waiting.set()
raise

# Wait in the barrier until we are released. Raise an exception
# if the barrier is reset or broken.
async def _wait(self):
await self._waiting.wait()
# no timeout so
if self._state < 0:
raise BrokenBarrierError
assert self._state == 1, repr(self)

# If we are the last tasks to exit the barrier, signal any tasks
# waiting for the barrier to drain.
def _exit(self):
if self._count == 0:
if self._state == 1:
self._state = 0
elif self._state == -1:
self._state = 0
self._waiting.clear()
self._blocking.set()

# async def reset(self):
def reset(self):
"""Reset the barrier to the initial state.
Any tasks currently waiting will get the BrokenBarrier exception
raised.
"""
if self._count > 0:
if self._state in (0, 1):
#reset the barrier, waking up tasks
self._state = -1
elif self._state == -2:
#was broken, set it to reset state
#which clears when the last tasks exits
self._state = -1
self._waiting.set()
self._blocking.clear()
else:
self._state = 0


# async def abort(self):
def abort(self):
"""Place the barrier into a 'broken' state.
Useful in case of error. Any currently waiting tasks and tasks
attempting to 'wait()' will have BrokenBarrierError raised.
"""
self._state = -2
self._waiting.set()
self._blocking.clear()

@property
def parties(self):
"""Return the number of tasks required to trip the barrier. 6F18 """
return self._parties

@property
def n_waiting(self):
"""Return the number of tasks currently waiting at the barrier."""
# We don't need synchronization here since this is an ephemeral result
# anyway. It returns the correct value in the steady state.
if self._state == 0:
return self._count
return 0

@property
def broken(self):
"""Return True if the barrier is in a broken state."""
return self._state == -2

# exception raised by the Barrier class
class BrokenBarrierError(RuntimeError):
pass
Loading
0