diff --git a/sklearn/externals/joblib/__init__.py b/sklearn/externals/joblib/__init__.py index a42646eb4c754..0d008b560522e 100644 --- a/sklearn/externals/joblib/__init__.py +++ b/sklearn/externals/joblib/__init__.py @@ -106,7 +106,7 @@ # Dev branch marker is: 'X.Y.dev' or 'X.Y.devN' where N is an integer. # 'X.Y.dev0' is the canonical version of 'X.Y.dev' # -__version__ = '0.12.4' +__version__ = '0.12.5' from .memory import Memory, MemorizedResult, register_store_backend diff --git a/sklearn/externals/joblib/externals/cloudpickle/__init__.py b/sklearn/externals/joblib/externals/cloudpickle/__init__.py index df671a0f15696..8004dcde0b7de 100644 --- a/sklearn/externals/joblib/externals/cloudpickle/__init__.py +++ b/sklearn/externals/joblib/externals/cloudpickle/__init__.py @@ -2,4 +2,4 @@ from .cloudpickle import * -__version__ = '0.5.5' +__version__ = '0.5.6' diff --git a/sklearn/externals/joblib/externals/cloudpickle/cloudpickle.py b/sklearn/externals/joblib/externals/cloudpickle/cloudpickle.py index b1107ba92c1da..842723539d128 100644 --- a/sklearn/externals/joblib/externals/cloudpickle/cloudpickle.py +++ b/sklearn/externals/joblib/externals/cloudpickle/cloudpickle.py @@ -635,11 +635,12 @@ def extract_func_data(self, func): base_globals = self.globals_ref.get(id(func.__globals__), None) if base_globals is None: - # For functions defined in __main__, use vars(__main__) for - # base_global. This is necessary to share the global variables - # across multiple functions in this module. - if func.__module__ == "__main__": - base_globals = "__main__" + # For functions defined in a well behaved module use + # vars(func.__module__) for base_globals. This is necessary to + # share the global variables across multiple pickled functions from + # this module. + if hasattr(func, '__module__') and func.__module__ is not None: + base_globals = func.__module__ else: base_globals = {} self.globals_ref[id(func.__globals__)] = base_globals @@ -934,7 +935,6 @@ def subimport(name): def dynamic_subimport(name, vars): mod = imp.new_module(name) mod.__dict__.update(vars) - sys.modules[name] = mod return mod @@ -1090,7 +1090,13 @@ def _make_skel_func(code, cell_count, base_globals=None): if base_globals is None: base_globals = {} elif isinstance(base_globals, str): - base_globals = vars(sys.modules[base_globals]) + if sys.modules.get(base_globals, None) is not None: + # this checks if we can import the previous environment the object + # lived in + base_globals = vars(sys.modules[base_globals]) + else: + base_globals = {} + base_globals['__builtins__'] = __builtins__ closure = ( diff --git a/sklearn/externals/joblib/externals/loky/__init__.py b/sklearn/externals/joblib/externals/loky/__init__.py index 18c01d0a6aa04..4f686454588a0 100644 --- a/sklearn/externals/joblib/externals/loky/__init__.py +++ b/sklearn/externals/joblib/externals/loky/__init__.py @@ -19,4 +19,4 @@ "FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED", ] -__version__ = '2.3.0' +__version__ = '2.3.1' diff --git a/sklearn/externals/joblib/externals/loky/process_executor.py b/sklearn/externals/joblib/externals/loky/process_executor.py index 57a7617d9ab7e..cfdd37abce923 100644 --- a/sklearn/externals/joblib/externals/loky/process_executor.py +++ b/sklearn/externals/joblib/externals/loky/process_executor.py @@ -62,7 +62,6 @@ import os import gc import sys -import types import struct import weakref import warnings @@ -438,7 +437,6 @@ def _process_worker(call_queue, result_queue, initializer, initargs, continue if time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY: mem_usage = _get_memory_usage(pid) - print(mem_usage) _last_memory_leak_check = time() if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE: # Memory usage stays within bounds: everything is fine. @@ -618,34 +616,41 @@ def shutdown_all_workers(): worker_sentinels = [p.sentinel for p in processes.values()] ready = wait(readers + worker_sentinels) - broken = ("A process in the executor was terminated abruptly", None) + broken = ("A worker process managed by the executor was unexpectedly " + "terminated. This could be caused by a segmentation fault " + "while calling the function or by an excessive memory usage " + "causing the Operating System to kill the worker.", None, + TerminatedWorkerError) if result_reader in ready: try: result_item = result_reader.recv() broken = None if isinstance(result_item, _RemoteTraceback): - cause = result_item.tb - broken = ("A task has failed to un-serialize", cause) + broken = ("A task has failed to un-serialize. Please " + "ensure that the arguments of the function are " + "all picklable.", result_item.tb, + BrokenProcessPool) except BaseException as e: tb = getattr(e, "__traceback__", None) if tb is None: _, _, tb = sys.exc_info() - broken = ("A result has failed to un-serialize", - traceback.format_exception(type(e), e, tb)) + broken = ("A result has failed to un-serialize. Please " + "ensure that the objects returned by the function " + "are always picklable.", + traceback.format_exception(type(e), e, tb), + BrokenProcessPool) elif wakeup_reader in ready: broken = None result_item = None thread_wakeup.clear() - if broken: - msg, cause = broken - # Mark the process pool broken so that submits fail right now. - executor_flags.flag_as_broken( - msg + ", the pool is not usable anymore.") - bpe = BrokenProcessPool( - msg + " while the future was running or pending.") - if cause is not None: + if broken is not None: + msg, cause_tb, exc_type = broken + bpe = exc_type(msg) + if cause_tb is not None: bpe.__cause__ = _RemoteTraceback( - "\n'''\n{}'''".format(''.join(cause))) + "\n'''\n{}'''".format(''.join(cause_tb))) + # Mark the process pool broken so that submits fail right now. + executor_flags.flag_as_broken(bpe) # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): @@ -808,6 +813,15 @@ class LokyRecursionError(RuntimeError): class BrokenProcessPool(_BPPException): + """ + Raised when the executor is broken while a future was in the running state. + The cause can an error raised when unpickling the task in the worker + process or when unpickling the result value in the parent process. It can + also be caused by a worker process being terminated unexpectedly. + """ + + +class TerminatedWorkerError(BrokenProcessPool): """ Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. @@ -998,8 +1012,8 @@ def _ensure_executor_running(self): def submit(self, fn, *args, **kwargs): with self._flags.shutdown_lock: - if self._flags.broken: - raise BrokenProcessPool(self._flags.broken) + if self._flags.broken is not None: + raise self._flags.broken if self._flags.shutdown: raise ShutdownExecutorError( 'cannot schedule new futures after shutdown')