diff --git a/Include/internal/pycore_atomic.h b/Include/internal/pycore_atomic.h index 1d5c5621677eb3..27172cbe49fbab 100644 --- a/Include/internal/pycore_atomic.h +++ b/Include/internal/pycore_atomic.h @@ -62,6 +62,12 @@ typedef struct _Py_atomic_int { #define _Py_atomic_load_explicit(ATOMIC_VAL, ORDER) \ atomic_load_explicit(&((ATOMIC_VAL)->_value), ORDER) +#define _Py_atomic_fetch_add_explicit(ATOMIC_VAL, VAL, ORDER) \ + atomic_fetch_add_explicit(&((ATOMIC_VAL)->_value), VAL, ORDER) + +#define _Py_atomic_fetch_sub_explicit(ATOMIC_VAL, VAL, ORDER) \ + atomic_fetch_sub_explicit(&((ATOMIC_VAL)->_value), VAL, ORDER) + /* Use builtin atomic operations in GCC >= 4.7 */ #elif defined(HAVE_BUILTIN_ATOMIC) @@ -100,6 +106,20 @@ typedef struct _Py_atomic_int { || (ORDER) == __ATOMIC_CONSUME), \ __atomic_load_n(&((ATOMIC_VAL)->_value), ORDER)) +#define _Py_atomic_fetch_add_explicit(ATOMIC_VAL, VAL, ORDER) \ + (assert((ORDER) == __ATOMIC_RELAXED \ + || (ORDER) == __ATOMIC_SEQ_CST \ + || (ORDER) == __ATOMIC_ACQUIRE \ + || (ORDER) == __ATOMIC_CONSUME), \ + __atomic_fetch_add(&((ATOMIC_VAL)->_value), VAL, ORDER)) + +#define _Py_atomic_fetch_sub_explicit(ATOMIC_VAL, VAL, ORDER) \ + (assert((ORDER) == __ATOMIC_RELAXED \ + || (ORDER) == __ATOMIC_SEQ_CST \ + || (ORDER) == __ATOMIC_ACQUIRE \ + || (ORDER) == __ATOMIC_CONSUME), \ + __atomic_fetch_sub(&((ATOMIC_VAL)->_value), VAL, ORDER)) + /* Only support GCC (for expression statements) and x86 (for simple * atomic semantics) and MSVC x86/x64/ARM */ #elif defined(__GNUC__) && (defined(__i386__) || defined(__amd64)) @@ -551,6 +571,12 @@ typedef struct _Py_atomic_int { #define _Py_atomic_load_relaxed(ATOMIC_VAL) \ _Py_atomic_load_explicit((ATOMIC_VAL), _Py_memory_order_relaxed) +#define _Py_atomic_fetch_add_relaxed(ATOMIC_VAL, VAL) \ + _Py_atomic_fetch_add_explicit((ATOMIC_VAL), (VAL), _Py_memory_order_relaxed) + +#define _Py_atomic_fetch_sub_relaxed(ATOMIC_VAL, VAL) \ + _Py_atomic_fetch_sub_explicit((ATOMIC_VAL), (VAL), _Py_memory_order_relaxed) + #ifdef __cplusplus } #endif diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index c9bfa9b82b6e6c..e8a33e4418b3f4 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -36,10 +36,11 @@ if os.name == 'posix': import _multiprocessing import _posixshmem + from multiprocessing.shared_memory import cleanup_shared_memory, shm_inc_refcount _CLEANUP_FUNCS.update({ 'semaphore': _multiprocessing.sem_unlink, - 'shared_memory': _posixshmem.shm_unlink, + 'shared_memory': cleanup_shared_memory, }) @@ -196,7 +197,9 @@ def main(fd): f'unknown resource type {rtype}') if cmd == 'REGISTER': - cache[rtype].add(name) + if rtype == "shared_memory" and name not in cache[rtype]: + cache[rtype].add(name) + shm_inc_refcount(name) elif cmd == 'UNREGISTER': cache[rtype].remove(name) elif cmd == 'PROBE': diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py index 122b3fcebf3fed..813cd2f722071a 100644 --- a/Lib/multiprocessing/shared_memory.py +++ b/Lib/multiprocessing/shared_memory.py @@ -70,8 +70,9 @@ class SharedMemory: _flags = os.O_RDWR _mode = 0o600 _prepend_leading_slash = True if _USE_POSIX else False + _track_resource = True - def __init__(self, name=None, create=False, size=0): + def __init__(self, name=None, create=False, size=0, track_resource=True): if not size >= 0: raise ValueError("'size' must be a positive integer") if create: @@ -81,6 +82,7 @@ def __init__(self, name=None, create=False, size=0): if name is None and not self._flags & os.O_EXCL: raise ValueError("'name' can only be None if create=True") + self._track_resource = track_resource if _USE_POSIX: # POSIX Shared Memory @@ -108,6 +110,7 @@ def __init__(self, name=None, create=False, size=0): self._name = name try: if create and size: + size += _posixshmem.REFCOUNT_SIZE os.ftruncate(self._fd, size) stats = os.fstat(self._fd) size = stats.st_size @@ -116,8 +119,13 @@ def __init__(self, name=None, create=False, size=0): self.unlink() raise - from .resource_tracker import register - register(self._name, "shared_memory") + self._size = size + self._refcount = memoryview(self._mmap)[0:_posixshmem.REFCOUNT_SIZE] + self._buf = memoryview(self._mmap)[_posixshmem.REFCOUNT_SIZE:] + + if self._track_resource: + from .resource_tracker import register + register(self.name, "shared_memory") else: @@ -176,8 +184,9 @@ def __init__(self, name=None, create=False, size=0): size = _winapi.VirtualQuerySize(p_buf) self._mmap = mmap.mmap(-1, size, tagname=name) - self._size = size - self._buf = memoryview(self._mmap) + self._size = size + self._buf = memoryview(self._mmap) + self._refcount = None def __del__(self): try: @@ -215,11 +224,17 @@ def name(self): @property def size(self): "Size in bytes." - return self._size + if _USE_POSIX: + return self._size - _posixshmem.REFCOUNT_SIZE + else: + return self._size def close(self): """Closes access to the shared memory from this instance but does not destroy the shared memory block.""" + if self._refcount is not None: + self._refcount.release() + self._refcount = None if self._buf is not None: self._buf.release() self._buf = None @@ -237,10 +252,25 @@ def unlink(self): called once (and only once) across all processes which have access to the shared memory block.""" if _USE_POSIX and self._name: - from .resource_tracker import unregister _posixshmem.shm_unlink(self._name) - unregister(self._name, "shared_memory") - + if self._track_resource: + from .resource_tracker import unregister + unregister(self.name, "shared_memory") + +def cleanup_shared_memory(name): + try: + shm = SharedMemory(name, track_resource=False) + refcount = _posixshmem.shm_dec_refcount(shm._refcount) + if refcount == 0: shm.unlink() + except FileNotFoundError: # Segment with name has already been unlinked + pass + +def shm_inc_refcount(name): + try: + shm = SharedMemory(name, track_resource=False) + _posixshmem.shm_inc_refcount(shm._refcount) + except FileNotFoundError: # Segment with name has already been unlinked + pass _encoding = "utf8" diff --git a/Misc/NEWS.d/next/Library/2020-11-05-16-29-50.bpo-38119.jwEVdy.rst b/Misc/NEWS.d/next/Library/2020-11-05-16-29-50.bpo-38119.jwEVdy.rst new file mode 100644 index 00000000000000..ec4e4676fcc56f --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-11-05-16-29-50.bpo-38119.jwEVdy.rst @@ -0,0 +1 @@ +Fix shared_memory's resource tracking by using reference counting diff --git a/Modules/Setup b/Modules/Setup index a5fbaf6381be5d..cf4373c2a9ff17 100644 --- a/Modules/Setup +++ b/Modules/Setup @@ -118,6 +118,7 @@ _signal -DPy_BUILD_CORE_BUILTIN -I$(srcdir)/Include/internal signalmodule.c _stat _stat.c # stat.h interface time -DPy_BUILD_CORE_BUILTIN -I$(srcdir)/Include/internal timemodule.c # -lm # time operations and variables _thread -DPy_BUILD_CORE_BUILTIN -I$(srcdir)/Include/internal _threadmodule.c # low-level threading interface +_posixshmem -DPy_BUILD_CORE_BUILTIN -I$(srcdir)/Include/internal _multiprocessing/posixshmem.c # shared memory interface dependent on 'pycore_atomic.h' # access to ISO C locale support _locale -DPy_BUILD_CORE_BUILTIN _localemodule.c # -lintl diff --git a/Modules/_multiprocessing/clinic/posixshmem.c.h b/Modules/_multiprocessing/clinic/posixshmem.c.h index 3424b10a569f8d..31bd155d5f4586 100644 --- a/Modules/_multiprocessing/clinic/posixshmem.c.h +++ b/Modules/_multiprocessing/clinic/posixshmem.c.h @@ -113,6 +113,80 @@ _posixshmem_shm_unlink(PyObject *module, PyObject *const *args, Py_ssize_t nargs #endif /* defined(HAVE_SHM_UNLINK) */ +PyDoc_STRVAR(_posixshmem_shm_inc_refcount__doc__, +"shm_inc_refcount($module, /, ptr)\n" +"--\n" +"\n" +"Increment Reference Count of the memoryview object"); + +#define _POSIXSHMEM_SHM_INC_REFCOUNT_METHODDEF \ + {"shm_inc_refcount", (PyCFunction)(void(*)(void))_posixshmem_shm_inc_refcount, METH_FASTCALL|METH_KEYWORDS, _posixshmem_shm_inc_refcount__doc__}, + +static int +_posixshmem_shm_inc_refcount_impl(PyObject *module, PyObject *ptr); + +static PyObject * +_posixshmem_shm_inc_refcount(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"ptr", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "shm_inc_refcount", 0}; + PyObject *argsbuf[1]; + PyObject *ptr; + int _return_value; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + ptr = args[0]; + _return_value = _posixshmem_shm_inc_refcount_impl(module, ptr); + if ((_return_value == -1) && PyErr_Occurred()) { + goto exit; + } + return_value = PyLong_FromLong((long)_return_value); + +exit: + return return_value; +} + +PyDoc_STRVAR(_posixshmem_shm_dec_refcount__doc__, +"shm_dec_refcount($module, /, ptr)\n" +"--\n" +"\n" +"Decrement Reference Count of the memoryview object"); + +#define _POSIXSHMEM_SHM_DEC_REFCOUNT_METHODDEF \ + {"shm_dec_refcount", (PyCFunction)(void(*)(void))_posixshmem_shm_dec_refcount, METH_FASTCALL|METH_KEYWORDS, _posixshmem_shm_dec_refcount__doc__}, + +static int +_posixshmem_shm_dec_refcount_impl(PyObject *module, PyObject *ptr); + +static PyObject * +_posixshmem_shm_dec_refcount(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"ptr", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "shm_dec_refcount", 0}; + PyObject *argsbuf[1]; + PyObject *ptr; + int _return_value; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + ptr = args[0]; + _return_value = _posixshmem_shm_dec_refcount_impl(module, ptr); + if ((_return_value == -1) && PyErr_Occurred()) { + goto exit; + } + return_value = PyLong_FromLong((long)_return_value); + +exit: + return return_value; +} + #ifndef _POSIXSHMEM_SHM_OPEN_METHODDEF #define _POSIXSHMEM_SHM_OPEN_METHODDEF #endif /* !defined(_POSIXSHMEM_SHM_OPEN_METHODDEF) */ @@ -120,4 +194,4 @@ _posixshmem_shm_unlink(PyObject *module, PyObject *const *args, Py_ssize_t nargs #ifndef _POSIXSHMEM_SHM_UNLINK_METHODDEF #define _POSIXSHMEM_SHM_UNLINK_METHODDEF #endif /* !defined(_POSIXSHMEM_SHM_UNLINK_METHODDEF) */ -/*[clinic end generated code: output=bca8e78d0f43ef1a input=a9049054013a1b77]*/ +/*[clinic end generated code: output=cd4bd3692d1ce532 input=a9049054013a1b77]*/ diff --git a/Modules/_multiprocessing/posixshmem.c b/Modules/_multiprocessing/posixshmem.c index 436ac6d6b39f49..78471fb6288497 100644 --- a/Modules/_multiprocessing/posixshmem.c +++ b/Modules/_multiprocessing/posixshmem.c @@ -5,6 +5,8 @@ posixshmem - A Python extension that provides shm_open() and shm_unlink() #define PY_SSIZE_T_CLEAN #include +#include +#include "pycore_atomic.h" // for shm_open() and shm_unlink() #ifdef HAVE_SYS_MMAN_H @@ -101,11 +103,44 @@ _posixshmem_shm_unlink_impl(PyObject *module, PyObject *path) } #endif /* HAVE_SHM_UNLINK */ +/*[clinic input] +_posixshmem.shm_inc_refcount -> int + ptr: object + +Increment Reference Count of the memoryview object + +[clinic start generated code]*/ + +static int +_posixshmem_shm_inc_refcount_impl(PyObject *module, PyObject *ptr) +/*[clinic end generated code: output=9ed5b4d016975d06 input=b7c1fe6ce39b7bb4]*/ +{ + Py_buffer *buf = PyMemoryView_GET_BUFFER(ptr); + return _Py_atomic_fetch_add_relaxed((_Py_atomic_int*)(buf->buf), 1) + 1; +} +/*[clinic input] +_posixshmem.shm_dec_refcount -> int + ptr: object + +Decrement Reference Count of the memoryview object + +[clinic start generated code]*/ + +static int +_posixshmem_shm_dec_refcount_impl(PyObject *module, PyObject *ptr) +/*[clinic end generated code: output=16ab284487281c72 input=0aab6ded127aa5c3]*/ +{ + Py_buffer *buf = PyMemoryView_GET_BUFFER(ptr); + return _Py_atomic_fetch_sub_relaxed((_Py_atomic_int*)(buf->buf), 1) - 1; +} + #include "clinic/posixshmem.c.h" static PyMethodDef module_methods[ ] = { _POSIXSHMEM_SHM_OPEN_METHODDEF _POSIXSHMEM_SHM_UNLINK_METHODDEF + _POSIXSHMEM_SHM_INC_REFCOUNT_METHODDEF + _POSIXSHMEM_SHM_DEC_REFCOUNT_METHODDEF {NULL} /* Sentinel */ }; @@ -118,6 +153,8 @@ static struct PyModuleDef this_module = { module_methods, // m_methods }; +const char *NAME_REFCOUNT_SIZE = "REFCOUNT_SIZE"; + /* Module init function */ PyMODINIT_FUNC PyInit__posixshmem(void) { @@ -126,5 +163,9 @@ PyInit__posixshmem(void) { if (!module) { return NULL; } + if (PyModule_AddIntConstant(module, NAME_REFCOUNT_SIZE, sizeof(_Py_atomic_int))) { + Py_XDECREF(module); + return NULL; + } return module; } diff --git a/Tools/c-analyzer/cpython/_parser.py b/Tools/c-analyzer/cpython/_parser.py index 7c8c2966653989..1a119365e6bef4 100644 --- a/Tools/c-analyzer/cpython/_parser.py +++ b/Tools/c-analyzer/cpython/_parser.py @@ -150,6 +150,7 @@ def clean_lines(text): Modules/main.c Py_BUILD_CORE 1 Modules/posixmodule.c Py_BUILD_CORE 1 Modules/signalmodule.c Py_BUILD_CORE 1 +Modules/_multiprocessing/posixshmem.c Py_BUILD_CORE 1 Modules/_threadmodule.c Py_BUILD_CORE 1 Modules/_tracemalloc.c Py_BUILD_CORE 1 Modules/_asynciomodule.c Py_BUILD_CORE 1