diff --git a/doc/whats_new.rst b/doc/whats_new.rst index 9c27f3444e0b5..d8d893483bf57 100644 --- a/doc/whats_new.rst +++ b/doc/whats_new.rst @@ -124,6 +124,25 @@ API changes summary - ``residual_metric`` has been deprecated in :class:`linear_model.RANSACRegressor`. Use ``loss`` instead. By `Manoj Kumar`_. + +.. _changes_0_17_1: + +Version 0.17.1 +============== + +Changelog +--------- + +Bug fixes +......... + + + - Upgrade vendored joblib to version 0.9.4 that fixes an important bug in + ``joblib.Parallel`` that can silently yield to wrong results when working + on datasets larger than 1MB: + https://github.com/joblib/joblib/blob/0.9.4/CHANGES.rst + + .. _changes_0_17: Version 0.17 diff --git a/sklearn/externals/joblib/__init__.py b/sklearn/externals/joblib/__init__.py index 69fbd7014d183..b9ee52871a7b7 100644 --- a/sklearn/externals/joblib/__init__.py +++ b/sklearn/externals/joblib/__init__.py @@ -74,7 +74,7 @@ >>> c = square(a) >>> # The above call did not trigger an evaluation -2) **Embarrassingly parallel helper:** to make is easy to write readable +2) **Embarrassingly parallel helper:** to make it easy to write readable parallel code and debug it quickly:: >>> from sklearn.externals.joblib import Parallel, delayed @@ -86,7 +86,7 @@ 3) **Logging/tracing:** The different functionalities will progressively acquire better logging mechanism to help track what has been ran, and capture I/O easily. In addition, Joblib will - provide a few I/O primitives, to easily define define logging and + provide a few I/O primitives, to easily define logging and display streams, and provide a way of compiling a report. We want to be able to quickly inspect what has been run. @@ -115,7 +115,7 @@ # Dev branch marker is: 'X.Y.dev' or 'X.Y.devN' where N is an integer. # 'X.Y.dev0' is the canonical version of 'X.Y.dev' # -__version__ = '0.9.3' +__version__ = '0.9.4' from .memory import Memory, MemorizedResult diff --git a/sklearn/externals/joblib/_compat.py b/sklearn/externals/joblib/_compat.py index 81457a9cb93b8..70a1f43ee3f3d 100644 --- a/sklearn/externals/joblib/_compat.py +++ b/sklearn/externals/joblib/_compat.py @@ -1,10 +1,13 @@ """ Compatibility layer for Python 3/Python 2 single codebase """ +import sys + +PY3_OR_LATER = sys.version_info[0] >= 3 try: _basestring = basestring _bytes_or_unicode = (str, unicode) except NameError: _basestring = str - _bytes_or_unicode = (bytes, str) \ No newline at end of file + _bytes_or_unicode = (bytes, str) diff --git a/sklearn/externals/joblib/format_stack.py b/sklearn/externals/joblib/format_stack.py index bef5c3bd0a2fb..ad28a86c2496d 100644 --- a/sklearn/externals/joblib/format_stack.py +++ b/sklearn/externals/joblib/format_stack.py @@ -28,16 +28,14 @@ import time import tokenize import traceback -import types + try: # Python 2 generate_tokens = tokenize.generate_tokens except AttributeError: # Python 3 generate_tokens = tokenize.tokenize -PY3 = (sys.version[0] == '3') INDENT = ' ' * 8 -from ._compat import _basestring ############################################################################### # some internal-use functions @@ -195,14 +193,13 @@ def format_records(records): # , print_globals=False): # the abspath call will throw an OSError. Just ignore it and # keep the original file string. pass + + if file.endswith('.pyc'): + file = file[:-4] + '.py' + link = file - try: - args, varargs, varkw, locals = inspect.getargvalues(frame) - except: - # This can happen due to a bug in python2.3. We should be - # able to remove this try/except when 2.4 becomes a - # requirement. Bug details at http://python.org/sf/1005466 - print("\nJoblib's exception reporting continues...\n") + + args, varargs, varkw, locals = inspect.getargvalues(frame) if func == '?': call = '' @@ -350,13 +347,11 @@ def format_exc(etype, evalue, etb, context=5, tb_offset=0): date = time.ctime(time.time()) pid = 'PID: %i' % os.getpid() - head = '%s%s%s\n%s%s%s' % (etype, ' ' * (75 - len(str(etype)) - len(date)), - date, pid, ' ' * (75 - len(str(pid)) - len(pyver)), - pyver) + head = '%s%s%s\n%s%s%s' % ( + etype, ' ' * (75 - len(str(etype)) - len(date)), + date, pid, ' ' * (75 - len(str(pid)) - len(pyver)), + pyver) - # Flush cache before calling inspect. This helps alleviate some of the - # problems with python 2.3's inspect.py. - linecache.checkcache() # Drop topmost frames if requested try: records = _fixed_getframes(etb, context, tb_offset) diff --git a/sklearn/externals/joblib/func_inspect.py b/sklearn/externals/joblib/func_inspect.py index e7b28faa094cb..9fb67f0241651 100644 --- a/sklearn/externals/joblib/func_inspect.py +++ b/sklearn/externals/joblib/func_inspect.py @@ -15,6 +15,8 @@ from ._compat import _basestring from .logger import pformat from ._memory_helpers import open_py_source +from ._compat import PY3_OR_LATER + def get_func_code(func): """ Attempts to retrieve a reliable function code hash. @@ -156,6 +158,53 @@ def get_func_name(func, resolv_alias=True, win_characters=True): return module, name +def getfullargspec(func): + """Compatibility function to provide inspect.getfullargspec in Python 2 + + This should be rewritten using a backport of Python 3 signature + once we drop support for Python 2.6. We went for a simpler + approach at the time of writing because signature uses OrderedDict + which is not available in Python 2.6. + """ + try: + return inspect.getfullargspec(func) + except AttributeError: + arg_spec = inspect.getargspec(func) + import collections + tuple_fields = ('args varargs varkw defaults kwonlyargs ' + 'kwonlydefaults annotations') + tuple_type = collections.namedtuple('FullArgSpec', tuple_fields) + + return tuple_type(args=arg_spec.args, + varargs=arg_spec.varargs, + varkw=arg_spec.keywords, + defaults=arg_spec.defaults, + kwonlyargs=[], + kwonlydefaults=None, + annotations={}) + + +def _signature_str(function_name, arg_spec): + """Helper function to output a function signature""" + # inspect.formatargspec can not deal with the same + # number of arguments in python 2 and 3 + arg_spec_for_format = arg_spec[:7 if PY3_OR_LATER else 4] + + arg_spec_str = inspect.formatargspec(*arg_spec_for_format) + return '{0}{1}'.format(function_name, arg_spec_str) + + +def _function_called_str(function_name, args, kwargs): + """Helper function to output a function call""" + template_str = '{0}({1}, {2})' + + args_str = repr(args)[1:-1] + kwargs_str = ', '.join('%s=%s' % (k, v) + for k, v in kwargs.items()) + return template_str.format(function_name, args_str, + kwargs_str) + + def filter_args(func, ignore_lst, args=(), kwargs=dict()): """ Filters the given args and kwargs using a list of arguments to ignore, and a function specification. @@ -180,24 +229,23 @@ def filter_args(func, ignore_lst, args=(), kwargs=dict()): args = list(args) if isinstance(ignore_lst, _basestring): # Catch a common mistake - raise ValueError('ignore_lst must be a list of parameters to ignore ' + raise ValueError( + 'ignore_lst must be a list of parameters to ignore ' '%s (type %s) was given' % (ignore_lst, type(ignore_lst))) # Special case for functools.partial objects if (not inspect.ismethod(func) and not inspect.isfunction(func)): if ignore_lst: warnings.warn('Cannot inspect object %s, ignore list will ' - 'not work.' % func, stacklevel=2) + 'not work.' % func, stacklevel=2) return {'*': args, '**': kwargs} - arg_spec = inspect.getargspec(func) - # We need to if/them to account for different versions of Python - if hasattr(arg_spec, 'args'): - arg_names = arg_spec.args - arg_defaults = arg_spec.defaults - arg_keywords = arg_spec.keywords - arg_varargs = arg_spec.varargs - else: - arg_names, arg_varargs, arg_keywords, arg_defaults = arg_spec - arg_defaults = arg_defaults or {} + arg_spec = getfullargspec(func) + arg_names = arg_spec.args + arg_spec.kwonlyargs + arg_defaults = arg_spec.defaults or () + arg_defaults = arg_defaults + tuple(arg_spec.kwonlydefaults[k] + for k in arg_spec.kwonlyargs) + arg_varargs = arg_spec.varargs + arg_varkw = arg_spec.varkw + if inspect.ismethod(func): # First argument is 'self', it has been removed by Python # we need to add it back: @@ -211,7 +259,18 @@ def filter_args(func, ignore_lst, args=(), kwargs=dict()): for arg_position, arg_name in enumerate(arg_names): if arg_position < len(args): # Positional argument or keyword argument given as positional - arg_dict[arg_name] = args[arg_position] + if arg_name not in arg_spec.kwonlyargs: + arg_dict[arg_name] = args[arg_position] + else: + raise ValueError( + "Keyword-only parameter '%s' was passed as " + 'positional parameter for %s:\n' + ' %s was called.' + % (arg_name, + _signature_str(name, arg_spec), + _function_called_str(name, args, kwargs)) + ) + else: position = arg_position - len(arg_names) if arg_name in kwargs: @@ -221,28 +280,24 @@ def filter_args(func, ignore_lst, args=(), kwargs=dict()): arg_dict[arg_name] = arg_defaults[position] except (IndexError, KeyError): # Missing argument - raise ValueError('Wrong number of arguments for %s%s:\n' - ' %s(%s, %s) was called.' - % (name, - inspect.formatargspec(*inspect.getargspec(func)), - name, - repr(args)[1:-1], - ', '.join('%s=%s' % (k, v) - for k, v in kwargs.items()) - ) - ) + raise ValueError( + 'Wrong number of arguments for %s:\n' + ' %s was called.' + % (_signature_str(name, arg_spec), + _function_called_str(name, args, kwargs)) + ) varkwargs = dict() for arg_name, arg_value in sorted(kwargs.items()): if arg_name in arg_dict: arg_dict[arg_name] = arg_value - elif arg_keywords is not None: + elif arg_varkw is not None: varkwargs[arg_name] = arg_value else: raise TypeError("Ignore list for %s() contains an unexpected " "keyword argument '%s'" % (name, arg_name)) - if arg_keywords is not None: + if arg_varkw is not None: arg_dict['**'] = varkwargs if arg_varargs is not None: varargs = args[arg_position + 1:] @@ -254,13 +309,10 @@ def filter_args(func, ignore_lst, args=(), kwargs=dict()): arg_dict.pop(item) else: raise ValueError("Ignore list: argument '%s' is not defined for " - "function %s%s" % - (item, name, - inspect.formatargspec(arg_names, - arg_varargs, - arg_keywords, - arg_defaults, - ))) + "function %s" + % (item, + _signature_str(name, arg_spec)) + ) # XXX: Return a sorted list of pairs? return arg_dict diff --git a/sklearn/externals/joblib/hashing.py b/sklearn/externals/joblib/hashing.py index 0214c1e3098b1..f9e20b13d4c21 100644 --- a/sklearn/externals/joblib/hashing.py +++ b/sklearn/externals/joblib/hashing.py @@ -7,19 +7,17 @@ # Copyright (c) 2009 Gael Varoquaux # License: BSD Style, 3 clauses. -import warnings import pickle import hashlib import sys import types import struct -from ._compat import _bytes_or_unicode - import io -PY3 = sys.version[0] == '3' +from ._compat import _bytes_or_unicode, PY3_OR_LATER + -if PY3: +if PY3_OR_LATER: Pickler = pickle._Pickler else: Pickler = pickle.Pickler @@ -30,7 +28,17 @@ class _ConsistentSet(object): whatever the order of its items. """ def __init__(self, set_sequence): - self._sequence = sorted(set_sequence) + # Forces order of elements in set to ensure consistent hash. + try: + # Trying first to order the set assuming the type of elements is + # consistent and orderable. + # This fails on python 3 when elements are unorderable + # but we keep it in a try as it's faster. + self._sequence = sorted(set_sequence) + except TypeError: + # If elements are unorderable, sorting them using their hash. + # This is slower but works in any case. + self._sequence = sorted((hash(e) for e in set_sequence)) class _MyHash(object): @@ -49,7 +57,7 @@ def __init__(self, hash_name='md5'): self.stream = io.BytesIO() # By default we want a pickle protocol that only changes with # the major python version and not the minor one - protocol = (pickle.DEFAULT_PROTOCOL if PY3 + protocol = (pickle.DEFAULT_PROTOCOL if PY3_OR_LATER else pickle.HIGHEST_PROTOCOL) Pickler.__init__(self, self.stream, protocol=protocol) # Initialise the hash obj @@ -59,7 +67,8 @@ def hash(self, obj, return_digest=True): try: self.dump(obj) except pickle.PicklingError as e: - warnings.warn('PicklingError while hashing %r: %r' % (obj, e)) + e.args += ('PicklingError while hashing %r: %r' % (obj, e),) + raise dumps = self.stream.getvalue() self._hash.update(dumps) if return_digest: @@ -128,8 +137,18 @@ def save_global(self, obj, name=None, pack=struct.pack): dispatch[type(pickle.dump)] = save_global def _batch_setitems(self, items): - # forces order of keys in dict to ensure consistent hash - Pickler._batch_setitems(self, iter(sorted(items))) + # forces order of keys in dict to ensure consistent hash. + try: + # Trying first to compare dict assuming the type of keys is + # consistent and orderable. + # This fails on python 3 when keys are unorderable + # but we keep it in a try as it's faster. + Pickler._batch_setitems(self, iter(sorted(items))) + except TypeError: + # If keys are unorderable, sorting them using their hash. This is + # slower but works in any case. + Pickler._batch_setitems(self, iter(sorted((hash(k), v) + for k, v in items))) def save_set(self, set_items): # forces order of items in Set to ensure consistent hash @@ -182,7 +201,7 @@ def save(self, obj): # the array is Fortran rather than C contiguous except (ValueError, BufferError): # Cater for non-single-segment arrays: this creates a - # copy, and thus alleviates this issue. + # copy, and thus aleviates this issue. # XXX: There might be a more efficient way of doing this obj_bytes_view = obj.flatten().view(self.np.uint8) self._hash.update(self._getbuffer(obj_bytes_view)) diff --git a/sklearn/externals/joblib/memory.py b/sklearn/externals/joblib/memory.py index 78be63c532cf2..fff84ad7a2cf8 100644 --- a/sklearn/externals/joblib/memory.py +++ b/sklearn/externals/joblib/memory.py @@ -36,7 +36,7 @@ from .logger import Logger, format_time, pformat from . import numpy_pickle from .disk import mkdirp, rm_subdirs -from ._compat import _basestring +from ._compat import _basestring, PY3_OR_LATER FIRST_LINE_TEXT = "# first line:" @@ -547,7 +547,7 @@ def _write_func_code(self, filename, func_code, first_line): out.write(func_code) # Also store in the in-memory store of function hashes is_named_callable = False - if sys.version_info[0] > 2: + if PY3_OR_LATER: is_named_callable = (hasattr(self.func, '__name__') and self.func.__name__ != '') else: diff --git a/sklearn/externals/joblib/my_exceptions.py b/sklearn/externals/joblib/my_exceptions.py index ce9a2bf5692dd..9d26cb57d9fcc 100644 --- a/sklearn/externals/joblib/my_exceptions.py +++ b/sklearn/externals/joblib/my_exceptions.py @@ -7,26 +7,30 @@ import sys +from ._compat import PY3_OR_LATER class JoblibException(Exception): """A simple exception with an error message that you can get to.""" - def __init__(self, *args): - self.args = args - - def __reduce__(self): - # For pickling - return self.__class__, self.args, {} + # We need to implement __init__ so that it is picked in the + # multiple heritance hierarchy in the class created in + # _mk_exception. Note: in Python 2, if you implement __init__ + # in your exception class you need to set .args correctly, + # otherwise you can dump an exception instance with pickle but + # not load it (at load time an empty .args will be passed to + # the constructor). Also we want to be explicit and not use + # 'super' here. Using 'super' can cause a sibling class method + # to be called and we have no control the sibling class method + # constructor signature in the exception returned by + # _mk_exception. + Exception.__init__(self, *args) def __repr__(self): - if hasattr(self, 'args'): + if hasattr(self, 'args') and len(self.args) > 0: message = self.args[0] else: - # Python 2 compat: instances of JoblibException can be created - # without calling JoblibException __init__ in case of - # multi-inheritance: in that case the message is stored as an - # explicit attribute under Python 2 (only) - message = self.message + message = '' + name = self.__class__.__name__ return '%s\n%s\n%s\n%s' % (name, 75 * '_', message, 75 * '_') @@ -39,13 +43,12 @@ class TransportableException(JoblibException): """ def __init__(self, message, etype): + # The next line set the .args correctly. This is needed to + # make the exception loadable with pickle + JoblibException.__init__(self, message, etype) self.message = message self.etype = etype - def __reduce__(self): - # For pickling - return self.__class__, (self.message, self.etype), {} - _exception_mapping = dict() @@ -61,20 +64,26 @@ def _mk_exception(exception, name=None): this_exception = _exception_mapping[this_name] else: if exception is Exception: - # We cannot create a subclass: we are already a trivial - # subclass + # JoblibException is already a subclass of Exception. No + # need to use multiple inheritance return JoblibException, this_name - this_exception = type(this_name, (exception, JoblibException), - dict(__repr__=JoblibException.__repr__, - __str__=JoblibException.__str__), - ) - _exception_mapping[this_name] = this_exception + try: + this_exception = type( + this_name, (JoblibException, exception), {}) + _exception_mapping[this_name] = this_exception + except TypeError: + # This happens if "Cannot create a consistent method + # resolution order", e.g. because 'exception' is a + # subclass of JoblibException or 'exception' is not an + # acceptable base class + this_exception = JoblibException + return this_exception, this_name def _mk_common_exceptions(): namespace = dict() - if sys.version_info[0] == 3: + if PY3_OR_LATER: import builtins as _builtin_exceptions common_exceptions = filter( lambda x: x.endswith('Error'), @@ -86,14 +95,8 @@ def _mk_common_exceptions(): for name in common_exceptions: obj = getattr(_builtin_exceptions, name) if isinstance(obj, type) and issubclass(obj, BaseException): - try: - this_obj, this_name = _mk_exception(obj, name=name) - namespace[this_name] = this_obj - except TypeError: - # Cannot create a consistent method resolution order: - # a class that we can't subclass properly, probably - # BaseException - pass + this_obj, this_name = _mk_exception(obj, name=name) + namespace[this_name] = this_obj return namespace diff --git a/sklearn/externals/joblib/numpy_pickle.py b/sklearn/externals/joblib/numpy_pickle.py index 6d727d669ec04..79170d80fdfd0 100644 --- a/sklearn/externals/joblib/numpy_pickle.py +++ b/sklearn/externals/joblib/numpy_pickle.py @@ -8,20 +8,15 @@ import pickle import traceback -import sys import os import zlib import warnings -import struct -import codecs - -from ._compat import _basestring - from io import BytesIO -PY3 = sys.version_info[0] >= 3 +from ._compat import _basestring, PY3_OR_LATER + -if PY3: +if PY3_OR_LATER: Unpickler = pickle._Unpickler Pickler = pickle._Pickler @@ -215,11 +210,11 @@ def __init__(self, filename, compress=0, cache_size=10, protocol=None): else: self.file = BytesIO() # Count the number of npy files that we have created: - self._npy_counter = 0 + self._npy_counter = 1 # By default we want a pickle protocol that only changes with # the major python version and not the minor one if protocol is None: - protocol = (pickle.DEFAULT_PROTOCOL if PY3 + protocol = (pickle.DEFAULT_PROTOCOL if PY3_OR_LATER else pickle.HIGHEST_PROTOCOL) Pickler.__init__(self, self.file, @@ -257,8 +252,8 @@ def save(self, obj): files, rather than pickling them. Of course, this is a total abuse of the Pickler class. """ - if self.np is not None and type(obj) in (self.np.ndarray, - self.np.matrix, self.np.memmap): + if (self.np is not None and type(obj) in + (self.np.ndarray, self.np.matrix, self.np.memmap)): size = obj.size * obj.itemsize if self.compress and size < self.cache_size * _MEGA: # When compressing, as we are not writing directly to the @@ -267,19 +262,21 @@ def save(self, obj): # Pickling doesn't work with memmaped arrays obj = self.np.asarray(obj) return Pickler.save(self, obj) - self._npy_counter += 1 - try: - filename = '%s_%02i.npy' % (self._filename, - self._npy_counter) - # This converts the array in a container - obj, filename = self._write_array(obj, filename) - self._filenames.append(filename) - except: - self._npy_counter -= 1 - # XXX: We should have a logging mechanism - print('Failed to save %s to .npy file:\n%s' % ( + + if not obj.dtype.hasobject: + try: + filename = '%s_%02i.npy' % (self._filename, + self._npy_counter) + # This converts the array in a container + obj, filename = self._write_array(obj, filename) + self._filenames.append(filename) + self._npy_counter += 1 + except Exception: + # XXX: We should have a logging mechanism + print('Failed to save %s to .npy file:\n%s' % ( type(obj), traceback.format_exc())) + return Pickler.save(self, obj) def close(self): @@ -326,7 +323,7 @@ def load_build(self): self.stack.append(array) # Be careful to register our new method. - if PY3: + if PY3_OR_LATER: dispatch[pickle.BUILD[0]] = load_build else: dispatch[pickle.BUILD] = load_build @@ -462,7 +459,7 @@ def load(filename, mmap_mode=None): obj = unpickler.load() except UnicodeDecodeError as exc: # More user-friendly error message - if PY3: + if PY3_OR_LATER: new_exc = ValueError( 'You may be trying to read with ' 'python 3 a joblib pickle generated with python 2. ' diff --git a/sklearn/externals/joblib/parallel.py b/sklearn/externals/joblib/parallel.py index 5ee20a6b8e929..d32a0d7d44122 100644 --- a/sklearn/externals/joblib/parallel.py +++ b/sklearn/externals/joblib/parallel.py @@ -53,7 +53,7 @@ # can cause third party libraries to crash. Under Python 3.4+ it is possible # to set an environment variable to switch the default start method from # 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost -# of causing semantic changes and some additional pool instantiation overhead. +# of causing semantic changes and some additional pool instanciation overhead. if hasattr(mp, 'get_context'): method = os.environ.get('JOBLIB_START_METHOD', '').strip() or None DEFAULT_MP_CONTEXT = mp.get_context(method=method) @@ -137,10 +137,7 @@ def __call__(self, *args, **kwargs): e_type, e_value, e_tb = sys.exc_info() text = format_exc(e_type, e_value, e_tb, context=10, tb_offset=1) - if issubclass(e_type, TransportableException): - raise - else: - raise TransportableException(text, e_type) + raise TransportableException(text, e_type) ############################################################################### @@ -262,7 +259,7 @@ class Parallel(Logger): pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'} The number of batches (of tasks) to be pre-dispatched. Default is '2*n_jobs'. When batch_size="auto" this is reasonable - default and the multiprocessing workers should never starve. + default and the multiprocessing workers shoud never starve. batch_size: int or 'auto', default: 'auto' The number of atomic tasks to dispatch at once to each worker. When individual evaluations are very fast, multiprocessing @@ -533,7 +530,6 @@ def _initialize_pool(self): mmap_mode=self._mmap_mode, temp_folder=self._temp_folder, verbose=max(0, self.verbose - 50), - context_id=0, # the pool is used only for one call ) if self._mp_context is not None: # Use Python 3.4+ multiprocessing context isolation @@ -613,13 +609,14 @@ def dispatch_one_batch(self, iterator): elif self.batch_size == 'auto': old_batch_size = self._effective_batch_size batch_duration = self._smoothed_batch_duration - if (0 < batch_duration < MIN_IDEAL_BATCH_DURATION): + if (batch_duration > 0 and + batch_duration < MIN_IDEAL_BATCH_DURATION): # The current batch size is too small: the duration of the # processing of a batch of task is not large enough to hide # the scheduling overhead. ideal_batch_size = int( old_batch_size * MIN_IDEAL_BATCH_DURATION / batch_duration) - # Multiply by two to limit oscillations between min and max. + # Multiply by two to limit oscilations between min and max. batch_size = max(2 * ideal_batch_size, 1) self._effective_batch_size = batch_size if self.verbose >= 10: @@ -750,14 +747,13 @@ def retrieve(self): # Kill remaining running processes without waiting for # the results as we will raise the exception we got back # to the caller instead of returning any result. - with self._lock: - self._terminate_pool() - if self._managed_pool: - # In case we had to terminate a managed pool, let - # us start a new one to ensure that subsequent calls - # to __call__ on the same Parallel instance will get - # a working pool as they expect. - self._initialize_pool() + self._terminate_pool() + if self._managed_pool: + # In case we had to terminate a managed pool, let + # us start a new one to ensure that subsequent calls + # to __call__ on the same Parallel instance will get + # a working pool as they expect. + self._initialize_pool() raise exception def __call__(self, iterable): @@ -798,10 +794,13 @@ def __call__(self, iterable): self.n_completed_tasks = 0 self._smoothed_batch_duration = 0.0 try: - self._iterating = True - + # Only set self._iterating to True if at least a batch + # was dispatched. In particular this covers the edge + # case of Parallel used with an exhausted iterator. while self.dispatch_one_batch(iterator): - pass + self._iterating = True + else: + self._iterating = False if pre_dispatch == "all" or n_jobs == 1: # The iterable was consumed all at once by the above for loop. diff --git a/sklearn/externals/joblib/pool.py b/sklearn/externals/joblib/pool.py index 92edeb5dd9595..54fdac50fb5f1 100644 --- a/sklearn/externals/joblib/pool.py +++ b/sklearn/externals/joblib/pool.py @@ -22,6 +22,13 @@ import atexit import tempfile import shutil +import warnings +from time import sleep + +try: + WindowsError +except NameError: + WindowsError = None try: # Python 2 compat @@ -92,7 +99,7 @@ def has_shareable_memory(a): def _strided_from_memmap(filename, dtype, mode, offset, order, shape, strides, total_buffer_len): - """Reconstruct an array view on a memory mapped file""" + """Reconstruct an array view on a memmory mapped file""" if mode == 'w+': # Do not zero the original data when unpickling mode = 'r+' @@ -177,13 +184,6 @@ class ArrayMemmapReducer(object): If verbose > 0, memmap creations are logged. If verbose > 1, both memmap creations, reuse and array pickling are logged. - context_id: int, optional, None by default - Set to a value identifying a call context to spare costly hashing of - the content of the input arrays when it is safe to assume that each - array will not be mutated by the parent process for the duration of the - dispatch process. This is the case when using the high level Parallel - API. It might not be the case when using the MemmapingPool API - directly. prewarm: bool, optional, False by default. Force a read on newly memmaped array to make sure that OS pre-cache it memory. This can be useful to avoid concurrent disk access when the @@ -196,8 +196,11 @@ def __init__(self, max_nbytes, temp_folder, mmap_mode, verbose=0, self._temp_folder = temp_folder self._mmap_mode = mmap_mode self.verbose = int(verbose) - self._context_id = context_id self._prewarm = prewarm + if context_id is not None: + warnings.warn('context_id is deprecated and ignored in joblib' + ' 0.9.4 and will be removed in 0.11', + DeprecationWarning) def __call__(self, a): m = _get_backing_memmap(a) @@ -219,12 +222,8 @@ def __call__(self, a): # Find a unique, concurrent safe filename for writing the # content of this array only once. - if self._context_id is not None: - marker = self._context_id - else: - marker = hash(a) - basename = "%d-%d-%d-%s.pkl" % ( - os.getpid(), id(threading.current_thread()), id(a), marker) + basename = "%d-%d-%s.pkl" % ( + os.getpid(), id(threading.current_thread()), hash(a)) filename = os.path.join(self._temp_folder, basename) # In case the same array with the same content is passed several @@ -248,8 +247,8 @@ def __call__(self, a): print("Memmaping (shape=%s, dtype=%s) to old file %s" % ( a.shape, a.dtype, filename)) - # Let's use the memmap reducer - return reduce_memmap(load(filename, mmap_mode=self._mmap_mode)) + # The worker process will use joblib.load to memmap the data + return (load, (filename, self._mmap_mode)) else: # do not convert a into memmap, let pickler do its usual copy with # the default system pickler @@ -317,7 +316,7 @@ class CustomizablePicklingQueue(object): This class is an alternative to the multiprocessing implementation of SimpleQueue in order to make it possible to pass custom pickling reducers, for instance to avoid memory copy when passing - memory mapped datastructures. + memmory mapped datastructures. `reducers` is expected expected to be a dictionary with key/values being `(type, callable)` pairs where `callable` is a function that @@ -430,8 +429,11 @@ def _setup_queues(self): def delete_folder(folder_path): """Utility function to cleanup a temporary folder if still existing""" - if os.path.exists(folder_path): - shutil.rmtree(folder_path) + try: + if os.path.exists(folder_path): + shutil.rmtree(folder_path) + except WindowsError: + warnings.warn("Failed to clean temporary folder: %s" % folder_path) class MemmapingPool(PicklingPool): @@ -486,12 +488,6 @@ class MemmapingPool(PicklingPool): verbose: int, optional Make it possible to monitor how the communication of numpy arrays with the subprocess is handled (pickling or memmaping) - context_id: int, optional, None by default - Set to a value identifying a call context to spare costly hashing of - the content of the input arrays when it is safe to assume that each - array will not be mutated by the parent process for the duration of the - dispatch process. This is the case when using the high level Parallel - API. prewarm: bool or str, optional, "auto" by default. If True, force a read on newly memmaped array to make sure that OS pre- cache it in memory. This can be useful to avoid concurrent disk access @@ -516,6 +512,10 @@ def __init__(self, processes=None, temp_folder=None, max_nbytes=1e6, forward_reducers = dict() if backward_reducers is None: backward_reducers = dict() + if context_id is not None: + warnings.warn('context_id is deprecated and ignored in joblib' + ' 0.9.4 and will be removed in 0.11', + DeprecationWarning) # Prepare a sub-folder name for the serialization of this particular # pool instance (do not create in advance to spare FS write access if @@ -559,7 +559,7 @@ def __init__(self, processes=None, temp_folder=None, max_nbytes=1e6, prewarm = not use_shared_mem forward_reduce_ndarray = ArrayMemmapReducer( max_nbytes, pool_folder, mmap_mode, verbose, - context_id=context_id, prewarm=prewarm) + prewarm=prewarm) forward_reducers[np.ndarray] = forward_reduce_ndarray forward_reducers[np.memmap] = reduce_memmap diff --git a/sklearn/externals/joblib/testing.py b/sklearn/externals/joblib/testing.py index 8555a5acdc2d9..21dfbc8689af4 100644 --- a/sklearn/externals/joblib/testing.py +++ b/sklearn/externals/joblib/testing.py @@ -5,6 +5,11 @@ import sys import warnings import os.path +import re +import subprocess +import threading + +from sklearn.externals.joblib._compat import PY3_OR_LATER def warnings_to_stdout(): @@ -17,3 +22,64 @@ def showwarning(msg, cat, fname, lno, file=None, line=0): warnings.showwarning = showwarning #warnings.simplefilter('always') + + +try: + from nose.tools import assert_raises_regex +except ImportError: + # For Python 2.7 + try: + from nose.tools import assert_raises_regexp as assert_raises_regex + except ImportError: + # for Python 2.6 + def assert_raises_regex(expected_exception, expected_regexp, + callable_obj=None, *args, **kwargs): + """Helper function to check for message patterns in exceptions""" + + not_raised = False + try: + callable_obj(*args, **kwargs) + not_raised = True + except Exception as e: + error_message = str(e) + if not re.compile(expected_regexp).search(error_message): + raise AssertionError("Error message should match pattern " + "%r. %r does not." % + (expected_regexp, error_message)) + if not_raised: + raise AssertionError("Should have raised %r" % + expected_exception(expected_regexp)) + + +def check_subprocess_call(cmd, timeout=1, stdout_regex=None): + """Runs a command in a subprocess with timeout in seconds. + + Also checks returncode is zero and stdout if stdout_regex is set. + """ + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + def kill_process(): + proc.kill() + + timer = threading.Timer(timeout, kill_process) + try: + timer.start() + stdout, stderr = proc.communicate() + + if PY3_OR_LATER: + stdout, stderr = stdout.decode(), stderr.decode() + if proc.returncode != 0: + message = ( + 'Non-zero return code: {0}.\nStdout:\n{1}\n' + 'Stderr:\n{2}').format( + proc.returncode, stdout, stderr) + raise ValueError(message) + + if (stdout_regex is not None and + not re.search(stdout_regex, stdout)): + raise ValueError( + "Unexpected output: '{0!r}' does not match:\n{1!r}".format( + stdout_regex, stdout)) + finally: + timer.cancel()