@@ -434,6 +434,88 @@ def _sock_recv_into(self, fut, sock, buf):
434
434
else :
435
435
fut .set_result (nbytes )
436
436
437
+ async def sock_recvfrom (self , sock , bufsize ):
438
+ """Receive a datagram from a datagram socket.
439
+
440
+ The return value is a tuple of (bytes, address) representing the
441
+ datagram received and the address it came from.
442
+ The maximum amount of data to be received at once is specified by
443
+ nbytes.
444
+ """
445
+ base_events ._check_ssl_socket (sock )
446
+ if self ._debug and sock .gettimeout () != 0 :
447
+ raise ValueError ("the socket must be non-blocking" )
448
+ try :
449
+ return sock .recvfrom (bufsize )
450
+ except (BlockingIOError , InterruptedError ):
451
+ pass
452
+ fut = self .create_future ()
453
+ fd = sock .fileno ()
454
+ self ._ensure_fd_no_transport (fd )
455
+ handle = self ._add_reader (fd , self ._sock_recvfrom , fut , sock , bufsize )
456
+ fut .add_done_callback (
457
+ functools .partial (self ._sock_read_done , fd , handle = handle ))
458
+ return await fut
459
+
460
+ def _sock_recvfrom (self , fut , sock , bufsize ):
461
+ # _sock_recvfrom() can add itself as an I/O callback if the operation
462
+ # can't be done immediately. Don't use it directly, call
463
+ # sock_recvfrom().
464
+ if fut .done ():
465
+ return
466
+ try :
467
+ result = sock .recvfrom (bufsize )
468
+ except (BlockingIOError , InterruptedError ):
469
+ return # try again next time
470
+ except (SystemExit , KeyboardInterrupt ):
471
+ raise
472
+ except BaseException as exc :
473
+ fut .set_exception (exc )
474
+ else :
475
+ fut .set_result (result )
476
+
477
+ async def sock_recvfrom_into (self , sock , buf , nbytes = 0 ):
478
+ """Receive data from the socket.
479
+
480
+ The received data is written into *buf* (a writable buffer).
481
+ The return value is a tuple of (number of bytes written, address).
482
+ """
483
+ base_events ._check_ssl_socket (sock )
484
+ if self ._debug and sock .gettimeout () != 0 :
485
+ raise ValueError ("the socket must be non-blocking" )
486
+ if not nbytes :
487
+ nbytes = len (buf )
488
+
489
+ try :
490
+ return sock .recvfrom_into (buf , nbytes )
491
+ except (BlockingIOError , InterruptedError ):
492
+ pass
493
+ fut = self .create_future ()
494
+ fd = sock .fileno ()
495
+ self ._ensure_fd_no_transport (fd )
496
+ handle = self ._add_reader (fd , self ._sock_recvfrom_into , fut , sock , buf ,
497
+ nbytes )
498
+ fut .add_done_callback (
499
+ functools .partial (self ._sock_read_done , fd , handle = handle ))
500
+ return await fut
501
+
502
+ def _sock_recvfrom_into (self , fut , sock , buf , bufsize ):
503
+ # _sock_recv_into() can add itself as an I/O callback if the operation
504
+ # can't be done immediately. Don't use it directly, call
505
+ # sock_recv_into().
506
+ if fut .done ():
507
+ return
508
+ try :
509
+ result = sock .recvfrom_into (buf , bufsize )
510
+ except (BlockingIOError , InterruptedError ):
511
+ return # try again next time
512
+ except (SystemExit , KeyboardInterrupt ):
513
+ raise
514
+ except BaseException as exc :
515
+ fut .set_exception (exc )
516
+ else :
517
+ fut .set_result (result )
518
+
437
519
async def sock_sendall (self , sock , data ):
438
520
"""Send data to the socket.
439
521
@@ -487,6 +569,48 @@ def _sock_sendall(self, fut, sock, view, pos):
487
569
else :
488
570
pos [0 ] = start
489
571
572
+ async def sock_sendto (self , sock , data , address ):
573
+ """Send data to the socket.
574
+
575
+ The socket must be connected to a remote socket. This method continues
576
+ to send data from data until either all data has been sent or an
577
+ error occurs. None is returned on success. On error, an exception is
578
+ raised, and there is no way to determine how much data, if any, was
579
+ successfully processed by the receiving end of the connection.
580
+ """
581
+ base_events ._check_ssl_socket (sock )
582
+ if self ._debug and sock .gettimeout () != 0 :
583
+ raise ValueError ("the socket must be non-blocking" )
584
+ try :
585
+ return sock .sendto (data , address )
586
+ except (BlockingIOError , InterruptedError ):
587
+ pass
588
+
589
+ fut = self .create_future ()
590
+ fd = sock .fileno ()
591
+ self ._ensure_fd_no_transport (fd )
592
+ # use a trick with a list in closure to store a mutable state
593
+ handle = self ._add_writer (fd , self ._sock_sendto , fut , sock , data ,
594
+ address )
595
+ fut .add_done_callback (
596
+ functools .partial (self ._sock_write_done , fd , handle = handle ))
597
+ return await fut
598
+
599
+ def _sock_sendto (self , fut , sock , data , address ):
600
+ if fut .done ():
601
+ # Future cancellation can be scheduled on previous loop iteration
602
+ return
603
+ try :
604
+ n = sock .sendto (data , 0 , address )
605
+ except (BlockingIOError , InterruptedError ):
606
+ return
607
+ except (SystemExit , KeyboardInterrupt ):
608
+ raise
609
+ except BaseException as exc :
610
+ fut .set_exception (exc )
611
+ else :
612
+ fut .set_result (n )
613
+
490
614
async def sock_connect (self , sock , address ):
491
615
"""Connect to a remote socket at address.
492
616
0 commit comments