8000 Merge (manually) the subprocess_stream into default · python/asyncio@1e4d71f · GitHub
[go: up one dir, main page]

Skip to content
8000
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Commit 1e4d71f

Browse files
committed
Merge (manually) the subprocess_stream into default
* Add a new asyncio.subprocess module * Add new create_subprocess_exec() and create_subprocess_shell() functions * The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers for stdout and stderr and a stream writer for stdin. * The new asyncio.subprocess.Process class offers an API close to the subprocess.Popen class: - pid, returncode, stdin, stdout and stderr attributes - communicate(), wait(), send_signal(), terminate() and kill() methods * Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess and unix_events, to not be confused with the symbols with the same name of subprocess and asyncio.subprocess modules * _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size of the pending write * _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if the write buffer size is greater than the high water mark (64 KB by default) * Add new subprocess examples: shell.py, subprocess_shell.py, * subprocess_attach_read_pipe.py and subprocess_attach_write_pipe.py
1 parent ecc2418 commit 1e4d71f

11 files changed

+631
-34
lines changed

asyncio/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from .protocols import *
2525
from .queues import *
2626
from .streams import *
27+
from .subprocess import *
2728
from .tasks import *
2829
from .transports import *
2930

@@ -39,5 +40,6 @@
3940
protocols.__all__ +
4041
queues.__all__ +
4142
streams.__all__ +
43+
subprocess.__all__ +
4244
tasks.__all__ +
4345
transports.__all__)

asyncio/base_subprocess.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,6 @@
66
from . import transports
77

88

9-
STDIN = 0
10-
STDOUT = 1
11-
STDERR = 2
12-
13-
149
class BaseSubprocessTransport(transports.SubprocessTransport):
1510

1611
def __init__(self, loop, protocol, args, shell,
@@ -22,11 +17,11 @@ def __init__(self, loop, protocol, args, shell,
2217

2318
self._pipes = {}
2419
if stdin == subprocess.PIPE:
25-
self._pipes[STDIN] = None
20+
self._pipes[0] = None
2621
if stdout == subprocess.PIPE:
27-
self._pipes[STDOUT] = None
22+
self._pipes[1] = None
2823
if stderr == subprocess.PIPE:
29-
self._pipes[STDERR] = None
24+
self._pipes[2] = None
3025
sel A93C f._pending_calls = collections.deque()
3126
self._finished = False
3227
self._returncode = None
@@ -76,19 +71,19 @@ def _post_init(self):
7671
loop = self._loop
7772
if proc.stdin is not None:
7873
_, pipe = yield from loop.connect_write_pipe(
79-
lambda: WriteSubprocessPipeProto(self, STDIN),
74+
lambda: WriteSubprocessPipeProto(self, 0),
8075
proc.stdin)
81-
self._pipes[STDIN] = pipe
76+
self._pipes[0] = pipe
8277
if proc.stdout is not None:
8378
_, pipe = yield from loop.connect_read_pipe(
84-
lambda: ReadSubprocessPipeProto(self, STDOUT),
79+
lambda: ReadSubprocessPipeProto(self, 1),
8580
proc.stdout)
86-
self._pipes[STDOUT] = pipe
81+
self._pipes[1] = pipe
8782
if proc.stderr is not None:
8883
_, pipe = yield from loop.connect_read_pipe(
89-
lambda: ReadSubprocessPipeProto(self, STDERR),
84+
lambda: ReadSubprocessPipeProto(self, 2),
9085
proc.stderr)
91-
self._pipes[STDERR] = pipe
86+
self._pipes[2] = pipe
9287

9388
assert self._pending_calls is not None
9489

asyncio/proactor_events.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def __init__(self, loop, sock, protocol, waiter=None,
2929
self._buffer = None # None or bytearray.
3030
self._read_fut = None
3131
self._write_fut = None
32+
self._pending_write = 0
3233
self._conn_lost = 0
3334
self._closing = False # Set when close() called.
3435
self._eof_written = False
@@ -68,6 +69,7 @@ def _force_close(self, exc):
6869
if self._read_fut:
6970
self._read_fut.cancel()
7071
self._write_fut = self._read_fut = None
72+
self._pending_write = 0
7173
self._buffer = None
7274
self._loop.call_soon(self._call_connection_lost, exc)
7375

@@ -128,11 +130,10 @@ def set_write_buffer_limits(self, high=None, low=None):
128130
self._low_water = low
129131

130132
def get_write_buffer_size(self):
131-
# NOTE: This doesn't take into account data already passed to
132-
# send() even if send() hasn't finished yet.
133-
if not self._buffer:
134-
return 0
135-
return len(self._buffer)
133+
size = self._pending_write
134+
if self._buffer is not None:
135+
size += len(self._buffer)
136+
return size
136137

137138

138139
class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
@@ -206,7 +207,7 @@ def _loop_reading(self, fut=None):
206207

207208

208209
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
209-
transports.WriteTransport):
210+
transports.WriteTransport):
210211
"""Transport for write pipes."""
211212

212213
def write(self, data):
@@ -252,6 +253,7 @@ def _loop_writing(self, f=None, data=None):
252253
try:
253254
assert f is self._write_fut
254255
self._write_fut = None
256+
self._pending_write = 0
255257
if f:
256258
f.result()
257259
if data is None:
@@ -262,15 +264,21 @@ def _loop_writing(self, f=None, data=None):
262264
self._loop.call_soon(self._call_connection_lost, None)
263265
if self._eof_written:
264266
self._sock.shutdown(socket.SHUT_WR)
267+
# Now that we've reduced the buffer size, tell the
268+
# protocol to resume writing if it was paused. Note that
269+
# we do this last since the callback is called immediately
270+
# and it may add more data to the buffer (even causing the
271+
# protocol to be paused again).
272+
self._maybe_resume_protocol()
265273
else:
266274
self._write_fut = self._loop._proactor.send(self._sock, data)
267-
self._write_fut.add_done_callback(self._loop_writing)
268-
# Now that we've reduced the buffer size, tell the
269-
# protocol to resume writing if it was paused. Note that
270-
# we do this last since the callback is called immediately
271-
# and it may add more data to the buffer (even causing the
272-
# protocol to be paused again).
273-
self._maybe_resume_protocol()
275+
if not self._write_fut.done():
276+
assert self._pending_write == 0
277+
self._pending_write = len(data)
278+
self._write_fut.add_done_callback(self._loop_writing)
279+
self._maybe_pause_protocol()
280+
else:
281+
self._write_fut.add_done_callback(self._loop_writing)
274282
except ConnectionResetError as exc:
275283
self._force_close(exc)
276284
except OSError as exc:

asyncio/subprocess.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
2+
3+
import collections
4+
import subprocess
5+
6+
from . import events
7+
from . import futures
8+
from . import protocols
9+
from . import streams
10+
from . import tasks
11+
12+
13+
PIPE = subprocess.PIPE
14+
STDOUT = subprocess.STDOUT
15+
DEVNULL = subprocess.DEVNULL
16+
17+
18+
class SubprocessStreamProtocol(streams.FlowControlMixin,
19+
protocols.SubprocessProtocol):
20+
"""Like StreamReaderProtocol, but for a subprocess."""
21+
22+
def __init__(self, limit, loop):
23+
super().__init__(loop=loop)
24+
self._limit = limit
25+
self.stdin = self.stdout = self.stderr = None
26+
self.waiter = futures.Future(loop=loop)
27+
self._waiters = collections.deque()
28+
self._transport = None
29+
30+
def connection_made(self, transport):
31+
self._transport = transport
32+
if transport.get_pipe_transport(1):
33+
self.stdout = streams.StreamReader(limit=self._limit,
34+
loop=self._loop)
35+
if transport.get_pipe_transport(2):
36+
self.stderr = streams.StreamReader(limit=self._limit,
37+
loop=self._loop)
38+
stdin = transport.get_pipe_transport(0)
39+
if stdin is not None:
40+
self.stdin = streams.StreamWriter(stdin,
41+
protocol=self,
42+
reader=None,
43+
loop=self._loop)
44+
self.waiter.set_result(None)
45+
46+
def pipe_data_received(self, fd, data):
47+
if fd == 1:
48+
reader = self.stdout
49+
elif fd == 2:
50+
reader = self.stderr
51+
else:
52+
reader = None
53+
if reader is not None:
54+
reader.feed_data(data)
55+
56+
def pipe_connection_lost(self, fd, exc):
57+
if fd == 0:
58+
pipe = self.stdin
59+
if pipe is not None:
60+
pipe.close()
61+
self.connection_lost(exc)
62+
return
63+
if fd == 1:
64+
reader = self.stdout
65+
elif fd == 2:
66+
reader = self.stderr
67+
else:
68+
reader = None
69+
if reader != None:
70+
if exc is None:
71+
reader.feed_eof()
72+
else:
73+
reader.set_exception(exc)
74+
75+
def process_exited(self):
76+
# wake up futures waiting for wait()
77+
returncode = self._transport.get_returncode()
78+
while self._waiters:
79+
waiter = self._waiters.popleft()
80+
waiter.set_result(returncode)
81+
82+
83+
class Process:
84+
def __init__(self, transport, protocol, loop):
85+
self._transport = transport
86+
self._protocol = protocol
87+
self._loop = loop
88+
self.stdin = protocol.stdin
89+
self.stdout = protocol.stdout
90+
self.stderr = protocol.stderr
91+
self.pid = transport.get_pid()
92+
93+
@property
94+
def returncode(self):
95+
return self._transport.get_returncode()
96+
97+
@tasks.coroutine
98+
def wait(self):
99+
"""Wait until the process exit and return the process return code."""
100+
returncode = self._transport.get_returncode()
101+
if returncode is not None:
102+
return returncode
103+
104+
waiter = futures.Future(loop=self._loop)
105+
self._protocol._waiters.append(waiter)
106+
yield from waiter
107+
return waiter.result()
108+
109+
def get_subprocess(self):
110+
return self._transport.get_extra_info('subprocess')
111+
112+
def _check_alive(self):
113+
if self._transport.get_returncode() is not None:
114+
raise ProcessLookupError()
115+
116+
def send_signal(self, signal):
117+
self._check_alive()
118+
self._transport.send_signal(signal)
119+
120+
def terminate(self):
121+
self._check_alive()
122+
self._transport.terminate()
123+
124+
def kill(self):
125+
self._check_alive()
126+
self._transport.kill()
127+
128+
@tasks.coroutine
129+
def _feed_stdin(self, input):
130+
self.stdin.write(input)
131+
yield from self.stdin.drain()
132+
self.stdin.close()
133+
134+
@tasks.coroutine
135+
def _noop(self):
136+
return None
137+
138+
@tasks.coroutine
139+
def _read_stream(self, fd):
140+
transport = self._transport.get_pipe_transport(fd)
141+
if fd == 2:
142+
stream = self.stderr
143+
else:
144+
assert fd == 1
145+
stream = self.stdout
146+
output = yield from stream.read()
147+
transport.close()
148+
return output
149+
150+
@tasks.coroutine
151+
def communicate(self, input=None):
152+
loop = self._transport._loop
153+
if input:
154+
stdin = self._feed_stdin(input)
155+
else:
156+
stdin = self._noop()
157+
if self.stdout is not None:
158+
stdout = self._read_stream(1)
159+
else:
160+
stdout = self._noop()
161+
if self.stderr is not None:
162+
stderr = self._read_stream(2)
163+
else:
164+
stderr = self._noop()
165+
stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
166+
loop=loop)
167+
yield from self.wait()
168+
return (stdout, stderr)
169+
170+
171+
@tasks.coroutine
172+
def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
173+
loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
174+
if loop is None:
175+
loop = events.get_event_loop()
176+
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
177+
loop=loop)
178+
transport, protocol = yield from loop.subprocess_shell(
179+
protocol_factory,
180+
cmd, stdin=stdin, stdout=stdout,
181+
stderr=stderr, **kwds)
182+
yield from protocol.waiter
183+
return Process(transport, protocol, loop)
184+
185+
@tasks.coroutine
186+
def create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None,
187+
loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
188+
if loop is None:
189+
loop = events.get_event_loop()
190+
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
191+
loop=loop)
192+
transport, protocol = yield from loop.subprocess_exec(
193+
protocol_factory,
194+
*args, stdin=stdin, stdout=stdout,
195+
stderr=stderr, **kwds)
196+
yield from protocol.waiter
197+
return Process(transport, protocol, loop)
198+

asyncio/unix_events.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,11 @@
2121
from .log import logger
2222

2323

24-
__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
24+
__all__ = ['SelectorEventLoop',
2525
'AbstractChildWatcher', 'SafeChildWatcher',
2626
'FastChildWatcher', 'DefaultEventLoopPolicy',
2727
]
2828

29-
STDIN = 0
30-
STDOUT = 1
31-
STDERR = 2
32-
33-
3429
if sys.platform == 'win32': # pragma: no cover
3530
raise ImportError('Signals are not really supported on Windows')
3631

examples/child_process.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
"""
22
Example of asynchronous interaction with a child python process.
33
4-
Note that on Windows we must use the IOCP event loop.
4+
This example shows how to attach an existing Popen object and use the low level
5+
transport-protocol API. See shell.py and subprocess_shell.py for higher level
6+
examples.
57
"""
68

79
import os

0 commit comments

Comments
 (0)
0