8000 Copy a bunch of fixes by Victor for the Proactor event loop from the … · python/asyncio@ecc2418 · GitHub
[go: up one dir, main page]

Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Commit ecc2418

Browse files
committed
Copy a bunch of fixes by Victor for the Proactor event loop from the CPython repo.
1 parent 0307f1a commit ecc2418

File tree

5 files changed

+47
-19
lines changed

5 files changed

+47
-19
lines changed

asyncio/proactor_events.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ def _loop_reading(self, fut=None):
205205
self.close()
206206

207207

208-
class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
208+
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
209209
transports.WriteTransport):
210210
"""Transport for write pipes."""
211211

@@ -286,8 +286,27 @@ def abort(self):
286286
self._force_close(None)
287287

288288

289+
class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
290+
def __init__(self, *args, **kw):
291+
super().__init__(*args, **kw)
292+
self._read_fut = self._loop._proactor.recv(self._sock, 16)
293+
self._read_fut.add_done_callback(self._pipe_closed)
294+
295+
def _pipe_closed(self, fut):
296+
if fut.cancelled():
297+
# the transport has been closed
298+
return
299+
assert fut is self._read_fut, (fut, self._read_fut)
300+
self._read_fut = None
301+
assert fut.result() == b''
302+
if self._write_fut is not None:
303+
self._force_close(exc)
304+
else:
305+
self.close()
306+
307+
289308
class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
290-
_ProactorWritePipeTransport,
309+
_ProactorBaseWritePipeTransport,
291310
transports.Transport):
292311
"""Transport for duplex pipes."""
293312

@@ -299,7 +318,7 @@ def write_eof(self):
299318

300319

301320
class _ProactorSocketTransport(_ProactorReadPipeTransport,
302-
_ProactorWritePipeTransport,
321+
_ProactorBaseWritePipeTransport,
303322
transports.Transport):
304323
"""Transport for connected sockets."""
305324

@@ -335,6 +354,7 @@ def __init__(self, proactor):
335354
self._selector = proactor # convenient alias
336355
self._self_reading_future = None
337356
self._accept_futures = {} # socket file descriptor => Future
357+
self._granularity = max(proactor.resolution, self._granularity)
338358
proactor.set_loop(self)
339359
self._make_self_pipe()
340360

@@ -353,15 +373,10 @@ def _make_read_pipe_transport(self, sock, protocol, waiter=None,
353373
return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
354374

355375
def _make_write_pipe_transport(self, sock, protocol, waiter=None,
356-
extra=None, check_for_hangup=True):
357-
if check_for_hangup:
358-
# We want connection_lost() to be called when other end closes
359-
return _ProactorDuplexPipeTransport(self,
360-
sock, protocol, waiter, extra)
361-
else:
362-
# If other end closes we may not notice for a long time
363-
return _ProactorWritePipeTransport(self, sock, protocol, waiter,
364-
extra)
376+
extra=None):
377+
# We want connection_lost() to be called when other end closes
378+
return _ProactorWritePipeTransport(self,
379+
sock, protocol, waiter, extra)
365380

366381
def close(self):
367382
if self._proactor is not None:

asyncio/selectors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ def select(self, timeout=None):
372372
else:
373373
# poll() has a resolution of 1 millisecond, round away from
374374
# zero to wait *at least* timeout seconds.
375-
timeout = int(math.ceil(timeout * 1e3))
375+
timeout = math.ceil(timeout * 1e3)
376376
ready = []
377377
try:
378378
fd_event_list = self._poll.poll(timeout)

asyncio/windows_events.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
"""Selector and proactor eventloops for Windows."""
22

3+
import _winapi
34
import errno
5+
import math
46
import socket
7+
import struct
58
import subprocess
69
import weakref
7-
import struct
8-
import _winapi
910

1011
from . import events
1112
from . import base_subprocess
@@ -190,6 +191,7 @@ def __init__(self, concurrency=0xffffffff):
190191
self._cache = {}
191 A3DB 192
self._registered = weakref.WeakSet()
192193
self._stopped_serving = weakref.WeakSet()
194+
self.resolution = 1e-3
193195

194196
def set_loop(self, loop):
195197
self._loop = loop
@@ -325,7 +327,9 @@ def wait_for_handle(self, handle, timeout=None):
325327
if timeout is None:
326328
ms = _winapi.INFINITE
327329
else:
328-
ms = int(timeout * 1000 + 0.5)
330+
# RegisterWaitForSingleObject() has a resolution of 1 millisecond,
331+
# round away from zero to wait *at least* timeout seconds.
332+
ms = math.ceil(timeout * 1e3)
329333

330334
# We only create ov so we can use ov.address as a key for the cache.
331335
ov = _overlapped.Overlapped(NULL)
@@ -396,7 +400,9 @@ def _poll(self, timeout=None):
396400
elif timeout < 0:
397401
raise ValueError("negative timeout")
398402
else:
399-
ms = int(timeout * 1000 + 0.5)
403+
# GetQueuedCompletionStatus() has a resolution of 1 millisecond,
404+
# round away from zero to wait *at least* timeout seconds.
405+
ms = math.ceil(timeout * 1e3)
400406
if ms >= INFINITE:
401407
raise ValueError("timeout too big")
402408
while True:

tests/test_base_events.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,13 @@ def cb():
120120
self.loop.call_at(when, cb)
121121
t0 = self.loop.time()
122122
self.loop.run_forever()
123-
t1 = self.loop.time()
124-
self.assertTrue(0.09 <= t1-t0 <= 0.9, t1-t0)
123+
dt = self.loop.time() - t0
124+
self.assertTrue(0.09 <= dt <= 0.9,
125+
# Issue #20452: add more info in case of failure,
126+
# to try to investigate the bug
127+
(dt,
128+
self.loop._granularity,
129+
time.get_clock_info('monotonic')))
125130

126131
def test_run_once_in_executor_handle(self):
127132
def cb():

tests/test_proactor_events.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class ProactorSocketTransportTests(unittest.TestCase):
1717
def setUp(self):
1818
self.loop = test_utils.TestLoop()
1919
self.proactor = unittest.mock.Mock()
20+
self.proactor.resolution = 1e-3
2021
self.loop._proactor = self.proactor
2122
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
2223
self.sock = unittest.mock.Mock(socket.socket)
@@ -342,6 +343,7 @@ class BaseProactorEventLoopTests(unittest.TestCase):
342343
def setUp(self):
343344
self.sock = unittest.mock.Mock(socket.socket)
344345
self.proactor = unittest.mock.Mock()
346+
self.proactor.resolution = 1e-3
345347

346348
self.ssock, self.csock = unittest.mock.Mock(), unittest.mock.Mock()
347349

0 commit comments

Comments
 (0)
0