@@ -345,6 +345,7 @@ def _process_worker(call_queue, result_queue, processes_management_lock,
345
345
processes_management_lock .release ()
346
346
call_item = None
347
347
else :
348
+ mp .util .info ("Could not acquire processes_management_lock" )
348
349
continue
349
350
except BaseException as e :
350
351
traceback .print_exc ()
@@ -465,7 +466,8 @@ def shutdown_all_workers():
465
466
executor_flags .flag_as_shutting_down ()
466
467
# This is an upper bound
467
468
with processes_management_lock :
468
- nb_children_alive = sum (p .is_alive () for p in processes .values ())
469
+ nb_children_alive = sum (p .is_alive ()
470
+ for p in list (processes .values ()))
469
471
for i in range (0 , nb_children_alive ):
470
472
call_queue .put (None )
471
473
@@ -480,7 +482,7 @@ def shutdown_all_workers():
480
482
_ , p = processes .popitem ()
481
483
p .join ()
482
484
mp .util .debug ("queue management thread clean shutdown of worker "
483
- "processes: {}" .format (processes ))
485
+ "processes: {}" .format (list ( processes ) ))
484
486
485
487
result_reader = result_queue ._reader
486
488
_poll_timeout = .001
@@ -491,10 +493,18 @@ def shutdown_all_workers():
491
493
work_ids_queue ,
492
494
call_queue )
493
495
# Wait for a result to be ready in the result_queue while checking
494
- # that worker process are still running. If a worker process
496
+ # that worker process are still running.
495
497
while not wakeup .get_and_unset ():
496
- worker_sentinels = [p .sentinel for p in processes .values ()]
498
+ # Force cast long to int when running 64-bit Python 2.7 under
499
+ # Windows
500
+ sentinel_map = {int (p .sentinel ): p
501
+ for p in list (processes .values ())}
502
+ worker_sentinels = list (sentinel_map .keys ())
497
503
if len (worker_sentinels ) == 0 :
504
+ # The processes dict is empty, let's get out of the wait loop
505
+ # even if there is no result or worker sentinel event so
506
+ # as to check whether the executor was terminated and shutdown
507
+ # the QueueManager thread accordingly.
498
508
wakeup .set ()
499
509
ready = wait ([result_reader ] + worker_sentinels ,
500
510
timeout = _poll_timeout )
@@ -519,7 +529,8 @@ def shutdown_all_workers():
519
529
# Mark the process pool broken so that submits fail right now.
520
530
executor_flags .flag_as_broken ()
521
531
mp .util .debug ('The executor is broken as at least one process '
522
- 'terminated abruptly' )
532
+ 'terminated abruptly. Workers: %s'
533
+ % ", " .join (str (sentinel_map [s ]) for s in ready ))
523
534
524
535
# All futures in flight must be marked failed
525
536
for work_id , work_item in pending_work_items .items ():
@@ -770,8 +781,8 @@ def __init__(self, max_workers=None, job_reducers=None,
770
781
will use the same reducers
771
782
timeout: int, optional (default: None)
772
783
Idle workers exit after timeout seconds. If a new job is
773
- submitted after the timeout, the executor will launch enough
774
- job to make sure the pool of worker is full.
784
+ submitted after the timeout, the executor will start enough
785
+ new Python processes to make sure the pool of workers is full.
775
786
context: A multiprocessing context to launch the workers. This
776
787
object should provide SimpleQueue, Queue and Process.
777
788
@@ -796,7 +807,7 @@ def __init__(self, max_workers=None, job_reducers=None,
796
807
mp .util .debug ("using context {}" .format (self ._ctx ))
797
808
_check_max_depth (self ._ctx )
798
809
799
- # Timeout and its lock.
810
+ # Timeout
800
811
self ._timeout = timeout
801
812
802
813
# Connection to wakeup QueueManagerThread
@@ -842,6 +853,8 @@ def _start_queue_management_thread(self):
842
853
# When the executor gets lost, the weakref callback will wake up
843
854
# the queue management thread.
844
855
def weakref_cb (_ , wakeup = self ._wakeup ):
856
+ mp .util .debug ('Executor collected: triggering callback for'
857
+ ' QueueManager wakeup' )
845
858
wakeup .set ()
846
859
847
860
# Start the processes so that their sentinels are known.
0 commit comments