8000 gh-135444: fix DatagramTransport buffer_size accounting · python/cpython@860cec6 · GitHub
[go: up one dir, main page]

Skip to content

Commit 860cec6

Browse files
committed
gh-135444: fix DatagramTransport buffer_size accounting
Commit 73e8637 added 8 to the buffer_size when send could not be called right away. However, it did not complete this accounting by removing 8 from the buffer size when sending did finally complete.
1 parent f273fd7 commit 860cec6

File tree

5 files changed

+53
-5
lines changed

5 files changed

+53
-5
lines changed

Lib/asyncio/proactor_events.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,8 @@ def _pipe_closed(self, fut):
460460
class _ProactorDatagramTransport(_ProactorBasePipeTransport,
461461
transports.DatagramTransport):
462462
max_size = 256 * 1024
463+
_header_size = 8
464+
463465
def __init__(self, loop, sock, protocol, address=None,
464466
waiter=None, extra=None):
465467
self._address = address
@@ -499,7 +501,7 @@ def sendto(self, data, addr=None):
499501

500502
# Ensure that what we buffer is immutable.
501503
self._buffer.append((bytes(data), addr))
502-
self._buffer_size += len(data) + 8 # include header bytes
504+
self._buffer_size += len(data) + self._header_size
503505

504506
if self._write_fut is None:
505507
# No current write operations are active, kick one off
@@ -526,7 +528,7 @@ def _loop_writing(self, fut=None):
526528
return
527529

528530
data, addr = self._buffer.popleft()
529-
self._buffer_size -= len(data)
531+
self._buffer_size -= len(data) + self._header_size
530532
if self._address is not None:
531533
self._write_fut = self._loop._proactor.send(self._sock,
532534
data)

Lib/asyncio/selector_events.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,6 +1212,7 @@ def close(self):
12121212
class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
12131213

12141214
_buffer_factory = collections.deque
1215+
_header_size = 8
12151216

12161217
def __init__(self, loop, sock, protocol, address=None,
12171218
waiter=None, extra=None):
@@ -1285,21 +1286,21 @@ def sendto(self, data, addr=None):
12851286

12861287
# Ensure that what we buffer is immutable.
12871288
self._buffer.append((bytes(data), addr))
1288-
self._buffer_size += len(data) + 8 # include header bytes
1289+
self._buffer_size += len(data) + self._header_size
12891290
self._maybe_pause_protocol()
12901291

12911292
def _sendto_ready(self):
12921293
while self._buffer:
12931294
data, addr = self._buffer.popleft()
1294-
self._buffer_size -= len(data)
1295+
self._buffer_size -= len(data) + self._header_size
12951296
try:
12961297
if self._extra['peername']:
12971298
self._sock.send(data)
12981299
else:
12991300
self._sock.sendto(data, addr)
13001301
except (BlockingIOError, InterruptedError):
13011302
self._buffer.appendleft((data, addr)) # Try again later.
1302-
self._buffer_size += len(data)
1303+
self._buffer_size += len(data) + self._header_size
13031304
break
13041305
except OSError as exc:
13051306
self._protocol.error_received(exc)

Lib/test/test_asyncio/test_proactor_events.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,8 @@ def test_sendto(self):
566566
self.assertTrue(self.proactor.sendto.called)
567567
self.proactor.sendto.assert_called_with(
568568
self.sock, data, addr=('0.0.0.0', 1234))
569+
self.assertFalse(transport._buffer)
570+
self.assertEqual(0, transport._buffer_size)
569571

570572
def test_sendto_bytearray(self):
571573
data = bytearray(b'data')

Lib/test/test_asyncio/test_selector_events.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,6 +1497,47 @@ def test_sendto_closing(self):
14971497
transport.sendto(b'data', (1,))
14981498
self.assertEqual(transport._conn_lost, 2)
14991499

1500+
def test_sendto_sendto_ready(self):
1501+
data = b'data'
1502+
1503+
# First queue up a buffer by having the socket block
1504+
self.sock.sendto.side_effect = BlockingIOError
1505+
transport = self.datagram_transport()
1506+
transport.sendto(data, ('0.0.0.0', 12345))
1507+
self.loop.assert_writer(7, transport._sendto_ready)
1508+
self.assertEqual(1, len(transport._buffer))
1509+
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)
1510+
1511+
# Now let the socket send the buffer
1512+
self.sock.sendto.side_effect = None
1513+
transport._sendto_ready()
1514+
self.assertTrue(self.sock.sendto.called)
1515+
self.assertEqual(
1516+
self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
1517+
self.assertFalse(self.loop.writers)
1518+
self.assertFalse(transport._buffer)
1519+
self.assertEqual(transport._buffer_size, 0)
1520+
1521+
def test_sendto_sendto_ready_blocked(self):
1522+
data = b'data'
1523+
1524+
# First queue up a buffer by having the socket block
1525+
self.sock< 9E88 /span>.sendto.side_effect = BlockingIOError
1526+
transport = self.datagram_transport()
1527+
transport.sendto(data, ('0.0.0.0', 12345))
1528+
self.loop.assert_writer(7, transport._sendto_ready)
1529+
self.assertEqual(1, len(transport._buffer))
1530+
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)
1531+
1532+
# Now try to send the buffer and let it get requeued
1533+
transport._sendto_ready()
1534+
self.assertTrue(self.sock.sendto.called)
1535+
self.assertEqual(
1536+
self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
1537+
self.assertTrue(self.loop.writers)
1538+
self.assertEqual(1, len(transport._buffer))
1539+
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)
1540+
15001541
def test_sendto_ready(self):
15011542
data = b'data'
15021543
self.sock.sendto.return_value = len(data)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix asyncio DatagramTransport flow control accounting when a packet cannot
2+
be immediately sent.

0 commit comments

Comments
 (0)
0