10000 Issue #18923: Update subprocess to use the new selectors module. · python/cpython@3a4586a · GitHub
[go: up one dir, main page]

Skip to content

Commit 3a4586a

Browse files
committed
Issue #18923: Update subprocess to use the new selectors module.
1 parent 2ce6c44 commit 3a4586a

File tree

2 files changed

+75
-173
lines changed

2 files changed

+75
-173
lines changed

Lib/subprocess.py

Lines changed: 69 additions & 169 deletions
F438
Original file line numberDiff line numberDiff line change
@@ -404,15 +404,23 @@ class STARTUPINFO:
404404
hStdError = None
405405
wShowWindow = 0
406406
else:
407-
import select
408-
_has_poll = hasattr(select, 'poll')
409407
import _posixsubprocess
408+
import select
409+
import selectors
410410

411411
# When select or poll has indicated that the file is writable,
412412
# we can write up to _PIPE_BUF bytes without risk of blocking.
413413
# POSIX defines PIPE_BUF as >= 512.
414414
_PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
415415

416+
# poll/select have the advantage of not requiring any extra file
417+
# descriptor, contrarily to epoll/kqueue (also, they require a single
418+
# syscall).
419+
if hasattr(selectors, 'PollSelector'):
420+
_PopenSelector = selectors.PollSelector
421+
else:
422+
_PopenSelector = selectors.SelectSelector
423+
416424

417425
__all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
418426
"getoutput", "check_output", "CalledProcessError", "DEVNULL"]
@@ -1530,12 +1538,65 @@ def _communicate(self, input, endtime, orig_timeout):
15301538
if not input:
15311539
self.stdin.close()
15321540

1533-
if _has_poll:
1534-
stdout, stderr = self._communicate_with_poll(input, endtime,
1535-
orig_timeout)
1536-
else:
1537-
stdout, stderr = self._communicate_with_select(input, endtime,
1538-
orig_timeout)
1541+
stdout = None
1542+
stderr = None
1543+
1544+
# Only create this mapping if we haven't already.
1545+
if not self._communication_started:
1546+
self._fileobj2output = {}
1547+
if self.stdout:
1548+
self._fileobj2output[self.stdout] = []
1549+
if self.stderr:
1550+
self._fileobj2output[self.stderr] = []
1551+
1552+
if self.stdout:
1553+
stdout = self._fileobj2output[self.stdout]
1554+
if self.stderr:
1555+
stderr = self._fileobj2output[self.stderr]
1556+
1557+
self._save_input(input)
1558+
1559+
with _PopenSelector() as selector:
1560+
if self.stdin and input:
1561+
selector.register(self.stdin, selectors.EVENT_WRITE)
1562+
if self.stdout:
1563+
selector.register(self.stdout, selectors.EVENT_READ)
1564+
if self.stderr:
1565+
selector.register(self.stderr, selectors.EVENT_READ)
1566+
1567+
while selector.get_map():
1568+
timeout = self._remaining_time(endtime)
1569+
if timeout is not None and timeout < 0:
1570+
raise TimeoutExpired(self.args, orig_timeout)
1571+
1572+
ready = selector.select(timeout)
1573+
self._check_timeout(endtime, orig_timeout)
1574+
1575+
# XXX Rewrite these to use non-blocking I/O on the file
1576+
# objects; they are no longer using C stdio!
1577+
1578+
for key, events in ready:
1579+
if key.fileobj is self.stdin:
1580+
chunk = self._input[self._input_offset :
1581+
self._input_offset + _PIPE_BUF]
1582+
try:
1583+
self._input_offset += os.write(key.fd, chunk)
1584+
except OSError as e:
1585+
if e.errno == errno.EPIPE:
1586+
selector.unregister(key.fileobj)
1587+
key.fileobj.close()
1588+
else:
1589+
raise
1590+
else:
1591+
if self._input_offset >= len(self._input):
1592+
selector.unregister(key.fileobj)
1593+
key.fileobj.close()
1594+
elif key.fileobj in (self.stdout, self.stderr):
1595+
data = os.read(key.fd, 4096)
1596+
if not data:
1597+
selector.unregister(key.fileobj)
1598+
key.fileobj.close()
1599+
self._fileobj2output[key.fileobj].append(data)
15391600

15401601
self.wait(timeout=self._remaining_time(endtime))
15411602

@@ -1569,167 +1630,6 @@ def _save_input(self, input):
15691630
self._input = self._input.encode(self.stdin.encoding)
15701631

15711632

1572-
def _communicate_with_poll(self, input, endtime, orig_timeout):
1573-
stdout = None # Return
1574-
stderr = None # Return
1575-
1576-
if not self._communication_started:
1577-
self._fd2file = {}
1578-
1579-
poller = select.poll()
1580-
def register_and_append(file_obj, eventmask):
1581-
poller.register(file_obj.fileno(), eventmask)
1582-
self._fd2file[file_obj.fileno()] = file_obj
1583-
1584-
def close_unregister_and_remove(fd):
1585-
poller.unregister(fd)
1586-
self._fd2file[fd].close()
1587-
self._fd2file.pop(fd)
1588-
1589-
if self.stdin and input:
1590-
register_and_append(self.stdin, select.POLLOUT)
1591-
1592-
# Only create this mapping if we haven't already.
1593-
if not self._communication_started:
1594-
self._fd2output = {}
1595-
if self.stdout:
1596-
self._fd2output[self.stdout.fileno()] = []
1597-
if self.stderr:
1598-
self._fd2output[self.stderr.fileno()] = []
1599-
1600-
select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
1601-
if self.stdout:
1602-
register_and_append(self.stdout, select_POLLIN_POLLPRI)
1603-
stdout = self._fd2output[self.stdout.fileno()]
1604-
if self.stderr:
1605-
register_and_append(self.stderr, select_POLLIN_POLLPRI)
1606-
stderr = self._fd2output[self.stderr.fileno()]
1607-
1608-
self._save_input(input)
1609-
1610-
while self._fd2file:
1611-
timeout = self._remaining_time(endtime)
1612-
if timeout is not None and timeout < 0:
1613-
raise TimeoutExpired(self.args, orig_timeout)
1614-
try:
1615-
ready = poller.poll(timeout)
1616-
except OSError as e:
1617-
if e.args[0] == errno.EINTR:
1618-
continue
1619-
raise
1620-
self._check_timeout(endtime, orig_timeout)
1621-
1622-
# XXX Rewrite these to use non-blocking I/O on the
1623-
# file objects; they are no longer using C stdio!
1624-
1625-
for fd, mode in ready:
1626-
if mode & select.POLLOUT:
1627-
chunk = self._input[self._input_offset :
1628-
self._input_offset + _PIPE_BUF]
1629-
try:
1630-
self._input_offset += os.write(fd, chunk)
1631-
except OSError as e:
1632-
if e.errno == errno.EPIPE:
1633-
close_unregister_and_remove(fd)
1634-
else:
1635-
raise
1636-
else:
1637-
if self._input_offset >= len(self._input):
1638-
close_unregister_and_remove(fd)
1639-
elif mode & select_POLLIN_POLLPRI:
1640-
data = os.read(fd, 4096)
1641-
if not data:
1642-
close_unregister_and_remove(fd)
1643-
self._fd2output[fd].append(data)
1644-
else:
1645-
# Ignore hang up or errors.
1646-
close_unregister_and_remove(fd)
1647-
1648-
return (stdout, stderr)
1649-
1650-
1651-
def _communicate_with_select(self, input, endtime, orig_timeout):
1652-
if not self._communication_started:
1653-
self._read_set = []
1654-
self._write_set = []
1655-
if self.stdin and input:
1656-
self._write_set.append(self.stdin)
1657-
if self.stdout:
1658-
self._read_set.append(self.stdout)
1659-
if self.stderr:
1660-
self._read_set.append(self.stderr)
1661-
1662-
self._save_input(input)
1663-
1664-
stdout = None # Return
1665-
stderr = None # Return
1666-
1667-
if self.stdout:
1668-
if not self._communication_started:
1669-
self._stdout_buff = []
1670-
stdout = self._stdout_buff
1671-
if self.stderr:
1672-
if not self._communication_started:
1673-
self._stderr_buff = []
1674-
stderr = self._stderr_buff
1675-
1676-
while self._read_set or self._write_set:
1677-
timeout = self._remaining_time(endtime)
1678-
if timeout is not None and timeout < 0:
1679-
raise TimeoutExpired(self.args, orig_timeout)
1680-
try:
1681-
(rlist, wlist, xlist) = \
1682-
select.select(self._read_set, self._write_set, [],
1683-
timeout)
1684-
except OSError as e:
1685-
if e.args[0] == errno.EINTR:
1686-
continue
1687-
raise
1688-
1689-
# According to the docs, returning three empty lists indicates
1690-
# that the timeout expired.
1691-
if not (rlist or wlist or xlist):
1692-
raise TimeoutExpired(self.args, orig_timeout)
1693-
# We also check what time it is ourselves for good measure.
1694-
self._check_timeout(endtime, orig_timeout)
1695-
1696-
# XXX Rewrite these to use non-blocking I/O on the
1697-
# file objects; they are no longer using C stdio!
1698-
1699-
if self.stdin in wlist:
1700-
chunk = self._input[self._input_offset :
1701-
self._input_offset + _PIPE_BUF]
1702-
try:
1703-
bytes_written = os.write(self.stdin.fileno(), chunk)
1704-
except OSError as e:
1705-
if e.errno == errno.EPIPE:
1706-
self.stdin.close()
1707-
self._write_set.remove(self.stdin)
1708-
else:
1709-
raise
1710-
else:
1711-
self._input_offset += bytes_written
1712-
if self._input_offset >= len(self._input):
1713-
self.stdin.close()
1714-
self._write_set.remove(self.stdin)
1715-
1716-
if self.stdout in rlist:
1717-
data = os.read(self.stdout.fileno(), 1024)
1718-
if not data:
1719-
self.stdout.close()
1720-
self._read_set.remove(self.stdout)
1721-
stdout.append(data)
1722-
1723-
if self.stderr in rlist:
1724-
data = os.read(self.stderr.fileno(), 1024)
1725-
if not data:
1726-
self.stderr.close()
1727-
self._read_set.remove(self.stderr)
1728-
stderr.append(data)
1729-
1730-
return (stdout, stderr)
1731-
1732-
17331633
def send_signal(self, sig):
17341634
"""Send a signal to the process
17351635
"""

Lib/test/test_subprocess.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import tempfile
1212
import time
1313
import re
14+
import selectors
1415
import sysconfig
1516
import warnings
1617
import select
@@ -2179,15 +2180,16 @@ def test_getoutput(self):
21792180
os.rmdir(dir)
21802181

21812182

2182-
@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
2183-
"poll system call not supported")
2183+
@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
2184+
"Test needs selectors.PollSelector")
21842185
class ProcessTestCaseNoPoll(ProcessTestCase):
21852186
def setUp(self):
2186-
subprocess._has_poll = False
2187+
self.orig_selector = subprocess._PopenSelector
2188+
subprocess._PopenSelector = selectors.SelectSelector
21872189
ProcessTestCase.setUp(self)
21882190

21892191
def tearDown(self):
2190-
subprocess._has_poll = True
2192+
subprocess._PopenSelector = self.orig_selector
21912193
ProcessTestCase.tearDown(self)
21922194

21932195

0 commit comments

Comments
 (0)
0