diff --git a/Doc/library/os.rst b/Doc/library/os.rst index b93b06d4e72afc..57d07d035887b9 100644 --- a/Doc/library/os.rst +++ b/Doc/library/os.rst @@ -3862,6 +3862,158 @@ features: .. versionadded:: 3.10 +.. _os-copying-files: + +Copying Files +~~~~~~~~~~~~~ + +Functions involving a file copy (:func:`copyfile` and :func:`copy`) may use +platform-specific "fast-copy" syscalls in order to copy the file more +efficiently (see :issue:`33671`). +"fast-copy" means that the copying operation occurs within the kernel, avoiding +the use of userspace buffers in Python as in "``outfd.write(infd.read())``". + +On macOS `fcopyfile `_ is used to +copy the file content (not metadata). + +On Linux :func:`os.sendfile` is used. + +On Windows :func:`copyfile` uses a bigger default buffer size (1 MiB +instead of 64 KiB) and a :func:`memoryview`-based variant of +:func:`copyfileobj` is used. + +If the fast-copy operation fails and no data was written in the destination +file then shutil will silently fallback on using less efficient +:func:`copyfileobj` function internally. + + +.. exception:: SameFileError + + This exception is raised if source and destination in :func:`copyfile` + are the same file. + + .. versionadded:: 3.14 + + +.. function:: copyfileobj(fsrc, fdst[, length]) + + Copy the contents of the :term:`file-like object ` *fsrc* to + the file-like object *fdst*. The integer *length*, if given, is the buffer + size. In particular, a negative *length* value means to copy the data + without looping over the source data in chunks; by default the data is read + in chunks to avoid uncontrolled memory consumption. Note that if the + current file position of the *fsrc* object is not 0, only the contents from + the current file position to the end of the file will be copied. + + .. versionadded:: 3.14 + + +.. function:: copyfile(src, dst, *, follow_symlinks=True) + + Copy the contents (no metadata) of the file named *src* to a file named + *dst* and return *dst* in the most efficient way possible. + *src* and *dst* are :term:`path-like objects ` or path + names given as strings. + + *dst* must be the complete target file name; look at :func:`shutil.copy` + for a copy that accepts a target directory path. If *src* and *dst* + specify the same file, :exc:`SameFileError` is raised. + + The destination location must be writable; otherwise, an :exc:`OSError` + exception will be raised. If *dst* already exists, it will be replaced. + Special files such as character or block devices and pipes cannot be + copied with this function. + + If *follow_symlinks* is false and *src* is a symbolic link, + a new symbolic link will be created instead of copying the + file *src* points to. + + .. audit-event:: os.copyfile src,dst os.copyfile + + .. versionadded:: 3.14 + + +.. function:: copymode(src, dst, *, follow_symlinks=True) + + Copy the permission bits from *src* to *dst*. The file contents, owner, and + group are unaffected. *src* and *dst* are + :term:`path-like objects ` or path names given as + strings. If *follow_symlinks* is false, and both *src* and *dst* are + symbolic links, :func:`copymode` will attempt to modify the mode of *dst* + itself (rather than the file it points to). This functionality is not + available on every platform; please see :func:`copystat` for more + information. If :func:`copymode` cannot modify symbolic links on the local + platform, and it is asked to do so, it will do nothing and return. + + .. audit-event:: os.copymode src,dst os.copymode + + .. versionadded:: 3.14 + + +.. function:: copystat(src, dst, *, follow_symlinks=True) + + Copy the permission bits, last access time, last modification time, and + flags from *src* to *dst*. On Linux, :func:`copystat` also copies the + "extended attributes" where possible. The file contents, owner, and group + are unaffected. *src* and *dst* are + :term:`path-like objects ` or path names given as + strings. + + If *follow_symlinks* is false, and *src* and *dst* both refer to symbolic + links, :func:`copystat` will operate on the symbolic links themselves + rather than the files the symbolic links refer to—reading the information + from the *src* symbolic link, and writing the information to the *dst* + symbolic link. + + .. note:: + + Not all platforms provide the ability to examine and modify symbolic + links. Python itself can tell you what functionality is locally + available. + + * If ``os.chmod in os.supports_follow_symlinks`` is ``True``, + :func:`copystat` can modify the permission bits of a symbolic link. + + * If ``os.utime in os.supports_follow_symlinks`` is ``True``, + :func:`copystat` can modify the last access and modification times of + a symbolic link. + + * If ``os.chflags in os.supports_follow_symlinks`` is ``True``, + :func:`copystat` can modify the flags of a symbolic link. + (``os.chflags`` is not available on all platforms.) + + On platforms where some or all of this functionality is unavailable, + when asked to modify a symbolic link, :func:`copystat` will copy + everything it can. :func:`copystat` never returns failure. + + Please see :data:`supports_follow_symlinks` for more information. + + .. audit-event:: os.copystat src,dst os.copystat + + .. versionadded:: 3.14 + + +.. function:: copy(src, dst, *, follow_symlinks=True) + + Copies the file *src* to the file *dst*. *src* and *dst* should be + :term:`path-like objects ` or strings. If *dst* specifies + a file that already exists, it will be replaced. + + When *follow_symlinks* is false, and *src* is a symbolic link, :func:`copy` + attempts to copy all metadata from the *src* symbolic link to the newly + created *dst* symbolic link. However, this functionality is not available + on all platforms. On platforms where some or all of this functionality is + unavailable, :func:`copy` will preserve all the metadata it can; + :func:`copy` never raises an exception because it cannot preserve file + metadata. + + :func:`copy` uses :func:`copystat` to copy the file metadata. Please see + :func:`copystat` for more information about platform support for modifying + symbolic link metadata. + + .. versionadded:: 3.14 + + Timer File Descriptors ~~~~~~~~~~~~~~~~~~~~~~ diff --git a/Doc/library/shutil.rst b/Doc/library/shutil.rst index fd32479195eca8..e913a7e17214a8 100644 --- a/Doc/library/shutil.rst +++ b/Doc/library/shutil.rst @@ -39,35 +39,12 @@ Directory and files operations .. function:: copyfileobj(fsrc, fdst[, length]) - Copy the contents of the :term:`file-like object ` *fsrc* to the file-like object *fdst*. - The integer *length*, if given, is the buffer size. In particular, a negative - *length* value means to copy the data without looping over the source data in - chunks; by default the data is read in chunks to avoid uncontrolled memory - consumption. Note that if the current file position of the *fsrc* object is not - 0, only the contents from the current file position to the end of the file will - be copied. + Alias of :func:`os.copyfileobj`. .. function:: copyfile(src, dst, *, follow_symlinks=True) - Copy the contents (no metadata) of the file named *src* to a file named - *dst* and return *dst* in the most efficient way possible. - *src* and *dst* are :term:`path-like objects ` or path names given as strings. - - *dst* must be the complete target file name; look at :func:`~shutil.copy` - for a copy that accepts a target directory path. If *src* and *dst* - specify the same file, :exc:`SameFileError` is raised. - - The destination location must be writable; otherwise, an :exc:`OSError` - exception will be raised. If *dst* already exists, it will be replaced. - Special files such as character or block devices and pipes cannot be - copied with this function. - - If *follow_symlinks* is false and *src* is a symbolic link, - a new symbolic link will be created instead of copying the - file *src* points to. - - .. audit-event:: shutil.copyfile src,dst shutil.copyfile + Alias of :func:`os.copyfile`. .. versionchanged:: 3.3 :exc:`IOError` used to be raised instead of :exc:`OSError`. @@ -80,77 +57,25 @@ Directory and files operations .. versionchanged:: 3.8 Platform-specific fast-copy syscalls may be used internally in order to - copy the file more efficiently. See - :ref:`shutil-platform-dependent-efficient-copy-operations` section. + copy the file more efficiently. See :ref:`os-copying-files`. .. exception:: SameFileError - This exception is raised if source and destination in :func:`copyfile` - are the same file. + Alias of :exc:`os.SameFileError`. .. versionadded:: 3.4 .. function:: copymode(src, dst, *, follow_symlinks=True) - Copy the permission bits from *src* to *dst*. The file contents, owner, and - group are unaffected. *src* and *dst* are :term:`path-like objects ` or path names - given as strings. - If *follow_symlinks* is false, and both *src* and *dst* are symbolic links, - :func:`copymode` will attempt to modify the mode of *dst* itself (rather - than the file it points to). This functionality is not available on every - platform; please see :func:`copystat` for more information. If - :func:`copymode` cannot modify symbolic links on the local platform, and it - is asked to do so, it will do nothing and return. - - .. audit-event:: shutil.copymode src,dst shutil.copymode + Alias of :func:`os.copymode`. .. versionchanged:: 3.3 Added *follow_symlinks* argument. .. function:: copystat(src, dst, *, follow_symlinks=True) - Copy the permission bits, last access time, last modification time, and - flags from *src* to *dst*. On Linux, :func:`copystat` also copies the - "extended attributes" where possible. The file contents, owner, and - group are unaffected. *src* and *dst* are :term:`path-like objects ` or path - names given as strings. - - If *follow_symlinks* is false, and *src* and *dst* both - refer to symbolic links, :func:`copystat` will operate on - the symbolic links themselves rather than the files the - symbolic links refer to—reading the information from the - *src* symbolic link, and writing the information to the - *dst* symbolic link. - - .. note:: - - Not all platforms provide the ability to examine and - modify symbolic links. Python itself can tell you what - functionality is locally available. - - * If ``os.chmod in os.supports_follow_symlinks`` is - ``True``, :func:`copystat` can modify the permission - bits of a symbolic link. - - * If ``os.utime in os.supports_follow_symlinks`` is - ``True``, :func:`copystat` can modify the last access - and modification times of a symbolic link. - - * If ``os.chflags in os.supports_follow_symlinks`` is - ``True``, :func:`copystat` can modify the flags of - a symbolic link. (``os.chflags`` is not available on - all platforms.) - - On platforms where some or all of this functionality - is unavailable, when asked to modify a symbolic link, - :func:`copystat` will copy everything it can. - :func:`copystat` never returns failure. - - Please see :data:`os.supports_follow_symlinks` - for more information. - - .. audit-event:: shutil.copystat src,dst shutil.copystat + Alias of :func:`os.copystat`. .. versionchanged:: 3.3 Added *follow_symlinks* argument and support for Linux extended attributes. @@ -184,13 +109,13 @@ Directory and files operations .. versionchanged:: 3.8 Platform-specific fast-copy syscalls may be used internally in order to - copy the file more efficiently. See - :ref:`shutil-platform-dependent-efficient-copy-operations` section. + copy the file more efficiently. See :ref:`os-copying-files`. .. function:: copy2(src, dst, *, follow_symlinks=True) Identical to :func:`~shutil.copy` except that :func:`copy2` - also attempts to preserve file metadata. + also attempts to preserve file metadata. Identical to + :func:`os.copy` except that *dst* may be a directory. When *follow_symlinks* is false, and *src* is a symbolic link, :func:`copy2` attempts to copy all metadata from the @@ -216,8 +141,7 @@ Directory and files operations .. versionchanged:: 3.8 Platform-specific fast-copy syscalls may be used internally in order to - copy the file more efficiently. See - :ref:`shutil-platform-dependent-efficient-copy-operations` section. + copy the file more efficiently. See :ref:`os-copying-files`. .. function:: ignore_patterns(*patterns) @@ -286,8 +210,7 @@ Directory and files operations .. versionchanged:: 3.8 Platform-specific fast-copy syscalls may be used internally in order to - copy the file more efficiently. See - :ref:`shutil-platform-dependent-efficient-copy-operations` section. + copy the file more efficiently. See :ref:`os-copying-files`. .. versionchanged:: 3.8 Added the *dirs_exist_ok* parameter. @@ -395,8 +318,7 @@ Directory and files operations .. versionchanged:: 3.8 Platform-specific fast-copy syscalls may be used internally in order to - copy the file more efficiently. See - :ref:`shutil-platform-dependent-efficient-copy-operations` section. + copy the file more efficiently. See :ref:`os-copying-files`. .. versionchanged:: 3.9 Accepts a :term:`path-like object` for both *src* and *dst*. @@ -502,32 +424,6 @@ Directory and files operations operation. For :func:`copytree`, the exception argument is a list of 3-tuples (*srcname*, *dstname*, *exception*). -.. _shutil-platform-dependent-efficient-copy-operations: - -Platform-dependent efficient copy operations -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Starting from Python 3.8, all functions involving a file copy -(:func:`copyfile`, :func:`~shutil.copy`, :func:`copy2`, -:func:`copytree`, and :func:`move`) may use -platform-specific "fast-copy" syscalls in order to copy the file more -efficiently (see :issue:`33671`). -"fast-copy" means that the copying operation occurs within the kernel, avoiding -the use of userspace buffers in Python as in "``outfd.write(infd.read())``". - -On macOS `fcopyfile`_ is used to copy the file content (not metadata). - -On Linux :func:`os.sendfile` is used. - -On Windows :func:`shutil.copyfile` uses a bigger default buffer size (1 MiB -instead of 64 KiB) and a :func:`memoryview`-based variant of -:func:`shutil.copyfileobj` is used. - -If the fast-copy operation fails and no data was written in the destination -file then shutil will silently fallback on using less efficient -:func:`copyfileobj` function internally. - -.. versionchanged:: 3.8 .. _shutil-copytree-example: diff --git a/Doc/whatsnew/3.8.rst b/Doc/whatsnew/3.8.rst index 1356f24547b424..d3d8ab841b6d66 100644 --- a/Doc/whatsnew/3.8.rst +++ b/Doc/whatsnew/3.8.rst @@ -1481,7 +1481,7 @@ Optimizations The speedup for copying a 512 MiB file within the same partition is about +26% on Linux, +50% on macOS and +40% on Windows. Also, much less CPU cycles are consumed. - See :ref:`shutil-platform-dependent-efficient-copy-operations` section. + See :ref:`os-copying-files`. (Contributed by Giampaolo Rodolà in :issue:`33671`.) * :func:`shutil.copytree` uses :func:`os.scandir` function and all copy @@ -1921,8 +1921,7 @@ Changes in the Python API * :func:`shutil.copyfile`, :func:`shutil.copy`, :func:`shutil.copy2`, :func:`shutil.copytree` and :func:`shutil.move` use platform-specific - "fast-copy" syscalls (see - :ref:`shutil-platform-dependent-efficient-copy-operations` section). + "fast-copy" syscalls (see :ref:`os-copying-files`). * :func:`shutil.copyfile` default buffer size on Windows was changed from 16 KiB to 1 MiB. diff --git a/Lib/os.py b/Lib/os.py index 7661ce68ca3be2..49ba577e393cfb 100644 --- a/Lib/os.py +++ b/Lib/os.py @@ -23,6 +23,7 @@ #' import abc +import errno import sys import stat as st @@ -65,6 +66,7 @@ def _get_exports_list(module): except ImportError: pass + _winapi = None import posix __all__.extend(_get_exports_list(posix)) del posix @@ -80,6 +82,7 @@ def _get_exports_list(module): pass import ntpath as path + import _winapi import nt __all__.extend(_get_exports_list(nt)) del nt @@ -1152,3 +1155,383 @@ def process_cpu_count(): else: # Just an alias to cpu_count() (same docstring) process_cpu_count = cpu_count + + +# This should never be removed, see rationale in: +# https://bugs.python.org/issue43743#msg393429 +_USE_CP_SENDFILE = (_exists("sendfile") + and sys.platform.startswith(("linux", "android"))) +_HAS_FCOPYFILE = _exists("_fcopyfile") # macOS +COPY_BUFSIZE = 1024 * 1024 if name == 'nt' else 64 * 1024 + +class CopyError(OSError): + pass + +class SameFileError(CopyError): + """Raised when source and destination are the same file.""" + +class SpecialFileError(OSError): + """Raised when trying to do a kind of operation (e.g. copying) which is + not supported on a special file (e.g. a named pipe)""" + +class _GiveupOnFastCopy(Exception): + """Raised as a signal to fallback on using raw read()/write() + file copy when fast-copy functions fail to do so. + """ + +def _fastcopy_fcopyfile(fsrc, fdst, flags): + """Copy a regular file content or metadata by using high-performance + fcopyfile(3) syscall (macOS). + """ + try: + infd = fsrc.fileno() + outfd = fdst.fileno() + except Exception as err: + raise _GiveupOnFastCopy(err) # not a regular file + + try: + _fcopyfile(infd, outfd, flags) + except OSError as err: + err.filename = fsrc.name + err.filename2 = fdst.name + if err.errno in {errno.EINVAL, errno.ENOTSUP}: + raise _GiveupOnFastCopy(err) + else: + raise err from None + +def _fastcopy_sendfile(fsrc, fdst): + """Copy data from one regular mmap-like fd to another by using + high-performance sendfile(2) syscall. + This should work on Linux >= 2.6.33 only. + """ + # Note: copyfileobj() is left alone in order to not introduce any + # unexpected breakage. Possible risks by using zero-copy calls + # in copyfileobj() are: + # - fdst cannot be open in "a"(ppend) mode + # - fsrc and fdst may be open in "t"(ext) mode + # - fsrc may be a BufferedReader (which hides unread data in a buffer), + # GzipFile (which decompresses data), HTTPResponse (which decodes + # chunks). + # - possibly others (e.g. encrypted fs/partition?) + global _USE_CP_SENDFILE + try: + infd = fsrc.fileno() + outfd = fdst.fileno() + except Exception as err: + raise _GiveupOnFastCopy(err) # not a regular file + + # Hopefully the whole file will be copied in a single call. + # sendfile() is called in a loop 'till EOF is reached (0 return) + # so a bufsize smaller or bigger than the actual file size + # should not make any difference, also in case the file content + # changes while being copied. + try: + blocksize = max(fstat(infd).st_size, 2 ** 23) # min 8MiB + except OSError: + blocksize = 2 ** 27 # 128MiB + # On 32-bit architectures truncate to 1GiB to avoid OverflowError, + # see bpo-38319. + if sys.maxsize < 2 ** 32: + blocksize = min(blocksize, 2 ** 30) + + offset = 0 + while True: + try: + sent = sendfile(outfd, infd, offset, blocksize) + except OSError as err: + # ...in oder to have a more informative exception. + err.filename = fsrc.name + err.filename2 = fdst.name + + if err.errno == errno.ENOTSOCK: + # sendfile() on this platform (probably Linux < 2.6.33) + # does not support copies between regular files (only + # sockets). + _USE_CP_SENDFILE = False + raise _GiveupOnFastCopy(err) + + if err.errno == errno.ENOSPC: # filesystem is full + raise err from None + + # Give up on first call and if no data was copied. + if offset == 0 and lseek(outfd, 0, SEEK_CUR) == 0: + raise _GiveupOnFastCopy(err) + + raise err + else: + if sent == 0: + break # EOF + offset += sent + +def _fastcopy_readinto(fsrc, fdst, length=COPY_BUFSIZE): + """readinto()/memoryview() based variant of copyfileobj(). + *fsrc* must support readinto() method and both files must be + open in binary mode. + """ + # Localize variable access to minimize overhead. + fsrc_readinto = fsrc.readinto + fdst_write = fdst.write + with memoryview(bytearray(length)) as mv: + while True: + n = fsrc_readinto(mv) + if not n: + break + elif n < length: + with mv[:n] as smv: + fdst_write(smv) + break + else: + fdst_write(mv) + +def _fastcopy(fsrc, fdst, file_size): + # macOS + if _HAS_FCOPYFILE: + try: + _fastcopy_fcopyfile(fsrc, fdst, _COPYFILE_DATA) + return + except _GiveupOnFastCopy: + pass + # Linux + elif _USE_CP_SENDFILE: + try: + _fastcopy_sendfile(fsrc, fdst) + return + except _GiveupOnFastCopy: + pass + # Windows, see: + # https://github.com/python/cpython/pull/7160#discussion_r195405230 + elif name == 'nt' and file_size > 0: + _fastcopy_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) + return + + copyfileobj(fsrc, fdst) + +def copyfileobj(fsrc, fdst, length=0): + """copy data from file-like object fsrc to file-like object fdst""" + if not length: + length = COPY_BUFSIZE + # Localize variable access to minimize overhead. + fsrc_read = fsrc.read + fdst_write = fdst.write + while buf := fsrc_read(length): + fdst_write(buf) + +def _samefile(src, dst): + # Macintosh, Unix. + if isinstance(src, DirEntry): + try: + return path.samestat(src.stat(), stat(dst)) + except OSError: + return False + + try: + return path.samefile(src, dst) + except OSError: + return False + +def _stat(fn): + return fn.stat() if isinstance(fn, DirEntry) else stat(fn) + +def _islink(fn): + return fn.is_symlink() if isinstance(fn, DirEntry) else path.islink(fn) + +def copyfile(src, dst, *, follow_symlinks=True): + """Copy data from src to dst in the most efficient way possible. + + If follow_symlinks is not set and src is a symbolic link, a new + symlink will be created instead of copying the file it points to. + + """ + sys.audit("os.copyfile", src, dst) + + if _samefile(src, dst): + raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) + + file_size = 0 + for i, fn in enumerate([src, dst]): + try: + stat_result = _stat(fn) + except OSError: + # File most likely does not exist + pass + else: + # XXX What about other special files? (sockets, devices...) + if st.S_ISFIFO(stat_result.st_mode): + fn = fn.path if isinstance(fn, DirEntry) else fn + raise SpecialFileError("`%s` is a named pipe" % fn) + if name == 'nt' and i == 0: + file_size = stat_result.st_size + + if not follow_symlinks and _islink(src): + symlink(readlink(src), dst) + else: + import io + with io.open(src, 'rb') as fsrc: + try: + with io.open(dst, 'wb') as fdst: + _fastcopy(fsrc, fdst, file_size) + + # Issue 43219, raise a less confusing exception + except IsADirectoryError as e: + if not path.exists(dst): + raise FileNotFoundError(f'Directory does not exist: {dst}') from e + else: + raise + + return dst + +def copymode(src, dst, *, follow_symlinks=True): + """Copy mode bits from src to dst. + + If follow_symlinks is not set, symlinks aren't followed if and only + if both `src` and `dst` are symlinks. If `lchmod` isn't available + (e.g. Linux) this method does nothing. + + """ + sys.audit("os.copymode", src, dst) + + if not follow_symlinks and _islink(src) and path.islink(dst): + if _exists('lchmod'): + stat_func, chmod_func = lstat, lchmod + else: + return + else: + stat_func = _stat + if name == 'nt' and path.islink(dst): + def chmod_func(*args): + chmod(*args, follow_symlinks=True) + else: + chmod_func = chmod + + stat_result = stat_func(src) + chmod_func(dst, st.S_IMODE(stat_result.st_mode)) + +if _exists('listxattr'): + def _copyxattr(src, dst, *, follow_symlinks=True): + """Copy extended filesystem attributes from `src` to `dst`. + + Overwrite existing attributes. + + If `follow_symlinks` is false, symlinks won't be followed. + + """ + + try: + names = listxattr(src, follow_symlinks=follow_symlinks) + except OSError as e: + if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): + raise + return + for name in names: + try: + value = getxattr(src, name, follow_symlinks=follow_symlinks) + setxattr(dst, name, value, follow_symlinks=follow_symlinks) + except OSError as e: + if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, + errno.EINVAL, errno.EACCES): + raise +else: + def _copyxattr(*args, **kwargs): + pass + +def copystat(src, dst, *, follow_symlinks=True): + """Copy file metadata + + Copy the permission bits, last access time, last modification time, and + flags from `src` to `dst`. On Linux, copystat() also copies the "extended + attributes" where possible. The file contents, owner, and group are + unaffected. `src` and `dst` are path-like objects or path names given as + strings. + + If the optional flag `follow_symlinks` is not set, symlinks aren't + followed if and only if both `src` and `dst` are symlinks. + """ + sys.audit("os.copystat", src, dst) + + def _nop(*args, ns=None, follow_symlinks=None): + pass + + # follow symlinks (aka don't not follow symlinks) + follow = follow_symlinks or not (_islink(src) and path.islink(dst)) + if follow: + # use the real function if it exists + def lookup(name): + return globals().get(name, _nop) + else: + # use the real function only if it exists + # *and* it supports follow_symlinks + def lookup(name): + fn = globals().get(name, _nop) + if fn in supports_follow_symlinks: + return fn + return _nop + + if isinstance(src, DirEntry): + stat_result = src.stat(follow_symlinks=follow) + else: + stat_result = lookup("stat")(src, follow_symlinks=follow) + mode = st.S_IMODE(stat_result.st_mode) + lookup("utime")(dst, ns=(stat_result.st_atime_ns, stat_result.st_mtime_ns), + follow_symlinks=follow) + # We must copy extended attributes before the file is (potentially) + # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. + _copyxattr(src, dst, follow_symlinks=follow) + try: + lookup("chmod")(dst, mode, follow_symlinks=follow) + except NotImplementedError: + # if we got a NotImplementedError, it's because + # * follow_symlinks=False, + # * lchown() is unavailable, and + # * either + # * fchownat() is unavailable or + # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. + # (it returned ENOSUP.) + # therefore we're out of options--we simply cannot chown the + # symlink. give up, suppress the error. + # (which is what shutil always did in this circumstance.) + pass + if hasattr(st, 'st_flags'): + try: + lookup("chflags")(dst, stat_result.st_flags, follow_symlinks=follow) + except OSError as why: + for err in 'EOPNOTSUPP', 'ENOTSUP': + if hasattr(errno, err) and why.errno == getattr(errno, err): + break + else: + raise + +def copy(src, dst, *, follow_symlinks=True): + """Copy data and metadata. + + Metadata is copied with copystat(). Please see the copystat function + for more information. + + If follow_symlinks is false, symlinks won't be followed. This + resembles GNU's "cp -P src dst". + """ + if hasattr(_winapi, "CopyFile2"): + src_ = fsdecode(src) + dst_ = fsdecode(dst) + flags = _winapi.COPY_FILE_ALLOW_DECRYPTED_DESTINATION # for compat + if not follow_symlinks: + flags |= _winapi.COPY_FILE_COPY_SYMLINK + try: + _winapi.CopyFile2(src_, dst_, flags) + return + except OSError as exc: + if (exc.winerror == _winapi.ERROR_PRIVILEGE_NOT_HELD + and not follow_symlinks): + # Likely encountered a symlink we aren't allowed to create. + # Fall back on the old code + pass + elif exc.winerror == _winapi.ERROR_ACCESS_DENIED: + # Possibly encountered a hidden or readonly file we can't + # overwrite. Fall back on old code + pass + else: + raise + + copyfile(src, dst, follow_symlinks=follow_symlinks) + copystat(src, dst, follow_symlinks=follow_symlinks) + +__all__.extend(["copyfileobj", "copyfile", "copymode", "copystat", "copy"]) diff --git a/Lib/shutil.py b/Lib/shutil.py index c9b4da34b1e19b..b04cb7ebbcbb2d 100644 --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -33,10 +33,8 @@ _LZMA_SUPPORTED = False _WINDOWS = os.name == 'nt' -posix = nt = None -if os.name == 'posix': - import posix -elif _WINDOWS: +nt = None +if _WINDOWS: import nt if sys.platform == 'win32': @@ -44,12 +42,14 @@ else: _winapi = None -COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 -# This should never be removed, see rationale in: -# https://bugs.python.org/issue43743#msg393429 -_USE_CP_SENDFILE = (hasattr(os, "sendfile") - and sys.platform.startswith(("linux", "android"))) -_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS +COPY_BUFSIZE = os.COPY_BUFSIZE +Error = os.CopyError +SameFileError = os.SameFileError +SpecialFileError = os.SpecialFileError +copyfileobj = os.copyfileobj +copyfile = os.copyfile +copymode = os.copymode +copystat = os.copystat # CMD defaults in Windows 10 _WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" @@ -64,16 +64,6 @@ "SameFileError"] # disk_usage is added later, if available on the platform -class Error(OSError): - pass - -class SameFileError(Error): - """Raised when source and destination are the same file.""" - -class SpecialFileError(OSError): - """Raised when trying to do a kind of operation (e.g. copying) which is - not supported on a special file (e.g. a named pipe)""" - class ExecError(OSError): """Raised when a command could not be executed""" @@ -84,333 +74,6 @@ class RegistryError(Exception): """Raised when a registry operation with the archiving and unpacking registries fails""" -class _GiveupOnFastCopy(Exception): - """Raised as a signal to fallback on using raw read()/write() - file copy when fast-copy functions fail to do so. - """ - -def _fastcopy_fcopyfile(fsrc, fdst, flags): - """Copy a regular file content or metadata by using high-performance - fcopyfile(3) syscall (macOS). - """ - try: - infd = fsrc.fileno() - outfd = fdst.fileno() - except Exception as err: - raise _GiveupOnFastCopy(err) # not a regular file - - try: - posix._fcopyfile(infd, outfd, flags) - except OSError as err: - err.filename = fsrc.name - err.filename2 = fdst.name - if err.errno in {errno.EINVAL, errno.ENOTSUP}: - raise _GiveupOnFastCopy(err) - else: - raise err from None - -def _fastcopy_sendfile(fsrc, fdst): - """Copy data from one regular mmap-like fd to another by using - high-performance sendfile(2) syscall. - This should work on Linux >= 2.6.33 only. - """ - # Note: copyfileobj() is left alone in order to not introduce any - # unexpected breakage. Possible risks by using zero-copy calls - # in copyfileobj() are: - # - fdst cannot be open in "a"(ppend) mode - # - fsrc and fdst may be open in "t"(ext) mode - # - fsrc may be a BufferedReader (which hides unread data in a buffer), - # GzipFile (which decompresses data), HTTPResponse (which decodes - # chunks). - # - possibly others (e.g. encrypted fs/partition?) - global _USE_CP_SENDFILE - try: - infd = fsrc.fileno() - outfd = fdst.fileno() - except Exception as err: - raise _GiveupOnFastCopy(err) # not a regular file - - # Hopefully the whole file will be copied in a single call. - # sendfile() is called in a loop 'till EOF is reached (0 return) - # so a bufsize smaller or bigger than the actual file size - # should not make any difference, also in case the file content - # changes while being copied. - try: - blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB - except OSError: - blocksize = 2 ** 27 # 128MiB - # On 32-bit architectures truncate to 1GiB to avoid OverflowError, - # see bpo-38319. - if sys.maxsize < 2 ** 32: - blocksize = min(blocksize, 2 ** 30) - - offset = 0 - while True: - try: - sent = os.sendfile(outfd, infd, offset, blocksize) - except OSError as err: - # ...in oder to have a more informative exception. - err.filename = fsrc.name - err.filename2 = fdst.name - - if err.errno == errno.ENOTSOCK: - # sendfile() on this platform (probably Linux < 2.6.33) - # does not support copies between regular files (only - # sockets). - _USE_CP_SENDFILE = False - raise _GiveupOnFastCopy(err) - - if err.errno == errno.ENOSPC: # filesystem is full - raise err from None - - # Give up on first call and if no data was copied. - if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: - raise _GiveupOnFastCopy(err) - - raise err - else: - if sent == 0: - break # EOF - offset += sent - -def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): - """readinto()/memoryview() based variant of copyfileobj(). - *fsrc* must support readinto() method and both files must be - open in binary mode. - """ - # Localize variable access to minimize overhead. - fsrc_readinto = fsrc.readinto - fdst_write = fdst.write - with memoryview(bytearray(length)) as mv: - while True: - n = fsrc_readinto(mv) - if not n: - break - elif n < length: - with mv[:n] as smv: - fdst_write(smv) - break - else: - fdst_write(mv) - -def copyfileobj(fsrc, fdst, length=0): - """copy data from file-like object fsrc to file-like object fdst""" - if not length: - length = COPY_BUFSIZE - # Localize variable access to minimize overhead. - fsrc_read = fsrc.read - fdst_write = fdst.write - while buf := fsrc_read(length): - fdst_write(buf) - -def _samefile(src, dst): - # Macintosh, Unix. - if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): - try: - return os.path.samestat(src.stat(), os.stat(dst)) - except OSError: - return False - - if hasattr(os.path, 'samefile'): - try: - return os.path.samefile(src, dst) - except OSError: - return False - - # All other platforms: check for same pathname. - return (os.path.normcase(os.path.abspath(src)) == - os.path.normcase(os.path.abspath(dst))) - -def _stat(fn): - return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) - -def _islink(fn): - return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) - -def copyfile(src, dst, *, follow_symlinks=True): - """Copy data from src to dst in the most efficient way possible. - - If follow_symlinks is not set and src is a symbolic link, a new - symlink will be created instead of copying the file it points to. - - """ - sys.audit("shutil.copyfile", src, dst) - - if _samefile(src, dst): - raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) - - file_size = 0 - for i, fn in enumerate([src, dst]): - try: - st = _stat(fn) - except OSError: - # File most likely does not exist - pass - else: - # XXX What about other special files? (sockets, devices...) - if stat.S_ISFIFO(st.st_mode): - fn = fn.path if isinstance(fn, os.DirEntry) else fn - raise SpecialFileError("`%s` is a named pipe" % fn) - if _WINDOWS and i == 0: - file_size = st.st_size - - if not follow_symlinks and _islink(src): - os.symlink(os.readlink(src), dst) - else: - with open(src, 'rb') as fsrc: - try: - with open(dst, 'wb') as fdst: - # macOS - if _HAS_FCOPYFILE: - try: - _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) - return dst - except _GiveupOnFastCopy: - pass - # Linux - elif _USE_CP_SENDFILE: - try: - _fastcopy_sendfile(fsrc, fdst) - return dst - except _GiveupOnFastCopy: - pass - # Windows, see: - # https://github.com/python/cpython/pull/7160#discussion_r195405230 - elif _WINDOWS and file_size > 0: - _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) - return dst - - copyfileobj(fsrc, fdst) - - # Issue 43219, raise a less confusing exception - except IsADirectoryError as e: - if not os.path.exists(dst): - raise FileNotFoundError(f'Directory does not exist: {dst}') from e - else: - raise - - return dst - -def copymode(src, dst, *, follow_symlinks=True): - """Copy mode bits from src to dst. - - If follow_symlinks is not set, symlinks aren't followed if and only - if both `src` and `dst` are symlinks. If `lchmod` isn't available - (e.g. Linux) this method does nothing. - - """ - sys.audit("shutil.copymode", src, dst) - - if not follow_symlinks and _islink(src) and os.path.islink(dst): - if hasattr(os, 'lchmod'): - stat_func, chmod_func = os.lstat, os.lchmod - else: - return - else: - stat_func = _stat - if os.name == 'nt' and os.path.islink(dst): - def chmod_func(*args): - os.chmod(*args, follow_symlinks=True) - else: - chmod_func = os.chmod - - st = stat_func(src) - chmod_func(dst, stat.S_IMODE(st.st_mode)) - -if hasattr(os, 'listxattr'): - def _copyxattr(src, dst, *, follow_symlinks=True): - """Copy extended filesystem attributes from `src` to `dst`. - - Overwrite existing attributes. - - If `follow_symlinks` is false, symlinks won't be followed. - - """ - - try: - names = os.listxattr(src, follow_symlinks=follow_symlinks) - except OSError as e: - if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): - raise - return - for name in names: - try: - value = os.getxattr(src, name, follow_symlinks=follow_symlinks) - os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) - except OSError as e: - if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, - errno.EINVAL, errno.EACCES): - raise -else: - def _copyxattr(*args, **kwargs): - pass - -def copystat(src, dst, *, follow_symlinks=True): - """Copy file metadata - - Copy the permission bits, last access time, last modification time, and - flags from `src` to `dst`. On Linux, copystat() also copies the "extended - attributes" where possible. The file contents, owner, and group are - unaffected. `src` and `dst` are path-like objects or path names given as - strings. - - If the optional flag `follow_symlinks` is not set, symlinks aren't - followed if and only if both `src` and `dst` are symlinks. - """ - sys.audit("shutil.copystat", src, dst) - - def _nop(*args, ns=None, follow_symlinks=None): - pass - - # follow symlinks (aka don't not follow symlinks) - follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) - if follow: - # use the real function if it exists - def lookup(name): - return getattr(os, name, _nop) - else: - # use the real function only if it exists - # *and* it supports follow_symlinks - def lookup(name): - fn = getattr(os, name, _nop) - if fn in os.supports_follow_symlinks: - return fn - return _nop - - if isinstance(src, os.DirEntry): - st = src.stat(follow_symlinks=follow) - else: - st = lookup("stat")(src, follow_symlinks=follow) - mode = stat.S_IMODE(st.st_mode) - lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), - follow_symlinks=follow) - # We must copy extended attributes before the file is (potentially) - # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. - _copyxattr(src, dst, follow_symlinks=follow) - try: - lookup("chmod")(dst, mode, follow_symlinks=follow) - except NotImplementedError: - # if we got a NotImplementedError, it's because - # * follow_symlinks=False, - # * lchown() is unavailable, and - # * either - # * fchownat() is unavailable or - # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. - # (it returned ENOSUP.) - # therefore we're out of options--we simply cannot chown the - # symlink. give up, suppress the error. - # (which is what shutil always did in this circumstance.) - pass - if hasattr(st, 'st_flags'): - try: - lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) - except OSError as why: - for err in 'EOPNOTSUPP', 'ENOTSUP': - if hasattr(errno, err) and why.errno == getattr(errno, err): - break - else: - raise - def copy(src, dst, *, follow_symlinks=True): """Copy data and mode bits ("cp src dst"). Return the file's destination. @@ -425,8 +88,8 @@ def copy(src, dst, *, follow_symlinks=True): """ if os.path.isdir(dst): dst = os.path.join(dst, os.path.basename(src)) - copyfile(src, dst, follow_symlinks=follow_symlinks) - copymode(src, dst, follow_symlinks=follow_symlinks) + os.copyfile(src, dst, follow_symlinks=follow_symlinks) + os.copymode(src, dst, follow_symlinks=follow_symlinks) return dst def copy2(src, dst, *, follow_symlinks=True): @@ -442,31 +105,7 @@ def copy2(src, dst, *, follow_symlinks=True): """ if os.path.isdir(dst): dst = os.path.join(dst, os.path.basename(src)) - - if hasattr(_winapi, "CopyFile2"): - src_ = os.fsdecode(src) - dst_ = os.fsdecode(dst) - flags = _winapi.COPY_FILE_ALLOW_DECRYPTED_DESTINATION # for compat - if not follow_symlinks: - flags |= _winapi.COPY_FILE_COPY_SYMLINK - try: - _winapi.CopyFile2(src_, dst_, flags) - return dst - except OSError as exc: - if (exc.winerror == _winapi.ERROR_PRIVILEGE_NOT_HELD - and not follow_symlinks): - # Likely encountered a symlink we aren't allowed to create. - # Fall back on the old code - pass - elif exc.winerror == _winapi.ERROR_ACCESS_DENIED: - # Possibly encountered a hidden or readonly file we can't - # overwrite. Fall back on old code - pass - else: - raise - - copyfile(src, dst, follow_symlinks=follow_symlinks) - copystat(src, dst, follow_symlinks=follow_symlinks) + os.copy(src, dst, follow_symlinks=follow_symlinks) return dst def ignore_patterns(*patterns): @@ -886,7 +525,7 @@ def move(src, dst, copy_function=copy2): sys.audit("shutil.move", src, dst) real_dst = dst if os.path.isdir(dst): - if _samefile(src, dst) and not os.path.islink(src): + if os._samefile(src, dst) and not os.path.islink(src): # We might be on a case insensitive filesystem, # perform the rename anyway. os.rename(src, dst) @@ -933,7 +572,7 @@ def _destinsrc(src, dst): return dst.startswith(src) def _is_immutable(src): - st = _stat(src) + st = os._stat(src) immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE] return hasattr(st, 'st_flags') and st.st_flags in immutable_states diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 941fa2b2c5c87f..a5192c013839e7 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -10,16 +10,19 @@ import fnmatch import fractions import itertools +import io import locale import os import pickle import platform +import random import select import selectors import shutil import signal import socket import stat +import string import struct import subprocess import sys @@ -29,6 +32,7 @@ import time import types import unittest +import unittest.mock import uuid import warnings from test import support @@ -70,7 +74,7 @@ from test.support.script_helper import assert_python_ok from test.support import unix_shell -from test.support.os_helper import FakePath +from test.support.os_helper import FakePath, TESTFN root_in_posix = False @@ -5411,6 +5415,946 @@ class TestPEP519PurePython(TestPEP519): fspath = staticmethod(os._fspath) +def write_file(path, content, binary=False): + """Write *content* to a file located at *path*. + + If *path* is a tuple instead of a string, os.path.join will be used to + make a path. If *binary* is true, the file will be opened in binary + mode. + """ + if isinstance(path, tuple): + path = os.path.join(*path) + mode = 'wb' if binary else 'w' + encoding = None if binary else "utf-8" + with open(path, mode, encoding=encoding) as fp: + fp.write(content) + +def write_test_file(path, size): + """Create a test file with an arbitrary size and random text content.""" + def chunks(total, step): + assert total >= step + while total > step: + yield step + total -= step + if total: + yield total + + bufsize = min(size, 8192) + chunk = b"".join([random.choice(string.ascii_letters).encode() + for i in range(bufsize)]) + with open(path, 'wb') as f: + for csize in chunks(size, bufsize): + f.write(chunk) + assert os.path.getsize(path) == size + +def read_file(path, binary=False): + """Return contents from a file located at *path*. + + If *path* is a tuple instead of a string, os.path.join will be used to + make a path. If *binary* is true, the file will be opened in binary + mode. + """ + if isinstance(path, tuple): + path = os.path.join(*path) + mode = 'rb' if binary else 'r' + encoding = None if binary else "utf-8" + with open(path, mode, encoding=encoding) as fp: + return fp.read() + +def supports_file2file_sendfile(): + # ...apparently Linux and Solaris are the only ones + if not hasattr(os, "sendfile"): + return False + srcname = None + dstname = None + try: + with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f: + srcname = f.name + f.write(b"0123456789") + + with open(srcname, "rb") as src: + with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst: + dstname = dst.name + infd = src.fileno() + outfd = dst.fileno() + try: + os.sendfile(outfd, infd, 0, 2) + except OSError: + return False + else: + return True + finally: + if srcname is not None: + os_helper.unlink(srcname) + if dstname is not None: + os_helper.unlink(dstname) + +SUPPORTS_SENDFILE = supports_file2file_sendfile() +TESTFN2 = TESTFN + "2" +MACOS = sys.platform.startswith("darwin") +SOLARIS = sys.platform.startswith("sunos") +AIX = sys.platform[:3] == 'aix' + + +class _CopyTest: + def mkdtemp(self, prefix=None): + """Create a temporary directory that will be cleaned up. + + Returns the path of the directory. + """ + d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd()) + self.addCleanup(os_helper.rmtree, d) + return d + + +class TestCopyMode(_CopyTest, unittest.TestCase): + @os_helper.skip_unless_symlink + def test_copymode_follow_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'quux') + write_file(src, 'foo') + write_file(dst, 'foo') + os.symlink(src, src_link) + os.symlink(dst, dst_link) + os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) + # file to file + os.chmod(dst, stat.S_IRWXO) + self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + os.copymode(src, dst) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # On Windows, os.chmod does not follow symlinks (issue #15411) + # follow src link + os.chmod(dst, stat.S_IRWXO) + os.copymode(src_link, dst) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # follow dst link + os.chmod(dst, stat.S_IRWXO) + os.copymode(src, dst_link) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # follow both links + os.chmod(dst, stat.S_IRWXO) + os.copymode(src_link, dst_link) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + + @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod') + @os_helper.skip_unless_symlink + def test_copymode_symlink_to_symlink(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'quux') + write_file(src, 'foo') + write_file(dst, 'foo') + os.symlink(src, src_link) + os.symlink(dst, dst_link) + os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) + os.chmod(dst, stat.S_IRWXU) + os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) + # link to link + os.lchmod(dst_link, stat.S_IRWXO) + old_mode = os.stat(dst).st_mode + os.copymode(src_link, dst_link, follow_symlinks=False) + self.assertEqual(os.lstat(src_link).st_mode, + os.lstat(dst_link).st_mode) + self.assertEqual(os.stat(dst).st_mode, old_mode) + # src link - use chmod + os.lchmod(dst_link, stat.S_IRWXO) + os.copymode(src_link, dst, follow_symlinks=False) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # dst link - use chmod + os.lchmod(dst_link, stat.S_IRWXO) + os.copymode(src, dst_link, follow_symlinks=False) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + + @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing') + @os_helper.skip_unless_symlink + def test_copymode_symlink_to_symlink_wo_lchmod(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'quux') + write_file(src, 'foo') + write_file(dst, 'foo') + os.symlink(src, src_link) + os.symlink(dst, dst_link) + os.copymode(src_link, dst_link, follow_symlinks=False) # silent fail + +class TestCopyStat(_CopyTest, unittest.TestCase): + + @os_helper.skip_unless_symlink + def test_copystat_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'qux') + write_file(src, 'foo') + src_stat = os.stat(src) + os.utime(src, (src_stat.st_atime, + src_stat.st_mtime - 42.0)) # ensure different mtimes + write_file(dst, 'bar') + self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) + os.symlink(src, src_link) + os.symlink(dst, dst_link) + if hasattr(os, 'lchmod'): + os.lchmod(src_link, stat.S_IRWXO) + if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): + os.lchflags(src_link, stat.UF_NODUMP) + src_link_stat = os.lstat(src_link) + # follow + if hasattr(os, 'lchmod'): + os.copystat(src_link, dst_link, follow_symlinks=True) + self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) + # don't follow + os.copystat(src_link, dst_link, follow_symlinks=False) + dst_link_stat = os.lstat(dst_link) + if os.utime in os.supports_follow_symlinks: + for attr in 'st_atime', 'st_mtime': + # The modification times may be truncated in the new file. + self.assertLessEqual(getattr(src_link_stat, attr), + getattr(dst_link_stat, attr) + 1) + if hasattr(os, 'lchmod'): + self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) + if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): + self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) + # tell to follow but dst is not a link + os.copystat(src_link, dst, follow_symlinks=False) + self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < + 00000.1) + + @unittest.skipUnless(hasattr(os, 'chflags') and + hasattr(errno, 'EOPNOTSUPP') and + hasattr(errno, 'ENOTSUP'), + "requires os.chflags, EOPNOTSUPP & ENOTSUP") + def test_copystat_handles_harmless_chflags_errors(self): + tmpdir = self.mkdtemp() + file1 = os.path.join(tmpdir, 'file1') + file2 = os.path.join(tmpdir, 'file2') + write_file(file1, 'xxx') + write_file(file2, 'xxx') + + def make_chflags_raiser(err): + ex = OSError() + + def _chflags_raiser(path, flags, *, follow_symlinks=True): + ex.errno = err + raise ex + return _chflags_raiser + old_chflags = os.chflags + try: + for err in errno.EOPNOTSUPP, errno.ENOTSUP: + os.chflags = make_chflags_raiser(err) + os.copystat(file1, file2) + # assert others errors break it + os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) + self.assertRaises(OSError, os.copystat, file1, file2) + finally: + os.chflags = old_chflags + +class TestCopyXAttr(_CopyTest, unittest.TestCase): + + @os_helper.skip_unless_xattr + def test_copyxattr(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + write_file(src, 'foo') + dst = os.path.join(tmp_dir, 'bar') + write_file(dst, 'bar') + + # no xattr == no problem + os._copyxattr(src, dst) + # common case + os.setxattr(src, 'user.foo', b'42') + os.setxattr(src, 'user.bar', b'43') + os._copyxattr(src, dst) + self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst))) + self.assertEqual( + os.getxattr(src, 'user.foo'), + os.getxattr(dst, 'user.foo')) + # check errors don't affect other attrs + os.remove(dst) + write_file(dst, 'bar') + os_error = OSError(errno.EPERM, 'EPERM') + + def _raise_on_user_foo(fname, attr, val, **kwargs): + if attr == 'user.foo': + raise os_error + else: + orig_setxattr(fname, attr, val, **kwargs) + try: + orig_setxattr = os.setxattr + os.setxattr = _raise_on_user_foo + os._copyxattr(src, dst) + self.assertIn('user.bar', os.listxattr(dst)) + finally: + os.setxattr = orig_setxattr + # the source filesystem not supporting xattrs should be ok, too. + def _raise_on_src(fname, *, follow_symlinks=True): + if fname == src: + raise OSError(errno.ENOTSUP, 'Operation not supported') + return orig_listxattr(fname, follow_symlinks=follow_symlinks) + try: + orig_listxattr = os.listxattr + os.listxattr = _raise_on_src + os._copyxattr(src, dst) + finally: + os.listxattr = orig_listxattr + + # test that os.copystat copies xattrs + src = os.path.join(tmp_dir, 'the_original') + srcro = os.path.join(tmp_dir, 'the_original_ro') + write_file(src, src) + write_file(srcro, srcro) + os.setxattr(src, 'user.the_value', b'fiddly') + os.setxattr(srcro, 'user.the_value', b'fiddly') + os.chmod(srcro, 0o444) + dst = os.path.join(tmp_dir, 'the_copy') + dstro = os.path.join(tmp_dir, 'the_copy_ro') + write_file(dst, dst) + write_file(dstro, dstro) + os.copystat(src, dst) + os.copystat(srcro, dstro) + self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly') + self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly') + + @os_helper.skip_unless_symlink + @os_helper.skip_unless_xattr + @os_helper.skip_unless_dac_override + def test_copyxattr_symlinks(self): + # On Linux, it's only possible to access non-user xattr for symlinks; + # which in turn require root privileges. This test should be expanded + # as soon as other platforms gain support for extended attributes. + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + src_link = os.path.join(tmp_dir, 'baz') + write_file(src, 'foo') + os.symlink(src, src_link) + os.setxattr(src, 'trusted.foo', b'42') + os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False) + dst = os.path.join(tmp_dir, 'bar') + dst_link = os.path.join(tmp_dir, 'qux') + write_file(dst, 'bar') + os.symlink(dst, dst_link) + os._copyxattr(src_link, dst_link, follow_symlinks=False) + self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43') + self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo') + os._copyxattr(src_link, dst, follow_symlinks=False) + self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43') + + +class TestCopyFile(_CopyTest, unittest.TestCase): + @os_helper.skip_unless_symlink + def test_copyfile_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'src') + dst = os.path.join(tmp_dir, 'dst') + dst_link = os.path.join(tmp_dir, 'dst_link') + link = os.path.join(tmp_dir, 'link') + write_file(src, 'foo') + os.symlink(src, link) + # don't follow + os.copyfile(link, dst_link, follow_symlinks=False) + self.assertTrue(os.path.islink(dst_link)) + self.assertEqual(os.readlink(link), os.readlink(dst_link)) + # follow + os.copyfile(link, dst) + self.assertFalse(os.path.islink(dst)) + + @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') + def test_dont_copy_file_onto_link_to_itself(self): + # bug 851123. + os.mkdir(TESTFN) + src = os.path.join(TESTFN, 'cheese') + dst = os.path.join(TESTFN, 'shop') + try: + with open(src, 'w', encoding='utf-8') as f: + f.write('cheddar') + try: + os.link(src, dst) + except PermissionError as e: + self.skipTest('os.link(): %s' % e) + self.assertRaises(os.SameFileError, os.copyfile, src, dst) + with open(src, 'r', encoding='utf-8') as f: + self.assertEqual(f.read(), 'cheddar') + os.remove(dst) + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + + @os_helper.skip_unless_symlink + def test_dont_copy_file_onto_symlink_to_itself(self): + # bug 851123. + os.mkdir(TESTFN) + src = os.path.join(TESTFN, 'cheese') + dst = os.path.join(TESTFN, 'shop') + try: + with open(src, 'w', encoding='utf-8') as f: + f.write('cheddar') + # Using `src` here would mean we end up with a symlink pointing + # to TESTFN/TESTFN/cheese, while it should point at + # TESTFN/cheese. + os.symlink('cheese', dst) + self.assertRaises(os.SameFileError, os.copyfile, src, dst) + with open(src, 'r', encoding='utf-8') as f: + self.assertEqual(f.read(), 'cheddar') + os.remove(dst) + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + + # Issue #3002: copyfile and copytree block indefinitely on named pipes + @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') + @unittest.skipIf(sys.platform == "vxworks", + "fifo requires special path on VxWorks") + def test_copyfile_named_pipe(self): + try: + os.mkfifo(TESTFN) + except PermissionError as e: + self.skipTest('os.mkfifo(): %s' % e) + try: + self.assertRaises(os.SpecialFileError, + os.copyfile, TESTFN, TESTFN2) + self.assertRaises(os.SpecialFileError, + os.copyfile, __file__, TESTFN) + finally: + os.remove(TESTFN) + + def test_copyfile_return_value(self): + # copytree returns its destination path. + src_dir = self.mkdtemp() + dst_dir = self.mkdtemp() + dst_file = os.path.join(dst_dir, 'bar') + src_file = os.path.join(src_dir, 'foo') + write_file(src_file, 'foo') + rv = os.copyfile(src_file, dst_file) + self.assertTrue(os.path.exists(rv)) + self.assertEqual(read_file(src_file), read_file(dst_file)) + + def test_copyfile_same_file(self): + # copyfile() should raise SameFileError if the source and destination + # are the same. + src_dir = self.mkdtemp() + src_file = os.path.join(src_dir, 'foo') + write_file(src_file, 'foo') + self.assertRaises(os.SameFileError, os.copyfile, src_file, src_file) + # But Error should work too, to stay backward compatible. + self.assertRaises(os.CopyError, os.copyfile, src_file, src_file) + # Make sure file is not corrupted. + self.assertEqual(read_file(src_file), 'foo') + + @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)') + # gh-92670: The test uses a trailing slash to force the OS consider + # the path as a directory, but on AIX the trailing slash has no effect + # and is considered as a file. + @unittest.skipIf(AIX, 'Not valid on AIX, see gh-92670') + def test_copyfile_nonexistent_dir(self): + # Issue 43219 + src_dir = self.mkdtemp() + src_file = os.path.join(src_dir, 'foo') + dst = os.path.join(src_dir, 'does_not_exist/') + write_file(src_file, 'foo') + self.assertRaises(FileNotFoundError, os.copyfile, src_file, dst) + + def test_copyfile_copy_dir(self): + # Issue 45234 + # test copy() and copyfile() raising proper exceptions when src and/or + # dst are directories + src_dir = self.mkdtemp() + src_file = os.path.join(src_dir, 'foo') + dir2 = self.mkdtemp() + dst = os.path.join(src_dir, 'does_not_exist/') + write_file(src_file, 'foo') + if sys.platform == "win32": + err = PermissionError + else: + err = IsADirectoryError + + self.assertRaises(err, os.copyfile, src_dir, dst) + self.assertRaises(err, os.copyfile, src_file, src_dir) + self.assertRaises(err, os.copyfile, dir2, src_dir) + + class Faux(object): + _entered = False + _exited_with = None + _raised = False + def __init__(self, raise_in_exit=False, suppress_at_exit=True): + self._raise_in_exit = raise_in_exit + self._suppress_at_exit = suppress_at_exit + def read(self, *args): + return '' + def __enter__(self): + self._entered = True + def __exit__(self, exc_type, exc_val, exc_tb): + self._exited_with = exc_type, exc_val, exc_tb + if self._raise_in_exit: + self._raised = True + raise OSError("Cannot close") + return self._suppress_at_exit + + def test_w_source_open_fails(self): + def _open(filename, mode='r'): + if filename == 'srcfile': + raise OSError('Cannot open "srcfile"') + assert 0 # shouldn't reach here. + + with support.swap_attr(io, 'open', _open): + with self.assertRaises(OSError): + os.copyfile('srcfile', 'destfile') + + @unittest.skipIf(MACOS, "skipped on macOS") + def test_w_dest_open_fails(self): + srcfile = self.Faux() + + def _open(filename, mode='r'): + if filename == 'srcfile': + return srcfile + if filename == 'destfile': + raise OSError('Cannot open "destfile"') + assert 0 # shouldn't reach here. + + with support.swap_attr(io, 'open', _open): + os.copyfile('srcfile', 'destfile') + self.assertTrue(srcfile._entered) + self.assertTrue(srcfile._exited_with[0] is OSError) + self.assertEqual(srcfile._exited_with[1].args, + ('Cannot open "destfile"',)) + + @unittest.skipIf(MACOS, "skipped on macOS") + def test_w_dest_close_fails(self): + srcfile = self.Faux() + destfile = self.Faux(True) + + def _open(filename, mode='r'): + if filename == 'srcfile': + return srcfile + if filename == 'destfile': + return destfile + assert 0 # shouldn't reach here. + + with support.swap_attr(io, 'open', _open): + os.copyfile('srcfile', 'destfile') + self.assertTrue(srcfile._entered) + self.assertTrue(destfile._entered) + self.assertTrue(destfile._raised) + self.assertTrue(srcfile._exited_with[0] is OSError) + self.assertEqual(srcfile._exited_with[1].args, + ('Cannot close',)) + + @unittest.skipIf(MACOS, "skipped on macOS") + def test_w_source_close_fails(self): + + srcfile = self.Faux(True) + destfile = self.Faux() + + def _open(filename, mode='r'): + if filename == 'srcfile': + return srcfile + if filename == 'destfile': + return destfile + assert 0 # shouldn't reach here. + + with support.swap_attr(io, 'open', _open): + with self.assertRaises(OSError): + os.copyfile('srcfile', 'destfile') + self.assertTrue(srcfile._entered) + self.assertTrue(destfile._entered) + self.assertFalse(destfile._raised) + self.assertTrue(srcfile._exited_with[0] is None) + self.assertTrue(srcfile._raised) + + +class TestCopy(_CopyTest, unittest.TestCase): + @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime') + def test_copy(self): + # Ensure that the copied file exists and has the same mode and + # modification time bits. + fname = 'test.txt' + tmpdir = self.mkdtemp() + write_file((tmpdir, fname), 'xxx') + file1 = os.path.join(tmpdir, fname) + tmpdir2 = self.mkdtemp() + file2 = os.path.join(tmpdir2, fname) + os.copy(file1, file2) + self.assertTrue(os.path.exists(file2)) + file1_stat = os.stat(file1) + file2_stat = os.stat(file2) + self.assertEqual(file1_stat.st_mode, file2_stat.st_mode) + for attr in 'st_atime', 'st_mtime': + # The modification times may be truncated in the new file. + self.assertLessEqual(getattr(file1_stat, attr), + getattr(file2_stat, attr) + 1) + if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'): + self.assertEqual(getattr(file1_stat, 'st_flags'), + getattr(file2_stat, 'st_flags')) + + @os_helper.skip_unless_symlink + def test_copy_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + write_file(src, 'foo') + os.symlink(src, src_link) + if hasattr(os, 'lchmod'): + os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) + if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): + os.lchflags(src_link, stat.UF_NODUMP) + src_stat = os.stat(src) + src_link_stat = os.lstat(src_link) + # follow + os.copy(src_link, dst, follow_symlinks=True) + self.assertFalse(os.path.islink(dst)) + self.assertEqual(read_file(src), read_file(dst)) + os.remove(dst) + # don't follow + os.copy(src_link, dst, follow_symlinks=False) + self.assertTrue(os.path.islink(dst)) + self.assertEqual(os.readlink(dst), os.readlink(src_link)) + dst_stat = os.lstat(dst) + if os.utime in os.supports_follow_symlinks: + for attr in 'st_atime', 'st_mtime': + # The modification times may be truncated in the new file. + self.assertLessEqual(getattr(src_link_stat, attr), + getattr(dst_stat, attr) + 1) + if hasattr(os, 'lchmod'): + self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode) + self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode) + if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): + self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags) + + @os_helper.skip_unless_xattr + def test_copy_xattr(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + write_file(src, 'foo') + os.setxattr(src, 'user.foo', b'42') + os.copy(src, dst) + self.assertEqual( + os.getxattr(src, 'user.foo'), + os.getxattr(dst, 'user.foo')) + os.remove(dst) + + def test_copy_dir(self): + src_dir = self.mkdtemp() + src_file = os.path.join(src_dir, 'foo') + dir2 = self.mkdtemp() + dst = os.path.join(src_dir, 'does_not_exist/') + write_file(src_file, 'foo') + if sys.platform == "win32": + err = PermissionError + else: + err = IsADirectoryError + self.assertRaises(err, os.copy, dir2, src_dir) + + # raise *err* because of src rather than FileNotFoundError because of dst + self.assertRaises(err, os.copy, dir2, dst) + os.copy(src_file, os.path.join(dir2, 'bar')) # should not raise exceptions + + +class TestCopyFileObj(unittest.TestCase): + FILESIZE = 2 * 1024 * 1024 + + @classmethod + def setUpClass(cls): + write_test_file(TESTFN, cls.FILESIZE) + + @classmethod + def tearDownClass(cls): + os_helper.unlink(TESTFN) + os_helper.unlink(TESTFN2) + + def tearDown(self): + os_helper.unlink(TESTFN2) + + @contextlib.contextmanager + def get_files(self): + with open(TESTFN, "rb") as src: + with open(TESTFN2, "wb") as dst: + yield (src, dst) + + def assert_files_eq(self, src, dst): + with open(src, 'rb') as fsrc: + with open(dst, 'rb') as fdst: + self.assertEqual(fsrc.read(), fdst.read()) + + def test_content(self): + with self.get_files() as (src, dst): + os.copyfileobj(src, dst) + self.assert_files_eq(TESTFN, TESTFN2) + + def test_file_not_closed(self): + with self.get_files() as (src, dst): + os.copyfileobj(src, dst) + assert not src.closed + assert not dst.closed + + def test_file_offset(self): + with self.get_files() as (src, dst): + os.copyfileobj(src, dst) + self.assertEqual(src.tell(), self.FILESIZE) + self.assertEqual(dst.tell(), self.FILESIZE) + + @unittest.skipIf(os.name != 'nt', "Windows only") + def test_win_impl(self): + # Make sure alternate Windows implementation is called. + with unittest.mock.patch("os._fastcopy_readinto") as m: + os.copyfile(TESTFN, TESTFN2) + assert m.called + + # File size is 2 MiB but max buf size should be 1 MiB. + self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024) + + # If file size < 1 MiB memoryview() length must be equal to + # the actual file size. + with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f: + f.write(b'foo') + fname = f.name + self.addCleanup(os_helper.unlink, fname) + with unittest.mock.patch("os._fastcopy_readinto") as m: + os.copyfile(fname, TESTFN2) + self.assertEqual(m.call_args[0][2], 3) + + # Empty files should not rely on readinto() variant. + with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f: + pass + fname = f.name + self.addCleanup(os_helper.unlink, fname) + with unittest.mock.patch("os._fastcopy_readinto") as m: + os.copyfile(fname, TESTFN2) + assert not m.called + self.assert_files_eq(fname, TESTFN2) + + +class _ZeroCopyFileTest(object): + """Tests common to all zero-copy APIs.""" + FILESIZE = (10 * 1024 * 1024) # 10 MiB + FILEDATA = b"" + PATCHPOINT = "" + + @classmethod + def setUpClass(cls): + write_test_file(TESTFN, cls.FILESIZE) + with open(TESTFN, 'rb') as f: + cls.FILEDATA = f.read() + assert len(cls.FILEDATA) == cls.FILESIZE + + @classmethod + def tearDownClass(cls): + os_helper.unlink(TESTFN) + + def tearDown(self): + os_helper.unlink(TESTFN2) + + @contextlib.contextmanager + def get_files(self): + with open(TESTFN, "rb") as src: + with open(TESTFN2, "wb") as dst: + yield (src, dst) + + def zerocopy_fun(self, *args, **kwargs): + raise NotImplementedError("must be implemented in subclass") + + def reset(self): + self.tearDown() + self.tearDownClass() + self.setUpClass() + self.setUp() + + # --- + + def test_regular_copy(self): + with self.get_files() as (src, dst): + self.zerocopy_fun(src, dst) + self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) + # Make sure the fallback function is not called. + with self.get_files() as (src, dst): + with unittest.mock.patch('os.copyfileobj') as m: + os.copyfile(TESTFN, TESTFN2) + assert not m.called + + def test_same_file(self): + self.addCleanup(self.reset) + with self.get_files() as (src, dst): + with self.assertRaises((OSError, os._GiveupOnFastCopy)): + self.zerocopy_fun(src, src) + # Make sure src file is not corrupted. + self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA) + + def test_non_existent_src(self): + name = tempfile.mktemp(dir=os.getcwd()) + with self.assertRaises(FileNotFoundError) as cm: + os.copyfile(name, "new") + self.assertEqual(cm.exception.filename, name) + + def test_empty_file(self): + srcname = TESTFN + 'src' + dstname = TESTFN + 'dst' + self.addCleanup(lambda: os_helper.unlink(srcname)) + self.addCleanup(lambda: os_helper.unlink(dstname)) + with open(srcname, "wb"): + pass + + with open(srcname, "rb") as src: + with open(dstname, "wb") as dst: + self.zerocopy_fun(src, dst) + + self.assertEqual(read_file(dstname, binary=True), b"") + + def test_unhandled_exception(self): + with unittest.mock.patch(self.PATCHPOINT, + side_effect=ZeroDivisionError): + self.assertRaises(ZeroDivisionError, + os.copyfile, TESTFN, TESTFN2) + + def test_exception_on_first_call(self): + # Emulate a case where the first call to the zero-copy + # function raises an exception in which case the function is + # supposed to give up immediately. + with unittest.mock.patch(self.PATCHPOINT, + side_effect=OSError(errno.EINVAL, "yo")): + with self.get_files() as (src, dst): + with self.assertRaises(os._GiveupOnFastCopy): + self.zerocopy_fun(src, dst) + + def test_filesystem_full(self): + # Emulate a case where filesystem is full and sendfile() fails + # on first call. + with unittest.mock.patch(self.PATCHPOINT, + side_effect=OSError(errno.ENOSPC, "yo")): + with self.get_files() as (src, dst): + self.assertRaises(OSError, self.zerocopy_fun, src, dst) + + +@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported') +class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase): + PATCHPOINT = "os.sendfile" + + def zerocopy_fun(self, fsrc, fdst): + return os._fastcopy_sendfile(fsrc, fdst) + + def test_non_regular_file_src(self): + with io.BytesIO(self.FILEDATA) as src: + with open(TESTFN2, "wb") as dst: + with self.assertRaises(os._GiveupOnFastCopy): + self.zerocopy_fun(src, dst) + os.copyfileobj(src, dst) + + self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) + + def test_non_regular_file_dst(self): + with open(TESTFN, "rb") as src: + with io.BytesIO() as dst: + with self.assertRaises(os._GiveupOnFastCopy): + self.zerocopy_fun(src, dst) + os.copyfileobj(src, dst) + dst.seek(0) + self.assertEqual(dst.read(), self.FILEDATA) + + def test_exception_on_second_call(self): + def sendfile(*args, **kwargs): + if not flag: + flag.append(None) + return orig_sendfile(*args, **kwargs) + else: + raise OSError(errno.EBADF, "yo") + + flag = [] + orig_sendfile = os.sendfile + with unittest.mock.patch('os.sendfile', create=True, + side_effect=sendfile): + with self.get_files() as (src, dst): + with self.assertRaises(OSError) as cm: + os._fastcopy_sendfile(src, dst) + assert flag + self.assertEqual(cm.exception.errno, errno.EBADF) + + def test_cant_get_size(self): + # Emulate a case where src file size cannot be determined. + # Internally bufsize will be set to a small value and + # sendfile() will be called repeatedly. + with unittest.mock.patch('os.fstat', side_effect=OSError) as m: + with self.get_files() as (src, dst): + os._fastcopy_sendfile(src, dst) + assert m.called + self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) + + def test_small_chunks(self): + # Force internal file size detection to be smaller than the + # actual file size. We want to force sendfile() to be called + # multiple times, also in order to emulate a src fd which gets + # bigger while it is being copied. + mock = unittest.mock.Mock() + mock.st_size = 65536 + 1 + with unittest.mock.patch('os.fstat', return_value=mock) as m: + with self.get_files() as (src, dst): + os._fastcopy_sendfile(src, dst) + assert m.called + self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) + + def test_big_chunk(self): + # Force internal file size detection to be +100MB bigger than + # the actual file size. Make sure sendfile() does not rely on + # file size value except for (maybe) a better throughput / + # performance. + mock = unittest.mock.Mock() + mock.st_size = self.FILESIZE + (100 * 1024 * 1024) + with unittest.mock.patch('os.fstat', return_value=mock) as m: + with self.get_files() as (src, dst): + os._fastcopy_sendfile(src, dst) + assert m.called + self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) + + def test_blocksize_arg(self): + with unittest.mock.patch('os.sendfile', + side_effect=ZeroDivisionError) as m: + self.assertRaises(ZeroDivisionError, + os.copyfile, TESTFN, TESTFN2) + blocksize = m.call_args[0][3] + # Make sure file size and the block size arg passed to + # sendfile() are the same. + self.assertEqual(blocksize, os.path.getsize(TESTFN)) + # ...unless we're dealing with a small file. + os_helper.unlink(TESTFN2) + write_file(TESTFN2, b"hello", binary=True) + self.addCleanup(os_helper.unlink, TESTFN2 + '3') + self.assertRaises(ZeroDivisionError, + os.copyfile, TESTFN2, TESTFN2 + '3') + blocksize = m.call_args[0][3] + self.assertEqual(blocksize, 2 ** 23) + + def test_file2file_not_supported(self): + # Emulate a case where sendfile() only support file->socket + # fds. In such a case copyfile() is supposed to skip the + # fast-copy attempt from then on. + assert os._USE_CP_SENDFILE + try: + with unittest.mock.patch( + self.PATCHPOINT, + side_effect=OSError(errno.ENOTSOCK, "yo")) as m: + with self.get_files() as (src, dst): + with self.assertRaises(os._GiveupOnFastCopy): + os._fastcopy_sendfile(src, dst) + assert m.called + assert not os._USE_CP_SENDFILE + + with unittest.mock.patch(self.PATCHPOINT) as m: + os.copyfile(TESTFN, TESTFN2) + assert not m.called + finally: + os._USE_CP_SENDFILE = True + + +@unittest.skipIf(not MACOS, 'macOS only') +class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase): + PATCHPOINT = "os._fcopyfile" + + def zerocopy_fun(self, src, dst): + return os._fastcopy_fcopyfile(src, dst, os._COPYFILE_DATA) + if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 5b0aac67a0adeb..38b0a6663e49df 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -12,16 +12,12 @@ import functools import pathlib import subprocess -import random -import string -import contextlib -import io from shutil import (make_archive, register_archive_format, unregister_archive_format, get_archive_formats, Error, unpack_archive, register_unpack_format, RegistryError, unregister_unpack_format, get_unpack_formats, - SameFileError, _GiveupOnFastCopy) + SameFileError) import tarfile import zipfile try: @@ -33,12 +29,12 @@ from test.support import os_helper from test.support.os_helper import TESTFN, FakePath from test.support import warnings_helper +from test.test_os import read_file, write_file TESTFN2 = TESTFN + "2" TESTFN_SRC = TESTFN + "_SRC" TESTFN_DST = TESTFN + "_DST" MACOS = sys.platform.startswith("darwin") -SOLARIS = sys.platform.startswith("sunos") AIX = sys.platform[:3] == 'aix' try: import grp @@ -70,52 +66,6 @@ def wrap(*args, **kwargs): os.rename = builtin_rename return wrap -def write_file(path, content, binary=False): - """Write *content* to a file located at *path*. - - If *path* is a tuple instead of a string, os.path.join will be used to - make a path. If *binary* is true, the file will be opened in binary - mode. - """ - if isinstance(path, tuple): - path = os.path.join(*path) - mode = 'wb' if binary else 'w' - encoding = None if binary else "utf-8" - with open(path, mode, encoding=encoding) as fp: - fp.write(content) - -def write_test_file(path, size): - """Create a test file with an arbitrary size and random text content.""" - def chunks(total, step): - assert total >= step - while total > step: - yield step - total -= step - if total: - yield total - - bufsize = min(size, 8192) - chunk = b"".join([random.choice(string.ascii_letters).encode() - for i in range(bufsize)]) - with open(path, 'wb') as f: - for csize in chunks(size, bufsize): - f.write(chunk) - assert os.path.getsize(path) == size - -def read_file(path, binary=False): - """Return contents from a file located at *path*. - - If *path* is a tuple instead of a string, os.path.join will be used to - make a path. If *binary* is true, the file will be opened in binary - mode. - """ - if isinstance(path, tuple): - path = os.path.join(*path) - mode = 'rb' if binary else 'r' - encoding = None if binary else "utf-8" - with open(path, mode, encoding=encoding) as fp: - return fp.read() - def rlistdir(path): res = [] for name in sorted(os.listdir(path)): @@ -128,36 +78,6 @@ def rlistdir(path): res.append(name) return res -def supports_file2file_sendfile(): - # ...apparently Linux and Solaris are the only ones - if not hasattr(os, "sendfile"): - return False - srcname = None - dstname = None - try: - with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f: - srcname = f.name - f.write(b"0123456789") - - with open(srcname, "rb") as src: - with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst: - dstname = dst.name - infd = src.fileno() - outfd = dst.fileno() - try: - os.sendfile(outfd, infd, 0, 2) - except OSError: - return False - else: - return True - finally: - if srcname is not None: - os_helper.unlink(srcname) - if dstname is not None: - os_helper.unlink(dstname) - - -SUPPORTS_SENDFILE = supports_file2file_sendfile() # AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test # The AIX command 'dump -o program' gives XCOFF header information @@ -1097,247 +1017,6 @@ def test_copytree_subdirectory(self): self.assertEqual(['pol'], os.listdir(rv)) class TestCopy(BaseTest, unittest.TestCase): - - ### shutil.copymode - - @os_helper.skip_unless_symlink - def test_copymode_follow_symlinks(self): - tmp_dir = self.mkdtemp() - src = os.path.join(tmp_dir, 'foo') - dst = os.path.join(tmp_dir, 'bar') - src_link = os.path.join(tmp_dir, 'baz') - dst_link = os.path.join(tmp_dir, 'quux') - write_file(src, 'foo') - write_file(dst, 'foo') - os.symlink(src, src_link) - os.symlink(dst, dst_link) - os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) - # file to file - os.chmod(dst, stat.S_IRWXO) - self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) - shutil.copymode(src, dst) - self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) - # On Windows, os.chmod does not follow symlinks (issue #15411) - # follow src link - os.chmod(dst, stat.S_IRWXO) - shutil.copymode(src_link, dst) - self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) - # follow dst link - os.chmod(dst, stat.S_IRWXO) - shutil.copymode(src, dst_link) - self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) - # follow both links - os.chmod(dst, stat.S_IRWXO) - shutil.copymode(src_link, dst_link) - self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) - - @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod') - @os_helper.skip_unless_symlink - def test_copymode_symlink_to_symlink(self): - tmp_dir = self.mkdtemp() - src = os.path.join(tmp_dir, 'foo') - dst = os.path.join(tmp_dir, 'bar') - src_link = os.path.join(tmp_dir, 'baz') - dst_link = os.path.join(tmp_dir, 'quux') - write_file(src, 'foo') - write_file(dst, 'foo') - os.symlink(src, src_link) - os.symlink(dst, dst_link) - os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) - os.chmod(dst, stat.S_IRWXU) - os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) - # link to link - os.lchmod(dst_link, stat.S_IRWXO) - old_mode = os.stat(dst).st_mode - shutil.copymode(src_link, dst_link, follow_symlinks=False) - self.assertEqual(os.lstat(src_link).st_mode, - os.lstat(dst_link).st_mode) - self.assertEqual(os.stat(dst).st_mode, old_mode) - # src link - use chmod - os.lchmod(dst_link, stat.S_IRWXO) - shutil.copymode(src_link, dst, follow_symlinks=False) - self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) - # dst link - use chmod - os.lchmod(dst_link, stat.S_IRWXO) - shutil.copymode(src, dst_link, follow_symlinks=False) - self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) - - @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing') - @os_helper.skip_unless_symlink - def test_copymode_symlink_to_symlink_wo_lchmod(self): - tmp_dir = self.mkdtemp() - src = os.path.join(tmp_dir, 'foo') - dst = os.path.join(tmp_dir, 'bar') - src_link = os.path.join(tmp_dir, 'baz') - dst_link = os.path.join(tmp_dir, 'quux') - write_file(src, 'foo') - write_file(dst, 'foo') - os.symlink(src, src_link) - os.symlink(dst, dst_link) - shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail - - ### shutil.copystat - - @os_helper.skip_unless_symlink - def test_copystat_symlinks(self): - tmp_dir = self.mkdtemp() - src = os.path.join(tmp_dir, 'foo') - dst = os.path.join(tmp_dir, 'bar') - src_link = os.path.join(tmp_dir, 'baz') - dst_link = os.path.join(tmp_dir, 'qux') - write_file(src, 'foo') - src_stat = os.stat(src) - os.utime(src, (src_stat.st_atime, - src_stat.st_mtime - 42.0)) # ensure different mtimes - write_file(dst, 'bar') - self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) - os.symlink(src, src_link) - os.symlink(dst, dst_link) - if hasattr(os, 'lchmod'): - os.lchmod(src_link, stat.S_IRWXO) - if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): - os.lchflags(src_link, stat.UF_NODUMP) - src_link_stat = os.lstat(src_link) - # follow - if hasattr(os, 'lchmod'): - shutil.copystat(src_link, dst_link, follow_symlinks=True) - self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) - # don't follow - shutil.copystat(src_link, dst_link, follow_symlinks=False) - dst_link_stat = os.lstat(dst_link) - if os.utime in os.supports_follow_symlinks: - for attr in 'st_atime', 'st_mtime': - # The modification times may be truncated in the new file. - self.assertLessEqual(getattr(src_link_stat, attr), - getattr(dst_link_stat, attr) + 1) - if hasattr(os, 'lchmod'): - self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) - if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): - self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) - # tell to follow but dst is not a link - shutil.copystat(src_link, dst, follow_symlinks=False) - self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < - 00000.1) - - @unittest.skipUnless(hasattr(os, 'chflags') and - hasattr(errno, 'EOPNOTSUPP') and - hasattr(errno, 'ENOTSUP'), - "requires os.chflags, EOPNOTSUPP & ENOTSUP") - def test_copystat_handles_harmless_chflags_errors(self): - tmpdir = self.mkdtemp() - file1 = os.path.join(tmpdir, 'file1') - file2 = os.path.join(tmpdir, 'file2') - write_file(file1, 'xxx') - write_file(file2, 'xxx') - - def make_chflags_raiser(err): - ex = OSError() - - def _chflags_raiser(path, flags, *, follow_symlinks=True): - ex.errno = err - raise ex - return _chflags_raiser - old_chflags = os.chflags - try: - for err in errno.EOPNOTSUPP, errno.ENOTSUP: - os.chflags = make_chflags_raiser(err) - shutil.copystat(file1, file2) - # assert others errors break it - os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) - self.assertRaises(OSError, shutil.copystat, file1, file2) - finally: - os.chflags = old_chflags - - ### shutil.copyxattr - - @os_helper.skip_unless_xattr - def test_copyxattr(self): - tmp_dir = self.mkdtemp() - src = os.path.join(tmp_dir, 'foo') - write_file(src, 'foo') - dst = os.path.join(tmp_dir, 'bar') - write_file(dst, 'bar') - - # no xattr == no problem - shutil._copyxattr(src, dst) - # common case - os.setxattr(src, 'user.foo', b'42') - os.setxattr(src, 'user.bar', b'43') - shutil._copyxattr(src, dst) - self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst))) - self.assertEqual( - os.getxattr(src, 'user.foo'), - os.getxattr(dst, 'user.foo')) - # check errors don't affect other attrs - os.remove(dst) - write_file(dst, 'bar') - os_error = OSError(errno.EPERM, 'EPERM') - - def _raise_on_user_foo(fname, attr, val, **kwargs): - if attr == 'user.foo': - raise os_error - else: - orig_setxattr(fname, attr, val, **kwargs) - try: - orig_setxattr = os.setxattr - os.setxattr = _raise_on_user_foo - shutil._copyxattr(src, dst) - self.assertIn('user.bar', os.listxattr(dst)) - finally: - os.setxattr = orig_setxattr - # the source filesystem not supporting xattrs should be ok, too. - def _raise_on_src(fname, *, follow_symlinks=True): - if fname == src: - raise OSError(errno.ENOTSUP, 'Operation not supported') - return orig_listxattr(fname, follow_symlinks=follow_symlinks) - try: - orig_listxattr = os.listxattr - os.listxattr = _raise_on_src - shutil._copyxattr(src, dst) - finally: - os.listxattr = orig_listxattr - - # test that shutil.copystat copies xattrs - src = os.path.join(tmp_dir, 'the_original') - srcro = os.path.join(tmp_dir, 'the_original_ro') - write_file(src, src) - write_file(srcro, srcro) - os.setxattr(src, 'user.the_value', b'fiddly') - os.setxattr(srcro, 'user.the_value', b'fiddly') - os.chmod(srcro, 0o444) - dst = os.path.join(tmp_dir, 'the_copy') - dstro = os.path.join(tmp_dir, 'the_copy_ro') - write_file(dst, dst) - write_file(dstro, dstro) - shutil.copystat(src, dst) - shutil.copystat(srcro, dstro) - self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly') - self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly') - - @os_helper.skip_unless_symlink - @os_helper.skip_unless_xattr - @os_helper.skip_unless_dac_override - def test_copyxattr_symlinks(self): - # On Linux, it's only possible to access non-user xattr for symlinks; - # which in turn require root privileges. This test should be expanded - # as soon as other platforms gain support for extended attributes. - tmp_dir = self.mkdtemp() - src = os.path.join(tmp_dir, 'foo') - src_link = os.path.join(tmp_dir, 'baz') - write_file(src, 'foo') - os.symlink(src, src_link) - os.setxattr(src, 'trusted.foo', b'42') - os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False) - dst = os.path.join(tmp_dir, 'bar') - dst_link = os.path.join(tmp_dir, 'qux') - write_file(dst, 'bar') - os.symlink(dst, dst_link) - shutil._copyxattr(src_link, dst_link, follow_symlinks=False) - self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43') - self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo') - shutil._copyxattr(src_link, dst, follow_symlinks=False) - self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43') - ### shutil.copy def _copy_file(self, method): @@ -1480,136 +1159,6 @@ def _test_copy_dir(self, copy_func): self.assertRaises(err, copy_func, dir2, dst) copy_func(src_file, dir2) # should not raise exceptions - ### shutil.copyfile - - @os_helper.skip_unless_symlink - def test_copyfile_symlinks(self): - tmp_dir = self.mkdtemp() - src = os.path.join(tmp_dir, 'src') - dst = os.path.join(tmp_dir, 'dst') - dst_link = os.path.join(tmp_dir, 'dst_link') - link = os.path.join(tmp_dir, 'link') - write_file(src, 'foo') - os.symlink(src, link) - # don't follow - shutil.copyfile(link, dst_link, follow_symlinks=False) - self.assertTrue(os.path.islink(dst_link)) - self.assertEqual(os.readlink(link), os.readlink(dst_link)) - # follow - shutil.copyfile(link, dst) - self.assertFalse(os.path.islink(dst)) - - @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') - def test_dont_copy_file_onto_link_to_itself(self): - # bug 851123. - os.mkdir(TESTFN) - src = os.path.join(TESTFN, 'cheese') - dst = os.path.join(TESTFN, 'shop') - try: - with open(src, 'w', encoding='utf-8') as f: - f.write('cheddar') - try: - os.link(src, dst) - except PermissionError as e: - self.skipTest('os.link(): %s' % e) - self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) - with open(src, 'r', encoding='utf-8') as f: - self.assertEqual(f.read(), 'cheddar') - os.remove(dst) - finally: - shutil.rmtree(TESTFN, ignore_errors=True) - - @os_helper.skip_unless_symlink - def test_dont_copy_file_onto_symlink_to_itself(self): - # bug 851123. - os.mkdir(TESTFN) - src = os.path.join(TESTFN, 'cheese') - dst = os.path.join(TESTFN, 'shop') - try: - with open(src, 'w', encoding='utf-8') as f: - f.write('cheddar') - # Using `src` here would mean we end up with a symlink pointing - # to TESTFN/TESTFN/cheese, while it should point at - # TESTFN/cheese. - os.symlink('cheese', dst) - self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) - with open(src, 'r', encoding='utf-8') as f: - self.assertEqual(f.read(), 'cheddar') - os.remove(dst) - finally: - shutil.rmtree(TESTFN, ignore_errors=True) - - # Issue #3002: copyfile and copytree block indefinitely on named pipes - @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') - @unittest.skipIf(sys.platform == "vxworks", - "fifo requires special path on VxWorks") - def test_copyfile_named_pipe(self): - try: - os.mkfifo(TESTFN) - except PermissionError as e: - self.skipTest('os.mkfifo(): %s' % e) - try: - self.assertRaises(shutil.SpecialFileError, - shutil.copyfile, TESTFN, TESTFN2) - self.assertRaises(shutil.SpecialFileError, - shutil.copyfile, __file__, TESTFN) - finally: - os.remove(TESTFN) - - def test_copyfile_return_value(self): - # copytree returns its destination path. - src_dir = self.mkdtemp() - dst_dir = self.mkdtemp() - dst_file = os.path.join(dst_dir, 'bar') - src_file = os.path.join(src_dir, 'foo') - write_file(src_file, 'foo') - rv = shutil.copyfile(src_file, dst_file) - self.assertTrue(os.path.exists(rv)) - self.assertEqual(read_file(src_file), read_file(dst_file)) - - def test_copyfile_same_file(self): - # copyfile() should raise SameFileError if the source and destination - # are the same. - src_dir = self.mkdtemp() - src_file = os.path.join(src_dir, 'foo') - write_file(src_file, 'foo') - self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file) - # But Error should work too, to stay backward compatible. - self.assertRaises(Error, shutil.copyfile, src_file, src_file) - # Make sure file is not corrupted. - self.assertEqual(read_file(src_file), 'foo') - - @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)') - # gh-92670: The test uses a trailing slash to force the OS consider - # the path as a directory, but on AIX the trailing slash has no effect - # and is considered as a file. - @unittest.skipIf(AIX, 'Not valid on AIX, see gh-92670') - def test_copyfile_nonexistent_dir(self): - # Issue 43219 - src_dir = self.mkdtemp() - src_file = os.path.join(src_dir, 'foo') - dst = os.path.join(src_dir, 'does_not_exist/') - write_file(src_file, 'foo') - self.assertRaises(FileNotFoundError, shutil.copyfile, src_file, dst) - - def test_copyfile_copy_dir(self): - # Issue 45234 - # test copy() and copyfile() raising proper exceptions when src and/or - # dst are directories - src_dir = self.mkdtemp() - src_file = os.path.join(src_dir, 'foo') - dir2 = self.mkdtemp() - dst = os.path.join(src_dir, 'does_not_exist/') - write_file(src_file, 'foo') - if sys.platform == "win32": - err = PermissionError - else: - err = IsADirectoryError - - self.assertRaises(err, shutil.copyfile, src_dir, dst) - self.assertRaises(err, shutil.copyfile, src_file, src_dir) - self.assertRaises(err, shutil.copyfile, dir2, src_dir) - class TestArchives(BaseTest, unittest.TestCase): @@ -2908,399 +2457,6 @@ def test_move_dir_permission_denied(self): os_helper.rmtree(TESTFN_DST) -class TestCopyFile(unittest.TestCase): - - class Faux(object): - _entered = False - _exited_with = None - _raised = False - def __init__(self, raise_in_exit=False, suppress_at_exit=True): - self._raise_in_exit = raise_in_exit - self._suppress_at_exit = suppress_at_exit - def read(self, *args): - return '' - def __enter__(self): - self._entered = True - def __exit__(self, exc_type, exc_val, exc_tb): - self._exited_with = exc_type, exc_val, exc_tb - if self._raise_in_exit: - self._raised = True - raise OSError("Cannot close") - return self._suppress_at_exit - - def test_w_source_open_fails(self): - def _open(filename, mode='r'): - if filename == 'srcfile': - raise OSError('Cannot open "srcfile"') - assert 0 # shouldn't reach here. - - with support.swap_attr(shutil, 'open', _open): - with self.assertRaises(OSError): - shutil.copyfile('srcfile', 'destfile') - - @unittest.skipIf(MACOS, "skipped on macOS") - def test_w_dest_open_fails(self): - srcfile = self.Faux() - - def _open(filename, mode='r'): - if filename == 'srcfile': - return srcfile - if filename == 'destfile': - raise OSError('Cannot open "destfile"') - assert 0 # shouldn't reach here. - - with support.swap_attr(shutil, 'open', _open): - shutil.copyfile('srcfile', 'destfile') - self.assertTrue(srcfile._entered) - self.assertTrue(srcfile._exited_with[0] is OSError) - self.assertEqual(srcfile._exited_with[1].args, - ('Cannot open "destfile"',)) - - @unittest.skipIf(MACOS, "skipped on macOS") - def test_w_dest_close_fails(self): - srcfile = self.Faux() - destfile = self.Faux(True) - - def _open(filename, mode='r'): - if filename == 'srcfile': - return srcfile - if filename == 'destfile': - return destfile - assert 0 # shouldn't reach here. - - with support.swap_attr(shutil, 'open', _open): - shutil.copyfile('srcfile', 'destfile') - self.assertTrue(srcfile._entered) - self.assertTrue(destfile._entered) - self.assertTrue(destfile._raised) - self.assertTrue(srcfile._exited_with[0] is OSError) - self.assertEqual(srcfile._exited_with[1].args, - ('Cannot close',)) - - @unittest.skipIf(MACOS, "skipped on macOS") - def test_w_source_close_fails(self): - - srcfile = self.Faux(True) - destfile = self.Faux() - - def _open(filename, mode='r'): - if filename == 'srcfile': - return srcfile - if filename == 'destfile': - return destfile - assert 0 # shouldn't reach here. - - with support.swap_attr(shutil, 'open', _open): - with self.assertRaises(OSError): - shutil.copyfile('srcfile', 'destfile') - self.assertTrue(srcfile._entered) - self.assertTrue(destfile._entered) - self.assertFalse(destfile._raised) - self.assertTrue(srcfile._exited_with[0] is None) - self.assertTrue(srcfile._raised) - - -class TestCopyFileObj(unittest.TestCase): - FILESIZE = 2 * 1024 * 1024 - - @classmethod - def setUpClass(cls): - write_test_file(TESTFN, cls.FILESIZE) - - @classmethod - def tearDownClass(cls): - os_helper.unlink(TESTFN) - os_helper.unlink(TESTFN2) - - def tearDown(self): - os_helper.unlink(TESTFN2) - - @contextlib.contextmanager - def get_files(self): - with open(TESTFN, "rb") as src: - with open(TESTFN2, "wb") as dst: - yield (src, dst) - - def assert_files_eq(self, src, dst): - with open(src, 'rb') as fsrc: - with open(dst, 'rb') as fdst: - self.assertEqual(fsrc.read(), fdst.read()) - - def test_content(self): - with self.get_files() as (src, dst): - shutil.copyfileobj(src, dst) - self.assert_files_eq(TESTFN, TESTFN2) - - def test_file_not_closed(self): - with self.get_files() as (src, dst): - shutil.copyfileobj(src, dst) - assert not src.closed - assert not dst.closed - - def test_file_offset(self): - with self.get_files() as (src, dst): - shutil.copyfileobj(src, dst) - self.assertEqual(src.tell(), self.FILESIZE) - self.assertEqual(dst.tell(), self.FILESIZE) - - @unittest.skipIf(os.name != 'nt', "Windows only") - def test_win_impl(self): - # Make sure alternate Windows implementation is called. - with unittest.mock.patch("shutil._copyfileobj_readinto") as m: - shutil.copyfile(TESTFN, TESTFN2) - assert m.called - - # File size is 2 MiB but max buf size should be 1 MiB. - self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024) - - # If file size < 1 MiB memoryview() length must be equal to - # the actual file size. - with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f: - f.write(b'foo') - fname = f.name - self.addCleanup(os_helper.unlink, fname) - with unittest.mock.patch("shutil._copyfileobj_readinto") as m: - shutil.copyfile(fname, TESTFN2) - self.assertEqual(m.call_args[0][2], 3) - - # Empty files should not rely on readinto() variant. - with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f: - pass - fname = f.name - self.addCleanup(os_helper.unlink, fname) - with unittest.mock.patch("shutil._copyfileobj_readinto") as m: - shutil.copyfile(fname, TESTFN2) - assert not m.called - self.assert_files_eq(fname, TESTFN2) - - -class _ZeroCopyFileTest(object): - """Tests common to all zero-copy APIs.""" - FILESIZE = (10 * 1024 * 1024) # 10 MiB - FILEDATA = b"" - PATCHPOINT = "" - - @classmethod - def setUpClass(cls): - write_test_file(TESTFN, cls.FILESIZE) - with open(TESTFN, 'rb') as f: - cls.FILEDATA = f.read() - assert len(cls.FILEDATA) == cls.FILESIZE - - @classmethod - def tearDownClass(cls): - os_helper.unlink(TESTFN) - - def tearDown(self): - os_helper.unlink(TESTFN2) - - @contextlib.contextmanager - def get_files(self): - with open(TESTFN, "rb") as src: - with open(TESTFN2, "wb") as dst: - yield (src, dst) - - def zerocopy_fun(self, *args, **kwargs): - raise NotImplementedError("must be implemented in subclass") - - def reset(self): - self.tearDown() - self.tearDownClass() - self.setUpClass() - self.setUp() - - # --- - - def test_regular_copy(self): - with self.get_files() as (src, dst): - self.zerocopy_fun(src, dst) - self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) - # Make sure the fallback function is not called. - with self.get_files() as (src, dst): - with unittest.mock.patch('shutil.copyfileobj') as m: - shutil.copyfile(TESTFN, TESTFN2) - assert not m.called - - def test_same_file(self): - self.addCleanup(self.reset) - with self.get_files() as (src, dst): - with self.assertRaises((OSError, _GiveupOnFastCopy)): - self.zerocopy_fun(src, src) - # Make sure src file is not corrupted. - self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA) - - def test_non_existent_src(self): - name = tempfile.mktemp(dir=os.getcwd()) - with self.assertRaises(FileNotFoundError) as cm: - shutil.copyfile(name, "new") - self.assertEqual(cm.exception.filename, name) - - def test_empty_file(self): - srcname = TESTFN + 'src' - dstname = TESTFN + 'dst' - self.addCleanup(lambda: os_helper.unlink(srcname)) - self.addCleanup(lambda: os_helper.unlink(dstname)) - with open(srcname, "wb"): - pass - - with open(srcname, "rb") as src: - with open(dstname, "wb") as dst: - self.zerocopy_fun(src, dst) - - self.assertEqual(read_file(dstname, binary=True), b"") - - def test_unhandled_exception(self): - with unittest.mock.patch(self.PATCHPOINT, - side_effect=ZeroDivisionError): - self.assertRaises(ZeroDivisionError, - shutil.copyfile, TESTFN, TESTFN2) - - def test_exception_on_first_call(self): - # Emulate a case where the first call to the zero-copy - # function raises an exception in which case the function is - # supposed to give up immediately. - with unittest.mock.patch(self.PATCHPOINT, - side_effect=OSError(errno.EINVAL, "yo")): - with self.get_files() as (src, dst): - with self.assertRaises(_GiveupOnFastCopy): - self.zerocopy_fun(src, dst) - - def test_filesystem_full(self): - # Emulate a case where filesystem is full and sendfile() fails - # on first call. - with unittest.mock.patch(self.PATCHPOINT, - side_effect=OSError(errno.ENOSPC, "yo")): - with self.get_files() as (src, dst): - self.assertRaises(OSError, self.zerocopy_fun, src, dst) - - -@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported') -class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase): - PATCHPOINT = "os.sendfile" - - def zerocopy_fun(self, fsrc, fdst): - return shutil._fastcopy_sendfile(fsrc, fdst) - - def test_non_regular_file_src(self): - with io.BytesIO(self.FILEDATA) as src: - with open(TESTFN2, "wb") as dst: - with self.assertRaises(_GiveupOnFastCopy): - self.zerocopy_fun(src, dst) - shutil.copyfileobj(src, dst) - - self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) - - def test_non_regular_file_dst(self): - with open(TESTFN, "rb") as src: - with io.BytesIO() as dst: - with self.assertRaises(_GiveupOnFastCopy): - self.zerocopy_fun(src, dst) - shutil.copyfileobj(src, dst) - dst.seek(0) - self.assertEqual(dst.read(), self.FILEDATA) - - def test_exception_on_second_call(self): - def sendfile(*args, **kwargs): - if not flag: - flag.append(None) - return orig_sendfile(*args, **kwargs) - else: - raise OSError(errno.EBADF, "yo") - - flag = [] - orig_sendfile = os.sendfile - with unittest.mock.patch('os.sendfile', create=True, - side_effect=sendfile): - with self.get_files() as (src, dst): - with self.assertRaises(OSError) as cm: - shutil._fastcopy_sendfile(src, dst) - assert flag - self.assertEqual(cm.exception.errno, errno.EBADF) - - def test_cant_get_size(self): - # Emulate a case where src file size cannot be determined. - # Internally bufsize will be set to a small value and - # sendfile() will be called repeatedly. - with unittest.mock.patch('os.fstat', side_effect=OSError) as m: - with self.get_files() as (src, dst): - shutil._fastcopy_sendfile(src, dst) - assert m.called - self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) - - def test_small_chunks(self): - # Force internal file size detection to be smaller than the - # actual file size. We want to force sendfile() to be called - # multiple times, also in order to emulate a src fd which gets - # bigger while it is being copied. - mock = unittest.mock.Mock() - mock.st_size = 65536 + 1 - with unittest.mock.patch('os.fstat', return_value=mock) as m: - with self.get_files() as (src, dst): - shutil._fastcopy_sendfile(src, dst) - assert m.called - self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) - - def test_big_chunk(self): - # Force internal file size detection to be +100MB bigger than - # the actual file size. Make sure sendfile() does not rely on - # file size value except for (maybe) a better throughput / - # performance. - mock = unittest.mock.Mock() - mock.st_size = self.FILESIZE + (100 * 1024 * 1024) - with unittest.mock.patch('os.fstat', return_value=mock) as m: - with self.get_files() as (src, dst): - shutil._fastcopy_sendfile(src, dst) - assert m.called - self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) - - def test_blocksize_arg(self): - with unittest.mock.patch('os.sendfile', - side_effect=ZeroDivisionError) as m: - self.assertRaises(ZeroDivisionError, - shutil.copyfile, TESTFN, TESTFN2) - blocksize = m.call_args[0][3] - # Make sure file size and the block size arg passed to - # sendfile() are the same. - self.assertEqual(blocksize, os.path.getsize(TESTFN)) - # ...unless we're dealing with a small file. - os_helper.unlink(TESTFN2) - write_file(TESTFN2, b"hello", binary=True) - self.addCleanup(os_helper.unlink, TESTFN2 + '3') - self.assertRaises(ZeroDivisionError, - shutil.copyfile, TESTFN2, TESTFN2 + '3') - blocksize = m.call_args[0][3] - self.assertEqual(blocksize, 2 ** 23) - - def test_file2file_not_supported(self): - # Emulate a case where sendfile() only support file->socket - # fds. In such a case copyfile() is supposed to skip the - # fast-copy attempt from then on. - assert shutil._USE_CP_SENDFILE - try: - with unittest.mock.patch( - self.PATCHPOINT, - side_effect=OSError(errno.ENOTSOCK, "yo")) as m: - with self.get_files() as (src, dst): - with self.assertRaises(_GiveupOnFastCopy): - shutil._fastcopy_sendfile(src, dst) - assert m.called - assert not shutil._USE_CP_SENDFILE - - with unittest.mock.patch(self.PATCHPOINT) as m: - shutil.copyfile(TESTFN, TESTFN2) - assert not m.called - finally: - shutil._USE_CP_SENDFILE = True - - -@unittest.skipIf(not MACOS, 'macOS only') -class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase): - PATCHPOINT = "posix._fcopyfile" - - def zerocopy_fun(self, src, dst): - return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA) - - class TestGetTerminalSize(unittest.TestCase): def test_does_not_crash(self): """Check if get_terminal_size() returns a meaningful value.