From 8bb5d21b46a73657069b33e6c556277e3d9e0c8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 2 Mar 2017 14:34:58 +0100 Subject: [PATCH 1/2] Use pip rather than easy_install in copy_joblib.sh Also need to remove joblib/testing.py to avoid pytest dependency --- sklearn/externals/copy_joblib.sh | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/sklearn/externals/copy_joblib.sh b/sklearn/externals/copy_joblib.sh index 6e5d2a5a3ec14..8b8de45ba42e9 100755 --- a/sklearn/externals/copy_joblib.sh +++ b/sklearn/externals/copy_joblib.sh @@ -1,20 +1,12 @@ #!/bin/sh # Script to do a local install of joblib export LC_ALL=C -rm -rf tmp joblib -PYTHON_VERSION=$(python -c 'import sys; print("{0[0]}.{0[1]}".format(sys.version_info))') -SITE_PACKAGES="$PWD/tmp/lib/python$PYTHON_VERSION/site-packages" +INSTALL_FOLDER=tmp/joblib_install +rm -rf joblib $INSTALL_FOLDER +pip install joblib --target $INSTALL_FOLDER +cp -r $INSTALL_FOLDER/joblib . +rm -rf $INSTALL_FOLDER -mkdir -p $SITE_PACKAGES -mkdir -p tmp/bin -export PYTHONPATH="$SITE_PACKAGES" -easy_install -Zeab tmp joblib - -cd tmp/joblib/ -python setup.py install --prefix $OLDPWD/tmp -cd $OLDPWD -cp -r $SITE_PACKAGES/joblib-*.egg/joblib . -rm -rf tmp # Needed to rewrite the doctests # Note: BSD sed -i needs an argument unders OSX # so first renaming to .bak and then deleting backup files @@ -25,4 +17,6 @@ find joblib -name "*.bak" | xargs rm # joblib is already tested on its own CI infrastructure upstream. rm -r joblib/test -chmod -x joblib/*.py +# Remove joblib/testing.py which is only used in tests and has a +# pytest dependency (needed until we drop nose) +rm joblib/testing.py From f0fe782662d64070722a8523b78990a82714dca9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Est=C3=A8ve?= Date: Thu, 2 Mar 2017 14:35:19 +0100 Subject: [PATCH 2/2] Update joblib to 0.11 --- sklearn/externals/joblib/__init__.py | 25 +- sklearn/externals/joblib/_compat.py | 1 - sklearn/externals/joblib/_memory_helpers.py | 2 +- .../externals/joblib/_parallel_backends.py | 10 +- sklearn/externals/joblib/backports.py | 80 ++++++ sklearn/externals/joblib/format_stack.py | 30 +-- sklearn/externals/joblib/func_inspect.py | 21 +- sklearn/externals/joblib/hashing.py | 3 +- sklearn/externals/joblib/logger.py | 2 +- sklearn/externals/joblib/memory.py | 170 +++++++++--- sklearn/externals/joblib/numpy_pickle.py | 39 +-- .../externals/joblib/numpy_pickle_compat.py | 2 +- .../externals/joblib/numpy_pickle_utils.py | 135 ++++++---- sklearn/externals/joblib/parallel.py | 247 ++++++++++-------- sklearn/externals/joblib/pool.py | 29 +- sklearn/externals/joblib/testing.py | 93 ------- 16 files changed, 503 insertions(+), 386 deletions(-) create mode 100644 sklearn/externals/joblib/backports.py delete mode 100644 sklearn/externals/joblib/testing.py diff --git a/sklearn/externals/joblib/__init__.py b/sklearn/externals/joblib/__init__.py index ce1957d1def5a..3455b7d79b511 100644 --- a/sklearn/externals/joblib/__init__.py +++ b/sklearn/externals/joblib/__init__.py @@ -1,27 +1,27 @@ -""" Joblib is a set of tools to provide **lightweight pipelining in +"""Joblib is a set of tools to provide **lightweight pipelining in Python**. In particular, joblib offers: - 1. transparent disk-caching of the output values and lazy re-evaluation - (memoize pattern) +1. transparent disk-caching of the output values and lazy re-evaluation + (memoize pattern) - 2. easy simple parallel computing +2. easy simple parallel computing - 3. logging and tracing of the execution +3. logging and tracing of the execution Joblib is optimized to be **fast** and **robust** in particular on large data and has specific optimizations for `numpy` arrays. It is **BSD-licensed**. - ============================== ============================================ - **User documentation**: http://pythonhosted.org/joblib + ========================= ================================================ + **User documentation:** http://pythonhosted.org/joblib - **Download packages**: http://pypi.python.org/pypi/joblib#downloads + **Download packages:** http://pypi.python.org/pypi/joblib#downloads - **Source code**: http://github.com/joblib/joblib + **Source code:** http://github.com/joblib/joblib - **Report issues**: http://github.com/joblib/joblib/issues - ============================== ============================================ + **Report issues:** http://github.com/joblib/joblib/issues + ========================= ================================================ Vision @@ -115,8 +115,7 @@ # Dev branch marker is: 'X.Y.dev' or 'X.Y.devN' where N is an integer. # 'X.Y.dev0' is the canonical version of 'X.Y.dev' # - -__version__ = '0.10.3' +__version__ = '0.11' from .memory import Memory, MemorizedResult diff --git a/sklearn/externals/joblib/_compat.py b/sklearn/externals/joblib/_compat.py index 6309fa5281265..0c6e752478f01 100644 --- a/sklearn/externals/joblib/_compat.py +++ b/sklearn/externals/joblib/_compat.py @@ -4,7 +4,6 @@ import sys PY3_OR_LATER = sys.version_info[0] >= 3 -PY26 = sys.version_info[:2] == (2, 6) PY27 = sys.version_info[:2] == (2, 7) try: diff --git a/sklearn/externals/joblib/_memory_helpers.py b/sklearn/externals/joblib/_memory_helpers.py index 08a90de96d289..857ad29d79ad3 100644 --- a/sklearn/externals/joblib/_memory_helpers.py +++ b/sklearn/externals/joblib/_memory_helpers.py @@ -102,4 +102,4 @@ def open_py_source(filename): buffer.seek(0) text = TextIOWrapper(buffer, encoding, line_buffering=True) text.mode = 'r' - return text \ No newline at end of file + return text diff --git a/sklearn/externals/joblib/_parallel_backends.py b/sklearn/externals/joblib/_parallel_backends.py index cc4f221d21282..8f3e768abd441 100644 --- a/sklearn/externals/joblib/_parallel_backends.py +++ b/sklearn/externals/joblib/_parallel_backends.py @@ -21,6 +21,8 @@ class ParallelBackendBase(with_metaclass(ABCMeta)): """Helper abc which defines all methods a ParallelBackend must implement""" + supports_timeout = False + @abstractmethod def effective_n_jobs(self, n_jobs): """Determine the number of jobs that can actually run in parallel @@ -236,6 +238,8 @@ class ThreadingBackend(PoolManagerMixin, ParallelBackendBase): "with nogil" block or an expensive call to a library such as NumPy). """ + supports_timeout = True + def configure(self, n_jobs=1, parallel=None, **backend_args): """Build a process or thread pool and return the number of workers""" n_jobs = self.effective_n_jobs(n_jobs) @@ -259,6 +263,8 @@ class MultiprocessingBackend(PoolManagerMixin, AutoBatchingMixin, # Environment variables to protect against bad situations when nesting JOBLIB_SPAWNED_PROCESS = "__JOBLIB_SPAWNED_PARALLEL__" + supports_timeout = True + def effective_n_jobs(self, n_jobs): """Determine the number of jobs which are going to run in parallel. @@ -277,10 +283,10 @@ def effective_n_jobs(self, n_jobs): stacklevel=3) return 1 - elif threading.current_thread().name != 'MainThread': + if not isinstance(threading.current_thread(), threading._MainThread): # Prevent posix fork inside in non-main posix threads warnings.warn( - 'Multiprocessing backed parallel loops cannot be nested' + 'Multiprocessing-backed parallel loops cannot be nested' ' below threads, setting n_jobs=1', stacklevel=3) return 1 diff --git a/sklearn/externals/joblib/backports.py b/sklearn/externals/joblib/backports.py new file mode 100644 index 0000000000000..7dd3df16f165a --- /dev/null +++ b/sklearn/externals/joblib/backports.py @@ -0,0 +1,80 @@ +""" +Backports of fixes for joblib dependencies +""" +import os +import time +import ctypes +import sys + +from distutils.version import LooseVersion + +try: + import numpy as np + + def make_memmap(filename, dtype='uint8', mode='r+', offset=0, + shape=None, order='C'): + """Backport of numpy memmap offset fix. + + See https://github.com/numpy/numpy/pull/8443 for more details. + + The numpy fix will be available in numpy 1.13. + """ + mm = np.memmap(filename, dtype=dtype, mode=mode, offset=offset, + shape=shape, order=order) + if LooseVersion(np.__version__) < '1.13': + mm.offset = offset + return mm +except ImportError: + def make_memmap(filename, dtype='uint8', mode='r+', offset=0, + shape=None, order='C'): + raise NotImplementedError( + "'joblib.backports.make_memmap' should not be used " + 'if numpy is not installed.') + + +if os.name == 'nt': + error_access_denied = 5 + try: + from os import replace + except ImportError: + # Python 2.7 + def replace(src, dst): + if not isinstance(src, unicode): # noqa + src = unicode(src, sys.getfilesystemencoding()) # noqa + if not isinstance(dst, unicode): # noqa + dst = unicode(dst, sys.getfilesystemencoding()) # noqa + + movefile_replace_existing = 0x1 + return_value = ctypes.windll.kernel32.MoveFileExW( + src, dst, movefile_replace_existing) + if return_value == 0: + raise ctypes.WinError() + + def concurrency_safe_rename(src, dst): + """Renames ``src`` into ``dst`` overwriting ``dst`` if it exists. + + On Windows os.replace (or for Python 2.7 its implementation + through MoveFileExW) can yield permission errors if executed by + two different processes. + """ + max_sleep_time = 1 + total_sleep_time = 0 + sleep_time = 0.001 + while total_sleep_time < max_sleep_time: + try: + replace(src, dst) + break + except Exception as exc: + if getattr(exc, 'winerror', None) == error_access_denied: + time.sleep(sleep_time) + total_sleep_time += sleep_time + sleep_time *= 2 + else: + raise + else: + raise +else: + try: + from os import replace as concurrency_safe_rename + except ImportError: + from os import rename as concurrency_safe_rename # noqa diff --git a/sklearn/externals/joblib/format_stack.py b/sklearn/externals/joblib/format_stack.py index 91eabeb0e1ada..4984ebb081323 100644 --- a/sklearn/externals/joblib/format_stack.py +++ b/sklearn/externals/joblib/format_stack.py @@ -135,15 +135,10 @@ def _fixed_getframes(etb, context=1, tb_offset=0): aux = traceback.extract_tb(etb) assert len(records) == len(aux) for i, (file, lnum, _, _) in enumerate(aux): - maybeStart = lnum - 1 - context // 2 - start = max(maybeStart, 0) + maybe_start = lnum - 1 - context // 2 + start = max(maybe_start, 0) end = start + context lines = linecache.getlines(file)[start:end] - # pad with empty lines if necessary - if maybeStart < 0: - lines = (['\n'] * -maybeStart) + lines - if len(lines) < context: - lines += ['\n'] * (context - len(lines)) buf = list(records[i]) buf[LNUM_POS] = lnum buf[INDEX_POS] = lnum - 1 - start @@ -355,13 +350,7 @@ def format_exc(etype, evalue, etb, context=5, tb_offset=0): pyver) # Drop topmost frames if requested - try: - records = _fixed_getframes(etb, context, tb_offset) - except: - raise - print('\nUnfortunately, your original traceback can not be ' - 'constructed.\n') - return '' + records = _fixed_getframes(etb, context, tb_offset) # Get (safely) a string form of the exception info try: @@ -397,18 +386,13 @@ def format_outer_frames(context=5, stack_start=None, stack_end=None, filename = filename[:-4] + '.py' if ignore_ipython: # Hack to avoid printing the internals of IPython - if (os.path.basename(filename) == 'iplib.py' - and func_name in ('safe_execfile', 'runcode')): + if (os.path.basename(filename) in ('iplib.py', 'py3compat.py') + and func_name in ('execfile', 'safe_execfile', 'runcode')): break - maybeStart = line_no - 1 - context // 2 - start = max(maybeStart, 0) + maybe_start = line_no - 1 - context // 2 + start = max(maybe_start, 0) end = start + context lines = linecache.getlines(filename)[start:end] - # pad with empty lines if necessary - if maybeStart < 0: - lines = (['\n'] * -maybeStart) + lines - if len(lines) < context: - lines += ['\n'] * (context - len(lines)) buf = list(records[i]) buf[LNUM_POS] = line_no buf[INDEX_POS] = line_no - 1 - start diff --git a/sklearn/externals/joblib/func_inspect.py b/sklearn/externals/joblib/func_inspect.py index ad5a548d38ded..30d1192b314ff 100644 --- a/sklearn/externals/joblib/func_inspect.py +++ b/sklearn/externals/joblib/func_inspect.py @@ -190,7 +190,7 @@ def _signature_str(function_name, arg_spec): arg_spec_for_format = arg_spec[:7 if PY3_OR_LATER else 4] arg_spec_str = inspect.formatargspec(*arg_spec_for_format) - return '{0}{1}'.format(function_name, arg_spec_str) + return '{}{}'.format(function_name, arg_spec_str) def _function_called_str(function_name, args, kwargs): @@ -316,6 +316,13 @@ def filter_args(func, ignore_lst, args=(), kwargs=dict()): return arg_dict +def _format_arg(arg): + formatted_arg = pformat(arg, indent=2) + if len(formatted_arg) > 1500: + formatted_arg = '%s...' % formatted_arg[:700] + return formatted_arg + + def format_signature(func, *args, **kwargs): # XXX: Should this use inspect.formatargvalues/formatargspec? module, name = get_func_name(func) @@ -328,14 +335,12 @@ def format_signature(func, *args, **kwargs): arg_str = list() previous_length = 0 for arg in args: - arg = pformat(arg, indent=2) - if len(arg) > 1500: - arg = '%s...' % arg[:700] + formatted_arg = _format_arg(arg) if previous_length > 80: - arg = '\n%s' % arg - previous_length = len(arg) - arg_str.append(arg) - arg_str.extend(['%s=%s' % (v, pformat(i)) for v, i in kwargs.items()]) + formatted_arg = '\n%s' % formatted_arg + previous_length = len(formatted_arg) + arg_str.append(formatted_arg) + arg_str.extend(['%s=%s' % (v, _format_arg(i)) for v, i in kwargs.items()]) arg_str = ', '.join(arg_str) signature = '%s(%s)' % (name, arg_str) diff --git a/sklearn/externals/joblib/hashing.py b/sklearn/externals/joblib/hashing.py index ced817be17cac..88bd6cfdefeab 100644 --- a/sklearn/externals/joblib/hashing.py +++ b/sklearn/externals/joblib/hashing.py @@ -13,6 +13,7 @@ import types import struct import io +import decimal from ._compat import _bytes_or_unicode, PY3_OR_LATER @@ -35,7 +36,7 @@ def __init__(self, set_sequence): # This fails on python 3 when elements are unorderable # but we keep it in a try as it's faster. self._sequence = sorted(set_sequence) - except TypeError: + except (TypeError, decimal.InvalidOperation): # If elements are unorderable, sorting them using their hash. # This is slower but works in any case. self._sequence = sorted((hash(e) for e in set_sequence)) diff --git a/sklearn/externals/joblib/logger.py b/sklearn/externals/joblib/logger.py index 41b586427dcef..82a53b16500ea 100644 --- a/sklearn/externals/joblib/logger.py +++ b/sklearn/externals/joblib/logger.py @@ -74,7 +74,7 @@ def __init__(self, depth=3): self.depth = depth def warn(self, msg): - logging.warn("[%s]: %s" % (self, msg)) + logging.warning("[%s]: %s" % (self, msg)) def debug(self, msg): # XXX: This conflicts with the debug flag used in children class diff --git a/sklearn/externals/joblib/memory.py b/sklearn/externals/joblib/memory.py index fff84ad7a2cf8..14d7552535bb0 100644 --- a/sklearn/externals/joblib/memory.py +++ b/sklearn/externals/joblib/memory.py @@ -15,11 +15,6 @@ import time import pydoc import re -import sys -try: - import cPickle as pickle -except ImportError: - import pickle import functools import traceback import warnings @@ -27,19 +22,28 @@ import json import weakref import io +import operator +import collections +import datetime +import threading # Local imports from . import hashing from .func_inspect import get_func_code, get_func_name, filter_args -from .func_inspect import format_signature, format_call +from .func_inspect import format_call +from .func_inspect import format_signature from ._memory_helpers import open_py_source from .logger import Logger, format_time, pformat from . import numpy_pickle -from .disk import mkdirp, rm_subdirs +from .disk import mkdirp, rm_subdirs, memstr_to_bytes from ._compat import _basestring, PY3_OR_LATER +from .backports import concurrency_safe_rename FIRST_LINE_TEXT = "# first line:" +CacheItemInfo = collections.namedtuple('CacheItemInfo', + 'path size last_access') + # TODO: The following object should have a data store object as a sub # object, and the interface to persist and query should be separated in # the data store. @@ -130,7 +134,82 @@ def _load_output(output_dir, func_name, timestamp=None, metadata=None, raise KeyError( "Non-existing cache value (may have been cleared).\n" "File %s does not exist" % filename) - return numpy_pickle.load(filename, mmap_mode=mmap_mode) + result = numpy_pickle.load(filename, mmap_mode=mmap_mode) + + return result + + +def _get_cache_items(root_path): + """Get cache information for reducing the size of the cache.""" + cache_items = [] + + for dirpath, dirnames, filenames in os.walk(root_path): + is_cache_hash_dir = re.match('[a-f0-9]{32}', os.path.basename(dirpath)) + + if is_cache_hash_dir: + output_filename = os.path.join(dirpath, 'output.pkl') + try: + last_access = os.path.getatime(output_filename) + except OSError: + try: + last_access = os.path.getatime(dirpath) + except OSError: + # The directory has already been deleted + continue + + last_access = datetime.datetime.fromtimestamp(last_access) + try: + full_filenames = [os.path.join(dirpath, fn) + for fn in filenames] + dirsize = sum(os.path.getsize(fn) + for fn in full_filenames) + except OSError: + # Either output_filename or one of the files in + # dirpath does not exist any more. We assume this + # directory is being cleaned by another process already + continue + + cache_items.append(CacheItemInfo(dirpath, dirsize, last_access)) + + return cache_items + + +def _get_cache_items_to_delete(root_path, bytes_limit): + """Get cache items to delete to keep the cache under a size limit.""" + if isinstance(bytes_limit, _basestring): + bytes_limit = memstr_to_bytes(bytes_limit) + + cache_items = _get_cache_items(root_path) + cache_size = sum(item.size for item in cache_items) + + to_delete_size = cache_size - bytes_limit + if to_delete_size < 0: + return [] + + # We want to delete first the cache items that were accessed a + # long time ago + cache_items.sort(key=operator.attrgetter('last_access')) + + cache_items_to_delete = [] + size_so_far = 0 + + for item in cache_items: + if size_so_far > to_delete_size: + break + + cache_items_to_delete.append(item) + size_so_far += item.size + + return cache_items_to_delete + + +def concurrency_safe_write(to_write, filename, write_func): + """Writes an object into a file in a concurrency-safe way.""" + thread_id = id(threading.current_thread()) + temporary_filename = '{}.thread-{}-pid-{}'.format( + filename, thread_id, os.getpid()) + write_func(to_write, temporary_filename) + concurrency_safe_rename(temporary_filename, filename) # An in-memory store to avoid looking at the disk-based function @@ -419,9 +498,10 @@ def _cached_call(self, args, kwargs): # function code has changed output_dir, argument_hash = self._get_output_dir(*args, **kwargs) metadata = None + output_pickle_path = os.path.join(output_dir, 'output.pkl') # FIXME: The statements below should be try/excepted if not (self._check_previous_func_code(stacklevel=4) and - os.path.exists(output_dir)): + os.path.isfile(output_pickle_path)): if self._verbose > 10: _, name = get_func_name(self.func) self.warn('Computing func %s, argument hash %s in ' @@ -449,11 +529,10 @@ def _cached_call(self, args, kwargs): print(max(0, (80 - len(msg))) * '_' + msg) except Exception: # XXX: Should use an exception logger + _, signature = format_signature(self.func, *args, **kwargs) self.warn('Exception while loading results for ' - '(args=%s, kwargs=%s)\n %s' % - (args, kwargs, traceback.format_exc())) - - shutil.rmtree(output_dir, ignore_errors=True) + '{}\n {}'.format( + signature, traceback.format_exc())) out, metadata = self.call(*args, **kwargs) argument_hash = None return (out, argument_hash, metadata) @@ -490,16 +569,6 @@ def __reduce__(self): return (self.__class__, (self.func, self.cachedir, self.ignore, self.mmap_mode, self.compress, self._verbose)) - def format_signature(self, *args, **kwargs): - warnings.warn("MemorizedFunc.format_signature will be removed in a " - "future version of joblib.", DeprecationWarning) - return format_signature(self.func, *args, **kwargs) - - def format_call(self, *args, **kwargs): - warnings.warn("MemorizedFunc.format_call will be removed in a " - "future version of joblib.", DeprecationWarning) - return format_call(self.func, args, kwargs) - #------------------------------------------------------------------------- # Private interface #------------------------------------------------------------------------- @@ -688,9 +757,11 @@ def _persist_output(self, output, dir): """ Persist the given output tuple in the directory. """ try: - mkdirp(dir) filename = os.path.join(dir, 'output.pkl') - numpy_pickle.dump(output, filename, compress=self.compress) + mkdirp(dir) + write_func = functools.partial(numpy_pickle.dump, + compress=self.compress) + concurrency_safe_write(output, filename, write_func) if self._verbose > 10: print('Persisting in %s' % dir) except OSError: @@ -724,9 +795,14 @@ def _persist_input(self, output_dir, duration, args, kwargs, metadata = {"duration": duration, "input_args": input_repr} try: mkdirp(output_dir) - with open(os.path.join(output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) - except: + filename = os.path.join(output_dir, 'metadata.json') + + def write_func(output, dest_filename): + with open(dest_filename, 'w') as f: + json.dump(output, f) + + concurrency_safe_write(metadata, filename, write_func) + except Exception: pass this_duration = time.time() - start_time @@ -750,21 +826,9 @@ def _persist_input(self, output_dir, duration, args, kwargs, % this_duration, stacklevel=5) return metadata - def load_output(self, output_dir): - """ Read the results of a previous calculation from the directory - it was cached in. - """ - warnings.warn("MemorizedFunc.load_output is deprecated and will be " - "removed in a future version\n" - "of joblib. A MemorizedResult provides similar features", - DeprecationWarning) - # No metadata available here. - return _load_output(output_dir, _get_func_fullname(self.func), - timestamp=self.timestamp, - mmap_mode=self.mmap_mode, verbose=self._verbose) - # XXX: Need a method to check if results are available. + #------------------------------------------------------------------------- # Private `object` interface #------------------------------------------------------------------------- @@ -793,7 +857,8 @@ class Memory(Logger): # Public interface #------------------------------------------------------------------------- - def __init__(self, cachedir, mmap_mode=None, compress=False, verbose=1): + def __init__(self, cachedir, mmap_mode=None, compress=False, verbose=1, + bytes_limit=None): """ Parameters ---------- @@ -813,6 +878,8 @@ def __init__(self, cachedir, mmap_mode=None, compress=False, verbose=1): verbose: int, optional Verbosity flag, controls the debug messages that are issued as functions are evaluated. + bytes_limit: int, optional + Limit in bytes of the size of the cache """ # XXX: Bad explanation of the None value of cachedir Logger.__init__(self) @@ -820,6 +887,7 @@ def __init__(self, cachedir, mmap_mode=None, compress=False, verbose=1): self.mmap_mode = mmap_mode self.timestamp = time.time() self.compress = compress + self.bytes_limit = bytes_limit if compress and mmap_mode is not None: warnings.warn('Compressed results cannot be memmapped', stacklevel=2) @@ -884,6 +952,24 @@ def clear(self, warn=True): if self.cachedir is not None: rm_subdirs(self.cachedir) + def reduce_size(self): + """Remove cache folders to make cache size fit in ``bytes_limit``.""" + if self.cachedir is not None and self.bytes_limit is not None: + cache_items_to_delete = _get_cache_items_to_delete( + self.cachedir, self.bytes_limit) + + for cache_item in cache_items_to_delete: + if self._verbose > 10: + print('Deleting cache item {}'.format(cache_item)) + try: + shutil.rmtree(cache_item.path, ignore_errors=True) + except OSError: + # Even with ignore_errors=True can shutil.rmtree + # can raise OSErrror with [Errno 116] Stale file + # handle if another process has deleted the folder + # already. + pass + def eval(self, func, *args, **kwargs): """ Eval function func with arguments `*args` and `**kwargs`, in the context of the memory. diff --git a/sklearn/externals/joblib/numpy_pickle.py b/sklearn/externals/joblib/numpy_pickle.py index 0cf88a2bf3ab0..87a1a616cd54d 100644 --- a/sklearn/externals/joblib/numpy_pickle.py +++ b/sklearn/externals/joblib/numpy_pickle.py @@ -26,6 +26,7 @@ # which we don't care. from .numpy_pickle_compat import ZNDArrayWrapper # noqa from ._compat import _basestring, PY3_OR_LATER +from .backports import make_memmap ############################################################################### # Utility objects for persistence. @@ -151,12 +152,12 @@ def read_mmap(self, unpickler): if unpickler.mmap_mode == 'w+': unpickler.mmap_mode = 'r+' - marray = unpickler.np.memmap(unpickler.filename, - dtype=self.dtype, - shape=self.shape, - order=self.order, - mode=unpickler.mmap_mode, - offset=offset) + marray = make_memmap(unpickler.filename, + dtype=self.dtype, + shape=self.shape, + order=self.order, + mode=unpickler.mmap_mode, + offset=offset) # update the offset so that it corresponds to the end of the read array unpickler.file_handle.seek(offset + marray.nbytes) @@ -409,7 +410,7 @@ def dump(value, filename, compress=0, protocol=None, cache_size=None): if len(compress) != 2: raise ValueError( 'Compress argument tuple should contain exactly 2 elements: ' - '(compress method, compress level), you passed {0}' + '(compress method, compress level), you passed {}' .format(compress)) compress_method, compress_level = compress else: @@ -418,14 +419,14 @@ def dump(value, filename, compress=0, protocol=None, cache_size=None): if compress_level is not False and compress_level not in range(10): # Raising an error if a non valid compress level is given. raise ValueError( - 'Non valid compress level given: "{0}". Possible values are ' - '{1}.'.format(compress_level, list(range(10)))) + 'Non valid compress level given: "{}". Possible values are ' + '{}.'.format(compress_level, list(range(10)))) if compress_method not in _COMPRESSORS: # Raising an error if an unsupported compression method is given. raise ValueError( - 'Non valid compression method given: "{0}". Possible values are ' - '{1}.'.format(compress_method, _COMPRESSORS)) + 'Non valid compression method given: "{}". Possible values are ' + '{}.'.format(compress_method, _COMPRESSORS)) if not is_filename and not is_fileobj: # People keep inverting arguments, and the resulting error is @@ -461,17 +462,17 @@ def dump(value, filename, compress=0, protocol=None, cache_size=None): compress_level = 3 if not PY3_OR_LATER and compress_method in ('lzma', 'xz'): - raise NotImplementedError("{0} compression is only available for " + raise NotImplementedError("{} compression is only available for " "python version >= 3.3. You are using " - "{1}.{2}".format(compress_method, - sys.version_info[0], - sys.version_info[1])) + "{}.{}".format(compress_method, + sys.version_info[0], + sys.version_info[1])) if cache_size is not None: # Cache size is deprecated starting from version 0.10 warnings.warn("Please do not set 'cache_size' in joblib.dump, " "this parameter has no effect and will be removed. " - "You used 'cache_size={0}'".format(cache_size), + "You used 'cache_size={}'".format(cache_size), DeprecationWarning, stacklevel=2) if compress_level != 0: @@ -560,8 +561,10 @@ def load(filename, mmap_mode=None): if Path is not None and isinstance(filename, Path): filename = str(filename) - if hasattr(filename, "read") and hasattr(filename, "seek"): - with _read_fileobject(filename, "", mmap_mode) as fobj: + if hasattr(filename, "read"): + fobj = filename + filename = getattr(fobj, 'name', '') + with _read_fileobject(fobj, filename, mmap_mode) as fobj: obj = _unpickle(fobj) else: with open(filename, 'rb') as f: diff --git a/sklearn/externals/joblib/numpy_pickle_compat.py b/sklearn/externals/joblib/numpy_pickle_compat.py index 150d8f4e38450..ba8ab827914e0 100644 --- a/sklearn/externals/joblib/numpy_pickle_compat.py +++ b/sklearn/externals/joblib/numpy_pickle_compat.py @@ -12,7 +12,7 @@ def hex_str(an_int): """Convert an int to an hexadecimal string.""" - return '{0:#x}'.format(an_int) + return '{:#x}'.format(an_int) if PY3_OR_LATER: def asbytes(s): diff --git a/sklearn/externals/joblib/numpy_pickle_utils.py b/sklearn/externals/joblib/numpy_pickle_utils.py index 6f471073e672e..7196c0cbc85ce 100644 --- a/sklearn/externals/joblib/numpy_pickle_utils.py +++ b/sklearn/externals/joblib/numpy_pickle_utils.py @@ -9,12 +9,11 @@ import io import zlib import gzip -import bz2 import warnings import contextlib from contextlib import closing -from ._compat import PY3_OR_LATER, PY26, PY27, _basestring +from ._compat import PY3_OR_LATER, PY27, _basestring try: from threading import RLock @@ -40,6 +39,16 @@ lzma = None +try: + # The python standard library can be built without bz2 so we make bz2 + # usage optional. + # see https://github.com/scikit-learn/scikit-learn/issues/7526 for more + # details. + import bz2 +except ImportError: + bz2 = None + + # Magic numbers of supported compression file formats. ' _ZFILE_PREFIX = b'ZF' # used with pickle files created before 0.9.3. _ZLIB_PREFIX = b'\x78' @@ -50,7 +59,11 @@ # Supported compressors _COMPRESSORS = ('zlib', 'bz2', 'lzma', 'xz', 'gzip') -_COMPRESSOR_CLASSES = [gzip.GzipFile, bz2.BZ2File] +_COMPRESSOR_CLASSES = [gzip.GzipFile] + +if bz2 is not None: + _COMPRESSOR_CLASSES.append(bz2.BZ2File) + if lzma is not None: _COMPRESSOR_CLASSES.append(lzma.LZMAFile) @@ -63,6 +76,15 @@ _IO_BUFFER_SIZE = 1024 ** 2 +def _is_raw_file(fileobj): + """Check if fileobj is a raw file object, e.g created with open.""" + if PY3_OR_LATER: + fileobj = getattr(fileobj, 'raw', fileobj) + return isinstance(fileobj, io.FileIO) + else: + return isinstance(fileobj, file) # noqa + + ############################################################################### # Cache file utilities def _detect_compressor(fileobj): @@ -76,10 +98,15 @@ def _detect_compressor(fileobj): ------- str in {'zlib', 'gzip', 'bz2', 'lzma', 'xz', 'compat', 'not-compressed'} """ - # Ensure we read the first bytes. - fileobj.seek(0) - first_bytes = fileobj.read(_MAX_PREFIX_LEN) - fileobj.seek(0) + # Read the magic number in the first bytes of the file. + if hasattr(fileobj, 'peek'): + # Peek allows to read those bytes without moving the cursor in the + # file whic. + first_bytes = fileobj.peek(_MAX_PREFIX_LEN) + else: + # Fallback to seek if the fileobject is not peekable. + first_bytes = fileobj.read(_MAX_PREFIX_LEN) + fileobj.seek(0) if first_bytes.startswith(_ZLIB_PREFIX): return "zlib" @@ -99,8 +126,7 @@ def _detect_compressor(fileobj): def _buffered_read_file(fobj): """Return a buffered version of a read file object.""" - if PY26 or (PY27 and isinstance(fobj, bz2.BZ2File)): - # Python 2.6 doesn't fully support io.BufferedReader. + if PY27 and bz2 is not None and isinstance(fobj, bz2.BZ2File): # Python 2.7 doesn't work with BZ2File through a buffer: "no # attribute 'readable'" error. return fobj @@ -110,8 +136,7 @@ def _buffered_read_file(fobj): def _buffered_write_file(fobj): """Return a buffered version of a write file object.""" - if PY26 or (PY27 and isinstance(fobj, bz2.BZ2File)): - # Python 2.6 doesn't fully support io.BufferedWriter. + if PY27 and bz2 is not None and isinstance(fobj, bz2.BZ2File): # Python 2.7 doesn't work with BZ2File through a buffer: no attribute # 'writable'. # BZ2File doesn't implement the file object context manager in python 2 @@ -151,8 +176,7 @@ def _read_fileobject(fileobj, filename, mmap_mode=None): """ # Detect if the fileobj contains compressed data. compressor = _detect_compressor(fileobj) - if isinstance(fileobj, tuple(_COMPRESSOR_CLASSES)): - compressor = fileobj.__class__.__name__ + if compressor == 'compat': # Compatibility with old pickle mode: simply return the input # filename "as-is" and let the compatibility function be called by the @@ -163,52 +187,53 @@ def _read_fileobject(fileobj, filename, mmap_mode=None): DeprecationWarning, stacklevel=2) yield filename else: - # Checking if incompatible load parameters with the type of file: - # mmap_mode cannot be used with compressed file or in memory buffers - # such as io.BytesIO. - if ((compressor in _COMPRESSORS or - isinstance(fileobj, tuple(_COMPRESSOR_CLASSES))) and - mmap_mode is not None): - warnings.warn('File "%(filename)s" is compressed using ' - '"%(compressor)s" which is not compatible with ' - 'mmap_mode "%(mmap_mode)s" flag passed. mmap_mode ' - 'option will be ignored.' - % locals(), stacklevel=2) - if isinstance(fileobj, io.BytesIO) and mmap_mode is not None: - warnings.warn('In memory persistence is not compatible with ' - 'mmap_mode "%(mmap_mode)s" flag passed. mmap_mode ' - 'option will be ignored.' - % locals(), stacklevel=2) - - # if the passed fileobj is in the supported list of decompressor - # objects (GzipFile, BZ2File, LzmaFile), we simply return it. - if isinstance(fileobj, tuple(_COMPRESSOR_CLASSES)): - yield fileobj - # otherwise, based on the compressor detected in the file, we open the + # based on the compressor detected in the file, we open the # correct decompressor file object, wrapped in a buffer. - elif compressor == 'zlib': - yield _buffered_read_file(BinaryZlibFile(fileobj, 'rb')) + if compressor == 'zlib': + fileobj = _buffered_read_file(BinaryZlibFile(fileobj, 'rb')) elif compressor == 'gzip': - yield _buffered_read_file(BinaryGzipFile(fileobj, 'rb')) - elif compressor == 'bz2': + fileobj = _buffered_read_file(BinaryGzipFile(fileobj, 'rb')) + elif compressor == 'bz2' and bz2 is not None: if PY3_OR_LATER: - yield _buffered_read_file(bz2.BZ2File(fileobj, 'rb')) + fileobj = _buffered_read_file(bz2.BZ2File(fileobj, 'rb')) else: # In python 2, BZ2File doesn't support a fileobj opened in # binary mode. In this case, we pass the filename. - yield _buffered_read_file(bz2.BZ2File(fileobj.name, 'rb')) + fileobj = _buffered_read_file(bz2.BZ2File(fileobj.name, 'rb')) elif (compressor == 'lzma' or compressor == 'xz'): - if lzma is not None: - yield _buffered_read_file(lzma.LZMAFile(fileobj, 'rb')) + if PY3_OR_LATER and lzma is not None: + # We support lzma only in python 3 because in python 2 users + # may have installed the pyliblzma package, which also provides + # the lzma module, but that unfortunately doesn't fully support + # the buffer interface required by joblib. + # See https://github.com/joblib/joblib/issues/403 for details. + fileobj = _buffered_read_file(lzma.LZMAFile(fileobj, 'rb')) else: raise NotImplementedError("Lzma decompression is not " - "available for this version of " - "python ({0}.{1})" + "supported for this version of " + "python ({}.{})" .format(sys.version_info[0], sys.version_info[1])) - # No compression detected => returning the input file object (open) - else: - yield fileobj + # Checking if incompatible load parameters with the type of file: + # mmap_mode cannot be used with compressed file or in memory buffers + # such as io.BytesIO. + if mmap_mode is not None: + if isinstance(fileobj, io.BytesIO): + warnings.warn('In memory persistence is not compatible with ' + 'mmap_mode "%(mmap_mode)s" flag passed. ' + 'mmap_mode option will be ignored.' + % locals(), stacklevel=2) + elif compressor != 'not-compressed': + warnings.warn('mmap_mode "%(mmap_mode)s" is not compatible ' + 'with compressed file %(filename)s. ' + '"%(mmap_mode)s" flag will be ignored.' + % locals(), stacklevel=2) + elif not _is_raw_file(fileobj): + warnings.warn('"%(fileobj)r" is not a raw file, mmap_mode ' + '"%(mmap_mode)s" flag will be ignored.' + % locals(), stacklevel=2) + + yield fileobj def _write_fileobject(filename, compress=("zlib", 3)): @@ -218,7 +243,7 @@ def _write_fileobject(filename, compress=("zlib", 3)): if compressmethod == "gzip": return _buffered_write_file(BinaryGzipFile(filename, 'wb', compresslevel=compresslevel)) - elif compressmethod == "bz2": + elif compressmethod == "bz2" and bz2 is not None: return _buffered_write_file(bz2.BZ2File(filename, 'wb', compresslevel=compresslevel)) elif lzma is not None and compressmethod == "xz": @@ -254,7 +279,7 @@ class BinaryZlibFile(io.BufferedIOBase): is returned as bytes, and data to be written should be given as bytes. This object is an adaptation of the BZ2File object and is compatible with - versions of python >= 2.6. + versions of python >= 2.7. If filename is a str or bytes object, it gives the name of the file to be opened. Otherwise, it should be a file object, @@ -280,8 +305,8 @@ def __init__(self, filename, mode="rb", compresslevel=9): self._size = -1 if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9): - raise ValueError("compresslevel must be between an integer " - "between 1 and 9, you gave {0}" + raise ValueError("'compresslevel' must be an integer " + "between 1 and 9. You provided 'compresslevel={}'" .format(compresslevel)) if mode == "rb": @@ -300,7 +325,7 @@ def __init__(self, filename, mode="rb", compresslevel=9): raise ValueError("Invalid mode: %r" % (mode,)) if isinstance(filename, _basestring): - self._fp = open(filename, mode) + self._fp = io.open(filename, mode) self._closefp = True self._mode = mode_code elif hasattr(filename, "read") or hasattr(filename, "write"): @@ -367,7 +392,7 @@ def _check_not_closed(self): fname = getattr(self._fp, 'name', None) msg = "I/O operation on closed file" if fname is not None: - msg += " {0}".format(fname) + msg += " {}".format(fname) msg += "." raise ValueError(msg) @@ -492,7 +517,7 @@ def write(self, data): with self._lock: self._check_can_write() # Convert data type if called by io.BufferedWriter. - if not PY26 and isinstance(data, memoryview): + if isinstance(data, memoryview): data = data.tobytes() compressed = self._compressor.compress(data) diff --git a/sklearn/externals/joblib/parallel.py b/sklearn/externals/joblib/parallel.py index 74bd18c4fb252..73e681b870dd5 100644 --- a/sklearn/externals/joblib/parallel.py +++ b/sklearn/externals/joblib/parallel.py @@ -16,9 +16,10 @@ import itertools from numbers import Integral from contextlib import contextmanager +import warnings try: import cPickle as pickle -except: +except ImportError: import pickle from ._multiprocessing_helpers import mp @@ -30,7 +31,6 @@ from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend, ThreadingBackend, SequentialBackend) from ._compat import _basestring -from .func_inspect import getfullargspec # Make sure that those two classes are part of the public joblib.parallel API # so that 3rd party backend implementers can import them from here. @@ -282,22 +282,24 @@ class Parallel(Logger): is used at all, which is useful for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one are used. - backend: str or None, default: 'multiprocessing' + backend: str, ParallelBackendBase instance or None, \ + default: 'multiprocessing' Specify the parallelization backend implementation. Supported backends are: - - "multiprocessing" used by default, can induce some - communication and memory overhead when exchanging input and - output data with the worker Python processes. - - "threading" is a very low-overhead backend but it suffers - from the Python Global Interpreter Lock if the called function - relies a lot on Python objects. "threading" is mostly useful - when the execution bottleneck is a compiled extension that - explicitly releases the GIL (for instance a Cython loop wrapped - in a "with nogil" block or an expensive call to a library such - as NumPy). - - finally, you can register backends by calling - register_parallel_backend. This will allow you to implement - a backend of your liking. + + - "multiprocessing" used by default, can induce some + communication and memory overhead when exchanging input and + output data with the worker Python processes. + - "threading" is a very low-overhead backend but it suffers + from the Python Global Interpreter Lock if the called function + relies a lot on Python objects. "threading" is mostly useful + when the execution bottleneck is a compiled extension that + explicitly releases the GIL (for instance a Cython loop wrapped + in a "with nogil" block or an expensive call to a library such + as NumPy). + - finally, you can register backends by calling + register_parallel_backend. This will allow you to implement + a backend of your liking. verbose: int, optional The verbosity level: if non zero, progress messages are printed. Above 50, the output is sent to stdout. @@ -327,12 +329,16 @@ class Parallel(Logger): Folder to be used by the pool for memmaping large arrays for sharing memory with worker processes. If None, this will try in order: - - a folder pointed by the JOBLIB_TEMP_FOLDER environment variable, - - /dev/shm if the folder exists and is writable: this is a RAMdisk - filesystem available by default on modern Linux distributions, - - the default system temporary folder that can be overridden - with TMP, TMPDIR or TEMP environment variables, typically /tmp - under Unix operating systems. + + - a folder pointed by the JOBLIB_TEMP_FOLDER environment + variable, + - /dev/shm if the folder exists and is writable: this is a + RAMdisk filesystem available by default on modern Linux + distributions, + - the default system temporary folder that can be + overridden with TMP, TMPDIR or TEMP environment + variables, typically /tmp under Unix operating systems. + Only active when backend="multiprocessing". max_nbytes int, str, or None, optional, 1M by default Threshold on the size of arrays passed to the workers that @@ -352,25 +358,25 @@ class Parallel(Logger): arguments. The main functionality it brings in addition to using the raw multiprocessing API are (see examples for details): - * More readable code, in particular since it avoids - constructing list of arguments. + * More readable code, in particular since it avoids + constructing list of arguments. - * Easier debugging: - - informative tracebacks even when the error happens on - the client side - - using 'n_jobs=1' enables to turn off parallel computing - for debugging without changing the codepath - - early capture of pickling errors + * Easier debugging: + - informative tracebacks even when the error happens on + the client side + - using 'n_jobs=1' enables to turn off parallel computing + for debugging without changing the codepath + - early capture of pickling errors - * An optional progress meter. + * An optional progress meter. - * Interruption of multiprocesses jobs with 'Ctrl-C' + * Interruption of multiprocesses jobs with 'Ctrl-C' - * Flexible pickling control for the communication to and from - the worker processes. + * Flexible pickling control for the communication to and from + the worker processes. - * Ability to use shared memory efficiently with worker - processes for large numpy-based datastructures. + * Ability to use shared memory efficiently with worker + processes for large numpy-based datastructures. Examples -------- @@ -395,76 +401,74 @@ class Parallel(Logger): (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0) The progress meter: the higher the value of `verbose`, the more - messages:: + messages: - >>> from time import sleep - >>> from sklearn.externals.joblib import Parallel, delayed - >>> r = Parallel(n_jobs=2, verbose=5)(delayed(sleep)(.1) for _ in range(10)) #doctest: +SKIP - [Parallel(n_jobs=2)]: Done 1 out of 10 | elapsed: 0.1s remaining: 0.9s - [Parallel(n_jobs=2)]: Done 3 out of 10 | elapsed: 0.2s remaining: 0.5s - [Parallel(n_jobs=2)]: Done 6 out of 10 | elapsed: 0.3s remaining: 0.2s - [Parallel(n_jobs=2)]: Done 9 out of 10 | elapsed: 0.5s remaining: 0.1s - [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 0.5s finished + >>> from time import sleep + >>> from sklearn.externals.joblib import Parallel, delayed + >>> r = Parallel(n_jobs=2, verbose=5)(delayed(sleep)(.1) for _ in range(10)) #doctest: +SKIP + [Parallel(n_jobs=2)]: Done 1 out of 10 | elapsed: 0.1s remaining: 0.9s + [Parallel(n_jobs=2)]: Done 3 out of 10 | elapsed: 0.2s remaining: 0.5s + [Parallel(n_jobs=2)]: Done 6 out of 10 | elapsed: 0.3s remaining: 0.2s + [Parallel(n_jobs=2)]: Done 9 out of 10 | elapsed: 0.5s remaining: 0.1s + [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 0.5s finished Traceback example, note how the line of the error is indicated as well as the values of the parameter passed to the function that triggered the exception, even though the traceback happens in the - child process:: - - >>> from heapq import nlargest - >>> from sklearn.externals.joblib import Parallel, delayed - >>> Parallel(n_jobs=2)(delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) #doctest: +SKIP - #... - --------------------------------------------------------------------------- - Sub-process traceback: - --------------------------------------------------------------------------- - TypeError Mon Nov 12 11:37:46 2012 - PID: 12934 Python 2.7.3: /usr/bin/python - ........................................................................... - /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None) - 419 if n >= size: - 420 return sorted(iterable, key=key, reverse=True)[:n] - 421 - 422 # When key is none, use simpler decoration - 423 if key is None: - --> 424 it = izip(iterable, count(0,-1)) # decorate - 425 result = _nlargest(n, it) - 426 return map(itemgetter(0), result) # undecorate - 427 - 428 # General case, slowest method + child process: + >>> from heapq import nlargest + >>> from sklearn.externals.joblib import Parallel, delayed + >>> Parallel(n_jobs=2)(delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) #doctest: +SKIP + #... + --------------------------------------------------------------------------- + Sub-process traceback: + --------------------------------------------------------------------------- + TypeError Mon Nov 12 11:37:46 2012 + PID: 12934 Python 2.7.3: /usr/bin/python + ........................................................................... + /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None) + 419 if n >= size: + 420 return sorted(iterable, key=key, reverse=True)[:n] + 421 + 422 # When key is none, use simpler decoration + 423 if key is None: + --> 424 it = izip(iterable, count(0,-1)) # decorate + 425 result = _nlargest(n, it) + 426 return map(itemgetter(0), result) # undecorate + 427 + 428 # General case, slowest method TypeError: izip argument #1 must support iteration - ___________________________________________________________________________ + ___________________________________________________________________________ Using pre_dispatch in a producer/consumer situation, where the data is generated on the fly. Note how the producer is first - called a 3 times before the parallel loop is initiated, and then + called 3 times before the parallel loop is initiated, and then called to generate new data on the fly. In this case the total - number of iterations cannot be reported in the progress messages:: - - >>> from math import sqrt - >>> from sklearn.externals.joblib import Parallel, delayed - - >>> def producer(): - ... for i in range(6): - ... print('Produced %s' % i) - ... yield i - - >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')( - ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP - Produced 0 - Produced 1 - Produced 2 - [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s - Produced 3 - [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s - Produced 4 - [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s - Produced 5 - [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s - [Parallel(n_jobs=2)]: Done 5 out of 6 | elapsed: 0.0s remaining: 0.0s - [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished + number of iterations cannot be reported in the progress messages: + + >>> from math import sqrt + >>> from sklearn.externals.joblib import Parallel, delayed + >>> def producer(): + ... for i in range(6): + ... print('Produced %s' % i) + ... yield i + >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')( + ... delayed(sqrt)(i) for i in producer()) #doctest: +SKIP + Produced 0 + Produced 1 + Produced 2 + [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s + Produced 3 + [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s + Produced 4 + [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s + Produced 5 + [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s + [Parallel(n_jobs=2)]: Done 5 out of 6 | elapsed: 0.0s remaining: 0.0s + [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished + ''' def __init__(self, n_jobs=1, backend=None, verbose=0, timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', @@ -493,6 +497,9 @@ def __init__(self, n_jobs=1, backend=None, verbose=0, timeout=None, if backend is None: backend = active_backend + elif isinstance(backend, ParallelBackendBase): + # Use provided backend as is + pass elif hasattr(backend, 'Pool') and hasattr(backend, 'Lock'): # Make it possible to pass a custom multiprocessing context as # backend to change the start method to forkserver or spawn or @@ -536,12 +543,22 @@ def __exit__(self, exc_type, exc_value, traceback): def _initialize_backend(self): """Build a process or thread pool and return the number of workers""" try: - return self._backend.configure(n_jobs=self.n_jobs, parallel=self, - **self._backend_args) + n_jobs = self._backend.configure(n_jobs=self.n_jobs, parallel=self, + **self._backend_args) + if self.timeout is not None and not self._backend.supports_timeout: + warnings.warn( + 'The backend class {!r} does not support timeout. ' + "You have set 'timeout={}' in Parallel but " + "the 'timeout' parameter will not be used.".format( + self._backend.__class__.__name__, + self.timeout)) + except FallbackToBackend as e: # Recursively initialize the backend in case of requested fallback. self._backend = e.backend - return self._initialize_backend() + n_jobs = self._initialize_backend() + + return n_jobs def _effective_n_jobs(self): if self._backend: @@ -676,12 +693,13 @@ def retrieve(self): # the use of the lock with self._lock: job = self._jobs.pop(0) + try: - # check if timeout supported in backend future implementation - if 'timeout' in getfullargspec(job.get).args: + if getattr(self._backend, 'supports_timeout', False): self._output.extend(job.get(timeout=self.timeout)) else: self._output.extend(job.get()) + except BaseException as exception: # Note: we catch any BaseException instead of just Exception # instances to also include KeyboardInterrupt. @@ -689,7 +707,22 @@ def retrieve(self): # Stop dispatching any new job in the async callback thread self._aborting = True - if isinstance(exception, TransportableException): + # If the backend allows it, cancel or kill remaining running + # tasks without waiting for the results as we will raise + # the exception we got back to the caller instead of returning + # any result. + backend = self._backend + if (backend is not None and + hasattr(backend, 'abort_everything')): + # If the backend is managed externally we need to make sure + # to leave it in a working state to allow for future jobs + # scheduling. + ensure_ready = self._managed_backend + backend.abort_everything(ensure_ready=ensure_ready) + + if not isinstance(exception, TransportableException): + raise + else: # Capture exception to add information on the local # stack in addition to the distant stack this_report = format_outer_frames(context=10, @@ -704,19 +737,7 @@ def retrieve(self): exception_type = _mk_exception(exception.etype)[0] exception = exception_type(report) - # If the backends allows it, cancel or kill remaining running - # tasks without waiting for the results as we will raise - # the exception we got back to the caller instead of returning - # any result. - backend = self._backend - if (backend is not None and - hasattr(backend, 'abort_everything')): - # If the backend is managed externally we need to make sure - # to leave it in a working state to allow for future jobs - # scheduling. - ensure_ready = self._managed_backend - backend.abort_everything(ensure_ready=ensure_ready) - raise exception + raise exception def __call__(self, iterable): if self._jobs: diff --git a/sklearn/externals/joblib/pool.py b/sklearn/externals/joblib/pool.py index e0682c1822314..c53a12dfa7686 100644 --- a/sklearn/externals/joblib/pool.py +++ b/sklearn/externals/joblib/pool.py @@ -28,7 +28,7 @@ try: WindowsError except NameError: - WindowsError = None + WindowsError = type(None) from pickle import whichmodule try: @@ -61,7 +61,7 @@ from .numpy_pickle import load from .numpy_pickle import dump from .hashing import hash - +from .backports import make_memmap # Some system have a ramdisk mounted by default, we can use it instead of /tmp # as the default folder to dump big arrays to share with subprocesses SYSTEM_SHARED_MEM_FS = '/dev/shm' @@ -107,13 +107,13 @@ def _strided_from_memmap(filename, dtype, mode, offset, order, shape, strides, if strides is None: # Simple, contiguous memmap - return np.memmap(filename, dtype=dtype, shape=shape, mode=mode, - offset=offset, order=order) + return make_memmap(filename, dtype=dtype, shape=shape, mode=mode, + offset=offset, order=order) else: # For non-contiguous data, memmap the total enclosing buffer and then # extract the non-contiguous view with the stride-tricks API - base = np.memmap(filename, dtype=dtype, shape=total_buffer_len, - mode=mode, offset=offset, order=order) + base = make_memmap(filename, dtype=dtype, shape=total_buffer_len, + mode=mode, offset=offset, order=order) return as_strided(base, shape=shape, strides=strides) @@ -279,7 +279,7 @@ class CustomizablePickler(Pickler): """ # We override the pure Python pickler as its the only way to be able to - # customize the dispatch table without side effects in Python 2.6 + # customize the dispatch table without side effects in Python 2.7 # to 3.2. For Python 3.3+ leverage the new dispatch_table # feature from http://bugs.python.org/issue14166 that makes it possible # to use the C implementation of the Pickler which is faster. @@ -605,11 +605,12 @@ def terminate(self): try: super(MemmapingPool, self).terminate() break - except WindowsError as e: - # Workaround occasional "[Error 5] Access is denied" issue - # when trying to terminate a process under windows. - sleep(0.1) - if i + 1 == n_retries: - warnings.warn("Failed to terminate worker processes in " - " multiprocessing pool: %r" % e) + except OSError as e: + if isinstance(e, WindowsError): + # Workaround occasional "[Error 5] Access is denied" issue + # when trying to terminate a process under windows. + sleep(0.1) + if i + 1 == n_retries: + warnings.warn("Failed to terminate worker processes in" + " multiprocessing pool: %r" % e) delete_folder(self._temp_folder) diff --git a/sklearn/externals/joblib/testing.py b/sklearn/externals/joblib/testing.py deleted file mode 100644 index 94c023c3f0aed..0000000000000 --- a/sklearn/externals/joblib/testing.py +++ /dev/null @@ -1,93 +0,0 @@ -""" -Helper for testing. -""" - -import sys -import warnings -import os.path -import re -import subprocess -import threading - -from sklearn.externals.joblib._compat import PY3_OR_LATER - - -def warnings_to_stdout(): - """ Redirect all warnings to stdout. - """ - showwarning_orig = warnings.showwarning - - def showwarning(msg, cat, fname, lno, file=None, line=0): - showwarning_orig(msg, cat, os.path.basename(fname), line, sys.stdout) - - warnings.showwarning = showwarning - #warnings.simplefilter('always') - - -try: - from nose.tools import assert_raises_regex -except ImportError: - # For Python 2.7 - try: - from nose.tools import assert_raises_regexp as assert_raises_regex - except ImportError: - # for Python 2.6 - def assert_raises_regex(expected_exception, expected_regexp, - callable_obj=None, *args, **kwargs): - """Helper function to check for message patterns in exceptions""" - - not_raised = False - try: - callable_obj(*args, **kwargs) - not_raised = True - except Exception as e: - error_message = str(e) - if not re.compile(expected_regexp).search(error_message): - raise AssertionError("Error message should match pattern " - "%r. %r does not." % - (expected_regexp, error_message)) - if not_raised: - raise AssertionError("Should have raised %r" % - expected_exception(expected_regexp)) - - -def check_subprocess_call(cmd, timeout=1, stdout_regex=None, - stderr_regex=None): - """Runs a command in a subprocess with timeout in seconds. - - Also checks returncode is zero, stdout if stdout_regex is set, and - stderr if stderr_regex is set. - """ - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - def kill_process(): - proc.kill() - - timer = threading.Timer(timeout, kill_process) - try: - timer.start() - stdout, stderr = proc.communicate() - - if PY3_OR_LATER: - stdout, stderr = stdout.decode(), stderr.decode() - if proc.returncode != 0: - message = ( - 'Non-zero return code: {0}.\nStdout:\n{1}\n' - 'Stderr:\n{2}').format( - proc.returncode, stdout, stderr) - raise ValueError(message) - - if (stdout_regex is not None and - not re.search(stdout_regex, stdout)): - raise ValueError( - "Unexpected stdout: {0!r} does not match:\n{1!r}".format( - stdout_regex, stdout)) - if (stderr_regex is not None and - not re.search(stderr_regex, stderr)): - raise ValueError( - "Unexpected stderr: {0!r} does not match:\n{1!r}".format( - stderr_regex, stderr)) - - finally: - timer.cancel()