8000 Python issue #23347: Refactor creation of subprocess transports · python/asyncio@c92da15 · GitHub
[go: up one dir, main page]

Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Commit c92da15

Browse files
committed
Python issue #23347: Refactor creation of subprocess transports
Changes on BaseSubprocessTransport: * Add a wait() method to wait until the child process exit * The constructor now accepts an optional waiter parameter. The _post_init() coroutine must not be called explicitly anymore. It makes subprocess transports closer to other transports, and it gives more freedom if we want later to change completly how subprocess transports are created. * close() now kills the process instead of kindly terminate it: the child process may ignore SIGTERM and continue to run. Call explicitly terminate() and wait() if you want to kindly terminate the child process. * close() now logs a warning in debug mode if the process is still running and needs to be killed * _make_subprocess_transport() is now fully asynchronous again: if the creation of the transport failed, wait asynchronously for the process eixt. Before the wait was synchronous. This change requires close() to *kill*, and not terminate, the child process. * Remove the _kill_wait() method, replaced with a more agressive close() method. It fixes _make_subprocess_transport() on error. BaseSubprocessTransport.close() calls the close() method of pipe transports, whereas _kill_wait() closed directly pipes of the subprocess.Popen object without unregistering file descriptors from the selector (which caused severe bugs). These changes simplifies the code of subprocess.py.
1 parent 241c710 commit c92da15

File tree

5 files changed

+92
-97
lines changed

5 files changed

+92
-97
lines changed

asyncio/base_subprocess.py

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import sys
44
import warnings
55

6+
from . import futures
67
from . import protocols
78
from . import transports
89
from .coroutines import coroutine
@@ -13,27 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
1314

1415
def __init__(self, loop, protocol, args, shell,
1516
stdin, stdout, stderr, bufsize,
16-
extra=None, **kwargs):
17+
waiter=None, extra=None, **kwargs):
1718
super().__init__(extra)
1819
self._closed = False
1920
self._protocol = protocol
2021
self._loop = loop
22+
self._proc = None
2123
self._pid = None
22-
24+
self._returncode = None
25+
self._exit_waiters = []
26+
self._pending_calls = collections.deque()
2327
self._pipes = {}
28+
self._finished = False
29+
2430
if stdin == subprocess.PIPE:
2531
self._pipes[0] = None
2632
if stdout == subprocess.PIPE:
2733
self._pipes[1] = None
2834
if stderr == subprocess.PIPE:
2935
self._pipes[2] = None
30-
self._pending_calls = collections.deque()
31-
self._finished = False
32-
self._returncode = None
36+
37+
# Create the child process: set the _proc attribute
3338
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
3439
stderr=stderr, bufsize=bufsize, **kwargs)
3540
self._pid = self._proc.pid
3641
self._extra['subprocess'] = self._proc
42+
3743
if self._loop.get_debug():
3844
if isinstance(args, (bytes, str)):
3945
program = args
@@ -42,6 +48,8 @@ def __init__(self, loop, protocol, args, shell,
4248
logger.debug('process %r created: pid %s',
4349
program, self._pid)
4450

51+
self._loop.create_task(self._connect_pipes(waiter))
52+
4553
def __repr__(self):
4654
info = [self.__class__.__name__]
4755
if self._closed:
@@ -77,12 +85,23 @@ def _make_read_subprocess_pipe_proto(self, fd):
7785

7886
def close(self):
7987
self._closed = True
88+
8089
for proto in self._pipes.values():
8190
if proto is None:
8291
continue
8392
proto.pipe.close()
84-
if self._returncode is None:
85-
self.terminate()
93+
94+
if self._proc is not None and self._returncode is None:
95+
if self._loop.get_debug():
96+
logger.warning('Close running child process: kill %r', self)
97+
98+
try:
99+
self._proc.kill()
100+
except ProcessLookupError:
101+
pass
102+
103+
# Don't clear the _proc reference yet because _post_init() may
104+
# still run
86105

87106
# On Python 3.3 and older, objects with a destructor part of a reference
88107
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
@@ -114,50 +133,24 @@ def terminate(self):
114133
def kill(self):
115134
self._proc.kill()
116135

117-
def _kill_wait(self):
118-
"""Close pipes, kill the subprocess and read its return status.
119-
120-
Function called when an exception is raised during the creation
121-
of a subprocess.
122-
"""
123-
self._closed = True
124-
if self._loop.get_debug():
125-
logger.warning('Exception during subprocess creation, '
126-
'kill the subprocess %r',
127-
self,
128-
exc_info=True)
129-
130-
proc = self._proc
131-
if proc.stdout:
132-
proc.stdout.close()
133-
if proc.stderr:
134-
proc.stderr.close()
135-
if proc.stdin:
136-
proc.stdin.close()
137-
138-
try:
139-
proc.kill()
140-
except ProcessLookupError:
141-
pass
142-
self._returncode = proc.wait()
143-
144-
self.close()
145-
146136
@coroutine
147-
def _post_init(self):
137+
def _connect_pipes(self, waiter):
148138
try:
149139
proc = self._proc
150140
loop = self._loop
141+
151142
if proc.stdin is not None:
152143
_, pipe = yield from loop.connect_write_pipe(
153144
lambda: WriteSubprocessPipeProto(self, 0),
154145
proc.stdin)
155146
self._pipes[0] = pipe
147+
156148
if proc.stdout is not None:
157149
_, pipe = yield from loop.connect_read_pipe(
158150
lambda: ReadSubprocessPipeProto(self, 1),
159151
proc.stdout)
160152
self._pipes[1] = pipe
153+
161154
if proc.stderr is not None:
162155
_, pipe = yield from loop.connect_read_pipe(
163156
lambda: ReadSubprocessPipeProto(self, 2),
@@ -166,13 +159,16 @@ def _post_init(self):
166159

167160
assert self._pending_calls is not None
168161

169-
self._loop.call_soon(self._protocol.connection_made, self)
162+
loop.call_soon(self._protocol.connection_made, self)
170163
for callback, data in self._pending_calls:
171-
self._loop.call_soon(callback, *data)
164+
loop.call_soon(callback, *data)
172165
self._pending_calls = None
173-
except:
174-
self._kill_wait()
175-
raise
166+
except Exception as exc:
167+
if waiter is not None and not waiter.cancelled():
168+
waiter.set_exception(exc)
169+
else:
170+
if waiter is not None and not waiter.cancelled():
171+
waiter.set_result(None)
176172

177173
def _call(self, cb, *data):
178174
if self._pending_calls is not None:
@@ -197,6 +193,23 @@ def _process_exited(self, returncode):
197193
self._call(self._protocol.process_exited)
198194
self._try_finish()
199195

196+
# wake up futures waiting for wait()
197+
for waiter in self._exit_waiters:
198+
if not waiter.cancelled():
199+
waiter.set_result(returncode)
200+
self._exit_waiters = None
201+
202+
def wait(self):
203+
"""Wait until the process exit and return the process return code.
< 2851 code>204+
205+
This method is a coroutine."""
206+
if self._returncode is not None:
207+
return self._returncode
208+
209+
waiter = futures.Future(loop=self._loop)
210+
self._exit_waiters.append(waiter)
211+
return (yield from waiter)
212+
200213
def _try_finish(self):
201214
assert not self._finished
202215
if self._returncode is None:
@@ -210,9 +223,9 @@ def _call_connection_lost(self, exc):
210223
try:
211224
self._protocol.connection_lost(exc)
212225
finally:
226+
self._loop = None
213227
self._proc = None
214228
self._protocol = None
215-
self._loop = None
216229

217230

218231
class WriteSubprocessPipeProto(protocols.BaseProtocol):

asyncio/subprocess.py

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ def __init__(self, limit, loop):
2525
super().__init__(loop=loop)
2626
self._limit = limit
2727
self.stdin = self.stdout = self.stderr = None
28-
self.waiter = futures.Future(loop=loop)
29-
self._waiters = collections.deque()
3028
self._transport = None
3129

3230
def __repr__(self):
@@ -61,9 +59,6 @@ def connection_made(self, transport):
6159
reader=None,
6260
loop=self._loop)
6361

64-
if not self.waiter.cancelled():
65-
self.waiter.set_result(None)
66-
6762
def pipe_data_received(self, fd, data):
6863
if fd == 1:
6964
reader = self.stdout
@@ -94,16 +89,9 @@ def pipe_connection_lost(self, fd, exc):
9489
reader.set_exception(exc)
9590

9691
def process_exited(self):
97-
returncode = self._transport.get_returncode()
9892
self._transport.close()
9993
self._transport = None
10094

101-
# wake up futures waiting for wait()
102-
while self._waiters:
103-
waiter = self._waiters.popleft()
104-
if not waiter.cancelled():
105-
waiter.set_result(returncode)
106-
10795

10896
class Process:
10997
def __init__(self, transport, protocol, loop):
@@ -124,15 +112,10 @@ def returncode(self):
124112

125113
@coroutine
126114
def wait(self):
127-
"""Wait until the process exit and return the process return code."""
128-
returncode = self._transport.get_returncode()
129-
if returncode is not None:
130-
return returncode
115+
"""Wait until the process exit and return the process return code.
131116
132-
waiter = futures.Future(loop=self._loop)
133-
self._protocol._waiters.append(waiter)
134-
yield from waiter
135-
return waiter.result()
117+
This method is a coroutine."""
118+
return (yield from self._transport< 10000 /span>.wait())
136119

137120
def _check_alive(self):
138121
if self._transport.get_returncode() is not None:
@@ -221,11 +204,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
221204
protocol_factory,
222205
cmd, stdin=stdin, stdout=stdout,
223206
stderr=stderr, **kwds)
224-
try:
225-
yield from protocol.waiter
226-
except:
227-
transport._kill_wait()
228-
raise
229207
return Process(transport, protocol, loop)
230208

231209
@coroutine
@@ -241,9 +219,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None,
241219
program, *args,
242220
stdin=stdin, stdout=stdout,
243221
stderr=stderr, **kwds)
244-
try:
245-
yield from protocol.waiter
246-
except:
247-
transport._kill_wait()
248-
raise
249222
return Process(transport, protocol, loop)

asyncio/unix_events.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from . import constants
1717
from . import coroutines
1818
from . import events
19+
from . import futures
1920
from . import selector_events
2021
from . import selectors
2122
from . import transports
@@ -175,16 +176,20 @@ def _make_subprocess_transport(self, protocol, args, shell,
175176
stdin, stdout, stderr, bufsize,
176177
extra=None, **kwargs):
177178
with events.get_child_watcher() as watcher:
179+
waiter = futures.Future(loop=self)
178180
transp = _UnixSubprocessTransport(self, protocol, args, shell,
179181
stdin, stdout, stderr, bufsize,
180-
extra=extra, **kwargs)
182+
waiter=waiter, extra=extra,
183+
**kwargs)
184+
185+
watcher.add_child_handler(transp.get_pid(),
186+
self._child_watcher_callback, transp)
181187
try:
182-
yield from transp._post_init()
188+
yield from waiter
183189
except:
184190
transp.close()
191+
yield from transp.wait()
185192
raise
186-
watcher.add_child_handler(transp.get_pid(),
187-
self._child_watcher_callback, transp)
188193

189194
return transp
190195

@@ -774,7 +779,7 @@ def __exit__(self, a, b, c):
774779
pass
775780

776781
def add_child_handler(self, pid, callback, *args):
777-
self._callbacks[pid] = callback, args
782+
self._callbacks[pid] = (callback, args)
778783

779784
# Prevent a race condition in case the child is already terminated.
780785
self._do_waitpid(pid)

asyncio/windows_events.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,13 +366,16 @@ def loop_accept_pipe(f=None):
366366
def _make_subprocess_transport(self, protocol, args, shell,
367367
stdin, stdout, stderr, bufsize,
368368
extra=None, **kwargs):
369+
waiter = futures.Future(loop=self)
369370
transp = _WindowsSubprocessTransport(self, protocol, args, shell,
370371
stdin, stdout, stderr, bufsize,
371-
extra=extra, **kwargs)
372+
waiter=waiter, extra=extra,
373+
**kwargs)
372374
try:
373-
yield from transp._post_init()
375+
yield from waiter
374376
except:
375377
transp.close()
378+
yield from transp.wait()
376379
raise
377380

378381
return transp

tests/test_events.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,9 +1551,10 @@ def test_subprocess_exec(self):
15511551
stdin = transp.get_pipe_transport(0)
15521552
stdin.write(b'Python The Winner')
15531553
self.loop.run_until_complete(proto.got_data[1].wait())
1554-
transp.close()
1554+
with test_utils.disable_logger():
1555+
transp.close()
15551556
self.loop.run_until_complete(proto.comp 341A leted)
1556-
self.check_terminated(proto.returncode)
1557+
self.check_killed(proto.returncode)
15571558
self.assertEqual(b'Python The Winner', proto.data[1])
15581559

15591560
def test_subprocess_interactive(self):
@@ -1567,21 +1568,20 @@ def test_subprocess_interactive(self):
15671568
self.loop.run_until_complete(proto.connected)
15681569
self.assertEqual('CONNECTED', proto.state)
15691570

1570-
try:
1571-
stdin = transp.get_pipe_transport(0)
1572-
stdin.write(b'Python ')
1573-
self.loop.run_until_complete(proto.got_data[1].wait())
1574-
proto.got_data[1].clear()
1575-
self.assertEqual(b'Python ', proto.data[1])
1576-
1577-
stdin.write(b'The Winner')
1578-
self.loop.run_until_complete(proto.got_data[1].wait())
1579-
self.assertEqual(b'Python The Winner', proto.data[1])
1580-
finally:
1581-
transp.close()
1571+
stdin = transp.get_pipe_transport(0)
1572+
stdin.write(b'Python ')
1573+
self.loop.run_until_complete(proto.got_data[1].wait())
1574+
proto.got_data[1].clear()
1575+
self.assertEqual(b'Python ', proto.data[1])
15821576

1577+
stdin.write(b'The Winner')
1578+
self.loop.run_until_complete(proto.got_data[1].wait())
1579+
self.assertEqual(b'Python The Winner', proto.data[1])
1580+
1581+
with test_utils.disable_logger():
1582+
transp.close()
15831583
self.loop.run_until_complete(proto.completed)
1584-
self.check_terminated(proto.returncode)
1584+
self.check_killed(proto.returncode)
15851585

15861586
def test_subprocess_shell(self):
15871587
connect = self.loop.subprocess_shell(
@@ -1739,9 +1739,10 @@ def test_subprocess_close_client_stream(self):
17391739
# GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
17401740
# WriteFile() we get ERROR_BROKEN_PIPE as expected.)
17411741
self.assertEqual(b'ERR:OSError', proto.data[2])
1742-
transp.close()
1742+
with test_utils.disable_logger():
1743+
transp.close()
17431744
self.loop.run_until_complete(proto.completed)
1744-
self.check_terminated(proto.returncode)
1745+
self.check_killed(proto.returncode)
17451746

17461747
def test_subprocess_wait_no_same_group(self):
17471748
# start the new process in a new session

0 commit comments

Comments
 (0)
0