8000 start from scratch · python/cpython@2bd08cf · GitHub
[go: up one dir, main page]

Skip to content

Commit 2bd08cf

Browse files
start from scratch
1 parent 0b87f8d commit 2bd08cf

19 files changed

+1626
-136
lines changed

Lib/ast.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,20 +1476,17 @@ def visit_Call(self, node):
14761476
self.traverse(e)
14771477

14781478
def visit_Subscript(self, node):
1479-
def is_simple_tuple(slice_value):
1480-
# when unparsing a non-empty tuple, the parentheses can be safely
1481-
# omitted if there aren't any elements that explicitly requires
1482-
# parentheses (such as starred expressions).
1479+
def is_non_empty_tuple(slice_value):
14831480
return (
14841481
isinstance(slice_value, Tuple)
14851482
and slice_value.elts
1486-
and not any(isinstance(elt, Starred) for elt in slice_value.elts)
14871483
)
14881484

14891485
self.set_precedence(_Precedence.ATOM, node.value)
14901486
self.traverse(node.value)
14911487
with self.delimit("[", "]"):
1492-
if is_simple_tuple(node.slice):
1488+
if is_non_empty_tuple(node.slice):
1489+
# parentheses can be omitted if the tuple isn't empty
14931490
self.items_view(self.traverse, node.slice.elts)
14941491
else:
14951492
self.traverse(node.slice)

Lib/asyncio/exceptions.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
"""asyncio exceptions."""
22

33

4-
__all__ = ('CancelledError', 'InvalidStateError', 'TimeoutError',
4+
__all__ = ('BrokenBarrierError',
5+
'CancelledError', 'InvalidStateError', 'TimeoutError',
56
'IncompleteReadError', 'LimitOverrunError',
67
'SendfileNotAvailableError')
78

@@ -55,3 +56,7 @@ def __init__(self, message, consumed):
5556

5657
def __reduce__(self):
5758
return type(self), (self.args[0], self.consumed)
59+
60+
61+
class BrokenBarrierError(RuntimeError):
62+
"""Barrier is broken by barrier.abort() call."""

Lib/asyncio/locks.py

Lines changed: 155 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
"""Synchronization primitives."""
22

3-
__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
3+
__all__ = ('Lock', 'Event', 'Condition', 'Semaphore',
4+
'BoundedSemaphore', 'Barrier')
45

56
import collections
7+
import enum
68

79
from . import exceptions
810
from . import mixins
911
from . import tasks
1012

11-
1213
class _ContextManagerMixin:
1314
async def __aenter__(self):
1415
await self.acquire()
@@ -416,3 +417,155 @@ def release(self):
416417
if self._value >= self._bound_value:
417418
raise ValueError('BoundedSemaphore released too many times')
418419
super().release()
420+
421+
422+
423+
class _BarrierState(enum.Enum):
424+
FILLING = 'filling'
425+
DRAINING = 'draining'
426+
RESETTING = 'resetting'
427+
BROKEN = 'broken'
428+
429+
430+
class Barrier(mixins._LoopBoundMixin):
431+
"""Asyncio equivalent to threading.Barrier
432+
433+
Implements a Barrier primitive.
434+
Useful for synchronizing a fixed number of tasks at known synchronization
435+
points. Tasks block on 'wait()' and are simultaneously awoken once they
436+
have all made their call.
437+
"""
438+
439+
def __init__(self, parties):
440+
"""Create a barrier, initialised to 'parties' tasks."""
441+
if parties < 1:
442+
raise ValueError('parties must be > 0')
443+
444+
self._cond = Condition() # notify all tasks when state changes
445+
446+
self._parties = parties
447+
self._state = _BarrierState.FILLING
448+
self._count = 0 # count tasks in Barrier
449+
450+
def __repr__(self):
451+
res = super().__repr__()
452+
extra = f'{self._state.value}'
453+
if not self.broken:
454+
extra += f', waiters:{self.n_waiting}/{self.parties}'
455+
return f'<{res[1:-1]} [{extra}]>'
456+
457+
async def __aenter__(self):
458+
# wait for the barrier reaches the parties number
459+
# when start draining release and return index of waited task
460+
return await self.wait()
461+
462+
async def __aexit__(self, *args):
463+
pass
464+
465+
async def wait(self):
466+
"""Wait for the barrier.
467+
468+
When the specified number of tasks have started waiting, they are all
469+
simultaneously awoken.
470+
Returns an unique and individual index number from 0 to 'parties-1'.
471+
"""
472+
async with self._cond:
473+
await self._block() # Block while the barrier drains or resets.
474+
try:
475+
index = self._count
476+
self._count += 1
477+
if index + 1 == self._parties:
478+
# We release the barrier
479+
await self._release()
480+
else:
481+
await self._wait()
482+
return index
483+
finally:
484+
self._count -= 1
485+
# Wake up any tasks waiting for barrier to drain.
486+
self._exit()
487+
488+
async def _block(self):
489+
# Block until the barrier is ready for us,
490+
# or raise an exception if it is broken.
491+
#
492+
# It is draining or resetting, wait until done
493+
# unless a CancelledError occurs
494+
await self._cond.wait_for(
495+
lambda: self._state not in (
496+
_BarrierState.DRAINING, _BarrierState.RESETTING
497+
)
498+
)
499+
500+
# see if the barrier is in a broken state
501+
if self._state is _BarrierState.BROKEN:
502+
raise exceptions.BrokenBarrierError("Barrier aborted")
503+
504+
async def _release(self):
505+
# Release the tasks waiting in the barrier.
506+
507+
# Enter draining state.
508+
# Next waiting tasks will be blocked until the end of draining.
509+
self._state = _BarrierState.DRAINING
510+
self._cond.notify_all()
511+
512+
async def _wait(self):
513+
# Wait in the barrier until we are released. Raise an exception
514+
# if the barrier is reset or broken.
515+
516+
# wait for end of filling
517+
# unless a CancelledError occurs
518+
await self._cond.wait_for(lambda: self._state is not _BarrierState.FILLING)
519+
520+
if self._state in (_BarrierState.BROKEN, _BarrierState.RESETTING):
521+
raise exceptions.BrokenBarrierError("Abort or reset of barrier")
522+
523+
def _exit(self):
524+
# If we are the last tasks to exit the barrier, signal any tasks
525+
# waiting for the barrier to drain.
526+
if self._count == 0:
527+
if self._state in (_BarrierState.RESETTING, _BarrierState.DRAINING):
528+
self._state = _BarrierState.FILLING
529+
self._cond.notify_all()
530+
531+
async def reset(self):
532+
"""Reset the barrier to the initial state.
533+
534+
Any tasks currently waiting will get the BrokenBarrier exception
535+
raised.
536+
"""
537+
async with self._cond:
538+
if self._count > 0:
539+
if self._state is not _BarrierState.RESETTING:
540+
#reset the barrier, waking up tasks
541+
self._state = _BarrierState.RESETTING
542+
else:
543+
self._state = _BarrierState.FILLING
544+
self._cond.notify_all()
545+
546+
async def abort(self):
547+
"""Place the barrier into a 'broken' state.
548+
549+
Useful in case of error. Any currently waiting tasks and tasks
550+
attempting to 'wait()' will have BrokenBarrierError raised.
551+
"""
552+
async with self._cond:
553+
self._state = _BarrierState.BROKEN
554+
self._cond.notify_all()
555+
556+
@property
557+
def parties(self):
558+
"""Return the number of tasks required to trip the barrier."""
559+
return self._parties
560+
561+
@property
562+
def n_waiting(self):
563+
"""Return the number of tasks currently waiting at the barrier."""
564+
if self._state is _BarrierState.FILLING:
565+
return self._count
566+
return 0
567+
568+
@property
569+
def broken(self):
570+
"""Return True if the barrier is in a broken state."""
571+
return self._state is _BarrierState.BROKEN

Lib/asyncio/runners.py

Lines changed: 106 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,112 @@
1-
__all__ = 'run',
1+
__all__ = ('Runner', 'run')
22

3+
import contextvars
4+
import enum
35
from . import coroutines
46
from . import events
57
from . import tasks
68

79

10+
class _State(enum.Enum):
11+
CREATED = "created"
12+
INITIALIZED = "initialized"
13+
CLOSED = "closed"
14+
15+
16+
class Runner:
17+
"""A context manager that controls event loop life cycle.
18+
19+
The context manager always creates a new event loop,
20+
allows to run async functions inside it,
21+
and properly finalizes the loop at the context manager exit.
22+
23+
If debug is True, the event loop will be run in debug mode.
24+
If loop_factory is passed, it is used for new event loop creation.
25+
26+
asyncio.run(main(), debug=True)
27+
28+
is a shortcut for
29+
30+
with asyncio.Runner(debug=True) as runner:
31+
runner.run(main())
32+
33+
The run() method can be called multiple times within the runner's context.
34+
35+
This can be useful for interactive console (e.g. IPython),
36+
unittest runners, console tools, -- everywhere when async code
37+
is called from existing sync framework and where the preferred single
38+
asyncio.run() call doesn't work.
39+
40+
"""
41+
42+
# Note: the class is final, it is not intended for inheritance.
43+
44+
def __init__(self, *, debug=None, loop_factory=None):
45+
self._state = _State.CREATED
46+
self._debug = debug
47+
self._loop_factory = loop_factory
48+
self._loop = None
49+
self._context = None
50+
51+
def __enter__(self):
52+
self._lazy_init()
53+
return self
54+
55+
def __exit__(self, exc_type, exc_val, exc_tb):
56+
self.close()
57+
58+
def close(self):
59+
"""Shutdown and close event loop."""
60+
if self._state is not _State.INITIALIZED:
61+
return
62+
try:
63+
loop = self._loop
64+
_cancel_all_tasks(loop)
65+
loop.run_until_complete(loop.shutdown_asyncgens())
66+
loop.run_until_complete(loop.shutdown_default_executor())
67+
finally:
68+
loop.close()
69+
self._loop = None
70+
self._state = _State.CLOSED
71+
72+
def get_loop(self):
73+
"""Return embedded event loop."""
74+
self._lazy_init()
75+
return self._loop
76+
77+
def run(self, coro, *, context=None):
78+
"""Run a coroutine inside the embedded event loop."""
79+
if not coroutines.iscoroutine(coro):
80+
raise ValueError("a coroutine was expected, got {!r}".format(coro))
81+
82+
if events._get_running_loop() is not None:
83+
# fail fast with short traceback
84+
raise RuntimeError(
85+
"Runner.run() cannot be called from a running event loop")
86+
87+
self._lazy_init()
88+
89+
if context is None:
90+
context = self._context
91+
task = self._loop.create_task(coro, context=context)
92+
return self._loop.run_until_complete(task)
93+
94+
def _lazy_init(self):
95+
if self._state is _State.CLOSED:
96+
raise RuntimeError("Runner is closed")
97+
if self._state is _State.INITIALIZED:
98+
return
99+
if self._loop_factory is None:
100+
self._loop = events.new_event_loop()
101+
else:
102+
self._loop = self._loop_factory()
103+
if self._debug is not None:
104+
self._loop.set_debug(self._debug)
105+
self._context = contextvars.copy_context()
106+
self._state = _State.INITIALIZED
107+
108+
109+
8110
def run(main, *, debug=None):
9111
"""Execute the coroutine and return the result.
10112
@@ -30,26 +132,12 @@ async def main():
30132
asyncio.run(main())
31133
"""
32134
if events._get_running_loop() is not None:
135+
# fail fast with short traceback
33136
raise RuntimeError(
34137
"asyncio.run() cannot be called from a running event loop")
35138

36-
if not coroutines.iscoroutine(main):
37-
raise ValueError("a coroutine was expected, got {!r}".format(main))
38-
39-
loop = events.new_event_loop()
40-
try:
41-
events.set_event_loop(loop)
42-
if debug is not None:
43-
loop.set_debug(debug)
44-
return loop.run_until_complete(main)
45-
finally:
46-
try:
47-
_cancel_all_tasks(loop)
48-
loop.run_until_complete(loop.shutdown_asyncgens())
49-
loop.run_until_complete(loop.shutdown_default_executor())
50-
finally:
51-
events.set_event_loop(None)
52-
loop.close()
139+
with Runner(debug=debug) as runner:
140+
return runner.run(main)
53141

54142

55143
def _cancel_all_tasks(loop):

Lib/asyncio/subprocess.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,7 @@ def process_exited(self):
103103
self._maybe_close_transport()
104104

105105
def _maybe_close_transport(self):
106-
# Since process already exited
107-
# clear all the pipes and close transport
108-
if self._process_exited and self._transport:
106+
if len(self._pipe_fds) == 0 and self._process_exited:
109107
self._transport.close()
110108
self._transport = None
111109

0 commit comments

Comments
 (0)
0