8000 Update to loky 1.1.4 with Windows 64-bit fix · scikit-learn/scikit-learn@3324bec · GitHub
[go: up one dir, main page]

Skip to content

Commit 3324bec

Browse files
committed
Update to loky 1.1.4 with Windows 64-bit fix
1 parent 3e0916c commit 3324bec

File tree

2 files changed

+26
-9
lines changed

2 files changed

+26
-9
lines changed

sklearn/externals/joblib/externals/loky/backend/compat_win32.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# flake8: noqa: F401
22
import sys
3+
import numbers
34

45
if sys.platform == "win32":
56
# Avoid import error by code introspection tools such as test runners
@@ -47,8 +48,11 @@ def CreatePipe(*args):
4748

4849
@staticmethod
4950
def CloseHandle(h):
51+
if isinstance(h, numbers.Integral):
52+
# Cast long to int for 64-bit Python 2.7 under Windows
53+
h = int(h)
5054
if sys.version_info[:2] < (3, 3):
51-
if type(h) != int:
55+
if not isinstance(h, int):
5256
h = h.Detach()
5357
win32.CloseHandle(h)
5458
else:

sklearn/externals/joblib/externals/loky/process_executor.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ def _process_worker(call_queue, result_queue, processes_management_lock,
345345
processes_management_lock.release()
346346
call_item = None
347347
else:
348+
mp.util.info("Could not acquire processes_management_lock")
348349
continue
349350
except BaseException as e:
350351
traceback.print_exc()
@@ -465,7 +466,8 @@ def shutdown_all_workers():
465466
executor_flags.flag_as_shutting_down()
466467
# This is an upper bound
467468
with processes_management_lock:
468-
nb_children_alive = sum(p.is_alive() for p in processes.values())
469+
nb_children_alive = sum(p.is_alive()
470+
for p in list(processes.values()))
469471
for i in range(0, nb_children_alive):
470472
call_queue.put(None)
471473

@@ -480,7 +482,7 @@ def shutdown_all_workers():
480482
_, p = processes.popitem()
481483
p.join()
482484
mp.util.debug("queue management thread clean shutdown of worker "
483-
"processes: {}".format(processes))
485+
"processes: {}".format(list(processes)))
484486

485487
result_reader = result_queue._reader
486488
_poll_timeout = .001
@@ -491,10 +493,18 @@ def shutdown_all_workers():
491493
work_ids_queue,
492494
call_queue)
493495
# Wait for a result to be ready in the result_queue while checking
494-
# that worker process are still running. If a worker process
496+
# that worker process are still running.
495497
while not wakeup.get_and_unset():
496-
worker_sentinels = [p.sentinel for p in processes.values()]
498+
# Force cast long to int when running 64-bit Python 2.7 under
499+
# Windows
500+
sentinel_map = {int(p.sentinel): p
501+
for p in list(processes.values())}
502+
worker_sentinels = list(sentinel_map.keys())
497503
if len(worker_sentinels) == 0:
504+
# The processes dict is empty, let's get out of the wait loop
505+
# even if there is no result or worker sentinel event so
506+
# as to check whether the executor was terminated and shutdown
507+
# the QueueManager thread accordingly.
498508
wakeup.set()
499509
ready = wait([result_reader] + worker_sentinels,
500510
timeout=_poll_timeout)
@@ -519,7 +529,8 @@ def shutdown_all_workers():
519529
# Mark the process pool broken so that submits fail right now.
520530
executor_flags.flag_as_broken()
521531
mp.util.debug('The executor is broken as at least one process '
522-
'terminated abruptly')
532+
'terminated abruptly. Workers: %s'
533+
% ", ".join(str(sentinel_map[s]) for s in ready))
523534

524535
# All futures in flight must be marked failed
525536
for work_id, work_item in pending_work_items.items():
@@ -770,8 +781,8 @@ def __init__(self, max_workers=None, job_reducers=None,
770781
will use the same reducers
771782
timeout: int, optional (default: None)
772783
Idle workers exit after timeout seconds. If a new job is
773-
submitted after the timeout, the executor will launch enough
774-
job to make sure the pool of worker is full.
784+
submitted after the timeout, the executor will start enough
785+
new Python processes to make sure the pool of workers is full.
775786
context: A multiprocessing context to launch the workers. This
776787
object should provide SimpleQueue, Queue and Process.
777788
@@ -796,7 +807,7 @@ def __init__(self, max_workers=None, job_reducers=None,
796807
mp.util.debug("using context {}".format(self._ctx))
797808
_check_max_depth(self._ctx)
798809

799-
# Timeout and its lock.
810+
# Timeout
800811
self._timeout = timeout
801812

802813
# Connection to wakeup QueueManagerThread
@@ -842,6 +853,8 @@ def _start_queue_management_thread(self):
842853
# When the executor gets lost, the weakref callback will wake up
843854
# the queue management thread.
844855
def weakref_cb(_, wakeup=self._wakeup):
856+
mp.util.debug('Executor collected: triggering callback for'
857+
' QueueManager wakeup')
845858
wakeup.set()
846859

847860
# Start the processes so that their sentinels are known.

0 commit comments

Comments
 (0)
0