diff --git a/Doc/library/os.rst b/Doc/library/os.rst
index b93b06d4e72afc..57d07d035887b9 100644
--- a/Doc/library/os.rst
+++ b/Doc/library/os.rst
@@ -3862,6 +3862,158 @@ features:
.. versionadded:: 3.10
+.. _os-copying-files:
+
+Copying Files
+~~~~~~~~~~~~~
+
+Functions involving a file copy (:func:`copyfile` and :func:`copy`) may use
+platform-specific "fast-copy" syscalls in order to copy the file more
+efficiently (see :issue:`33671`).
+"fast-copy" means that the copying operation occurs within the kernel, avoiding
+the use of userspace buffers in Python as in "``outfd.write(infd.read())``".
+
+On macOS `fcopyfile `_ is used to
+copy the file content (not metadata).
+
+On Linux :func:`os.sendfile` is used.
+
+On Windows :func:`copyfile` uses a bigger default buffer size (1 MiB
+instead of 64 KiB) and a :func:`memoryview`-based variant of
+:func:`copyfileobj` is used.
+
+If the fast-copy operation fails and no data was written in the destination
+file then shutil will silently fallback on using less efficient
+:func:`copyfileobj` function internally.
+
+
+.. exception:: SameFileError
+
+ This exception is raised if source and destination in :func:`copyfile`
+ are the same file.
+
+ .. versionadded:: 3.14
+
+
+.. function:: copyfileobj(fsrc, fdst[, length])
+
+ Copy the contents of the :term:`file-like object ` *fsrc* to
+ the file-like object *fdst*. The integer *length*, if given, is the buffer
+ size. In particular, a negative *length* value means to copy the data
+ without looping over the source data in chunks; by default the data is read
+ in chunks to avoid uncontrolled memory consumption. Note that if the
+ current file position of the *fsrc* object is not 0, only the contents from
+ the current file position to the end of the file will be copied.
+
+ .. versionadded:: 3.14
+
+
+.. function:: copyfile(src, dst, *, follow_symlinks=True)
+
+ Copy the contents (no metadata) of the file named *src* to a file named
+ *dst* and return *dst* in the most efficient way possible.
+ *src* and *dst* are :term:`path-like objects ` or path
+ names given as strings.
+
+ *dst* must be the complete target file name; look at :func:`shutil.copy`
+ for a copy that accepts a target directory path. If *src* and *dst*
+ specify the same file, :exc:`SameFileError` is raised.
+
+ The destination location must be writable; otherwise, an :exc:`OSError`
+ exception will be raised. If *dst* already exists, it will be replaced.
+ Special files such as character or block devices and pipes cannot be
+ copied with this function.
+
+ If *follow_symlinks* is false and *src* is a symbolic link,
+ a new symbolic link will be created instead of copying the
+ file *src* points to.
+
+ .. audit-event:: os.copyfile src,dst os.copyfile
+
+ .. versionadded:: 3.14
+
+
+.. function:: copymode(src, dst, *, follow_symlinks=True)
+
+ Copy the permission bits from *src* to *dst*. The file contents, owner, and
+ group are unaffected. *src* and *dst* are
+ :term:`path-like objects ` or path names given as
+ strings. If *follow_symlinks* is false, and both *src* and *dst* are
+ symbolic links, :func:`copymode` will attempt to modify the mode of *dst*
+ itself (rather than the file it points to). This functionality is not
+ available on every platform; please see :func:`copystat` for more
+ information. If :func:`copymode` cannot modify symbolic links on the local
+ platform, and it is asked to do so, it will do nothing and return.
+
+ .. audit-event:: os.copymode src,dst os.copymode
+
+ .. versionadded:: 3.14
+
+
+.. function:: copystat(src, dst, *, follow_symlinks=True)
+
+ Copy the permission bits, last access time, last modification time, and
+ flags from *src* to *dst*. On Linux, :func:`copystat` also copies the
+ "extended attributes" where possible. The file contents, owner, and group
+ are unaffected. *src* and *dst* are
+ :term:`path-like objects ` or path names given as
+ strings.
+
+ If *follow_symlinks* is false, and *src* and *dst* both refer to symbolic
+ links, :func:`copystat` will operate on the symbolic links themselves
+ rather than the files the symbolic links refer to—reading the information
+ from the *src* symbolic link, and writing the information to the *dst*
+ symbolic link.
+
+ .. note::
+
+ Not all platforms provide the ability to examine and modify symbolic
+ links. Python itself can tell you what functionality is locally
+ available.
+
+ * If ``os.chmod in os.supports_follow_symlinks`` is ``True``,
+ :func:`copystat` can modify the permission bits of a symbolic link.
+
+ * If ``os.utime in os.supports_follow_symlinks`` is ``True``,
+ :func:`copystat` can modify the last access and modification times of
+ a symbolic link.
+
+ * If ``os.chflags in os.supports_follow_symlinks`` is ``True``,
+ :func:`copystat` can modify the flags of a symbolic link.
+ (``os.chflags`` is not available on all platforms.)
+
+ On platforms where some or all of this functionality is unavailable,
+ when asked to modify a symbolic link, :func:`copystat` will copy
+ everything it can. :func:`copystat` never returns failure.
+
+ Please see :data:`supports_follow_symlinks` for more information.
+
+ .. audit-event:: os.copystat src,dst os.copystat
+
+ .. versionadded:: 3.14
+
+
+.. function:: copy(src, dst, *, follow_symlinks=True)
+
+ Copies the file *src* to the file *dst*. *src* and *dst* should be
+ :term:`path-like objects ` or strings. If *dst* specifies
+ a file that already exists, it will be replaced.
+
+ When *follow_symlinks* is false, and *src* is a symbolic link, :func:`copy`
+ attempts to copy all metadata from the *src* symbolic link to the newly
+ created *dst* symbolic link. However, this functionality is not available
+ on all platforms. On platforms where some or all of this functionality is
+ unavailable, :func:`copy` will preserve all the metadata it can;
+ :func:`copy` never raises an exception because it cannot preserve file
+ metadata.
+
+ :func:`copy` uses :func:`copystat` to copy the file metadata. Please see
+ :func:`copystat` for more information about platform support for modifying
+ symbolic link metadata.
+
+ .. versionadded:: 3.14
+
+
Timer File Descriptors
~~~~~~~~~~~~~~~~~~~~~~
diff --git a/Doc/library/shutil.rst b/Doc/library/shutil.rst
index fd32479195eca8..e913a7e17214a8 100644
--- a/Doc/library/shutil.rst
+++ b/Doc/library/shutil.rst
@@ -39,35 +39,12 @@ Directory and files operations
.. function:: copyfileobj(fsrc, fdst[, length])
- Copy the contents of the :term:`file-like object ` *fsrc* to the file-like object *fdst*.
- The integer *length*, if given, is the buffer size. In particular, a negative
- *length* value means to copy the data without looping over the source data in
- chunks; by default the data is read in chunks to avoid uncontrolled memory
- consumption. Note that if the current file position of the *fsrc* object is not
- 0, only the contents from the current file position to the end of the file will
- be copied.
+ Alias of :func:`os.copyfileobj`.
.. function:: copyfile(src, dst, *, follow_symlinks=True)
- Copy the contents (no metadata) of the file named *src* to a file named
- *dst* and return *dst* in the most efficient way possible.
- *src* and *dst* are :term:`path-like objects ` or path names given as strings.
-
- *dst* must be the complete target file name; look at :func:`~shutil.copy`
- for a copy that accepts a target directory path. If *src* and *dst*
- specify the same file, :exc:`SameFileError` is raised.
-
- The destination location must be writable; otherwise, an :exc:`OSError`
- exception will be raised. If *dst* already exists, it will be replaced.
- Special files such as character or block devices and pipes cannot be
- copied with this function.
-
- If *follow_symlinks* is false and *src* is a symbolic link,
- a new symbolic link will be created instead of copying the
- file *src* points to.
-
- .. audit-event:: shutil.copyfile src,dst shutil.copyfile
+ Alias of :func:`os.copyfile`.
.. versionchanged:: 3.3
:exc:`IOError` used to be raised instead of :exc:`OSError`.
@@ -80,77 +57,25 @@ Directory and files operations
.. versionchanged:: 3.8
Platform-specific fast-copy syscalls may be used internally in order to
- copy the file more efficiently. See
- :ref:`shutil-platform-dependent-efficient-copy-operations` section.
+ copy the file more efficiently. See :ref:`os-copying-files`.
.. exception:: SameFileError
- This exception is raised if source and destination in :func:`copyfile`
- are the same file.
+ Alias of :exc:`os.SameFileError`.
.. versionadded:: 3.4
.. function:: copymode(src, dst, *, follow_symlinks=True)
- Copy the permission bits from *src* to *dst*. The file contents, owner, and
- group are unaffected. *src* and *dst* are :term:`path-like objects ` or path names
- given as strings.
- If *follow_symlinks* is false, and both *src* and *dst* are symbolic links,
- :func:`copymode` will attempt to modify the mode of *dst* itself (rather
- than the file it points to). This functionality is not available on every
- platform; please see :func:`copystat` for more information. If
- :func:`copymode` cannot modify symbolic links on the local platform, and it
- is asked to do so, it will do nothing and return.
-
- .. audit-event:: shutil.copymode src,dst shutil.copymode
+ Alias of :func:`os.copymode`.
.. versionchanged:: 3.3
Added *follow_symlinks* argument.
.. function:: copystat(src, dst, *, follow_symlinks=True)
- Copy the permission bits, last access time, last modification time, and
- flags from *src* to *dst*. On Linux, :func:`copystat` also copies the
- "extended attributes" where possible. The file contents, owner, and
- group are unaffected. *src* and *dst* are :term:`path-like objects ` or path
- names given as strings.
-
- If *follow_symlinks* is false, and *src* and *dst* both
- refer to symbolic links, :func:`copystat` will operate on
- the symbolic links themselves rather than the files the
- symbolic links refer to—reading the information from the
- *src* symbolic link, and writing the information to the
- *dst* symbolic link.
-
- .. note::
-
- Not all platforms provide the ability to examine and
- modify symbolic links. Python itself can tell you what
- functionality is locally available.
-
- * If ``os.chmod in os.supports_follow_symlinks`` is
- ``True``, :func:`copystat` can modify the permission
- bits of a symbolic link.
-
- * If ``os.utime in os.supports_follow_symlinks`` is
- ``True``, :func:`copystat` can modify the last access
- and modification times of a symbolic link.
-
- * If ``os.chflags in os.supports_follow_symlinks`` is
- ``True``, :func:`copystat` can modify the flags of
- a symbolic link. (``os.chflags`` is not available on
- all platforms.)
-
- On platforms where some or all of this functionality
- is unavailable, when asked to modify a symbolic link,
- :func:`copystat` will copy everything it can.
- :func:`copystat` never returns failure.
-
- Please see :data:`os.supports_follow_symlinks`
- for more information.
-
- .. audit-event:: shutil.copystat src,dst shutil.copystat
+ Alias of :func:`os.copystat`.
.. versionchanged:: 3.3
Added *follow_symlinks* argument and support for Linux extended attributes.
@@ -184,13 +109,13 @@ Directory and files operations
.. versionchanged:: 3.8
Platform-specific fast-copy syscalls may be used internally in order to
- copy the file more efficiently. See
- :ref:`shutil-platform-dependent-efficient-copy-operations` section.
+ copy the file more efficiently. See :ref:`os-copying-files`.
.. function:: copy2(src, dst, *, follow_symlinks=True)
Identical to :func:`~shutil.copy` except that :func:`copy2`
- also attempts to preserve file metadata.
+ also attempts to preserve file metadata. Identical to
+ :func:`os.copy` except that *dst* may be a directory.
When *follow_symlinks* is false, and *src* is a symbolic
link, :func:`copy2` attempts to copy all metadata from the
@@ -216,8 +141,7 @@ Directory and files operations
.. versionchanged:: 3.8
Platform-specific fast-copy syscalls may be used internally in order to
- copy the file more efficiently. See
- :ref:`shutil-platform-dependent-efficient-copy-operations` section.
+ copy the file more efficiently. See :ref:`os-copying-files`.
.. function:: ignore_patterns(*patterns)
@@ -286,8 +210,7 @@ Directory and files operations
.. versionchanged:: 3.8
Platform-specific fast-copy syscalls may be used internally in order to
- copy the file more efficiently. See
- :ref:`shutil-platform-dependent-efficient-copy-operations` section.
+ copy the file more efficiently. See :ref:`os-copying-files`.
.. versionchanged:: 3.8
Added the *dirs_exist_ok* parameter.
@@ -395,8 +318,7 @@ Directory and files operations
.. versionchanged:: 3.8
Platform-specific fast-copy syscalls may be used internally in order to
- copy the file more efficiently. See
- :ref:`shutil-platform-dependent-efficient-copy-operations` section.
+ copy the file more efficiently. See :ref:`os-copying-files`.
.. versionchanged:: 3.9
Accepts a :term:`path-like object` for both *src* and *dst*.
@@ -502,32 +424,6 @@ Directory and files operations
operation. For :func:`copytree`, the exception argument is a list of 3-tuples
(*srcname*, *dstname*, *exception*).
-.. _shutil-platform-dependent-efficient-copy-operations:
-
-Platform-dependent efficient copy operations
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Starting from Python 3.8, all functions involving a file copy
-(:func:`copyfile`, :func:`~shutil.copy`, :func:`copy2`,
-:func:`copytree`, and :func:`move`) may use
-platform-specific "fast-copy" syscalls in order to copy the file more
-efficiently (see :issue:`33671`).
-"fast-copy" means that the copying operation occurs within the kernel, avoiding
-the use of userspace buffers in Python as in "``outfd.write(infd.read())``".
-
-On macOS `fcopyfile`_ is used to copy the file content (not metadata).
-
-On Linux :func:`os.sendfile` is used.
-
-On Windows :func:`shutil.copyfile` uses a bigger default buffer size (1 MiB
-instead of 64 KiB) and a :func:`memoryview`-based variant of
-:func:`shutil.copyfileobj` is used.
-
-If the fast-copy operation fails and no data was written in the destination
-file then shutil will silently fallback on using less efficient
-:func:`copyfileobj` function internally.
-
-.. versionchanged:: 3.8
.. _shutil-copytree-example:
diff --git a/Doc/whatsnew/3.8.rst b/Doc/whatsnew/3.8.rst
index 1356f24547b424..d3d8ab841b6d66 100644
--- a/Doc/whatsnew/3.8.rst
+++ b/Doc/whatsnew/3.8.rst
@@ -1481,7 +1481,7 @@ Optimizations
The speedup for copying a 512 MiB file within the same partition is about
+26% on Linux, +50% on macOS and +40% on Windows. Also, much less CPU cycles
are consumed.
- See :ref:`shutil-platform-dependent-efficient-copy-operations` section.
+ See :ref:`os-copying-files`.
(Contributed by Giampaolo Rodolà in :issue:`33671`.)
* :func:`shutil.copytree` uses :func:`os.scandir` function and all copy
@@ -1921,8 +1921,7 @@ Changes in the Python API
* :func:`shutil.copyfile`, :func:`shutil.copy`, :func:`shutil.copy2`,
:func:`shutil.copytree` and :func:`shutil.move` use platform-specific
- "fast-copy" syscalls (see
- :ref:`shutil-platform-dependent-efficient-copy-operations` section).
+ "fast-copy" syscalls (see :ref:`os-copying-files`).
* :func:`shutil.copyfile` default buffer size on Windows was changed from
16 KiB to 1 MiB.
diff --git a/Lib/os.py b/Lib/os.py
index 7661ce68ca3be2..49ba577e393cfb 100644
--- a/Lib/os.py
+++ b/Lib/os.py
@@ -23,6 +23,7 @@
#'
import abc
+import errno
import sys
import stat as st
@@ -65,6 +66,7 @@ def _get_exports_list(module):
except ImportError:
pass
+ _winapi = None
import posix
__all__.extend(_get_exports_list(posix))
del posix
@@ -80,6 +82,7 @@ def _get_exports_list(module):
pass
import ntpath as path
+ import _winapi
import nt
__all__.extend(_get_exports_list(nt))
del nt
@@ -1152,3 +1155,383 @@ def process_cpu_count():
else:
# Just an alias to cpu_count() (same docstring)
process_cpu_count = cpu_count
+
+
+# This should never be removed, see rationale in:
+# https://bugs.python.org/issue43743#msg393429
+_USE_CP_SENDFILE = (_exists("sendfile")
+ and sys.platform.startswith(("linux", "android")))
+_HAS_FCOPYFILE = _exists("_fcopyfile") # macOS
+COPY_BUFSIZE = 1024 * 1024 if name == 'nt' else 64 * 1024
+
+class CopyError(OSError):
+ pass
+
+class SameFileError(CopyError):
+ """Raised when source and destination are the same file."""
+
+class SpecialFileError(OSError):
+ """Raised when trying to do a kind of operation (e.g. copying) which is
+ not supported on a special file (e.g. a named pipe)"""
+
+class _GiveupOnFastCopy(Exception):
+ """Raised as a signal to fallback on using raw read()/write()
+ file copy when fast-copy functions fail to do so.
+ """
+
+def _fastcopy_fcopyfile(fsrc, fdst, flags):
+ """Copy a regular file content or metadata by using high-performance
+ fcopyfile(3) syscall (macOS).
+ """
+ try:
+ infd = fsrc.fileno()
+ outfd = fdst.fileno()
+ except Exception as err:
+ raise _GiveupOnFastCopy(err) # not a regular file
+
+ try:
+ _fcopyfile(infd, outfd, flags)
+ except OSError as err:
+ err.filename = fsrc.name
+ err.filename2 = fdst.name
+ if err.errno in {errno.EINVAL, errno.ENOTSUP}:
+ raise _GiveupOnFastCopy(err)
+ else:
+ raise err from None
+
+def _fastcopy_sendfile(fsrc, fdst):
+ """Copy data from one regular mmap-like fd to another by using
+ high-performance sendfile(2) syscall.
+ This should work on Linux >= 2.6.33 only.
+ """
+ # Note: copyfileobj() is left alone in order to not introduce any
+ # unexpected breakage. Possible risks by using zero-copy calls
+ # in copyfileobj() are:
+ # - fdst cannot be open in "a"(ppend) mode
+ # - fsrc and fdst may be open in "t"(ext) mode
+ # - fsrc may be a BufferedReader (which hides unread data in a buffer),
+ # GzipFile (which decompresses data), HTTPResponse (which decodes
+ # chunks).
+ # - possibly others (e.g. encrypted fs/partition?)
+ global _USE_CP_SENDFILE
+ try:
+ infd = fsrc.fileno()
+ outfd = fdst.fileno()
+ except Exception as err:
+ raise _GiveupOnFastCopy(err) # not a regular file
+
+ # Hopefully the whole file will be copied in a single call.
+ # sendfile() is called in a loop 'till EOF is reached (0 return)
+ # so a bufsize smaller or bigger than the actual file size
+ # should not make any difference, also in case the file content
+ # changes while being copied.
+ try:
+ blocksize = max(fstat(infd).st_size, 2 ** 23) # min 8MiB
+ except OSError:
+ blocksize = 2 ** 27 # 128MiB
+ # On 32-bit architectures truncate to 1GiB to avoid OverflowError,
+ # see bpo-38319.
+ if sys.maxsize < 2 ** 32:
+ blocksize = min(blocksize, 2 ** 30)
+
+ offset = 0
+ while True:
+ try:
+ sent = sendfile(outfd, infd, offset, blocksize)
+ except OSError as err:
+ # ...in oder to have a more informative exception.
+ err.filename = fsrc.name
+ err.filename2 = fdst.name
+
+ if err.errno == errno.ENOTSOCK:
+ # sendfile() on this platform (probably Linux < 2.6.33)
+ # does not support copies between regular files (only
+ # sockets).
+ _USE_CP_SENDFILE = False
+ raise _GiveupOnFastCopy(err)
+
+ if err.errno == errno.ENOSPC: # filesystem is full
+ raise err from None
+
+ # Give up on first call and if no data was copied.
+ if offset == 0 and lseek(outfd, 0, SEEK_CUR) == 0:
+ raise _GiveupOnFastCopy(err)
+
+ raise err
+ else:
+ if sent == 0:
+ break # EOF
+ offset += sent
+
+def _fastcopy_readinto(fsrc, fdst, length=COPY_BUFSIZE):
+ """readinto()/memoryview() based variant of copyfileobj().
+ *fsrc* must support readinto() method and both files must be
+ open in binary mode.
+ """
+ # Localize variable access to minimize overhead.
+ fsrc_readinto = fsrc.readinto
+ fdst_write = fdst.write
+ with memoryview(bytearray(length)) as mv:
+ while True:
+ n = fsrc_readinto(mv)
+ if not n:
+ break
+ elif n < length:
+ with mv[:n] as smv:
+ fdst_write(smv)
+ break
+ else:
+ fdst_write(mv)
+
+def _fastcopy(fsrc, fdst, file_size):
+ # macOS
+ if _HAS_FCOPYFILE:
+ try:
+ _fastcopy_fcopyfile(fsrc, fdst, _COPYFILE_DATA)
+ return
+ except _GiveupOnFastCopy:
+ pass
+ # Linux
+ elif _USE_CP_SENDFILE:
+ try:
+ _fastcopy_sendfile(fsrc, fdst)
+ return
+ except _GiveupOnFastCopy:
+ pass
+ # Windows, see:
+ # https://github.com/python/cpython/pull/7160#discussion_r195405230
+ elif name == 'nt' and file_size > 0:
+ _fastcopy_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
+ return
+
+ copyfileobj(fsrc, fdst)
+
+def copyfileobj(fsrc, fdst, length=0):
+ """copy data from file-like object fsrc to file-like object fdst"""
+ if not length:
+ length = COPY_BUFSIZE
+ # Localize variable access to minimize overhead.
+ fsrc_read = fsrc.read
+ fdst_write = fdst.write
+ while buf := fsrc_read(length):
+ fdst_write(buf)
+
+def _samefile(src, dst):
+ # Macintosh, Unix.
+ if isinstance(src, DirEntry):
+ try:
+ return path.samestat(src.stat(), stat(dst))
+ except OSError:
+ return False
+
+ try:
+ return path.samefile(src, dst)
+ except OSError:
+ return False
+
+def _stat(fn):
+ return fn.stat() if isinstance(fn, DirEntry) else stat(fn)
+
+def _islink(fn):
+ return fn.is_symlink() if isinstance(fn, DirEntry) else path.islink(fn)
+
+def copyfile(src, dst, *, follow_symlinks=True):
+ """Copy data from src to dst in the most efficient way possible.
+
+ If follow_symlinks is not set and src is a symbolic link, a new
+ symlink will be created instead of copying the file it points to.
+
+ """
+ sys.audit("os.copyfile", src, dst)
+
+ if _samefile(src, dst):
+ raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
+
+ file_size = 0
+ for i, fn in enumerate([src, dst]):
+ try:
+ stat_result = _stat(fn)
+ except OSError:
+ # File most likely does not exist
+ pass
+ else:
+ # XXX What about other special files? (sockets, devices...)
+ if st.S_ISFIFO(stat_result.st_mode):
+ fn = fn.path if isinstance(fn, DirEntry) else fn
+ raise SpecialFileError("`%s` is a named pipe" % fn)
+ if name == 'nt' and i == 0:
+ file_size = stat_result.st_size
+
+ if not follow_symlinks and _islink(src):
+ symlink(readlink(src), dst)
+ else:
+ import io
+ with io.open(src, 'rb') as fsrc:
+ try:
+ with io.open(dst, 'wb') as fdst:
+ _fastcopy(fsrc, fdst, file_size)
+
+ # Issue 43219, raise a less confusing exception
+ except IsADirectoryError as e:
+ if not path.exists(dst):
+ raise FileNotFoundError(f'Directory does not exist: {dst}') from e
+ else:
+ raise
+
+ return dst
+
+def copymode(src, dst, *, follow_symlinks=True):
+ """Copy mode bits from src to dst.
+
+ If follow_symlinks is not set, symlinks aren't followed if and only
+ if both `src` and `dst` are symlinks. If `lchmod` isn't available
+ (e.g. Linux) this method does nothing.
+
+ """
+ sys.audit("os.copymode", src, dst)
+
+ if not follow_symlinks and _islink(src) and path.islink(dst):
+ if _exists('lchmod'):
+ stat_func, chmod_func = lstat, lchmod
+ else:
+ return
+ else:
+ stat_func = _stat
+ if name == 'nt' and path.islink(dst):
+ def chmod_func(*args):
+ chmod(*args, follow_symlinks=True)
+ else:
+ chmod_func = chmod
+
+ stat_result = stat_func(src)
+ chmod_func(dst, st.S_IMODE(stat_result.st_mode))
+
+if _exists('listxattr'):
+ def _copyxattr(src, dst, *, follow_symlinks=True):
+ """Copy extended filesystem attributes from `src` to `dst`.
+
+ Overwrite existing attributes.
+
+ If `follow_symlinks` is false, symlinks won't be followed.
+
+ """
+
+ try:
+ names = listxattr(src, follow_symlinks=follow_symlinks)
+ except OSError as e:
+ if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
+ raise
+ return
+ for name in names:
+ try:
+ value = getxattr(src, name, follow_symlinks=follow_symlinks)
+ setxattr(dst, name, value, follow_symlinks=follow_symlinks)
+ except OSError as e:
+ if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
+ errno.EINVAL, errno.EACCES):
+ raise
+else:
+ def _copyxattr(*args, **kwargs):
+ pass
+
+def copystat(src, dst, *, follow_symlinks=True):
+ """Copy file metadata
+
+ Copy the permission bits, last access time, last modification time, and
+ flags from `src` to `dst`. On Linux, copystat() also copies the "extended
+ attributes" where possible. The file contents, owner, and group are
+ unaffected. `src` and `dst` are path-like objects or path names given as
+ strings.
+
+ If the optional flag `follow_symlinks` is not set, symlinks aren't
+ followed if and only if both `src` and `dst` are symlinks.
+ """
+ sys.audit("os.copystat", src, dst)
+
+ def _nop(*args, ns=None, follow_symlinks=None):
+ pass
+
+ # follow symlinks (aka don't not follow symlinks)
+ follow = follow_symlinks or not (_islink(src) and path.islink(dst))
+ if follow:
+ # use the real function if it exists
+ def lookup(name):
+ return globals().get(name, _nop)
+ else:
+ # use the real function only if it exists
+ # *and* it supports follow_symlinks
+ def lookup(name):
+ fn = globals().get(name, _nop)
+ if fn in supports_follow_symlinks:
+ return fn
+ return _nop
+
+ if isinstance(src, DirEntry):
+ stat_result = src.stat(follow_symlinks=follow)
+ else:
+ stat_result = lookup("stat")(src, follow_symlinks=follow)
+ mode = st.S_IMODE(stat_result.st_mode)
+ lookup("utime")(dst, ns=(stat_result.st_atime_ns, stat_result.st_mtime_ns),
+ follow_symlinks=follow)
+ # We must copy extended attributes before the file is (potentially)
+ # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
+ _copyxattr(src, dst, follow_symlinks=follow)
+ try:
+ lookup("chmod")(dst, mode, follow_symlinks=follow)
+ except NotImplementedError:
+ # if we got a NotImplementedError, it's because
+ # * follow_symlinks=False,
+ # * lchown() is unavailable, and
+ # * either
+ # * fchownat() is unavailable or
+ # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
+ # (it returned ENOSUP.)
+ # therefore we're out of options--we simply cannot chown the
+ # symlink. give up, suppress the error.
+ # (which is what shutil always did in this circumstance.)
+ pass
+ if hasattr(st, 'st_flags'):
+ try:
+ lookup("chflags")(dst, stat_result.st_flags, follow_symlinks=follow)
+ except OSError as why:
+ for err in 'EOPNOTSUPP', 'ENOTSUP':
+ if hasattr(errno, err) and why.errno == getattr(errno, err):
+ break
+ else:
+ raise
+
+def copy(src, dst, *, follow_symlinks=True):
+ """Copy data and metadata.
+
+ Metadata is copied with copystat(). Please see the copystat function
+ for more information.
+
+ If follow_symlinks is false, symlinks won't be followed. This
+ resembles GNU's "cp -P src dst".
+ """
+ if hasattr(_winapi, "CopyFile2"):
+ src_ = fsdecode(src)
+ dst_ = fsdecode(dst)
+ flags = _winapi.COPY_FILE_ALLOW_DECRYPTED_DESTINATION # for compat
+ if not follow_symlinks:
+ flags |= _winapi.COPY_FILE_COPY_SYMLINK
+ try:
+ _winapi.CopyFile2(src_, dst_, flags)
+ return
+ except OSError as exc:
+ if (exc.winerror == _winapi.ERROR_PRIVILEGE_NOT_HELD
+ and not follow_symlinks):
+ # Likely encountered a symlink we aren't allowed to create.
+ # Fall back on the old code
+ pass
+ elif exc.winerror == _winapi.ERROR_ACCESS_DENIED:
+ # Possibly encountered a hidden or readonly file we can't
+ # overwrite. Fall back on old code
+ pass
+ else:
+ raise
+
+ copyfile(src, dst, follow_symlinks=follow_symlinks)
+ copystat(src, dst, follow_symlinks=follow_symlinks)
+
+__all__.extend(["copyfileobj", "copyfile", "copymode", "copystat", "copy"])
diff --git a/Lib/shutil.py b/Lib/shutil.py
index c9b4da34b1e19b..b04cb7ebbcbb2d 100644
--- a/Lib/shutil.py
+++ b/Lib/shutil.py
@@ -33,10 +33,8 @@
_LZMA_SUPPORTED = False
_WINDOWS = os.name == 'nt'
-posix = nt = None
-if os.name == 'posix':
- import posix
-elif _WINDOWS:
+nt = None
+if _WINDOWS:
import nt
if sys.platform == 'win32':
@@ -44,12 +42,14 @@
else:
_winapi = None
-COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
-# This should never be removed, see rationale in:
-# https://bugs.python.org/issue43743#msg393429
-_USE_CP_SENDFILE = (hasattr(os, "sendfile")
- and sys.platform.startswith(("linux", "android")))
-_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS
+COPY_BUFSIZE = os.COPY_BUFSIZE
+Error = os.CopyError
+SameFileError = os.SameFileError
+SpecialFileError = os.SpecialFileError
+copyfileobj = os.copyfileobj
+copyfile = os.copyfile
+copymode = os.copymode
+copystat = os.copystat
# CMD defaults in Windows 10
_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
@@ -64,16 +64,6 @@
"SameFileError"]
# disk_usage is added later, if available on the platform
-class Error(OSError):
- pass
-
-class SameFileError(Error):
- """Raised when source and destination are the same file."""
-
-class SpecialFileError(OSError):
- """Raised when trying to do a kind of operation (e.g. copying) which is
- not supported on a special file (e.g. a named pipe)"""
-
class ExecError(OSError):
"""Raised when a command could not be executed"""
@@ -84,333 +74,6 @@ class RegistryError(Exception):
"""Raised when a registry operation with the archiving
and unpacking registries fails"""
-class _GiveupOnFastCopy(Exception):
- """Raised as a signal to fallback on using raw read()/write()
- file copy when fast-copy functions fail to do so.
- """
-
-def _fastcopy_fcopyfile(fsrc, fdst, flags):
- """Copy a regular file content or metadata by using high-performance
- fcopyfile(3) syscall (macOS).
- """
- try:
- infd = fsrc.fileno()
- outfd = fdst.fileno()
- except Exception as err:
- raise _GiveupOnFastCopy(err) # not a regular file
-
- try:
- posix._fcopyfile(infd, outfd, flags)
- except OSError as err:
- err.filename = fsrc.name
- err.filename2 = fdst.name
- if err.errno in {errno.EINVAL, errno.ENOTSUP}:
- raise _GiveupOnFastCopy(err)
- else:
- raise err from None
-
-def _fastcopy_sendfile(fsrc, fdst):
- """Copy data from one regular mmap-like fd to another by using
- high-performance sendfile(2) syscall.
- This should work on Linux >= 2.6.33 only.
- """
- # Note: copyfileobj() is left alone in order to not introduce any
- # unexpected breakage. Possible risks by using zero-copy calls
- # in copyfileobj() are:
- # - fdst cannot be open in "a"(ppend) mode
- # - fsrc and fdst may be open in "t"(ext) mode
- # - fsrc may be a BufferedReader (which hides unread data in a buffer),
- # GzipFile (which decompresses data), HTTPResponse (which decodes
- # chunks).
- # - possibly others (e.g. encrypted fs/partition?)
- global _USE_CP_SENDFILE
- try:
- infd = fsrc.fileno()
- outfd = fdst.fileno()
- except Exception as err:
- raise _GiveupOnFastCopy(err) # not a regular file
-
- # Hopefully the whole file will be copied in a single call.
- # sendfile() is called in a loop 'till EOF is reached (0 return)
- # so a bufsize smaller or bigger than the actual file size
- # should not make any difference, also in case the file content
- # changes while being copied.
- try:
- blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB
- except OSError:
- blocksize = 2 ** 27 # 128MiB
- # On 32-bit architectures truncate to 1GiB to avoid OverflowError,
- # see bpo-38319.
- if sys.maxsize < 2 ** 32:
- blocksize = min(blocksize, 2 ** 30)
-
- offset = 0
- while True:
- try:
- sent = os.sendfile(outfd, infd, offset, blocksize)
- except OSError as err:
- # ...in oder to have a more informative exception.
- err.filename = fsrc.name
- err.filename2 = fdst.name
-
- if err.errno == errno.ENOTSOCK:
- # sendfile() on this platform (probably Linux < 2.6.33)
- # does not support copies between regular files (only
- # sockets).
- _USE_CP_SENDFILE = False
- raise _GiveupOnFastCopy(err)
-
- if err.errno == errno.ENOSPC: # filesystem is full
- raise err from None
-
- # Give up on first call and if no data was copied.
- if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
- raise _GiveupOnFastCopy(err)
-
- raise err
- else:
- if sent == 0:
- break # EOF
- offset += sent
-
-def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
- """readinto()/memoryview() based variant of copyfileobj().
- *fsrc* must support readinto() method and both files must be
- open in binary mode.
- """
- # Localize variable access to minimize overhead.
- fsrc_readinto = fsrc.readinto
- fdst_write = fdst.write
- with memoryview(bytearray(length)) as mv:
- while True:
- n = fsrc_readinto(mv)
- if not n:
- break
- elif n < length:
- with mv[:n] as smv:
- fdst_write(smv)
- break
- else:
- fdst_write(mv)
-
-def copyfileobj(fsrc, fdst, length=0):
- """copy data from file-like object fsrc to file-like object fdst"""
- if not length:
- length = COPY_BUFSIZE
- # Localize variable access to minimize overhead.
- fsrc_read = fsrc.read
- fdst_write = fdst.write
- while buf := fsrc_read(length):
- fdst_write(buf)
-
-def _samefile(src, dst):
- # Macintosh, Unix.
- if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
- try:
- return os.path.samestat(src.stat(), os.stat(dst))
- except OSError:
- return False
-
- if hasattr(os.path, 'samefile'):
- try:
- return os.path.samefile(src, dst)
- except OSError:
- return False
-
- # All other platforms: check for same pathname.
- return (os.path.normcase(os.path.abspath(src)) ==
- os.path.normcase(os.path.abspath(dst)))
-
-def _stat(fn):
- return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn)
-
-def _islink(fn):
- return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn)
-
-def copyfile(src, dst, *, follow_symlinks=True):
- """Copy data from src to dst in the most efficient way possible.
-
- If follow_symlinks is not set and src is a symbolic link, a new
- symlink will be created instead of copying the file it points to.
-
- """
- sys.audit("shutil.copyfile", src, dst)
-
- if _samefile(src, dst):
- raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
-
- file_size = 0
- for i, fn in enumerate([src, dst]):
- try:
- st = _stat(fn)
- except OSError:
- # File most likely does not exist
- pass
- else:
- # XXX What about other special files? (sockets, devices...)
- if stat.S_ISFIFO(st.st_mode):
- fn = fn.path if isinstance(fn, os.DirEntry) else fn
- raise SpecialFileError("`%s` is a named pipe" % fn)
- if _WINDOWS and i == 0:
- file_size = st.st_size
-
- if not follow_symlinks and _islink(src):
- os.symlink(os.readlink(src), dst)
- else:
- with open(src, 'rb') as fsrc:
- try:
- with open(dst, 'wb') as fdst:
- # macOS
- if _HAS_FCOPYFILE:
- try:
- _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)
- return dst
- except _GiveupOnFastCopy:
- pass
- # Linux
- elif _USE_CP_SENDFILE:
- try:
- _fastcopy_sendfile(fsrc, fdst)
- return dst
- except _GiveupOnFastCopy:
- pass
- # Windows, see:
- # https://github.com/python/cpython/pull/7160#discussion_r195405230
- elif _WINDOWS and file_size > 0:
- _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
- return dst
-
- copyfileobj(fsrc, fdst)
-
- # Issue 43219, raise a less confusing exception
- except IsADirectoryError as e:
- if not os.path.exists(dst):
- raise FileNotFoundError(f'Directory does not exist: {dst}') from e
- else:
- raise
-
- return dst
-
-def copymode(src, dst, *, follow_symlinks=True):
- """Copy mode bits from src to dst.
-
- If follow_symlinks is not set, symlinks aren't followed if and only
- if both `src` and `dst` are symlinks. If `lchmod` isn't available
- (e.g. Linux) this method does nothing.
-
- """
- sys.audit("shutil.copymode", src, dst)
-
- if not follow_symlinks and _islink(src) and os.path.islink(dst):
- if hasattr(os, 'lchmod'):
- stat_func, chmod_func = os.lstat, os.lchmod
- else:
- return
- else:
- stat_func = _stat
- if os.name == 'nt' and os.path.islink(dst):
- def chmod_func(*args):
- os.chmod(*args, follow_symlinks=True)
- else:
- chmod_func = os.chmod
-
- st = stat_func(src)
- chmod_func(dst, stat.S_IMODE(st.st_mode))
-
-if hasattr(os, 'listxattr'):
- def _copyxattr(src, dst, *, follow_symlinks=True):
- """Copy extended filesystem attributes from `src` to `dst`.
-
- Overwrite existing attributes.
-
- If `follow_symlinks` is false, symlinks won't be followed.
-
- """
-
- try:
- names = os.listxattr(src, follow_symlinks=follow_symlinks)
- except OSError as e:
- if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
- raise
- return
- for name in names:
- try:
- value = os.getxattr(src, name, follow_symlinks=follow_symlinks)
- os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
- except OSError as e:
- if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
- errno.EINVAL, errno.EACCES):
- raise
-else:
- def _copyxattr(*args, **kwargs):
- pass
-
-def copystat(src, dst, *, follow_symlinks=True):
- """Copy file metadata
-
- Copy the permission bits, last access time, last modification time, and
- flags from `src` to `dst`. On Linux, copystat() also copies the "extended
- attributes" where possible. The file contents, owner, and group are
- unaffected. `src` and `dst` are path-like objects or path names given as
- strings.
-
- If the optional flag `follow_symlinks` is not set, symlinks aren't
- followed if and only if both `src` and `dst` are symlinks.
- """
- sys.audit("shutil.copystat", src, dst)
-
- def _nop(*args, ns=None, follow_symlinks=None):
- pass
-
- # follow symlinks (aka don't not follow symlinks)
- follow = follow_symlinks or not (_islink(src) and os.path.islink(dst))
- if follow:
- # use the real function if it exists
- def lookup(name):
- return getattr(os, name, _nop)
- else:
- # use the real function only if it exists
- # *and* it supports follow_symlinks
- def lookup(name):
- fn = getattr(os, name, _nop)
- if fn in os.supports_follow_symlinks:
- return fn
- return _nop
-
- if isinstance(src, os.DirEntry):
- st = src.stat(follow_symlinks=follow)
- else:
- st = lookup("stat")(src, follow_symlinks=follow)
- mode = stat.S_IMODE(st.st_mode)
- lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns),
- follow_symlinks=follow)
- # We must copy extended attributes before the file is (potentially)
- # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
- _copyxattr(src, dst, follow_symlinks=follow)
- try:
- lookup("chmod")(dst, mode, follow_symlinks=follow)
- except NotImplementedError:
- # if we got a NotImplementedError, it's because
- # * follow_symlinks=False,
- # * lchown() is unavailable, and
- # * either
- # * fchownat() is unavailable or
- # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
- # (it returned ENOSUP.)
- # therefore we're out of options--we simply cannot chown the
- # symlink. give up, suppress the error.
- # (which is what shutil always did in this circumstance.)
- pass
- if hasattr(st, 'st_flags'):
- try:
- lookup("chflags")(dst, st.st_flags, follow_symlinks=follow)
- except OSError as why:
- for err in 'EOPNOTSUPP', 'ENOTSUP':
- if hasattr(errno, err) and why.errno == getattr(errno, err):
- break
- else:
- raise
-
def copy(src, dst, *, follow_symlinks=True):
"""Copy data and mode bits ("cp src dst"). Return the file's destination.
@@ -425,8 +88,8 @@ def copy(src, dst, *, follow_symlinks=True):
"""
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
- copyfile(src, dst, follow_symlinks=follow_symlinks)
- copymode(src, dst, follow_symlinks=follow_symlinks)
+ os.copyfile(src, dst, follow_symlinks=follow_symlinks)
+ os.copymode(src, dst, follow_symlinks=follow_symlinks)
return dst
def copy2(src, dst, *, follow_symlinks=True):
@@ -442,31 +105,7 @@ def copy2(src, dst, *, follow_symlinks=True):
"""
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
-
- if hasattr(_winapi, "CopyFile2"):
- src_ = os.fsdecode(src)
- dst_ = os.fsdecode(dst)
- flags = _winapi.COPY_FILE_ALLOW_DECRYPTED_DESTINATION # for compat
- if not follow_symlinks:
- flags |= _winapi.COPY_FILE_COPY_SYMLINK
- try:
- _winapi.CopyFile2(src_, dst_, flags)
- return dst
- except OSError as exc:
- if (exc.winerror == _winapi.ERROR_PRIVILEGE_NOT_HELD
- and not follow_symlinks):
- # Likely encountered a symlink we aren't allowed to create.
- # Fall back on the old code
- pass
- elif exc.winerror == _winapi.ERROR_ACCESS_DENIED:
- # Possibly encountered a hidden or readonly file we can't
- # overwrite. Fall back on old code
- pass
- else:
- raise
-
- copyfile(src, dst, follow_symlinks=follow_symlinks)
- copystat(src, dst, follow_symlinks=follow_symlinks)
+ os.copy(src, dst, follow_symlinks=follow_symlinks)
return dst
def ignore_patterns(*patterns):
@@ -886,7 +525,7 @@ def move(src, dst, copy_function=copy2):
sys.audit("shutil.move", src, dst)
real_dst = dst
if os.path.isdir(dst):
- if _samefile(src, dst) and not os.path.islink(src):
+ if os._samefile(src, dst) and not os.path.islink(src):
# We might be on a case insensitive filesystem,
# perform the rename anyway.
os.rename(src, dst)
@@ -933,7 +572,7 @@ def _destinsrc(src, dst):
return dst.startswith(src)
def _is_immutable(src):
- st = _stat(src)
+ st = os._stat(src)
immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE]
return hasattr(st, 'st_flags') and st.st_flags in immutable_states
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 941fa2b2c5c87f..a5192c013839e7 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -10,16 +10,19 @@
import fnmatch
import fractions
import itertools
+import io
import locale
import os
import pickle
import platform
+import random
import select
import selectors
import shutil
import signal
import socket
import stat
+import string
import struct
import subprocess
import sys
@@ -29,6 +32,7 @@
import time
import types
import unittest
+import unittest.mock
import uuid
import warnings
from test import support
@@ -70,7 +74,7 @@
from test.support.script_helper import assert_python_ok
from test.support import unix_shell
-from test.support.os_helper import FakePath
+from test.support.os_helper import FakePath, TESTFN
root_in_posix = False
@@ -5411,6 +5415,946 @@ class TestPEP519PurePython(TestPEP519):
fspath = staticmethod(os._fspath)
+def write_file(path, content, binary=False):
+ """Write *content* to a file located at *path*.
+
+ If *path* is a tuple instead of a string, os.path.join will be used to
+ make a path. If *binary* is true, the file will be opened in binary
+ mode.
+ """
+ if isinstance(path, tuple):
+ path = os.path.join(*path)
+ mode = 'wb' if binary else 'w'
+ encoding = None if binary else "utf-8"
+ with open(path, mode, encoding=encoding) as fp:
+ fp.write(content)
+
+def write_test_file(path, size):
+ """Create a test file with an arbitrary size and random text content."""
+ def chunks(total, step):
+ assert total >= step
+ while total > step:
+ yield step
+ total -= step
+ if total:
+ yield total
+
+ bufsize = min(size, 8192)
+ chunk = b"".join([random.choice(string.ascii_letters).encode()
+ for i in range(bufsize)])
+ with open(path, 'wb') as f:
+ for csize in chunks(size, bufsize):
+ f.write(chunk)
+ assert os.path.getsize(path) == size
+
+def read_file(path, binary=False):
+ """Return contents from a file located at *path*.
+
+ If *path* is a tuple instead of a string, os.path.join will be used to
+ make a path. If *binary* is true, the file will be opened in binary
+ mode.
+ """
+ if isinstance(path, tuple):
+ path = os.path.join(*path)
+ mode = 'rb' if binary else 'r'
+ encoding = None if binary else "utf-8"
+ with open(path, mode, encoding=encoding) as fp:
+ return fp.read()
+
+def supports_file2file_sendfile():
+ # ...apparently Linux and Solaris are the only ones
+ if not hasattr(os, "sendfile"):
+ return False
+ srcname = None
+ dstname = None
+ try:
+ with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
+ srcname = f.name
+ f.write(b"0123456789")
+
+ with open(srcname, "rb") as src:
+ with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
+ dstname = dst.name
+ infd = src.fileno()
+ outfd = dst.fileno()
+ try:
+ os.sendfile(outfd, infd, 0, 2)
+ except OSError:
+ return False
+ else:
+ return True
+ finally:
+ if srcname is not None:
+ os_helper.unlink(srcname)
+ if dstname is not None:
+ os_helper.unlink(dstname)
+
+SUPPORTS_SENDFILE = supports_file2file_sendfile()
+TESTFN2 = TESTFN + "2"
+MACOS = sys.platform.startswith("darwin")
+SOLARIS = sys.platform.startswith("sunos")
+AIX = sys.platform[:3] == 'aix'
+
+
+class _CopyTest:
+ def mkdtemp(self, prefix=None):
+ """Create a temporary directory that will be cleaned up.
+
+ Returns the path of the directory.
+ """
+ d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
+ self.addCleanup(os_helper.rmtree, d)
+ return d
+
+
+class TestCopyMode(_CopyTest, unittest.TestCase):
+ @os_helper.skip_unless_symlink
+ def test_copymode_follow_symlinks(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ dst_link = os.path.join(tmp_dir, 'quux')
+ write_file(src, 'foo')
+ write_file(dst, 'foo')
+ os.symlink(src, src_link)
+ os.symlink(dst, dst_link)
+ os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
+ # file to file
+ os.chmod(dst, stat.S_IRWXO)
+ self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ os.copymode(src, dst)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ # On Windows, os.chmod does not follow symlinks (issue #15411)
+ # follow src link
+ os.chmod(dst, stat.S_IRWXO)
+ os.copymode(src_link, dst)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ # follow dst link
+ os.chmod(dst, stat.S_IRWXO)
+ os.copymode(src, dst_link)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ # follow both links
+ os.chmod(dst, stat.S_IRWXO)
+ os.copymode(src_link, dst_link)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+
+ @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
+ @os_helper.skip_unless_symlink
+ def test_copymode_symlink_to_symlink(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ dst_link = os.path.join(tmp_dir, 'quux')
+ write_file(src, 'foo')
+ write_file(dst, 'foo')
+ os.symlink(src, src_link)
+ os.symlink(dst, dst_link)
+ os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
+ os.chmod(dst, stat.S_IRWXU)
+ os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
+ # link to link
+ os.lchmod(dst_link, stat.S_IRWXO)
+ old_mode = os.stat(dst).st_mode
+ os.copymode(src_link, dst_link, follow_symlinks=False)
+ self.assertEqual(os.lstat(src_link).st_mode,
+ os.lstat(dst_link).st_mode)
+ self.assertEqual(os.stat(dst).st_mode, old_mode)
+ # src link - use chmod
+ os.lchmod(dst_link, stat.S_IRWXO)
+ os.copymode(src_link, dst, follow_symlinks=False)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ # dst link - use chmod
+ os.lchmod(dst_link, stat.S_IRWXO)
+ os.copymode(src, dst_link, follow_symlinks=False)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+
+ @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
+ @os_helper.skip_unless_symlink
+ def test_copymode_symlink_to_symlink_wo_lchmod(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ dst_link = os.path.join(tmp_dir, 'quux')
+ write_file(src, 'foo')
+ write_file(dst, 'foo')
+ os.symlink(src, src_link)
+ os.symlink(dst, dst_link)
+ os.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
+
+class TestCopyStat(_CopyTest, unittest.TestCase):
+
+ @os_helper.skip_unless_symlink
+ def test_copystat_symlinks(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ dst_link = os.path.join(tmp_dir, 'qux')
+ write_file(src, 'foo')
+ src_stat = os.stat(src)
+ os.utime(src, (src_stat.st_atime,
+ src_stat.st_mtime - 42.0)) # ensure different mtimes
+ write_file(dst, 'bar')
+ self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
+ os.symlink(src, src_link)
+ os.symlink(dst, dst_link)
+ if hasattr(os, 'lchmod'):
+ os.lchmod(src_link, stat.S_IRWXO)
+ if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
+ os.lchflags(src_link, stat.UF_NODUMP)
+ src_link_stat = os.lstat(src_link)
+ # follow
+ if hasattr(os, 'lchmod'):
+ os.copystat(src_link, dst_link, follow_symlinks=True)
+ self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
+ # don't follow
+ os.copystat(src_link, dst_link, follow_symlinks=False)
+ dst_link_stat = os.lstat(dst_link)
+ if os.utime in os.supports_follow_symlinks:
+ for attr in 'st_atime', 'st_mtime':
+ # The modification times may be truncated in the new file.
+ self.assertLessEqual(getattr(src_link_stat, attr),
+ getattr(dst_link_stat, attr) + 1)
+ if hasattr(os, 'lchmod'):
+ self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
+ if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
+ self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
+ # tell to follow but dst is not a link
+ os.copystat(src_link, dst, follow_symlinks=False)
+ self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
+ 00000.1)
+
+ @unittest.skipUnless(hasattr(os, 'chflags') and
+ hasattr(errno, 'EOPNOTSUPP') and
+ hasattr(errno, 'ENOTSUP'),
+ "requires os.chflags, EOPNOTSUPP & ENOTSUP")
+ def test_copystat_handles_harmless_chflags_errors(self):
+ tmpdir = self.mkdtemp()
+ file1 = os.path.join(tmpdir, 'file1')
+ file2 = os.path.join(tmpdir, 'file2')
+ write_file(file1, 'xxx')
+ write_file(file2, 'xxx')
+
+ def make_chflags_raiser(err):
+ ex = OSError()
+
+ def _chflags_raiser(path, flags, *, follow_symlinks=True):
+ ex.errno = err
+ raise ex
+ return _chflags_raiser
+ old_chflags = os.chflags
+ try:
+ for err in errno.EOPNOTSUPP, errno.ENOTSUP:
+ os.chflags = make_chflags_raiser(err)
+ os.copystat(file1, file2)
+ # assert others errors break it
+ os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
+ self.assertRaises(OSError, os.copystat, file1, file2)
+ finally:
+ os.chflags = old_chflags
+
+class TestCopyXAttr(_CopyTest, unittest.TestCase):
+
+ @os_helper.skip_unless_xattr
+ def test_copyxattr(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ write_file(src, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ write_file(dst, 'bar')
+
+ # no xattr == no problem
+ os._copyxattr(src, dst)
+ # common case
+ os.setxattr(src, 'user.foo', b'42')
+ os.setxattr(src, 'user.bar', b'43')
+ os._copyxattr(src, dst)
+ self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
+ self.assertEqual(
+ os.getxattr(src, 'user.foo'),
+ os.getxattr(dst, 'user.foo'))
+ # check errors don't affect other attrs
+ os.remove(dst)
+ write_file(dst, 'bar')
+ os_error = OSError(errno.EPERM, 'EPERM')
+
+ def _raise_on_user_foo(fname, attr, val, **kwargs):
+ if attr == 'user.foo':
+ raise os_error
+ else:
+ orig_setxattr(fname, attr, val, **kwargs)
+ try:
+ orig_setxattr = os.setxattr
+ os.setxattr = _raise_on_user_foo
+ os._copyxattr(src, dst)
+ self.assertIn('user.bar', os.listxattr(dst))
+ finally:
+ os.setxattr = orig_setxattr
+ # the source filesystem not supporting xattrs should be ok, too.
+ def _raise_on_src(fname, *, follow_symlinks=True):
+ if fname == src:
+ raise OSError(errno.ENOTSUP, 'Operation not supported')
+ return orig_listxattr(fname, follow_symlinks=follow_symlinks)
+ try:
+ orig_listxattr = os.listxattr
+ os.listxattr = _raise_on_src
+ os._copyxattr(src, dst)
+ finally:
+ os.listxattr = orig_listxattr
+
+ # test that os.copystat copies xattrs
+ src = os.path.join(tmp_dir, 'the_original')
+ srcro = os.path.join(tmp_dir, 'the_original_ro')
+ write_file(src, src)
+ write_file(srcro, srcro)
+ os.setxattr(src, 'user.the_value', b'fiddly')
+ os.setxattr(srcro, 'user.the_value', b'fiddly')
+ os.chmod(srcro, 0o444)
+ dst = os.path.join(tmp_dir, 'the_copy')
+ dstro = os.path.join(tmp_dir, 'the_copy_ro')
+ write_file(dst, dst)
+ write_file(dstro, dstro)
+ os.copystat(src, dst)
+ os.copystat(srcro, dstro)
+ self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
+ self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
+
+ @os_helper.skip_unless_symlink
+ @os_helper.skip_unless_xattr
+ @os_helper.skip_unless_dac_override
+ def test_copyxattr_symlinks(self):
+ # On Linux, it's only possible to access non-user xattr for symlinks;
+ # which in turn require root privileges. This test should be expanded
+ # as soon as other platforms gain support for extended attributes.
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ src_link = os.path.join(tmp_dir, 'baz')
+ write_file(src, 'foo')
+ os.symlink(src, src_link)
+ os.setxattr(src, 'trusted.foo', b'42')
+ os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
+ dst = os.path.join(tmp_dir, 'bar')
+ dst_link = os.path.join(tmp_dir, 'qux')
+ write_file(dst, 'bar')
+ os.symlink(dst, dst_link)
+ os._copyxattr(src_link, dst_link, follow_symlinks=False)
+ self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
+ self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
+ os._copyxattr(src_link, dst, follow_symlinks=False)
+ self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
+
+
+class TestCopyFile(_CopyTest, unittest.TestCase):
+ @os_helper.skip_unless_symlink
+ def test_copyfile_symlinks(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'src')
+ dst = os.path.join(tmp_dir, 'dst')
+ dst_link = os.path.join(tmp_dir, 'dst_link')
+ link = os.path.join(tmp_dir, 'link')
+ write_file(src, 'foo')
+ os.symlink(src, link)
+ # don't follow
+ os.copyfile(link, dst_link, follow_symlinks=False)
+ self.assertTrue(os.path.islink(dst_link))
+ self.assertEqual(os.readlink(link), os.readlink(dst_link))
+ # follow
+ os.copyfile(link, dst)
+ self.assertFalse(os.path.islink(dst))
+
+ @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
+ def test_dont_copy_file_onto_link_to_itself(self):
+ # bug 851123.
+ os.mkdir(TESTFN)
+ src = os.path.join(TESTFN, 'cheese')
+ dst = os.path.join(TESTFN, 'shop')
+ try:
+ with open(src, 'w', encoding='utf-8') as f:
+ f.write('cheddar')
+ try:
+ os.link(src, dst)
+ except PermissionError as e:
+ self.skipTest('os.link(): %s' % e)
+ self.assertRaises(os.SameFileError, os.copyfile, src, dst)
+ with open(src, 'r', encoding='utf-8') as f:
+ self.assertEqual(f.read(), 'cheddar')
+ os.remove(dst)
+ finally:
+ shutil.rmtree(TESTFN, ignore_errors=True)
+
+ @os_helper.skip_unless_symlink
+ def test_dont_copy_file_onto_symlink_to_itself(self):
+ # bug 851123.
+ os.mkdir(TESTFN)
+ src = os.path.join(TESTFN, 'cheese')
+ dst = os.path.join(TESTFN, 'shop')
+ try:
+ with open(src, 'w', encoding='utf-8') as f:
+ f.write('cheddar')
+ # Using `src` here would mean we end up with a symlink pointing
+ # to TESTFN/TESTFN/cheese, while it should point at
+ # TESTFN/cheese.
+ os.symlink('cheese', dst)
+ self.assertRaises(os.SameFileError, os.copyfile, src, dst)
+ with open(src, 'r', encoding='utf-8') as f:
+ self.assertEqual(f.read(), 'cheddar')
+ os.remove(dst)
+ finally:
+ shutil.rmtree(TESTFN, ignore_errors=True)
+
+ # Issue #3002: copyfile and copytree block indefinitely on named pipes
+ @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
+ @unittest.skipIf(sys.platform == "vxworks",
+ "fifo requires special path on VxWorks")
+ def test_copyfile_named_pipe(self):
+ try:
+ os.mkfifo(TESTFN)
+ except PermissionError as e:
+ self.skipTest('os.mkfifo(): %s' % e)
+ try:
+ self.assertRaises(os.SpecialFileError,
+ os.copyfile, TESTFN, TESTFN2)
+ self.assertRaises(os.SpecialFileError,
+ os.copyfile, __file__, TESTFN)
+ finally:
+ os.remove(TESTFN)
+
+ def test_copyfile_return_value(self):
+ # copytree returns its destination path.
+ src_dir = self.mkdtemp()
+ dst_dir = self.mkdtemp()
+ dst_file = os.path.join(dst_dir, 'bar')
+ src_file = os.path.join(src_dir, 'foo')
+ write_file(src_file, 'foo')
+ rv = os.copyfile(src_file, dst_file)
+ self.assertTrue(os.path.exists(rv))
+ self.assertEqual(read_file(src_file), read_file(dst_file))
+
+ def test_copyfile_same_file(self):
+ # copyfile() should raise SameFileError if the source and destination
+ # are the same.
+ src_dir = self.mkdtemp()
+ src_file = os.path.join(src_dir, 'foo')
+ write_file(src_file, 'foo')
+ self.assertRaises(os.SameFileError, os.copyfile, src_file, src_file)
+ # But Error should work too, to stay backward compatible.
+ self.assertRaises(os.CopyError, os.copyfile, src_file, src_file)
+ # Make sure file is not corrupted.
+ self.assertEqual(read_file(src_file), 'foo')
+
+ @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)')
+ # gh-92670: The test uses a trailing slash to force the OS consider
+ # the path as a directory, but on AIX the trailing slash has no effect
+ # and is considered as a file.
+ @unittest.skipIf(AIX, 'Not valid on AIX, see gh-92670')
+ def test_copyfile_nonexistent_dir(self):
+ # Issue 43219
+ src_dir = self.mkdtemp()
+ src_file = os.path.join(src_dir, 'foo')
+ dst = os.path.join(src_dir, 'does_not_exist/')
+ write_file(src_file, 'foo')
+ self.assertRaises(FileNotFoundError, os.copyfile, src_file, dst)
+
+ def test_copyfile_copy_dir(self):
+ # Issue 45234
+ # test copy() and copyfile() raising proper exceptions when src and/or
+ # dst are directories
+ src_dir = self.mkdtemp()
+ src_file = os.path.join(src_dir, 'foo')
+ dir2 = self.mkdtemp()
+ dst = os.path.join(src_dir, 'does_not_exist/')
+ write_file(src_file, 'foo')
+ if sys.platform == "win32":
+ err = PermissionError
+ else:
+ err = IsADirectoryError
+
+ self.assertRaises(err, os.copyfile, src_dir, dst)
+ self.assertRaises(err, os.copyfile, src_file, src_dir)
+ self.assertRaises(err, os.copyfile, dir2, src_dir)
+
+ class Faux(object):
+ _entered = False
+ _exited_with = None
+ _raised = False
+ def __init__(self, raise_in_exit=False, suppress_at_exit=True):
+ self._raise_in_exit = raise_in_exit
+ self._suppress_at_exit = suppress_at_exit
+ def read(self, *args):
+ return ''
+ def __enter__(self):
+ self._entered = True
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._exited_with = exc_type, exc_val, exc_tb
+ if self._raise_in_exit:
+ self._raised = True
+ raise OSError("Cannot close")
+ return self._suppress_at_exit
+
+ def test_w_source_open_fails(self):
+ def _open(filename, mode='r'):
+ if filename == 'srcfile':
+ raise OSError('Cannot open "srcfile"')
+ assert 0 # shouldn't reach here.
+
+ with support.swap_attr(io, 'open', _open):
+ with self.assertRaises(OSError):
+ os.copyfile('srcfile', 'destfile')
+
+ @unittest.skipIf(MACOS, "skipped on macOS")
+ def test_w_dest_open_fails(self):
+ srcfile = self.Faux()
+
+ def _open(filename, mode='r'):
+ if filename == 'srcfile':
+ return srcfile
+ if filename == 'destfile':
+ raise OSError('Cannot open "destfile"')
+ assert 0 # shouldn't reach here.
+
+ with support.swap_attr(io, 'open', _open):
+ os.copyfile('srcfile', 'destfile')
+ self.assertTrue(srcfile._entered)
+ self.assertTrue(srcfile._exited_with[0] is OSError)
+ self.assertEqual(srcfile._exited_with[1].args,
+ ('Cannot open "destfile"',))
+
+ @unittest.skipIf(MACOS, "skipped on macOS")
+ def test_w_dest_close_fails(self):
+ srcfile = self.Faux()
+ destfile = self.Faux(True)
+
+ def _open(filename, mode='r'):
+ if filename == 'srcfile':
+ return srcfile
+ if filename == 'destfile':
+ return destfile
+ assert 0 # shouldn't reach here.
+
+ with support.swap_attr(io, 'open', _open):
+ os.copyfile('srcfile', 'destfile')
+ self.assertTrue(srcfile._entered)
+ self.assertTrue(destfile._entered)
+ self.assertTrue(destfile._raised)
+ self.assertTrue(srcfile._exited_with[0] is OSError)
+ self.assertEqual(srcfile._exited_with[1].args,
+ ('Cannot close',))
+
+ @unittest.skipIf(MACOS, "skipped on macOS")
+ def test_w_source_close_fails(self):
+
+ srcfile = self.Faux(True)
+ destfile = self.Faux()
+
+ def _open(filename, mode='r'):
+ if filename == 'srcfile':
+ return srcfile
+ if filename == 'destfile':
+ return destfile
+ assert 0 # shouldn't reach here.
+
+ with support.swap_attr(io, 'open', _open):
+ with self.assertRaises(OSError):
+ os.copyfile('srcfile', 'destfile')
+ self.assertTrue(srcfile._entered)
+ self.assertTrue(destfile._entered)
+ self.assertFalse(destfile._raised)
+ self.assertTrue(srcfile._exited_with[0] is None)
+ self.assertTrue(srcfile._raised)
+
+
+class TestCopy(_CopyTest, unittest.TestCase):
+ @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
+ def test_copy(self):
+ # Ensure that the copied file exists and has the same mode and
+ # modification time bits.
+ fname = 'test.txt'
+ tmpdir = self.mkdtemp()
+ write_file((tmpdir, fname), 'xxx')
+ file1 = os.path.join(tmpdir, fname)
+ tmpdir2 = self.mkdtemp()
+ file2 = os.path.join(tmpdir2, fname)
+ os.copy(file1, file2)
+ self.assertTrue(os.path.exists(file2))
+ file1_stat = os.stat(file1)
+ file2_stat = os.stat(file2)
+ self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
+ for attr in 'st_atime', 'st_mtime':
+ # The modification times may be truncated in the new file.
+ self.assertLessEqual(getattr(file1_stat, attr),
+ getattr(file2_stat, attr) + 1)
+ if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
+ self.assertEqual(getattr(file1_stat, 'st_flags'),
+ getattr(file2_stat, 'st_flags'))
+
+ @os_helper.skip_unless_symlink
+ def test_copy_symlinks(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ write_file(src, 'foo')
+ os.symlink(src, src_link)
+ if hasattr(os, 'lchmod'):
+ os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
+ if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
+ os.lchflags(src_link, stat.UF_NODUMP)
+ src_stat = os.stat(src)
+ src_link_stat = os.lstat(src_link)
+ # follow
+ os.copy(src_link, dst, follow_symlinks=True)
+ self.assertFalse(os.path.islink(dst))
+ self.assertEqual(read_file(src), read_file(dst))
+ os.remove(dst)
+ # don't follow
+ os.copy(src_link, dst, follow_symlinks=False)
+ self.assertTrue(os.path.islink(dst))
+ self.assertEqual(os.readlink(dst), os.readlink(src_link))
+ dst_stat = os.lstat(dst)
+ if os.utime in os.supports_follow_symlinks:
+ for attr in 'st_atime', 'st_mtime':
+ # The modification times may be truncated in the new file.
+ self.assertLessEqual(getattr(src_link_stat, attr),
+ getattr(dst_stat, attr) + 1)
+ if hasattr(os, 'lchmod'):
+ self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
+ self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
+ if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
+ self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
+
+ @os_helper.skip_unless_xattr
+ def test_copy_xattr(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ write_file(src, 'foo')
+ os.setxattr(src, 'user.foo', b'42')
+ os.copy(src, dst)
+ self.assertEqual(
+ os.getxattr(src, 'user.foo'),
+ os.getxattr(dst, 'user.foo'))
+ os.remove(dst)
+
+ def test_copy_dir(self):
+ src_dir = self.mkdtemp()
+ src_file = os.path.join(src_dir, 'foo')
+ dir2 = self.mkdtemp()
+ dst = os.path.join(src_dir, 'does_not_exist/')
+ write_file(src_file, 'foo')
+ if sys.platform == "win32":
+ err = PermissionError
+ else:
+ err = IsADirectoryError
+ self.assertRaises(err, os.copy, dir2, src_dir)
+
+ # raise *err* because of src rather than FileNotFoundError because of dst
+ self.assertRaises(err, os.copy, dir2, dst)
+ os.copy(src_file, os.path.join(dir2, 'bar')) # should not raise exceptions
+
+
+class TestCopyFileObj(unittest.TestCase):
+ FILESIZE = 2 * 1024 * 1024
+
+ @classmethod
+ def setUpClass(cls):
+ write_test_file(TESTFN, cls.FILESIZE)
+
+ @classmethod
+ def tearDownClass(cls):
+ os_helper.unlink(TESTFN)
+ os_helper.unlink(TESTFN2)
+
+ def tearDown(self):
+ os_helper.unlink(TESTFN2)
+
+ @contextlib.contextmanager
+ def get_files(self):
+ with open(TESTFN, "rb") as src:
+ with open(TESTFN2, "wb") as dst:
+ yield (src, dst)
+
+ def assert_files_eq(self, src, dst):
+ with open(src, 'rb') as fsrc:
+ with open(dst, 'rb') as fdst:
+ self.assertEqual(fsrc.read(), fdst.read())
+
+ def test_content(self):
+ with self.get_files() as (src, dst):
+ os.copyfileobj(src, dst)
+ self.assert_files_eq(TESTFN, TESTFN2)
+
+ def test_file_not_closed(self):
+ with self.get_files() as (src, dst):
+ os.copyfileobj(src, dst)
+ assert not src.closed
+ assert not dst.closed
+
+ def test_file_offset(self):
+ with self.get_files() as (src, dst):
+ os.copyfileobj(src, dst)
+ self.assertEqual(src.tell(), self.FILESIZE)
+ self.assertEqual(dst.tell(), self.FILESIZE)
+
+ @unittest.skipIf(os.name != 'nt', "Windows only")
+ def test_win_impl(self):
+ # Make sure alternate Windows implementation is called.
+ with unittest.mock.patch("os._fastcopy_readinto") as m:
+ os.copyfile(TESTFN, TESTFN2)
+ assert m.called
+
+ # File size is 2 MiB but max buf size should be 1 MiB.
+ self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
+
+ # If file size < 1 MiB memoryview() length must be equal to
+ # the actual file size.
+ with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
+ f.write(b'foo')
+ fname = f.name
+ self.addCleanup(os_helper.unlink, fname)
+ with unittest.mock.patch("os._fastcopy_readinto") as m:
+ os.copyfile(fname, TESTFN2)
+ self.assertEqual(m.call_args[0][2], 3)
+
+ # Empty files should not rely on readinto() variant.
+ with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
+ pass
+ fname = f.name
+ self.addCleanup(os_helper.unlink, fname)
+ with unittest.mock.patch("os._fastcopy_readinto") as m:
+ os.copyfile(fname, TESTFN2)
+ assert not m.called
+ self.assert_files_eq(fname, TESTFN2)
+
+
+class _ZeroCopyFileTest(object):
+ """Tests common to all zero-copy APIs."""
+ FILESIZE = (10 * 1024 * 1024) # 10 MiB
+ FILEDATA = b""
+ PATCHPOINT = ""
+
+ @classmethod
+ def setUpClass(cls):
+ write_test_file(TESTFN, cls.FILESIZE)
+ with open(TESTFN, 'rb') as f:
+ cls.FILEDATA = f.read()
+ assert len(cls.FILEDATA) == cls.FILESIZE
+
+ @classmethod
+ def tearDownClass(cls):
+ os_helper.unlink(TESTFN)
+
+ def tearDown(self):
+ os_helper.unlink(TESTFN2)
+
+ @contextlib.contextmanager
+ def get_files(self):
+ with open(TESTFN, "rb") as src:
+ with open(TESTFN2, "wb") as dst:
+ yield (src, dst)
+
+ def zerocopy_fun(self, *args, **kwargs):
+ raise NotImplementedError("must be implemented in subclass")
+
+ def reset(self):
+ self.tearDown()
+ self.tearDownClass()
+ self.setUpClass()
+ self.setUp()
+
+ # ---
+
+ def test_regular_copy(self):
+ with self.get_files() as (src, dst):
+ self.zerocopy_fun(src, dst)
+ self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
+ # Make sure the fallback function is not called.
+ with self.get_files() as (src, dst):
+ with unittest.mock.patch('os.copyfileobj') as m:
+ os.copyfile(TESTFN, TESTFN2)
+ assert not m.called
+
+ def test_same_file(self):
+ self.addCleanup(self.reset)
+ with self.get_files() as (src, dst):
+ with self.assertRaises((OSError, os._GiveupOnFastCopy)):
+ self.zerocopy_fun(src, src)
+ # Make sure src file is not corrupted.
+ self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
+
+ def test_non_existent_src(self):
+ name = tempfile.mktemp(dir=os.getcwd())
+ with self.assertRaises(FileNotFoundError) as cm:
+ os.copyfile(name, "new")
+ self.assertEqual(cm.exception.filename, name)
+
+ def test_empty_file(self):
+ srcname = TESTFN + 'src'
+ dstname = TESTFN + 'dst'
+ self.addCleanup(lambda: os_helper.unlink(srcname))
+ self.addCleanup(lambda: os_helper.unlink(dstname))
+ with open(srcname, "wb"):
+ pass
+
+ with open(srcname, "rb") as src:
+ with open(dstname, "wb") as dst:
+ self.zerocopy_fun(src, dst)
+
+ self.assertEqual(read_file(dstname, binary=True), b"")
+
+ def test_unhandled_exception(self):
+ with unittest.mock.patch(self.PATCHPOINT,
+ side_effect=ZeroDivisionError):
+ self.assertRaises(ZeroDivisionError,
+ os.copyfile, TESTFN, TESTFN2)
+
+ def test_exception_on_first_call(self):
+ # Emulate a case where the first call to the zero-copy
+ # function raises an exception in which case the function is
+ # supposed to give up immediately.
+ with unittest.mock.patch(self.PATCHPOINT,
+ side_effect=OSError(errno.EINVAL, "yo")):
+ with self.get_files() as (src, dst):
+ with self.assertRaises(os._GiveupOnFastCopy):
+ self.zerocopy_fun(src, dst)
+
+ def test_filesystem_full(self):
+ # Emulate a case where filesystem is full and sendfile() fails
+ # on first call.
+ with unittest.mock.patch(self.PATCHPOINT,
+ side_effect=OSError(errno.ENOSPC, "yo")):
+ with self.get_files() as (src, dst):
+ self.assertRaises(OSError, self.zerocopy_fun, src, dst)
+
+
+@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
+class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
+ PATCHPOINT = "os.sendfile"
+
+ def zerocopy_fun(self, fsrc, fdst):
+ return os._fastcopy_sendfile(fsrc, fdst)
+
+ def test_non_regular_file_src(self):
+ with io.BytesIO(self.FILEDATA) as src:
+ with open(TESTFN2, "wb") as dst:
+ with self.assertRaises(os._GiveupOnFastCopy):
+ self.zerocopy_fun(src, dst)
+ os.copyfileobj(src, dst)
+
+ self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
+
+ def test_non_regular_file_dst(self):
+ with open(TESTFN, "rb") as src:
+ with io.BytesIO() as dst:
+ with self.assertRaises(os._GiveupOnFastCopy):
+ self.zerocopy_fun(src, dst)
+ os.copyfileobj(src, dst)
+ dst.seek(0)
+ self.assertEqual(dst.read(), self.FILEDATA)
+
+ def test_exception_on_second_call(self):
+ def sendfile(*args, **kwargs):
+ if not flag:
+ flag.append(None)
+ return orig_sendfile(*args, **kwargs)
+ else:
+ raise OSError(errno.EBADF, "yo")
+
+ flag = []
+ orig_sendfile = os.sendfile
+ with unittest.mock.patch('os.sendfile', create=True,
+ side_effect=sendfile):
+ with self.get_files() as (src, dst):
+ with self.assertRaises(OSError) as cm:
+ os._fastcopy_sendfile(src, dst)
+ assert flag
+ self.assertEqual(cm.exception.errno, errno.EBADF)
+
+ def test_cant_get_size(self):
+ # Emulate a case where src file size cannot be determined.
+ # Internally bufsize will be set to a small value and
+ # sendfile() will be called repeatedly.
+ with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
+ with self.get_files() as (src, dst):
+ os._fastcopy_sendfile(src, dst)
+ assert m.called
+ self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
+
+ def test_small_chunks(self):
+ # Force internal file size detection to be smaller than the
+ # actual file size. We want to force sendfile() to be called
+ # multiple times, also in order to emulate a src fd which gets
+ # bigger while it is being copied.
+ mock = unittest.mock.Mock()
+ mock.st_size = 65536 + 1
+ with unittest.mock.patch('os.fstat', return_value=mock) as m:
+ with self.get_files() as (src, dst):
+ os._fastcopy_sendfile(src, dst)
+ assert m.called
+ self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
+
+ def test_big_chunk(self):
+ # Force internal file size detection to be +100MB bigger than
+ # the actual file size. Make sure sendfile() does not rely on
+ # file size value except for (maybe) a better throughput /
+ # performance.
+ mock = unittest.mock.Mock()
+ mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
+ with unittest.mock.patch('os.fstat', return_value=mock) as m:
+ with self.get_files() as (src, dst):
+ os._fastcopy_sendfile(src, dst)
+ assert m.called
+ self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
+
+ def test_blocksize_arg(self):
+ with unittest.mock.patch('os.sendfile',
+ side_effect=ZeroDivisionError) as m:
+ self.assertRaises(ZeroDivisionError,
+ os.copyfile, TESTFN, TESTFN2)
+ blocksize = m.call_args[0][3]
+ # Make sure file size and the block size arg passed to
+ # sendfile() are the same.
+ self.assertEqual(blocksize, os.path.getsize(TESTFN))
+ # ...unless we're dealing with a small file.
+ os_helper.unlink(TESTFN2)
+ write_file(TESTFN2, b"hello", binary=True)
+ self.addCleanup(os_helper.unlink, TESTFN2 + '3')
+ self.assertRaises(ZeroDivisionError,
+ os.copyfile, TESTFN2, TESTFN2 + '3')
+ blocksize = m.call_args[0][3]
+ self.assertEqual(blocksize, 2 ** 23)
+
+ def test_file2file_not_supported(self):
+ # Emulate a case where sendfile() only support file->socket
+ # fds. In such a case copyfile() is supposed to skip the
+ # fast-copy attempt from then on.
+ assert os._USE_CP_SENDFILE
+ try:
+ with unittest.mock.patch(
+ self.PATCHPOINT,
+ side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
+ with self.get_files() as (src, dst):
+ with self.assertRaises(os._GiveupOnFastCopy):
+ os._fastcopy_sendfile(src, dst)
+ assert m.called
+ assert not os._USE_CP_SENDFILE
+
+ with unittest.mock.patch(self.PATCHPOINT) as m:
+ os.copyfile(TESTFN, TESTFN2)
+ assert not m.called
+ finally:
+ os._USE_CP_SENDFILE = True
+
+
+@unittest.skipIf(not MACOS, 'macOS only')
+class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
+ PATCHPOINT = "os._fcopyfile"
+
+ def zerocopy_fun(self, src, dst):
+ return os._fastcopy_fcopyfile(src, dst, os._COPYFILE_DATA)
+
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index 5b0aac67a0adeb..38b0a6663e49df 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -12,16 +12,12 @@
import functools
import pathlib
import subprocess
-import random
-import string
-import contextlib
-import io
from shutil import (make_archive,
register_archive_format, unregister_archive_format,
get_archive_formats, Error, unpack_archive,
register_unpack_format, RegistryError,
unregister_unpack_format, get_unpack_formats,
- SameFileError, _GiveupOnFastCopy)
+ SameFileError)
import tarfile
import zipfile
try:
@@ -33,12 +29,12 @@
from test.support import os_helper
from test.support.os_helper import TESTFN, FakePath
from test.support import warnings_helper
+from test.test_os import read_file, write_file
TESTFN2 = TESTFN + "2"
TESTFN_SRC = TESTFN + "_SRC"
TESTFN_DST = TESTFN + "_DST"
MACOS = sys.platform.startswith("darwin")
-SOLARIS = sys.platform.startswith("sunos")
AIX = sys.platform[:3] == 'aix'
try:
import grp
@@ -70,52 +66,6 @@ def wrap(*args, **kwargs):
os.rename = builtin_rename
return wrap
-def write_file(path, content, binary=False):
- """Write *content* to a file located at *path*.
-
- If *path* is a tuple instead of a string, os.path.join will be used to
- make a path. If *binary* is true, the file will be opened in binary
- mode.
- """
- if isinstance(path, tuple):
- path = os.path.join(*path)
- mode = 'wb' if binary else 'w'
- encoding = None if binary else "utf-8"
- with open(path, mode, encoding=encoding) as fp:
- fp.write(content)
-
-def write_test_file(path, size):
- """Create a test file with an arbitrary size and random text content."""
- def chunks(total, step):
- assert total >= step
- while total > step:
- yield step
- total -= step
- if total:
- yield total
-
- bufsize = min(size, 8192)
- chunk = b"".join([random.choice(string.ascii_letters).encode()
- for i in range(bufsize)])
- with open(path, 'wb') as f:
- for csize in chunks(size, bufsize):
- f.write(chunk)
- assert os.path.getsize(path) == size
-
-def read_file(path, binary=False):
- """Return contents from a file located at *path*.
-
- If *path* is a tuple instead of a string, os.path.join will be used to
- make a path. If *binary* is true, the file will be opened in binary
- mode.
- """
- if isinstance(path, tuple):
- path = os.path.join(*path)
- mode = 'rb' if binary else 'r'
- encoding = None if binary else "utf-8"
- with open(path, mode, encoding=encoding) as fp:
- return fp.read()
-
def rlistdir(path):
res = []
for name in sorted(os.listdir(path)):
@@ -128,36 +78,6 @@ def rlistdir(path):
res.append(name)
return res
-def supports_file2file_sendfile():
- # ...apparently Linux and Solaris are the only ones
- if not hasattr(os, "sendfile"):
- return False
- srcname = None
- dstname = None
- try:
- with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
- srcname = f.name
- f.write(b"0123456789")
-
- with open(srcname, "rb") as src:
- with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
- dstname = dst.name
- infd = src.fileno()
- outfd = dst.fileno()
- try:
- os.sendfile(outfd, infd, 0, 2)
- except OSError:
- return False
- else:
- return True
- finally:
- if srcname is not None:
- os_helper.unlink(srcname)
- if dstname is not None:
- os_helper.unlink(dstname)
-
-
-SUPPORTS_SENDFILE = supports_file2file_sendfile()
# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
# The AIX command 'dump -o program' gives XCOFF header information
@@ -1097,247 +1017,6 @@ def test_copytree_subdirectory(self):
self.assertEqual(['pol'], os.listdir(rv))
class TestCopy(BaseTest, unittest.TestCase):
-
- ### shutil.copymode
-
- @os_helper.skip_unless_symlink
- def test_copymode_follow_symlinks(self):
- tmp_dir = self.mkdtemp()
- src = os.path.join(tmp_dir, 'foo')
- dst = os.path.join(tmp_dir, 'bar')
- src_link = os.path.join(tmp_dir, 'baz')
- dst_link = os.path.join(tmp_dir, 'quux')
- write_file(src, 'foo')
- write_file(dst, 'foo')
- os.symlink(src, src_link)
- os.symlink(dst, dst_link)
- os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
- # file to file
- os.chmod(dst, stat.S_IRWXO)
- self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
- shutil.copymode(src, dst)
- self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
- # On Windows, os.chmod does not follow symlinks (issue #15411)
- # follow src link
- os.chmod(dst, stat.S_IRWXO)
- shutil.copymode(src_link, dst)
- self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
- # follow dst link
- os.chmod(dst, stat.S_IRWXO)
- shutil.copymode(src, dst_link)
- self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
- # follow both links
- os.chmod(dst, stat.S_IRWXO)
- shutil.copymode(src_link, dst_link)
- self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
-
- @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
- @os_helper.skip_unless_symlink
- def test_copymode_symlink_to_symlink(self):
- tmp_dir = self.mkdtemp()
- src = os.path.join(tmp_dir, 'foo')
- dst = os.path.join(tmp_dir, 'bar')
- src_link = os.path.join(tmp_dir, 'baz')
- dst_link = os.path.join(tmp_dir, 'quux')
- write_file(src, 'foo')
- write_file(dst, 'foo')
- os.symlink(src, src_link)
- os.symlink(dst, dst_link)
- os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
- os.chmod(dst, stat.S_IRWXU)
- os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
- # link to link
- os.lchmod(dst_link, stat.S_IRWXO)
- old_mode = os.stat(dst).st_mode
- shutil.copymode(src_link, dst_link, follow_symlinks=False)
- self.assertEqual(os.lstat(src_link).st_mode,
- os.lstat(dst_link).st_mode)
- self.assertEqual(os.stat(dst).st_mode, old_mode)
- # src link - use chmod
- os.lchmod(dst_link, stat.S_IRWXO)
- shutil.copymode(src_link, dst, follow_symlinks=False)
- self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
- # dst link - use chmod
- os.lchmod(dst_link, stat.S_IRWXO)
- shutil.copymode(src, dst_link, follow_symlinks=False)
- self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
-
- @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
- @os_helper.skip_unless_symlink
- def test_copymode_symlink_to_symlink_wo_lchmod(self):
- tmp_dir = self.mkdtemp()
- src = os.path.join(tmp_dir, 'foo')
- dst = os.path.join(tmp_dir, 'bar')
- src_link = os.path.join(tmp_dir, 'baz')
- dst_link = os.path.join(tmp_dir, 'quux')
- write_file(src, 'foo')
- write_file(dst, 'foo')
- os.symlink(src, src_link)
- os.symlink(dst, dst_link)
- shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
-
- ### shutil.copystat
-
- @os_helper.skip_unless_symlink
- def test_copystat_symlinks(self):
- tmp_dir = self.mkdtemp()
- src = os.path.join(tmp_dir, 'foo')
- dst = os.path.join(tmp_dir, 'bar')
- src_link = os.path.join(tmp_dir, 'baz')
- dst_link = os.path.join(tmp_dir, 'qux')
- write_file(src, 'foo')
- src_stat = os.stat(src)
- os.utime(src, (src_stat.st_atime,
- src_stat.st_mtime - 42.0)) # ensure different mtimes
- write_file(dst, 'bar')
- self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
- os.symlink(src, src_link)
- os.symlink(dst, dst_link)
- if hasattr(os, 'lchmod'):
- os.lchmod(src_link, stat.S_IRWXO)
- if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
- os.lchflags(src_link, stat.UF_NODUMP)
- src_link_stat = os.lstat(src_link)
- # follow
- if hasattr(os, 'lchmod'):
- shutil.copystat(src_link, dst_link, follow_symlinks=True)
- self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
- # don't follow
- shutil.copystat(src_link, dst_link, follow_symlinks=False)
- dst_link_stat = os.lstat(dst_link)
- if os.utime in os.supports_follow_symlinks:
- for attr in 'st_atime', 'st_mtime':
- # The modification times may be truncated in the new file.
- self.assertLessEqual(getattr(src_link_stat, attr),
- getattr(dst_link_stat, attr) + 1)
- if hasattr(os, 'lchmod'):
- self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
- if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
- self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
- # tell to follow but dst is not a link
- shutil.copystat(src_link, dst, follow_symlinks=False)
- self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
- 00000.1)
-
- @unittest.skipUnless(hasattr(os, 'chflags') and
- hasattr(errno, 'EOPNOTSUPP') and
- hasattr(errno, 'ENOTSUP'),
- "requires os.chflags, EOPNOTSUPP & ENOTSUP")
- def test_copystat_handles_harmless_chflags_errors(self):
- tmpdir = self.mkdtemp()
- file1 = os.path.join(tmpdir, 'file1')
- file2 = os.path.join(tmpdir, 'file2')
- write_file(file1, 'xxx')
- write_file(file2, 'xxx')
-
- def make_chflags_raiser(err):
- ex = OSError()
-
- def _chflags_raiser(path, flags, *, follow_symlinks=True):
- ex.errno = err
- raise ex
- return _chflags_raiser
- old_chflags = os.chflags
- try:
- for err in errno.EOPNOTSUPP, errno.ENOTSUP:
- os.chflags = make_chflags_raiser(err)
- shutil.copystat(file1, file2)
- # assert others errors break it
- os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
- self.assertRaises(OSError, shutil.copystat, file1, file2)
- finally:
- os.chflags = old_chflags
-
- ### shutil.copyxattr
-
- @os_helper.skip_unless_xattr
- def test_copyxattr(self):
- tmp_dir = self.mkdtemp()
- src = os.path.join(tmp_dir, 'foo')
- write_file(src, 'foo')
- dst = os.path.join(tmp_dir, 'bar')
- write_file(dst, 'bar')
-
- # no xattr == no problem
- shutil._copyxattr(src, dst)
- # common case
- os.setxattr(src, 'user.foo', b'42')
- os.setxattr(src, 'user.bar', b'43')
- shutil._copyxattr(src, dst)
- self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
- self.assertEqual(
- os.getxattr(src, 'user.foo'),
- os.getxattr(dst, 'user.foo'))
- # check errors don't affect other attrs
- os.remove(dst)
- write_file(dst, 'bar')
- os_error = OSError(errno.EPERM, 'EPERM')
-
- def _raise_on_user_foo(fname, attr, val, **kwargs):
- if attr == 'user.foo':
- raise os_error
- else:
- orig_setxattr(fname, attr, val, **kwargs)
- try:
- orig_setxattr = os.setxattr
- os.setxattr = _raise_on_user_foo
- shutil._copyxattr(src, dst)
- self.assertIn('user.bar', os.listxattr(dst))
- finally:
- os.setxattr = orig_setxattr
- # the source filesystem not supporting xattrs should be ok, too.
- def _raise_on_src(fname, *, follow_symlinks=True):
- if fname == src:
- raise OSError(errno.ENOTSUP, 'Operation not supported')
- return orig_listxattr(fname, follow_symlinks=follow_symlinks)
- try:
- orig_listxattr = os.listxattr
- os.listxattr = _raise_on_src
- shutil._copyxattr(src, dst)
- finally:
- os.listxattr = orig_listxattr
-
- # test that shutil.copystat copies xattrs
- src = os.path.join(tmp_dir, 'the_original')
- srcro = os.path.join(tmp_dir, 'the_original_ro')
- write_file(src, src)
- write_file(srcro, srcro)
- os.setxattr(src, 'user.the_value', b'fiddly')
- os.setxattr(srcro, 'user.the_value', b'fiddly')
- os.chmod(srcro, 0o444)
- dst = os.path.join(tmp_dir, 'the_copy')
- dstro = os.path.join(tmp_dir, 'the_copy_ro')
- write_file(dst, dst)
- write_file(dstro, dstro)
- shutil.copystat(src, dst)
- shutil.copystat(srcro, dstro)
- self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
- self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
-
- @os_helper.skip_unless_symlink
- @os_helper.skip_unless_xattr
- @os_helper.skip_unless_dac_override
- def test_copyxattr_symlinks(self):
- # On Linux, it's only possible to access non-user xattr for symlinks;
- # which in turn require root privileges. This test should be expanded
- # as soon as other platforms gain support for extended attributes.
- tmp_dir = self.mkdtemp()
- src = os.path.join(tmp_dir, 'foo')
- src_link = os.path.join(tmp_dir, 'baz')
- write_file(src, 'foo')
- os.symlink(src, src_link)
- os.setxattr(src, 'trusted.foo', b'42')
- os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
- dst = os.path.join(tmp_dir, 'bar')
- dst_link = os.path.join(tmp_dir, 'qux')
- write_file(dst, 'bar')
- os.symlink(dst, dst_link)
- shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
- self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
- self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
- shutil._copyxattr(src_link, dst, follow_symlinks=False)
- self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
-
### shutil.copy
def _copy_file(self, method):
@@ -1480,136 +1159,6 @@ def _test_copy_dir(self, copy_func):
self.assertRaises(err, copy_func, dir2, dst)
copy_func(src_file, dir2) # should not raise exceptions
- ### shutil.copyfile
-
- @os_helper.skip_unless_symlink
- def test_copyfile_symlinks(self):
- tmp_dir = self.mkdtemp()
- src = os.path.join(tmp_dir, 'src')
- dst = os.path.join(tmp_dir, 'dst')
- dst_link = os.path.join(tmp_dir, 'dst_link')
- link = os.path.join(tmp_dir, 'link')
- write_file(src, 'foo')
- os.symlink(src, link)
- # don't follow
- shutil.copyfile(link, dst_link, follow_symlinks=False)
- self.assertTrue(os.path.islink(dst_link))
- self.assertEqual(os.readlink(link), os.readlink(dst_link))
- # follow
- shutil.copyfile(link, dst)
- self.assertFalse(os.path.islink(dst))
-
- @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
- def test_dont_copy_file_onto_link_to_itself(self):
- # bug 851123.
- os.mkdir(TESTFN)
- src = os.path.join(TESTFN, 'cheese')
- dst = os.path.join(TESTFN, 'shop')
- try:
- with open(src, 'w', encoding='utf-8') as f:
- f.write('cheddar')
- try:
- os.link(src, dst)
- except PermissionError as e:
- self.skipTest('os.link(): %s' % e)
- self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
- with open(src, 'r', encoding='utf-8') as f:
- self.assertEqual(f.read(), 'cheddar')
- os.remove(dst)
- finally:
- shutil.rmtree(TESTFN, ignore_errors=True)
-
- @os_helper.skip_unless_symlink
- def test_dont_copy_file_onto_symlink_to_itself(self):
- # bug 851123.
- os.mkdir(TESTFN)
- src = os.path.join(TESTFN, 'cheese')
- dst = os.path.join(TESTFN, 'shop')
- try:
- with open(src, 'w', encoding='utf-8') as f:
- f.write('cheddar')
- # Using `src` here would mean we end up with a symlink pointing
- # to TESTFN/TESTFN/cheese, while it should point at
- # TESTFN/cheese.
- os.symlink('cheese', dst)
- self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
- with open(src, 'r', encoding='utf-8') as f:
- self.assertEqual(f.read(), 'cheddar')
- os.remove(dst)
- finally:
- shutil.rmtree(TESTFN, ignore_errors=True)
-
- # Issue #3002: copyfile and copytree block indefinitely on named pipes
- @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
- @unittest.skipIf(sys.platform == "vxworks",
- "fifo requires special path on VxWorks")
- def test_copyfile_named_pipe(self):
- try:
- os.mkfifo(TESTFN)
- except PermissionError as e:
- self.skipTest('os.mkfifo(): %s' % e)
- try:
- self.assertRaises(shutil.SpecialFileError,
- shutil.copyfile, TESTFN, TESTFN2)
- self.assertRaises(shutil.SpecialFileError,
- shutil.copyfile, __file__, TESTFN)
- finally:
- os.remove(TESTFN)
-
- def test_copyfile_return_value(self):
- # copytree returns its destination path.
- src_dir = self.mkdtemp()
- dst_dir = self.mkdtemp()
- dst_file = os.path.join(dst_dir, 'bar')
- src_file = os.path.join(src_dir, 'foo')
- write_file(src_file, 'foo')
- rv = shutil.copyfile(src_file, dst_file)
- self.assertTrue(os.path.exists(rv))
- self.assertEqual(read_file(src_file), read_file(dst_file))
-
- def test_copyfile_same_file(self):
- # copyfile() should raise SameFileError if the source and destination
- # are the same.
- src_dir = self.mkdtemp()
- src_file = os.path.join(src_dir, 'foo')
- write_file(src_file, 'foo')
- self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
- # But Error should work too, to stay backward compatible.
- self.assertRaises(Error, shutil.copyfile, src_file, src_file)
- # Make sure file is not corrupted.
- self.assertEqual(read_file(src_file), 'foo')
-
- @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)')
- # gh-92670: The test uses a trailing slash to force the OS consider
- # the path as a directory, but on AIX the trailing slash has no effect
- # and is considered as a file.
- @unittest.skipIf(AIX, 'Not valid on AIX, see gh-92670')
- def test_copyfile_nonexistent_dir(self):
- # Issue 43219
- src_dir = self.mkdtemp()
- src_file = os.path.join(src_dir, 'foo')
- dst = os.path.join(src_dir, 'does_not_exist/')
- write_file(src_file, 'foo')
- self.assertRaises(FileNotFoundError, shutil.copyfile, src_file, dst)
-
- def test_copyfile_copy_dir(self):
- # Issue 45234
- # test copy() and copyfile() raising proper exceptions when src and/or
- # dst are directories
- src_dir = self.mkdtemp()
- src_file = os.path.join(src_dir, 'foo')
- dir2 = self.mkdtemp()
- dst = os.path.join(src_dir, 'does_not_exist/')
- write_file(src_file, 'foo')
- if sys.platform == "win32":
- err = PermissionError
- else:
- err = IsADirectoryError
-
- self.assertRaises(err, shutil.copyfile, src_dir, dst)
- self.assertRaises(err, shutil.copyfile, src_file, src_dir)
- self.assertRaises(err, shutil.copyfile, dir2, src_dir)
-
class TestArchives(BaseTest, unittest.TestCase):
@@ -2908,399 +2457,6 @@ def test_move_dir_permission_denied(self):
os_helper.rmtree(TESTFN_DST)
-class TestCopyFile(unittest.TestCase):
-
- class Faux(object):
- _entered = False
- _exited_with = None
- _raised = False
- def __init__(self, raise_in_exit=False, suppress_at_exit=True):
- self._raise_in_exit = raise_in_exit
- self._suppress_at_exit = suppress_at_exit
- def read(self, *args):
- return ''
- def __enter__(self):
- self._entered = True
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._exited_with = exc_type, exc_val, exc_tb
- if self._raise_in_exit:
- self._raised = True
- raise OSError("Cannot close")
- return self._suppress_at_exit
-
- def test_w_source_open_fails(self):
- def _open(filename, mode='r'):
- if filename == 'srcfile':
- raise OSError('Cannot open "srcfile"')
- assert 0 # shouldn't reach here.
-
- with support.swap_attr(shutil, 'open', _open):
- with self.assertRaises(OSError):
- shutil.copyfile('srcfile', 'destfile')
-
- @unittest.skipIf(MACOS, "skipped on macOS")
- def test_w_dest_open_fails(self):
- srcfile = self.Faux()
-
- def _open(filename, mode='r'):
- if filename == 'srcfile':
- return srcfile
- if filename == 'destfile':
- raise OSError('Cannot open "destfile"')
- assert 0 # shouldn't reach here.
-
- with support.swap_attr(shutil, 'open', _open):
- shutil.copyfile('srcfile', 'destfile')
- self.assertTrue(srcfile._entered)
- self.assertTrue(srcfile._exited_with[0] is OSError)
- self.assertEqual(srcfile._exited_with[1].args,
- ('Cannot open "destfile"',))
-
- @unittest.skipIf(MACOS, "skipped on macOS")
- def test_w_dest_close_fails(self):
- srcfile = self.Faux()
- destfile = self.Faux(True)
-
- def _open(filename, mode='r'):
- if filename == 'srcfile':
- return srcfile
- if filename == 'destfile':
- return destfile
- assert 0 # shouldn't reach here.
-
- with support.swap_attr(shutil, 'open', _open):
- shutil.copyfile('srcfile', 'destfile')
- self.assertTrue(srcfile._entered)
- self.assertTrue(destfile._entered)
- self.assertTrue(destfile._raised)
- self.assertTrue(srcfile._exited_with[0] is OSError)
- self.assertEqual(srcfile._exited_with[1].args,
- ('Cannot close',))
-
- @unittest.skipIf(MACOS, "skipped on macOS")
- def test_w_source_close_fails(self):
-
- srcfile = self.Faux(True)
- destfile = self.Faux()
-
- def _open(filename, mode='r'):
- if filename == 'srcfile':
- return srcfile
- if filename == 'destfile':
- return destfile
- assert 0 # shouldn't reach here.
-
- with support.swap_attr(shutil, 'open', _open):
- with self.assertRaises(OSError):
- shutil.copyfile('srcfile', 'destfile')
- self.assertTrue(srcfile._entered)
- self.assertTrue(destfile._entered)
- self.assertFalse(destfile._raised)
- self.assertTrue(srcfile._exited_with[0] is None)
- self.assertTrue(srcfile._raised)
-
-
-class TestCopyFileObj(unittest.TestCase):
- FILESIZE = 2 * 1024 * 1024
-
- @classmethod
- def setUpClass(cls):
- write_test_file(TESTFN, cls.FILESIZE)
-
- @classmethod
- def tearDownClass(cls):
- os_helper.unlink(TESTFN)
- os_helper.unlink(TESTFN2)
-
- def tearDown(self):
- os_helper.unlink(TESTFN2)
-
- @contextlib.contextmanager
- def get_files(self):
- with open(TESTFN, "rb") as src:
- with open(TESTFN2, "wb") as dst:
- yield (src, dst)
-
- def assert_files_eq(self, src, dst):
- with open(src, 'rb') as fsrc:
- with open(dst, 'rb') as fdst:
- self.assertEqual(fsrc.read(), fdst.read())
-
- def test_content(self):
- with self.get_files() as (src, dst):
- shutil.copyfileobj(src, dst)
- self.assert_files_eq(TESTFN, TESTFN2)
-
- def test_file_not_closed(self):
- with self.get_files() as (src, dst):
- shutil.copyfileobj(src, dst)
- assert not src.closed
- assert not dst.closed
-
- def test_file_offset(self):
- with self.get_files() as (src, dst):
- shutil.copyfileobj(src, dst)
- self.assertEqual(src.tell(), self.FILESIZE)
- self.assertEqual(dst.tell(), self.FILESIZE)
-
- @unittest.skipIf(os.name != 'nt', "Windows only")
- def test_win_impl(self):
- # Make sure alternate Windows implementation is called.
- with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
- shutil.copyfile(TESTFN, TESTFN2)
- assert m.called
-
- # File size is 2 MiB but max buf size should be 1 MiB.
- self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
-
- # If file size < 1 MiB memoryview() length must be equal to
- # the actual file size.
- with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
- f.write(b'foo')
- fname = f.name
- self.addCleanup(os_helper.unlink, fname)
- with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
- shutil.copyfile(fname, TESTFN2)
- self.assertEqual(m.call_args[0][2], 3)
-
- # Empty files should not rely on readinto() variant.
- with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
- pass
- fname = f.name
- self.addCleanup(os_helper.unlink, fname)
- with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
- shutil.copyfile(fname, TESTFN2)
- assert not m.called
- self.assert_files_eq(fname, TESTFN2)
-
-
-class _ZeroCopyFileTest(object):
- """Tests common to all zero-copy APIs."""
- FILESIZE = (10 * 1024 * 1024) # 10 MiB
- FILEDATA = b""
- PATCHPOINT = ""
-
- @classmethod
- def setUpClass(cls):
- write_test_file(TESTFN, cls.FILESIZE)
- with open(TESTFN, 'rb') as f:
- cls.FILEDATA = f.read()
- assert len(cls.FILEDATA) == cls.FILESIZE
-
- @classmethod
- def tearDownClass(cls):
- os_helper.unlink(TESTFN)
-
- def tearDown(self):
- os_helper.unlink(TESTFN2)
-
- @contextlib.contextmanager
- def get_files(self):
- with open(TESTFN, "rb") as src:
- with open(TESTFN2, "wb") as dst:
- yield (src, dst)
-
- def zerocopy_fun(self, *args, **kwargs):
- raise NotImplementedError("must be implemented in subclass")
-
- def reset(self):
- self.tearDown()
- self.tearDownClass()
- self.setUpClass()
- self.setUp()
-
- # ---
-
- def test_regular_copy(self):
- with self.get_files() as (src, dst):
- self.zerocopy_fun(src, dst)
- self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
- # Make sure the fallback function is not called.
- with self.get_files() as (src, dst):
- with unittest.mock.patch('shutil.copyfileobj') as m:
- shutil.copyfile(TESTFN, TESTFN2)
- assert not m.called
-
- def test_same_file(self):
- self.addCleanup(self.reset)
- with self.get_files() as (src, dst):
- with self.assertRaises((OSError, _GiveupOnFastCopy)):
- self.zerocopy_fun(src, src)
- # Make sure src file is not corrupted.
- self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
-
- def test_non_existent_src(self):
- name = tempfile.mktemp(dir=os.getcwd())
- with self.assertRaises(FileNotFoundError) as cm:
- shutil.copyfile(name, "new")
- self.assertEqual(cm.exception.filename, name)
-
- def test_empty_file(self):
- srcname = TESTFN + 'src'
- dstname = TESTFN + 'dst'
- self.addCleanup(lambda: os_helper.unlink(srcname))
- self.addCleanup(lambda: os_helper.unlink(dstname))
- with open(srcname, "wb"):
- pass
-
- with open(srcname, "rb") as src:
- with open(dstname, "wb") as dst:
- self.zerocopy_fun(src, dst)
-
- self.assertEqual(read_file(dstname, binary=True), b"")
-
- def test_unhandled_exception(self):
- with unittest.mock.patch(self.PATCHPOINT,
- side_effect=ZeroDivisionError):
- self.assertRaises(ZeroDivisionError,
- shutil.copyfile, TESTFN, TESTFN2)
-
- def test_exception_on_first_call(self):
- # Emulate a case where the first call to the zero-copy
- # function raises an exception in which case the function is
- # supposed to give up immediately.
- with unittest.mock.patch(self.PATCHPOINT,
- side_effect=OSError(errno.EINVAL, "yo")):
- with self.get_files() as (src, dst):
- with self.assertRaises(_GiveupOnFastCopy):
- self.zerocopy_fun(src, dst)
-
- def test_filesystem_full(self):
- # Emulate a case where filesystem is full and sendfile() fails
- # on first call.
- with unittest.mock.patch(self.PATCHPOINT,
- side_effect=OSError(errno.ENOSPC, "yo")):
- with self.get_files() as (src, dst):
- self.assertRaises(OSError, self.zerocopy_fun, src, dst)
-
-
-@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
-class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
- PATCHPOINT = "os.sendfile"
-
- def zerocopy_fun(self, fsrc, fdst):
- return shutil._fastcopy_sendfile(fsrc, fdst)
-
- def test_non_regular_file_src(self):
- with io.BytesIO(self.FILEDATA) as src:
- with open(TESTFN2, "wb") as dst:
- with self.assertRaises(_GiveupOnFastCopy):
- self.zerocopy_fun(src, dst)
- shutil.copyfileobj(src, dst)
-
- self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
-
- def test_non_regular_file_dst(self):
- with open(TESTFN, "rb") as src:
- with io.BytesIO() as dst:
- with self.assertRaises(_GiveupOnFastCopy):
- self.zerocopy_fun(src, dst)
- shutil.copyfileobj(src, dst)
- dst.seek(0)
- self.assertEqual(dst.read(), self.FILEDATA)
-
- def test_exception_on_second_call(self):
- def sendfile(*args, **kwargs):
- if not flag:
- flag.append(None)
- return orig_sendfile(*args, **kwargs)
- else:
- raise OSError(errno.EBADF, "yo")
-
- flag = []
- orig_sendfile = os.sendfile
- with unittest.mock.patch('os.sendfile', create=True,
- side_effect=sendfile):
- with self.get_files() as (src, dst):
- with self.assertRaises(OSError) as cm:
- shutil._fastcopy_sendfile(src, dst)
- assert flag
- self.assertEqual(cm.exception.errno, errno.EBADF)
-
- def test_cant_get_size(self):
- # Emulate a case where src file size cannot be determined.
- # Internally bufsize will be set to a small value and
- # sendfile() will be called repeatedly.
- with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
- with self.get_files() as (src, dst):
- shutil._fastcopy_sendfile(src, dst)
- assert m.called
- self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
-
- def test_small_chunks(self):
- # Force internal file size detection to be smaller than the
- # actual file size. We want to force sendfile() to be called
- # multiple times, also in order to emulate a src fd which gets
- # bigger while it is being copied.
- mock = unittest.mock.Mock()
- mock.st_size = 65536 + 1
- with unittest.mock.patch('os.fstat', return_value=mock) as m:
- with self.get_files() as (src, dst):
- shutil._fastcopy_sendfile(src, dst)
- assert m.called
- self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
-
- def test_big_chunk(self):
- # Force internal file size detection to be +100MB bigger than
- # the actual file size. Make sure sendfile() does not rely on
- # file size value except for (maybe) a better throughput /
- # performance.
- mock = unittest.mock.Mock()
- mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
- with unittest.mock.patch('os.fstat', return_value=mock) as m:
- with self.get_files() as (src, dst):
- shutil._fastcopy_sendfile(src, dst)
- assert m.called
- self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
-
- def test_blocksize_arg(self):
- with unittest.mock.patch('os.sendfile',
- side_effect=ZeroDivisionError) as m:
- self.assertRaises(ZeroDivisionError,
- shutil.copyfile, TESTFN, TESTFN2)
- blocksize = m.call_args[0][3]
- # Make sure file size and the block size arg passed to
- # sendfile() are the same.
- self.assertEqual(blocksize, os.path.getsize(TESTFN))
- # ...unless we're dealing with a small file.
- os_helper.unlink(TESTFN2)
- write_file(TESTFN2, b"hello", binary=True)
- self.addCleanup(os_helper.unlink, TESTFN2 + '3')
- self.assertRaises(ZeroDivisionError,
- shutil.copyfile, TESTFN2, TESTFN2 + '3')
- blocksize = m.call_args[0][3]
- self.assertEqual(blocksize, 2 ** 23)
-
- def test_file2file_not_supported(self):
- # Emulate a case where sendfile() only support file->socket
- # fds. In such a case copyfile() is supposed to skip the
- # fast-copy attempt from then on.
- assert shutil._USE_CP_SENDFILE
- try:
- with unittest.mock.patch(
- self.PATCHPOINT,
- side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
- with self.get_files() as (src, dst):
- with self.assertRaises(_GiveupOnFastCopy):
- shutil._fastcopy_sendfile(src, dst)
- assert m.called
- assert not shutil._USE_CP_SENDFILE
-
- with unittest.mock.patch(self.PATCHPOINT) as m:
- shutil.copyfile(TESTFN, TESTFN2)
- assert not m.called
- finally:
- shutil._USE_CP_SENDFILE = True
-
-
-@unittest.skipIf(not MACOS, 'macOS only')
-class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
- PATCHPOINT = "posix._fcopyfile"
-
- def zerocopy_fun(self, src, dst):
- return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
-
-
class TestGetTerminalSize(unittest.TestCase):
def test_does_not_crash(self):
"""Check if get_terminal_size() returns a meaningful value.