@@ -404,15 +404,23 @@ class STARTUPINFO:
404
404
hStdError = None
405
405
wShowWindow = 0
406
406
else :
407
- import select
408
- _has_poll = hasattr (select , 'poll' )
409
407
import _posixsubprocess
408
+ import select
409
+ import selectors
410
410
411
411
# When select or poll has indicated that the file is writable,
412
412
# we can write up to _PIPE_BUF bytes without risk of blocking.
413
413
# POSIX defines PIPE_BUF as >= 512.
414
414
_PIPE_BUF = getattr (select , 'PIPE_BUF' , 512 )
415
415
416
+ # poll/select have the advantage of not requiring any extra file
417
+ # descriptor, contrarily to epoll/kqueue (also, they require a single
418
+ # syscall).
419
+ if hasattr (selectors , 'PollSelector' ):
420
+ _PopenSelector = selectors .PollSelector
421
+ else :
422
+ _PopenSelector = selectors .SelectSelector
423
+
416
424
417
425
__all__ = ["Popen" , "PIPE" , "STDOUT" , "call" , "check_call" , "getstatusoutput" ,
418
426
"getoutput" , "check_output" , "CalledProcessError" , "DEVNULL" ]
@@ -1530,12 +1538,65 @@ def _communicate(self, input, endtime, orig_timeout):
1530
1538
if not input :
1531
1539
self .stdin .close ()
1532
1540
1533
- if _has_poll :
1534
- stdout , stderr = self ._communicate_with_poll (input , endtime ,
1535
- orig_timeout )
1536
- else :
1537
- stdout , stderr = self ._communicate_with_select (input , endtime ,
1538
- orig_timeout )
1541
+ stdout = None
1542
+ stderr = None
1543
+
1544
+ # Only create this mapping if we haven't already.
1545
+ if not self ._communication_started :
1546
+ self ._fileobj2output = {}
1547
+ if self .stdout :
1548
+ self ._fileobj2output [self .stdout ] = []
1549
+ if self .stderr :
1550
+ self ._fileobj2output [self .stderr ] = []
1551
+
1552
+ if self .stdout :
1553
+ stdout = self ._fileobj2output [self .stdout ]
1554
+ if self .stderr :
1555
+ stderr = self ._fileobj2output [self .stderr ]
1556
+
1557
+ self ._save_input (input )
1558
+
1559
+ with _PopenSelector () as selector :
1560
+ if self .stdin and input :
1561
+ selector .register (self .stdin , selectors .EVENT_WRITE )
1562
+ if self .stdout :
1563
+ selector .register (self .stdout , selectors .EVENT_READ )
1564
+ if self .stderr :
1565
+ selector .register (self .stderr , selectors .EVENT_READ )
1566
+
1567
+ while selector .get_map ():
1568
+ timeout = self ._remaining_time (endtime )
1569
+ if timeout is not None and timeout < 0 :
1570
+ raise TimeoutExpired (self .args , orig_timeout )
1571
+
1572
+ ready = selector .select (timeout )
1573
+ self ._check_timeout (endtime , orig_timeout )
1574
+
1575
+ # XXX Rewrite these to use non-blocking I/O on the file
1576
+ # objects; they are no longer using C stdio!
1577
+
1578
+ for key , events in ready :
1579
+ if key .fileobj is self .stdin :
1580
+ chunk = self ._input [self ._input_offset :
1581
+ self ._input_offset + _PIPE_BUF ]
1582
+ try :
1583
+ self ._input_offset += os .write (key .fd , chunk )
1584
+ except OSError as e :
1585
+ if e .errno == errno .EPIPE :
1586
+ selector .unregister (key .fileobj )
1587
+ key .fileobj .close ()
1588
+ else :
1589
+ raise
1590
+ else :
1591
+ if self ._input_offset >= len (self ._input ):
1592
+ selector .unregister (key .fileobj )
1593
+ key .fileobj .close ()
1594
+ elif key .fileobj in (self .stdout , self .stderr ):
1595
+ data = os .read (key .fd , 4096 )
1596
+ if not data :
1597
+ selector .unregister (key .fileobj )
1598
+ key .fileobj .close ()
1599
+ self ._fileobj2output [key .fileobj ].append (data )
1539
1600
1540
1601
self .wait (timeout = self ._remaining_time (endtime ))
1541
1602
@@ -1569,167 +1630,6 @@ def _save_input(self, input):
1569
1630
self ._input = self ._input .encode (self .stdin .encoding )
1570
1631
1571
1632
1572
- def _communicate_with_poll (self , input , endtime , orig_timeout ):
1573
- stdout = None # Return
1574
- stderr = None # Return
1575
-
1576
- if not self ._communication_started :
1577
- self ._fd2file = {}
1578
-
1579
- poller = select .poll ()
1580
- def register_and_append (file_obj , eventmask ):
1581
- poller .register (file_obj .fileno (), eventmask )
1582
- self ._fd2file [file_obj .fileno ()] = file_obj
1583
-
1584
- def close_unregister_and_remove (fd ):
1585
- poller .unregister (fd )
1586
- self ._fd2file [fd ].close ()
1587
- self ._fd2file .pop (fd )
1588
-
1589
- if self .stdin and input :
1590
- register_and_append (self .stdin , select .POLLOUT )
1591
-
1592
- # Only create this mapping if we haven't already.
1593
- if not self ._communication_started :
1594
- self ._fd2output = {}
1595
- if self .stdout :
1596
- self ._fd2output [self .stdout .fileno ()] = []
1597
- if self .stderr :
1598
- self ._fd2output [self .stderr .fileno ()] = []
1599
-
1600
- select_POLLIN_POLLPRI = select .POLLIN | select .POLLPRI
1601
- if self .stdout :
1602
- register_and_append (self .stdout , select_POLLIN_POLLPRI )
1603
- stdout = self ._fd2output [self .stdout .fileno ()]
1604
- if self .stderr :
1605
- register_and_append (self .stderr , select_POLLIN_POLLPRI )
1606
- stderr = self ._fd2output [self .stderr .fileno ()]
1607
-
1608
- self ._save_input (input )
1609
-
1610
- while self ._fd2file :
1611
- timeout = self ._remaining_time (endtime )
1612
- if timeout is not None and timeout < 0 :
1613
- raise TimeoutExpired (self .args , orig_timeout )
1614
- try :
1615
- ready = poller .poll (timeout )
1616
- except OSError as e :
1617
- if e .args [0 ] == errno .EINTR :
1618
- continue
1619
- raise
1620
- self ._check_timeout (endtime , orig_timeout )
1621
-
1622
- # XXX Rewrite these to use non-blocking I/O on the
1623
- # file objects; they are no longer using C stdio!
1624
-
1625
- for fd , mode in ready :
1626
- if mode & select .POLLOUT :
1627
- chunk = self ._input [self ._input_offset :
1628
- self ._input_offset + _PIPE_BUF ]
1629
- try :
1630
- self ._input_offset += os .write (fd , chunk )
1631
- except OSError as e :
1632
- if e .errno == errno .EPIPE :
1633
- close_unregister_and_remove (fd )
1634
- else :
1635
- raise
1636
- else :
1637
- if self ._input_offset >= len (self ._input ):
1638
- close_unregister_and_remove (fd )
1639
- elif mode & select_POLLIN_POLLPRI :
1640
- data = os .read (fd , 4096 )
1641
- if not data :
1642
- close_unregister_and_remove (fd )
1643
- self ._fd2output [fd ].append (data )
1644
- else :
1645
- # Ignore hang up or errors.
1646
- close_unregister_and_remove (fd )
1647
-
1648
- return (stdout , stderr )
1649
-
1650
-
1651
- def _communicate_with_select (self , input , endtime , orig_timeout ):
1652
- if not self ._communication_started :
1653
- self ._read_set = []
1654
- self ._write_set = []
1655
- if self .stdin and input :
1656
- self ._write_set .append (self .stdin )
1657
- if self .stdout :
1658
- self ._read_set .append (self .stdout )
1659
- if self .stderr :
1660
- self ._read_set .append (self .stderr )
1661
-
1662
- self ._save_input (input )
1663
-
1664
- stdout = None # Return
1665
- stderr = None # Return
1666
-
1667
- if self .stdout :
1668
- if not self ._communication_started :
1669
- self ._stdout_buff = []
1670
- stdout = self ._stdout_buff
1671
- if self .stderr :
1672
- if not self ._communication_started :
1673
- self ._stderr_buff = []
1674
- stderr = self ._stderr_buff
1675
-
1676
- while self ._read_set or self ._write_set :
1677
- timeout = self ._remaining_time (endtime )
1678
- if timeout is not None and timeout < 0 :
1679
- raise TimeoutExpired (self .args , orig_timeout )
1680
- try :
1681
- (rlist , wlist , xlist ) = \
1682
- select .select (self ._read_set , self ._write_set , [],
1683
- timeout )
1684
- except OSError as e :
1685
- if e .args [0 ] == errno .EINTR :
1686
- continue
1687
- raise
1688
-
1689
- # According to the docs, returning three empty lists indicates
1690
- # that the timeout expired.
1691
- if not (rlist or wlist or xlist ):
1692
- raise TimeoutExpired (self .args , orig_timeout )
1693
- # We also check what time it is ourselves for good measure.
1694
- self ._check_timeout (endtime , orig_timeout )
1695
-
1696
- # XXX Rewrite these to use non-blocking I/O on the
1697
- # file objects; they are no longer using C stdio!
1698
-
1699
- if self .stdin in wlist :
1700
- chunk = self ._input [self ._input_offset :
1701
- self ._input_offset + _PIPE_BUF ]
1702
- try :
1703
- bytes_written = os .write (self .stdin .fileno (), chunk )
1704
- except OSError as e :
1705
- if e .errno == errno .EPIPE :
1706
F438
- self .stdin .close ()
1707
- self ._write_set .remove (self .stdin )
1708
- else :
1709
- raise
1710
- else :
1711
- self ._input_offset += bytes_written
1712
- if self ._input_offset >= len (self ._input ):
1713
- self .stdin .close ()
1714
- self ._write_set .remove (self .stdin )
1715
-
1716
- if self .stdout in rlist :
1717
- data = os .read (self .stdout .fileno (), 1024 )
1718
- if not data :
1719
- self .stdout .close ()
1720
- self ._read_set .remove (self .stdout )
1721
- stdout .append (data )
1722
-
1723
- if self .stderr in rlist :
1724
- data = os .read (self .stderr .fileno (), 1024 )
1725
- if not data :
1726
- self .stderr .close ()
1727
- self ._read_set .remove (self .stderr )
1728
- stderr .append (data )
1729
-
1730
- return (stdout , stderr )
1731
-
1732
-
1733
1633
def send_signal (self , sig ):
1734
1634
"""Send a signal to the process
1735
1635
"""
0 commit comments