8000 Copy a bunch of fixes by Victor for the Proactor event loop from the … · python/asyncio@ecc2418 · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Commit ecc2418

Browse files
committed
Copy a bunch of fixes by Victor for the Proactor event loop from the CPython repo.
1 parent 0307f1a commit ecc2418

File tree

5 files changed

+47
-19
lines changed

5 files changed

+47
-19
lines changed

asyncio/proactor_events.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ def _loop_reading(self, fut=None):
205205
self.close()
206206

207207

208-
class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
208+
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
209209
transports.WriteTransport):
210210
"""Transport for write pipes."""
211211

@@ -286,8 +286,27 @@ def abort(self):
286286
self._force_close(None)
287287

288288

289+
class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
290+
def __init__(self, *args, **kw):
291+
super().__init__(*args, **kw)
292+
self._read_fut = self._loop._proactor.recv(self._sock, 16)
293+
self._read_fut.add_done_callback(self._pipe_closed)
294+
295+
def _pipe_closed(self, fut):
296+
if fut.cancelled():
297+
# the transport has been closed
298+
return
299+
assert fut is self._read_fut, (fut, self._read_fut)
300+
self._read_fut = None
301+
assert fut.result() == b''
302+
if self._write_fut is not None:
303+
self._force_close(exc)
304+
else:
305+
self.close()
306+
307+
289308
class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
290-
_ProactorWritePipeTransport,
309+
_ProactorBaseWritePipeTransport,
291310
transports.Transport):
292311
"""Transport for duplex pipes."""
293312

@@ -299,7 +318,7 @@ def write_eof(self):
299318

300319

301320
class _ProactorSocketTransport(_ProactorReadPipeTransport,
302-
_ProactorWritePipeTransport,
321+
_ProactorBaseWritePipeTransport,
303322
transports.Transport):
304323
"""Transport for connected sockets."""
305324

@@ -335,6 +354,7 @@ def __init__(self, proactor):
335354
self._selector = proactor # convenient alias
336355
self._self_reading_future = None
337356
self._accept_futures = {} # socket file descriptor => Future
357+
self._granularity = max(proactor.resolution, self._granularity)
338358
proactor.set_loop(self)
339359
self._make_self_pipe()
340360

@@ -353,15 +373,10 @@ def _make_read_pipe_transport(self, sock, protocol, waiter=None,
353373
return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
354374

355375
def _make_write_pipe_transport(self, sock, protocol, waiter=None,
356-
extra=None, check_for_hangup=True):
357-
if check_for_hangup:
358-
# We want connection_lost() to be called when other end closes
359-
return _ProactorDuplexPipeTransport(self,
360-
sock, protocol, waiter, extra)
361-
else:
362-
# If other end closes we may not notice for a long time
363-
return _ProactorWritePipeTransport(self, sock, protocol, waiter,
364-
extra)
376+
extra=None):
377+
# We want connection_lost() to be called when other end closes
378+
return _ProactorWritePipeTransport(self,
379+
sock, protocol, waiter, extra)
365380

366381
def close(self):
367382
if self._proactor is not None:

asyncio/selectors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ def select(self, timeout=None):
372372
else:
373373
# poll() has a resolution of 1 millisecond, round away from
374374
# zero to wait *at least* timeout seconds.
375-
timeout = int(math.ceil(timeout * 1e3))
375+
timeout = math.ceil(timeout * 1e3)
376376
ready = []
377377
try:
378378
fd_event_list = self._poll.poll(timeout)

asyncio/windows_events.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
"""Selector and proactor eventloops for Windows."""
22

3+
import _winapi
34
import errno
5+
import math
46
import socket
7+
import struct
58
import subprocess
69
import weakref
7-
import struct
8-
import _winapi
910

1011
from . import events
1112
from . import base_subprocess
@@ -190,6 +191,7 @@ def __init__(self, concurrency=0xffffffff):
190191
self._cache = {}
191192
self._registered = weakref.WeakSet()
192193
self._stopped_serving = weakref.WeakSet()
194+
self.resolution = 1e-3
193195

194196
def set_loop(self, loop):
195197
self._loop = loop
@@ -325,7 +327,9 @@ def wait_for_handle(self, handle, timeout=None):
325327
if timeout is None:
326328
ms = _winapi.INFINITE
327329
else:
328-
ms = int(timeout * 1000 + 0.5)
330+
# RegisterWaitForSingleObject() has a resolution of 1 millisecond,
331+
# round away from zero to wait *at least* timeout seconds.
332+
ms = math.ceil(timeout * 1e3)
329333

330334
# We only create ov so we can use ov.address as a key for the cache.
331335
ov = _overlapped.Overlapped(NULL)
@@ -396,7 +400,9 @@ def _poll(self, timeout=None):
396400
elif timeout < 0:
397401
raise ValueError("negative timeout")
398402
else:
399-
ms = int(timeout * 1000 + 0.5)
403+
# GetQueuedCompletionStatus() has a resolution of 1 millisecond,
404+
# round away from zero to wait *at least* timeout seconds.
405+
ms = math.ceil(timeout * 1e3)
400406
if ms >= INFINITE:
401407
raise ValueError("timeout too big")
402408
while True:

tests/test_base_events.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,13 @@ def cb():
120120
self.loop.call_at(when, cb)
121121
t0 = self.loop.time()
122122
self.loop.run_forever()
123-
t1 = self.loop.time()
124-
self.assertTrue(0.09 <= t1-t0 <= 0.9, t1-t0)
123+
dt = self.loop.time() - t0
124+
self.assertTrue(0.09 <= dt <= 0.9,
125+
# Issue #20452: add more info in case of failure,
126+
# to try to investigate the bug
127+
(dt,
128+
self.loop._granularity,
129+
time.get_clock_info('monotonic')))
125130

126131
def test_run_once_in_executor_handle(self):
127132
def cb():

tests/test_proactor_events.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class ProactorSocketTransportTests(unittest.TestCase):
1717
def setUp(self):
1818
self.loop = test_utils.TestLoop()
1919
self.proactor = unittest.mock.Mock()
20+
self.proactor.resolution = 1e-3
2021
self.loop._proactor = self.proactor
2122
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
2223
self.sock = unittest.mock.Mock(socket.socket)
@@ -342,6 +343,7 @@ class BaseProactorEventLoopTests(unittest.TestCase):
342343
def setUp(self):
343344
self.sock = unittest.mock.Mock(socket.socket)
344345
self.proactor = unittest.mock.Mock()
346+
self.proactor.resolution = 1e-3
345347

346348
self.ssock, self.csock = unittest.mock.Mock(), unittest.mock.Mock()
347349

0 commit comments

Comments
 (0)
0