|
62 | 62 | import os
|
63 | 63 | import gc
|
64 | 64 | import sys
|
65 |
| -import types |
66 | 65 | import struct
|
67 | 66 | import weakref
|
68 | 67 | import warnings
|
@@ -438,7 +437,6 @@ def _process_worker(call_queue, result_queue, initializer, initargs,
|
438 | 437 | continue
|
439 | 438 | if time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY:
|
440 | 439 | mem_usage = _get_memory_usage(pid)
|
441 |
| - print(mem_usage) |
442 | 440 | _last_memory_leak_check = time()
|
443 | 441 | if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE:
|
444 | 442 | # Memory usage stays within bounds: everything is fine.
|
@@ -618,34 +616,41 @@ def shutdown_all_workers():
|
618 | 616 | worker_sentinels = [p.sentinel for p in processes.values()]
|
619 | 617 | ready = wait(readers + worker_sentinels)
|
620 | 618 |
|
621 |
| - broken = ("A process in the executor was terminated abruptly", None) |
| 619 | + broken = ("A worker process managed by the executor was unexpectedly " |
| 620 | + "terminated. This could be caused by a segmentation fault " |
| 621 | + "while calling the function or by an excessive memory usage " |
| 622 | + "causing the Operating System to kill the worker.", None, |
| 623 | + TerminatedWorkerError) |
622 | 624 | if result_reader in ready:
|
623 | 625 | try:
|
624 | 626 | result_item = result_reader.recv()
|
625 | 627 | broken = None
|
626 | 628 | if isinstance(result_item, _RemoteTraceback):
|
627 |
| - cause = result_item.tb |
628 |
| - broken = ("A task has failed to un-serialize", cause) |
| 629 | + broken = ("A task has failed to un-serialize. Please " |
| 630 | + "ensure that the arguments of the function are " |
| 631 | + "all picklable.", result_item.tb, |
| 632 | + BrokenProcessPool) |
629 | 633 | except BaseException as e:
|
630 | 634 | tb = getattr(e, "__traceback__", None)
|
631 | 635 | if tb is None:
|
632 | 636 | _, _, tb = sys.exc_info()
|
633 |
| - broken = ("A result has failed to un-serialize", |
634 |
| - traceback.format_exception(type(e), e, tb)) |
| 637 | + broken = ("A result has failed to un-serialize. Please " |
| 638 | + "ensure that the objects returned by the function " |
| 639 | + "are always picklable.", |
| 640 | + traceback.format_exception(type(e), e, tb), |
| 641 | + BrokenProcessPool) |
635 | 642 | elif wakeup_reader in ready:
|
636 | 643 | broken = None
|
637 | 644 | result_item = None
|
638 | 645 | thread_wakeup.clear()
|
639 |
| - if broken: |
640 |
| - msg, cause = broken |
641 |
| - # Mark the process pool broken so that submits fail right now. |
642 |
| - executor_flags.flag_as_broken( |
643 |
| - msg + ", the pool is not usable anymore.") |
644 |
| - bpe = BrokenProcessPool( |
645 |
| - msg + " while the future was running or pending.") |
646 |
| - if cause is not None: |
| 646 | + if broken is not None: |
| 647 | + msg, cause_tb, exc_type = broken |
| 648 | + bpe = exc_type(msg) |
| 649 | + if cause_tb is not None: |
647 | 650 | bpe.__cause__ = _RemoteTraceback(
|
648 |
| - "\n'''\n{}'''".format(''.join(cause))) |
| 651 | + "\n'''\n{}'''".format(''.join(cause_tb))) |
| 652 | + # Mark the process pool broken so that submits fail right now. |
| 653 | + executor_flags.flag_as_broken(bpe) |
649 | 654 |
|
650 | 655 | # All futures in flight must be marked failed
|
651 | 656 | for work_id, work_item in pending_work_items.items():
|
@@ -808,6 +813,15 @@ class LokyRecursionError(RuntimeError):
|
808 | 813 |
|
809 | 814 |
|
810 | 815 | class BrokenProcessPool(_BPPException):
|
| 816 | + """ |
| 817 | + Raised when the executor is broken while a future was in the running state. |
| 818 | + The cause can an error raised when unpickling the task in the worker |
| 819 | + process or when unpickling the result value in the parent process. It can |
| 820 | + also be caused by a worker process being terminated unexpectedly. |
| 821 | + """ |
| 822 | + |
| 823 | + |
| 824 | +class TerminatedWorkerError(BrokenProcessPool): |
811 | 825 | """
|
812 | 826 | Raised when a process in a ProcessPoolExecutor terminated abruptly
|
813 | 827 | while a future was in the running state.
|
@@ -998,8 +1012,8 @@ def _ensure_executor_running(self):
|
998 | 1012 |
|
999 | 1013 | def submit(self, fn, *args, **kwargs):
|
1000 | 1014 | with self._flags.shutdown_lock:
|
1001 |
| - if self._flags.broken: |
1002 |
| - raise BrokenProcessPool(self._flags.broken) |
| 1015 | + if self._flags.broken is not None: |
| 1016 | + raise self._flags.broken |
1003 | 1017 | if self._flags.shutdown:
|
1004 | 1018 | raise ShutdownExecutorError(
|
1005 | 1019 | 'cannot schedule new futures after shutdown')
|
|
0 commit comments