8000 Merge pull request #5263 from youknowone/multiprocessing · RustPython/RustPython@3d78ca8 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3d78ca8

Browse files
authored
Merge pull request #5263 from youknowone/multiprocessing
Update multiprocessing from CPython 3.12.3
2 parents 1cec856 + 46410ff commit 3d78ca8

31 files changed

+7148
-99
lines changed

Lib/multiprocessing/connection.py

Lines changed: 227 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
1111

12+
import errno
1213
import io
1314
import os
1415
import sys
@@ -73,11 +74,6 @@ def arbitrary_address(family):
7374
if family == 'AF_INET':
7475
return ('localhost', 0)
7576
elif family == 'AF_UNIX':
76-
# Prefer abstract sockets if possible to avoid problems with the address
77-
# size. When coding portable applications, some implementations have
78-
# sun_path as short as 92 bytes in the sockaddr_un struct.
79-
if util.abstract_sockets_supported:
80-
return f"\0listener-{os.getpid()}-{next(_mmap_counter)}"
8177
return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
8278
elif family == 'AF_PIPE':
8379
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
@@ -188,10 +184,9 @@ def send_bytes(self, buf, offset=0, size=None):
188184
self._check_closed()
189185
self._check_writable()
190186
m = memoryview(buf)
191-
# HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
192187
if m.itemsize > 1:
193-
m = memoryview(bytes(m))
194-
n = len(m)
188+
m = m.cast('B')
189+
n = m.nbytes
195190
if offset < 0:
196191
raise ValueError("offset is negative")
197192
if n < offset:
@@ -277,12 +272,22 @@ class PipeConnection(_ConnectionBase):
277272
with FILE_FLAG_OVERLAPPED.
278273
"""
279274
_got_empty_message = False
275+
_send_ov = None
280276

281277
def _close(self, _CloseHandle=_winapi.CloseHandle):
278+
ov = self._send_ov
279+
if ov is not None:
280+
# Interrupt WaitForMultipleObjects() in _send_bytes()
281+
ov.cancel()
282282
_CloseHandle(self._handle)
283283

284284
def _send_bytes(self, buf):
285+
if self._send_ov is not None:
286+
# A connection should only be used by a single thread
287+
raise ValueError("concurrent send_bytes() calls "
288+
"are not supported")
285289
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
290+
self._send_ov = ov
286291
try:
287292
if err == _winapi.ERROR_IO_PENDING:
288293
waitres = _winapi.WaitForMultipleObjects(
@@ -292,7 +297,13 @@ def _send_bytes(self, buf):
292297
ov.cancel()
293298
raise
294299
finally:
300+
self._send_ov = None
295301
nwritten, err = ov.GetOverlappedResult(True)
302+
if err == _winapi.ERROR_OPERATION_ABORTED:
303+
# close() was called by another thread while
304+
# WaitForMultipleObjects() was waiting for the overlapped
305+
# operation.
306+
raise OSError(errno.EPIPE, "handle is closed")
296307
assert err == 0
297308
assert nwritten == len(buf)
298309

@@ -465,8 +476,9 @@ def accept(self):
465476
'''
466477
if self._listener is None:
467478
raise OSError('listener is closed')
479+
468480
c = self._listener.accept()
469-
if self._authkey:
481+
if self._authkey is not None:
470482
deliver_challenge(c, self._authkey)
471483
answer_challenge(c, self._authkey)
472484
return c
@@ -728,39 +740,227 @@ def PipeClient(address):
728740
# Authentication stuff
729741
#
730742

731-
MESSAGE_LENGTH = 20
743+
MESSAGE_LENGTH = 40 # MUST be > 20
732744

733-
CHALLENGE = b'#CHALLENGE#'
734-
WELCOME = b'#WELCOME#'
735-
FAILURE = b'#FAILURE#'
745+
_CHALLENGE = b'#CHALLENGE#'
746+
_WELCOME = b'#WELCOME#'
747+
_FAILURE = b'#FAILURE#'
736748

737-
def deliver_challenge(connection, authkey):
749+
# multiprocessing.connection Authentication Handshake Protocol Description
750+
# (as documented for reference after reading the existing code)
751+
# =============================================================================
752+
#
753+
# On Windows: native pipes with "overlapped IO" are used to send the bytes,
754+
# instead of the length prefix SIZE scheme described below. (ie: the OS deals
755+
# with message sizes for us)
756+
#
757+
# Protocol error behaviors:
758+
#
759+
# On POSIX, any failure to receive the length prefix into SIZE, for SIZE greater
760+
# than the requested maxsize to receive, or receiving fewer than SIZE bytes
761+
# results in the connection being closed and auth to fail.
762+
#
763+
# On Windows, receiving too few bytes is never a low level _recv_bytes read
764+
# error, receiving too many will trigger an error only if receive maxsize
765+
# value was larger than 128 OR the if the data arrived in smaller pieces.
766+
#
767+
# Serving side Client side
768+
# ------------------------------ ---------------------------------------
769+
# 0. Open a connection on the pipe.
770+
# 1. Accept connection.
771+
# 2. Random 20+ bytes -> MESSAGE
772+
# Modern servers always send
773+
# more than 20 bytes and include
774+
# a {digest} prefix on it with
775+
# their preferred HMAC digest.
776+
# Legacy ones send ==20 bytes.
777+
# 3. send 4 byte length (net order)
778+
# prefix followed by:
779+
# b'#CHALLENGE#' + MESSAGE
780+
# 4. Receive 4 bytes, parse as network byte
781+
# order integer. If it is -1, receive an
782+
# additional 8 bytes, parse that as network
783+
# byte order. The result is the length of
784+
# the data that follows -> SIZE.
785+
# 5. Receive min(SIZE, 256) bytes -> M1
786+
# 6. Assert that M1 starts with:
787+
# b'#CHALLENGE#'
788+
# 7. Strip that prefix from M1 into -> M2
789+
# 7.1. Parse M2: if it is exactly 20 bytes in
790+
# F438 length this indicates a legacy server
791+
# supporting only HMAC-MD5. Otherwise the
792+
# 7.2. preferred digest is looked up from an
793+
# expected "{digest}" prefix on M2. No prefix
794+
# or unsupported digest? <- AuthenticationError
795+
# 7.3. Put divined algorithm name in -> D_NAME
796+
# 8. Compute HMAC-D_NAME of AUTHKEY, M2 -> C_DIGEST
797+
# 9. Send 4 byte length prefix (net order)
798+
# followed by C_DIGEST bytes.
799+
# 10. Receive 4 or 4+8 byte length
800+
# prefix (#4 dance) -> SIZE.
801+
# 11. Receive min(SIZE, 256) -> C_D.
802+
# 11.1. Parse C_D: legacy servers
803+
# accept it as is, "md5" -> D_NAME
804+
# 11.2. modern servers check the length
805+
# of C_D, IF it is 16 bytes?
806+
# 11.2.1. "md5" -> D_NAME
807+
# and skip to step 12.
808+
# 11.3. longer? expect and parse a "{digest}"
809+
# prefix into -> D_NAME.
810+
# Strip the prefix and store remaining
811+
# bytes in -> C_D.
812+
# 11.4. Don't like D_NAME? <- AuthenticationError
813+
# 12. Compute HMAC-D_NAME of AUTHKEY,
814+
# MESSAGE into -> M_DIGEST.
815+
# 13. Compare M_DIGEST == C_D:
816+
# 14a: Match? Send length prefix &
817+
# b'#WELCOME#'
818+
# <- RETURN
819+
# 14b: Mismatch? Send len prefix &
820+
# b'#FAILURE#'
821+
# <- CLOSE & AuthenticationError
822+
# 15. Receive 4 or 4+8 byte length prefix (net
823+
# order) again as in #4 into -> SIZE.
824+
# 16. Receive min(SIZE, 256) bytes -> M3.
825+
# 17. Compare M3 == b'#WELCOME#':
826+
# 17a. Match? <- RETURN
827+
# 17b. Mismatch? <- CLOSE & AuthenticationError
828+
#
829+
# If this RETURNed, the connection remains open: it has been authenticated.
830+
#
831+
# Length prefixes are used consistently. Even on the legacy protocol, this
832+
# was good fortune and allowed us to evolve the protocol by using the length
833+
# of the opening challenge or length of the returned digest as a signal as
834+
# to which protocol the other end supports.
835+
836+
_ALLOWED_DIGESTS = frozenset(
837+
{b'md5', b'sha256', b'sha384', b'sha3_256', b'sha3_384'})
838+
_MAX_DIGEST_LEN = max(len(_) for _ in _ALLOWED_DIGESTS)
839+
840+
# Old hmac-md5 only server versions from Python <=3.11 sent a message of this
841+
# length. It happens to not match the length of any supported digest so we can
842+
# use a message of this length to indicate that we should work in backwards
843+
# compatible md5-only mode without a {digest_name} prefix on our response.
844+
_MD5ONLY_MESSAGE_LENGTH = 20
845+
_MD5_DIGEST_LEN = 16
846+
_LEGACY_LENGTHS = (_MD5ONLY_MESSAGE_LENGTH, _MD5_DIGEST_LEN)
847+
848+
849+
def _get_digest_name_and_payload(message: bytes) -> (str, bytes):
850+
"""Returns a digest name and the payload for a response hash.
851+
852+
If a legacy protocol is detected based on the message length
853+
or contents the digest name returned will be empty to indicate
854+
legacy mode where MD5 and no digest prefix should be sent.
855+
"""
856+
# modern message format: b"{digest}payload" longer than 20 bytes
857+
# legacy message format: 16 or 20 byte b"payload"
858+
if len(message) in _LEGACY_LENGTHS:
859+
# Either this was a legacy server challenge, or we're processing
860+
# a reply from a legacy client that sent an unprefixed 16-byte
861+
# HMAC-MD5 response. All messages using the modern protocol will
862+
# be longer than either of these lengths.
863+
return '', message
864+
if (message.startswith(b'{') and
865+
(curly := message.find(b'}', 1, _MAX_DIGEST_LEN+2)) > 0):
866+
digest = message[1:curly]
867+
if digest in _ALLOWED_DIGESTS:
868+
payload = message[curly+1:]
869+
return digest.decode('ascii'), payload
870+
raise AuthenticationError(
871+
'unsupported message length, missing digest prefix, '
872+
f'or unsupported digest: {message=}')
873+
874+
875+
def _create_response(authkey, message):
876+
"""Create a MAC based on authkey and message
877+
878+
The MAC algorithm defaults to HMAC-MD5, unless MD5 is not available or
879+
the message has a '{digest_name}' prefix. For legacy HMAC-MD5, the response
880+
is the raw MAC, otherwise the response is prefixed with '{digest_name}',
881+
e.g. b'{sha256}abcdefg...'
882+
883+
Note: The MAC protects the entire message including the digest_name prefix.
884+
"""
738885
import hmac
886+
digest_name = _get_digest_name_and_payload(message)[0]
887+
# The MAC protects the entire message: digest header and payload.
888+
if not digest_name:
889+
# Legacy server without a {digest} prefix on message.
890+
# Generate a legacy non-prefixed HMAC-MD5 reply.
891+
try:
892+
return hmac.new(authkey, message, 'md5').digest()
893+
except ValueError:
894+
# HMAC-MD5 is not available (FIPS mode?), fall back to
895+
# HMAC-SHA2-256 modern protocol. The legacy server probably
896+
# doesn't support it and will reject us anyways. :shrug:
897+
digest_name = 'sha256'
898+
# Modern protocol, indicate the digest used in the reply.
899+
response = hmac.new(authkey, message, digest_name).digest()
900+
return b'{%s}%s' % (digest_name.encode('ascii'), response)
901+
902+
903+
def _verify_challenge(authkey, message, response):
904+
"""Verify MAC challenge
905+
906+
If our message did not include a digest_name prefix, the client is allowed
907+
to select a stronger digest_name from _ALLOWED_DIGESTS.
908+
909+
In case our message is prefixed, a client cannot downgrade to a weaker
910+
algorithm, because the MAC is calculated over the entire message
911+
including the '{digest_name}' prefix.
912+
"""
913+
import hmac
914+
response_digest, response_mac = _get_digest_name_and_payload(response)
915+
response_digest = response_digest or 'md5'
916+
try:
917+
expected = hmac.new(authkey, message, response_digest).digest()
918+
except ValueError:
919+
raise AuthenticationError(f'{response_digest=} unsupported')
920+
if len(expected) != len(response_mac):
921+
raise AuthenticationError(
922+
f'expected {response_digest!r} of length {len(expected)} '
923+
f'got {len(response_mac)}')
924+
if not hmac.compare_digest(expected, response_mac):
925+
raise AuthenticationError('digest received was wrong')
926+
927+
928+
def deliver_challenge(connection, authkey: bytes, digest_name='sha256'):
739929
if not isinstance(authkey, bytes):
740930
raise ValueError(
741931
"Authkey must be bytes, not {0!s}".format(type(authkey)))
932+
assert MESSAGE_LENGTH > _MD5ONLY_MESSAGE_LENGTH, "protocol constraint"
742933
message = os.urandom(MESSAGE_LENGTH)
743-
connection.send_bytes(CHALLENGE + message)
744-
digest = hmac.new(authkey, message, 'md5').digest()
934+
message = b'{%s}%s' % (digest_name.encode('ascii'), message)
935+
# Even when sending a challenge to a legacy client that does not support
936+
# digest prefixes, they'll take the entire thing as a challenge and
937+
# respond to it with a raw HMAC-MD5.
938+
connection.send_bytes(_CHALLENGE + message)
745939
response = connection.recv_bytes(256) # reject large message
746-
if response == digest:
747-
connection.send_bytes(WELCOME)
940+
try:
941+
_verify_challenge(authkey, message, response)
942+
except AuthenticationError:
943+
connection.send_bytes(_FAILURE)
944+
raise
748945
else:
749-
connection.send_bytes(FAILURE)
750-
raise AuthenticationError('digest received was wrong')
946+
connection.send_bytes(_WELCOME)
751947

752-
def answer_challenge(connection, authkey):
753-
import hmac
948+
949+
def answer_challenge(connection, authkey: bytes):
754950
if not isinstance(authkey, bytes):
755951
raise ValueError(
756952
"Authkey must be bytes, not {0!s}".format(type(authkey)))
757953
message = connection.recv_bytes(256) # reject large message
758-
assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
759-
message = message[len(CHALLENGE):]
760-
digest = hmac.new(authkey, message, 'md5').digest()
954+
if not message.startswith(_CHALLENGE):
955+
raise AuthenticationError(
956+
f'Protocol error, expected challenge: {message=}')
957+
message = message[len(_CHALLENGE):]
958+
if len(message) < _MD5ONLY_MESSAGE_LENGTH:
959+
raise AuthenticationError('challenge too short: {len(message)} bytes')
960+
digest = _create_response(authkey, message)
761961
connection.send_bytes(digest)
762962
response = connection.recv_bytes(256) # reject large message
763-
if response != WELCOME:
963+
if response != _WELCOME:
764964
raise AuthenticationError('digest sent was rejected')
765965

766966
#
@@ -943,7 +1143,7 @@ def wait(object_list, timeout=None):
9431143
return ready
9441144

9451145
#
946-
# Make connection and socket objects sharable if possible
1146+
# Make connection and socket objects shareable if possible
9471147
#
9481148

9491149
if sys.platform == 'win32':

Lib/multiprocessing/context.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ class Process(process.BaseProcess):
223223
def _Popen(process_obj):
224224
return _default_context.get_context().Process._Popen(process_obj)
225225

226+
@staticmethod
227+
def _after_fork():
228+
return _default_context.get_context().Process._after_fork()
229+
226230
class DefaultContext(BaseContext):
227231
Process = Process
228232

@@ -254,6 +258,7 @@ def get_start_method(self, allow_none=False):
254258
return self._actual_context._name
255259

256260
def get_all_start_methods(self):
261+
"""Returns a list of the supported start methods, default first."""
257262
if sys.platform == 'win32':
258263
return ['spawn']
259264
else:
@@ -283,6 +288,11 @@ def _Popen(process_obj):
283288
from .popen_spawn_posix import Popen
284289
return Popen(process_obj)
285290

291+
@staticmethod
292+
def _after_fork():
293+
# process is spawned, nothing to do
294+
pass
295+
286296
class ForkServerProcess(process.BaseProcess):
287297
_start_method = 'forkserver'
288298
@staticmethod
@@ -326,6 +336,11 @@ def _Popen(process_obj):
326336
from .popen_spawn_win32 import Popen
327337
return Popen(process_obj)
328338

339+
@staticmethod
340+
def _after_fork():
341+
# process is spawned, nothing to do
342+
pass
343+
329344
class SpawnContext(BaseContext):
330345
_name = 'spawn'
331346
Process = SpawnProcess

Lib/multiprocessing/forkserver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def _stop_unlocked(self):
6161

6262
def set_forkserver_preload(self, modules_names):
6363
'''Set list of module names to try to load in forkserver process.'''
64-
if not all(type(mod) is str for mod in self._preload_modules):
64+
if not all(type(mod) is str for mod in modules_names):
6565
raise TypeError('module_names must be a list of strings')
6666
self._preload_modules = modules_names
6767

0 commit comments

Comments
 (0)
0