8000 bpo-46805: Add low level UDP socket functions to asyncio (GH-31455) · python/cpython@9f04ee5 · GitHub
[go: up one dir, main page]

Skip to content

Commit 9f04ee5

Browse files
authored
bpo-46805: Add low level UDP socket functions to asyncio (GH-31455)
1 parent 7e473e9 commit 9f04ee5

File tree

12 files changed

+488
-6
lines changed

12 files changed

+488
-6
lines changed

Doc/library/asyncio-eventloop.rst

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,29 @@ convenient.
922922

923923
.. versionadded:: 3.7
924924

925+
.. coroutinemethod:: loop.sock_recvfrom(sock, bufsize)
926+
927+
Receive a datagram of up to *bufsize* from *sock*. Asynchronous version of
928+
:meth:`socket.recvfrom() <socket.socket.recvfrom>`.
929+
930+
Return a tuple of (received data, remote address).
931+
932+
*sock* must be a non-blocking socket.
933+
934+
.. versionadded:: 3.11
935+
936+
.. coroutinemethod:: loop.sock_recvfrom_into(sock, buf, nbytes=0)
937+
938+
Receive a datagram of up to *nbytes* from *sock* into *buf*.
939+
Asynchronous version of
940+
:meth:`socket.recvfrom_into() <socket.socket.recvfrom_into>`.
941+
942+
Return a tuple of (number of bytes received, remote address).
943+
944+
*sock* must be a non-blocking socket.
945+
946+
.. versionadded:: 3.11
947+
925948
.. coroutinemethod:: loop.sock_sendall(sock, data)
926949

927950
Send *data* to the *sock* socket. Asynchronous version of
@@ -940,6 +963,18 @@ convenient.
940963
method, before Python 3.7 it returned a :class:`Future`.
941964
Since Python 3.7, this is an ``async def`` method.
942965

966+
.. coroutinemethod:: loop.sock_sendto(sock, data, address)
967+
968+
Send a datagram from *sock* to *address*.
969+
Asynchronous version of
970+
:meth:`socket.sendto() <socket.socket.sendto>`.
971+
972+
Return the number of bytes sent.
973+
974+
*sock* must be a non-blocking socket.
975+
976+
.. versionadded:: 3.11
977+
943978
.. coroutinemethod:: loop.sock_connect(sock, address)
944979

945980
Connect *sock* to a remote socket at *address*.

Doc/library/asyncio-llapi-index.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,18 @@ See also the main documentation section about the
189189
* - ``await`` :meth:`loop.sock_recv_into`
190190
- Receive data from the :class:`~socket.socket` into a buffer.
191191

192+
* - ``await`` :meth:`loop.sock_recvfrom`
193+
- Receive a datagram from the :class:`~socket.socket`.
194+
195+
* - ``await`` :meth:`loop.sock_recvfrom_into`
196+
- Receive a datagram from the :class:`~socket.socket` into a buffer.
197+
192198
* - ``await`` :meth:`loop.sock_sendall`
193199
- Send data to the :class:`~socket.socket`.
194200

201+
* - ``await`` :meth:`loop.sock_sendto`
202+
- Send a datagram via the :class:`~socket.socket` to the given address.
203+
195204
* - ``await`` :meth:`loop.sock_connect`
196205
- Connect the :class:`~socket.socket`.
197206

Doc/whatsnew/3.11.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,15 @@ New Modules
226226
Improved Modules
227227
================
228228

229+
asyncio
230+
-------
231+
232+
* Add raw datagram socket functions to the event loop:
233+
:meth:`~asyncio.AbstractEventLoop.sock_sendto`,
234+
:meth:`~asyncio.AbstractEventLoop.sock_recvfrom` and
235+
:meth:`~asyncio.AbstractEventLoop.sock_recvfrom_into`.
236+
(Contributed by Alex Grönholm in :issue:`46805`.)
237+
229238
fractions
230239
---------
231240

Lib/asyncio/events.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,9 +546,18 @@ async def sock_recv(self, sock, nbytes):
546546
async def sock_recv_into(self, sock, buf):
547547
raise NotImplementedError
548548

549+
async def sock_recvfrom(self, sock, bufsize):
550+
raise NotImplementedError
551+
552+
async def sock_recvfrom_into(self, sock, buf, nbytes=0):
553+
raise NotImplementedError
554+
549555
async def sock_sendall(self, sock, data):
550556
raise NotImplementedError
551557

558+
async def sock_sendto(self, sock, data, address):
559+
raise NotImplementedError
560+
552561
async def sock_connect(self, sock, address):
553562
raise NotImplementedError
554563

Lib/asyncio/proactor_events.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,9 +700,21 @@ async def sock_recv(self, sock, n):
700700
async def sock_recv_into(self, sock, buf):
701701
return await self._proactor.recv_into(sock, buf)
702702

703+
async def sock_recvfrom(self, sock, bufsize):
704+
return await self._proactor.recvfrom(sock, bufsize)
705+
706+
async def sock_recvfrom_into(self, sock, buf, nbytes=0):
707+
if not nbytes:
708+
nbytes = len(buf)
709+
710+
return await self._proactor.recvfrom_into(sock, buf, nbytes)
711+
703712
async def sock_sendall(self, sock, data):
704713
return await self._proactor.send(sock, data)
705714

715+
async def sock_sendto(self, sock, data, address):
716+
return await self._proactor.sendto(sock, data, 0, address)
717+
706718
async def sock_connect(self, sock, address):
707719
return await self._proactor.connect(sock, address)
708720

Lib/asyncio/selector_events.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,88 @@ def _sock_recv_into(self, fut, sock, buf):
434434
else:
435435
fut.set_result(nbytes)
436436

437+
async def sock_recvfrom(self, sock, bufsize):
438+
"""Receive a datagram from a datagram socket.
439+
440+
The return value is a tuple of (bytes, address) representing the
441+
datagram received and the address it came from.
442+
The maximum amount of data to be received at once is specified by
443+
nbytes.
444+
"""
445+
base_events._check_ssl_socket(sock)
446+
if self._debug and sock.gettimeout() != 0:
447+
raise ValueError("the socket must be non-blocking")
448+
try:
449+
return sock.recvfrom(bufsize)
450+
except (BlockingIOError, InterruptedError):
451+
pass
452+
fut = self.create_future()
453+
fd = sock.fileno()
454+
self._ensure_fd_no_transport(fd)
455+
handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
456+
fut.add_done_callback(
457+
functools.partial(self._sock_read_done, fd, handle=handle))
458+
return await fut
459+
460+
def _sock_recvfrom(self, fut, sock, bufsize):
461+
# _sock_recvfrom() can add itself as an I/O callback if the operation
462+
# can't be done immediately. Don't use it directly, call
463+
# sock_recvfrom().
464+
if fut.done():
465+
return
466+
try:
467+
result = sock.recvfrom(bufsize)
468+
except (BlockingIOError, InterruptedError):
469+
return # try again next time
470+
except (SystemExit, KeyboardInterrupt):
471+
raise
472+
except BaseException as exc:
473+
fut.set_exception(exc)
474+
else:
475+
fut.set_result(result)
476+
477+
async def sock_recvfrom_into(self, sock, buf, nbytes=0):
478+
"""Receive data from the socket.
479+
480+
The received data is written into *buf* (a writable buffer).
481+
The return value is a tuple of (number of bytes written, address).
482+
"""
483+
base_events._check_ssl_socket(sock)
484+
if self._debug and sock.gettimeout() != 0:
485+
raise ValueError("the socket must be non-blocking")
486+
if not nbytes:
487+
nbytes = len(buf)
488+
489+
try:
490+
return sock.recvfrom_into(buf, nbytes)
491+
except (BlockingIOError, InterruptedError):
492+
pass
493+
fut = self.create_future()
494+
fd = sock.fileno()
495+
self._ensure_fd_no_transport(fd)
496+
handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
497+
nbytes)
498+
fut.add_done_callback(
499+
functools.partial(self._sock_read_done, fd, handle=handle))
500+
return await fut
501+
502+
def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
503+
# _sock_recv_into() can add itself as an I/O callback if the operation
504+
# can't be done immediately. Don't use it directly, call
505+
# sock_recv_into().
506+
if fut.done():
507+
return
508+
try:
509+
result = sock.recvfrom_into(buf, bufsize)
510+
except (BlockingIOError, InterruptedError):
511+
return # try again next time
512+
except (SystemExit, KeyboardInterrupt):
513+
raise
514+
except BaseException as exc:
515+
fut.set_exception(exc)
516+
else:
517+
fut.set_result(result)
518+
437519
async def sock_sendall(self, sock, data):
438520
"""Send data to the socket.
439521
@@ -487,6 +569,48 @@ def _sock_sendall(self, fut, sock, view, pos):
487569
else:
488570
pos[0] = start
489571

572+
async def sock_sendto(self, sock, data, address):
573+
"""Send data to the socket.
574+
575+
The socket must be connected to a remote socket. This method continues
576+
to send data from data until either all data has been sent or an
577+
error occurs. None is returned on success. On error, an exception is
578+
raised, and there is no way to determine how much data, if any, was
579+
successfully processed by the receiving end of the connection.
580+
"""
581+
base_events._check_ssl_socket(sock)
582+
if self._debug and sock.gettimeout() != 0:
583+
raise ValueError("the socket must be non-blocking")
584+
try:
585+
return sock.sendto(data, address)
586+
except (BlockingIOError, InterruptedError):
587+
pass
588+
589+
fut = self.create_future()
590+
fd = sock.fileno()
591+
self._ensure_fd_no_transport(fd)
592+
# use a trick with a list in closure to store a mutable state
593+
handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
594+
address)
595+
fut.add_done_callback(
596+
functools.partial(self._sock_write_done, fd, handle=handle))
597+
return await fut
598+
599+
def _sock_sendto(self, fut, sock, data, address):
600+
if fut.done():
601+
# Future cancellation can be scheduled on previous loop iteration
602+
return
603+
try:
604+
n = sock.sendto(data, 0, address)
605+
except (BlockingIOError, InterruptedError):
606+
return
607+
except (SystemExit, KeyboardInterrupt):
608+
raise
609+
except BaseException as exc:
610+
fut.set_exception(exc)
611+
else:
612+
fut.set_result(n)
613+
490614
async def sock_connect(self, sock, address):
491615
"""Connect to a remote socket at address.
492616

Lib/asyncio/windows_events.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,26 @@ def finish_recv(trans, key, ov):
512512

513513
return self._register(ov, conn, finish_recv)
514514

515+
def recvfrom_into(self, conn, buf, flags=0):
516+
self._register_with_iocp(conn)
517+
ov = _overlapped.Overlapped(NULL)
518+
try:
519+
ov.WSARecvFromInto(conn.fileno(), buf, flags)
520+
except BrokenPipeError:
521+
return self._result((0, None))
522+
523+
def finish_recv(trans, key, ov):
524+
try:
525+
return ov.getresult()
526+
except OSError as exc:
527+
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
528+
_overlapped.ERROR_OPERATION_ABORTED):
529+
raise ConnectionResetError(*exc.args)
530+
else:
531+
raise
532+
533+
return self._register(ov, conn, finish_recv)
534+
515535
def sendto(self, conn, buf, flags=0, addr=None):
516536
self._register_with_iocp(conn)
517537
ov = _overlapped.Overlapped(NULL)

Lib/test/test_asyncio/test_sock_lowlevel.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55

66
from asyncio import proactor_events
77
from itertools import cycle, islice
8+
from unittest.mock import patch, Mock
89
from test.test_asyncio import utils as test_utils
910
from test import support
1011
from test.support import socket_helper
1112

12-
1313
def tearDownModule():
1414
asyncio.set_event_loop_policy(None)
1515

@@ -380,6 +380,79 @@ def test_huge_content_recvinto(self):
380380
self.loop.run_until_complete(
381381
self._basetest_huge_content_recvinto(httpd.address))
382382

383+
async def _basetest_datagram_recvfrom(self, server_address):
384+
# Happy path, sock.sendto() returns immediately
385+
data = b'\x01' * 4096
386+
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
387+
sock.setblocking(False)
388+
await self.loop.sock_sendto(sock, data, server_address)
389+
received_data, from_addr = await self.loop.sock_recvfrom(
390+
sock, 4096)
391+
self.assertEqual(received_data, data)
392+
self.assertEqual(from_addr, server_address)
393+
394+
def test_recvfrom(self):
395+
with test_utils.run_udp_echo_server() as server_address:
396+
self.loop.run_until_complete(
397+
self._basetest_datagram_recvfrom(server_address))
398+
399+
async def _basetest_datagram_recvfrom_into(self, server_address):
400+
# Happy path, sock.sendto() returns immediately
401+
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
402+
sock.setblocking(False)
403+
404+
buf = bytearray(4096)
405+
data = b'\x01' * 4096
406+
await self.loop.sock_sendto(sock, data, server_address)
407+
num_bytes, from_addr = await self.loop.sock_recvfrom_into(
408+
sock, buf)
409+
self.assertEqual(num_bytes, 4096)
410+
self.assertEqual(buf, data)
411+
self.assertEqual(from_addr, server_address)
412+
413+
buf = bytearray(8192)
414+
await self.loop.sock_sendto(sock, data, server_address)
415+
num_bytes, from_addr = await self.loop.sock_recvfrom_into(
416+
sock, buf, 4096)
417+
self.assertEqual(num_bytes, 4096)
418+
self.assertEqual(buf[:4096], data[:4096])
419+
self.assertEqual(from_addr, server_address)
420+
421+
def test_recvfrom_into(self):
422+
with test_utils.run_udp_echo_server() as server_address:
423+
self.loop.run_until_complete(
424+
self._basetest_datagram_recvfrom_into(server_address))
425+
426+
async def _basetest_datagram_sendto_blocking(self, server_address):
427+
# Sad path, sock.sendto() raises BlockingIOError
428+
# This involves patching sock.sendto() to raise BlockingIOError but
429+
# sendto() is not used by the proactor event loop
430+
data = b'\x01' * 4096
431+
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
432+
sock.setblocking(False)
433+
mock_sock = Mock(sock)
434+
mock_sock.gettimeout = sock.gettimeout
435+
mock_sock.sendto.configure_mock(side_effect=BlockingIOError)
436+
mock_sock.fileno = sock.fileno
437+
self.loop.call_soon(
438+
lambda: setattr(mock_sock, 'sendto', sock.sendto)
439+
)
440+
await self.loop.sock_sendto(mock_sock, data, server_address)
441+
442+
received_data, from_addr = await self.loop.sock_recvfrom(
443+
sock, 4096)
444+
self.assertEqual(received_data, data)
445+
self.assertEqual(from_addr, server_address)
446+
447+
def test_sendto_blocking(self):
448+
if sys.platform == 'win32':
449+
if isinstance(self.loop, asyncio.ProactorEventLoop):
450+
raise unittest.SkipTest('Not relevant to ProactorEventLoop')
451+
452+
with test_utils.run_udp_echo_server() as server_address:
453+
self.loop.run_until_complete(
454+
self._basetest_datagram_sendto_blocking(server_address))
455+
383456
@socket_helper.skip_unless_bind_unix_socket
384457
def test_unix_sock_client_ops(self):
385458
with test_utils.run_test_unix_server() as httpd:

0 commit comments

Comments
 (0)
0