13
13
import atexit
14
14
import tempfile
15
15
import warnings
16
+ import weakref
17
+ from uuid import uuid4
16
18
17
19
try :
18
20
WindowsError
28
30
from pickle import loads
29
31
from pickle import dumps
30
32
31
- from pickle import HIGHEST_PROTOCOL
33
+ from pickle import HIGHEST_PROTOCOL , PicklingError
32
34
33
35
try :
34
36
import numpy as np
38
40
39
41
from .numpy_pickle import load
40
42
from .numpy_pickle import dump
41
- from .hashing import hash
42
43
from .backports import make_memmap
43
44
from .disk import delete_folder
44
45
45
46
# Some system have a ramdisk mounted by default, we can use it instead of /tmp
46
- # as the default folder to dump big arrays to share with subprocesses
47
+ # as the default folder to dump big arrays to share with subprocesses.
47
48
SYSTEM_SHARED_MEM_FS = '/dev/shm'
48
49
50
+ # Minimal number of bytes available on SYSTEM_SHARED_MEM_FS to consider using
51
+ # it as the default folder to dump big arrays to share with subprocesses.
52
+ SYSTEM_SHARED_MEM_FS_MIN_SIZE = int (2e9 )
53
+
49
54
# Folder and file permissions to chmod temporary files generated by the
50
55
# memmapping pool. Only the owner of the Python process can access the
51
56
# temporary files and folder.
52
57
FOLDER_PERMISSIONS = stat .S_IRUSR | stat .S_IWUSR | stat .S_IXUSR
53
58
FILE_PERMISSIONS = stat .S_IRUSR | stat .S_IWUSR
54
59
60
+
61
+ class _WeakArrayKeyMap :
62
+ """A variant of weakref.WeakKeyDictionary for unhashable numpy arrays.
63
+
64
+ This datastructure will be used with numpy arrays as obj keys, therefore we
65
+ do not use the __get__ / __set__ methods to avoid any conflict with the
66
+ numpy fancy indexing syntax.
67
+ """
68
+
69
+ def __init__ (self ):
70
+ self ._data = {}
71
+
72
+ def get (self , obj ):
73
+ ref , val = self ._data [id (obj )]
74
+ if ref () is not obj :
75
+ # In case of race condition with on_destroy: could never be
76
+ # triggered by the joblib tests with CPython.
77
+ raise KeyError (obj )
78
+ return val
79
+
80
+ def set (self , obj , value ):
81
+ key = id (obj )
82
+ try :
83
+ ref , _ = self ._data [key ]
84
+ if ref () is not obj :
85
+ # In case of race condition with on_destroy: could never be
86
+ # triggered by the joblib tests with CPython.
87
+ raise KeyError (obj )
88
+ except KeyError :
89
+ # Insert the new entry in the mapping along with a weakref
90
+ # callback to automatically delete the entry from the mapping
91
+ # as soon as the object used as key is garbage collected.
92
+ def on_destroy (_ ):
93
+ del self ._data [key ]
94
+ ref = weakref .ref (obj , on_destroy )
95
+ self ._data [key ] = ref , value
96
+
97
+ def __getstate__ (self ):
98
+ raise PicklingError ("_WeakArrayKeyMap is not pickleable" )
99
+
100
+
55
101
###############################################################################
56
102
# Support for efficient transient pickling of numpy data structures
57
103
@@ -109,12 +155,18 @@ def _get_temp_dir(pool_folder_name, temp_folder=None):
109
155
if temp_folder is None :
110
156
if os .path .exists (SYSTEM_SHARED_MEM_FS ):
111
157
try :
112
- temp_folder = SYSTEM_SHARED_MEM_FS
113
- pool_folder = os .path .join (temp_folder , pool_folder_name )
114
- if not os .path .exists (pool_folder ):
115
- os .makedirs (pool_folder )
116
- use_shared_mem = True
117
- except IOError :
158
+ shm_stats = os .statvfs (SYSTEM_SHARED_MEM_FS )
159
+ available_nbytes = shm_stats .f_bsize * shm_stats .f_ba
E0EE
vail
160
+ if available_nbytes > SYSTEM_SHARED_MEM_FS_MIN_SIZE :
161
+ # Try to see if we have write access to the shared mem
162
+ # folder only if it is reasonably large (that is 2GB or
163
+ # more).
164
+ temp_folder = SYSTEM_SHARED_MEM_FS
165
+ pool_folder = os .path .join (temp_folder , pool_folder_name )
166
+ if not os .path .exists (pool_folder ):
167
+ os .makedirs (pool_folder )
168
+ use_shared_mem = True
169
+ except (IOError , OSError ):
118
170
# Missing rights in the the /dev/shm partition,
119
171
# fallback to regular temp folder.
120
172
temp_folder = None
@@ -231,6 +283,19 @@ def __init__(self, max_nbytes, temp_folder, mmap_mode, verbose=0,
231
283
self ._mmap_mode = mmap_mode
232
284
self .verbose = int (verbose )
233
285
self ._prewarm = prewarm
286
+ self ._memmaped_arrays = _WeakArrayKeyMap ()
287
+
288
+ def __reduce__ (self ):
289
+ # The ArrayMemmapReducer is passed to the children processes: it needs
290
+ # to be pickled but the _WeakArrayKeyMap need to be skipped as it's
291
+ # only guaranteed to be consistent with the parent process memory
292
+ # garbage collection.
293
+ args = (self ._max_nbytes , self ._temp_folder , self ._mmap_mode )
294
+ kwargs = {
295
+ 'verbose' : self .verbose ,
296
+ 'prewarm' : self ._prewarm ,
297
+ }
298
+ return ArrayMemmapReducer , args , kwargs
234
299
235
300
def __call__ (self , a ):
236
301
m = _get_backing_memmap (a )
@@ -249,10 +314,16 @@ def __call__(self, a):
249
314
if e .errno != errno .EEXIST :
250
315
raise e
251
316
252
- # Find a unique, concurrent safe filename for writing the
253
- # content of this array only once.
254
- basename = "{}-{}-{}.pkl" .format (
255
- os .getpid (), id (threading .current_thread ()), hash (a ))
317
+ try :
318
+ basename = self ._memmaped_arrays .get (a )
319
+ except KeyError :
320
+ # Generate a new unique random filename. The process and thread
321
+ # ids are only useful for debugging purpose and to make it
322
+ # easier to cleanup orphaned files in case of hard process
323
+ # kill (e.g. by "kill -9" or segfault).
324
+ basename = "{}-{}-{}.pkl" .format (
325
+ os .getpid (), id (threading .current_thread ()), uuid4 ().hex )
326
+ self ._memmaped_arrays .set (a , basename )
256
327
filename = os .path .join (self ._temp_folder , basename )
257
328
258
329
# In case the same array with the same content is passed several
0 commit comments