68
68
class _ThreadWakeup :
69
69
def __init__ (self ):
70
70
self ._closed = False
71
+ self ._lock = threading .Lock ()
71
72
self ._reader , self ._writer = mp .Pipe (duplex = False )
72
73
73
74
def close (self ):
74
- # Please note that we do not take the shutdown lock when
75
+ # Please note that we do not take the self._lock when
75
76
# calling clear() (to avoid deadlocking) so this method can
76
77
# only be called safely from the same thread as all calls to
77
- # clear() even if you hold the shutdown lock. Otherwise we
78
+ # clear() even if you hold the lock. Otherwise we
78
79
# might try to read from the closed pipe.
79
- if not self ._closed :
80
- self ._closed = True
81
- self ._writer .close ()
82
- self ._reader .close ()
80
+ with self ._lock :
81
+ if not self ._closed :
82
+ self ._closed = True
83
+ self ._writer .close ()
84
+ self ._reader .close ()
83
85
84
86
def wakeup (self ):
85
- if not self ._closed :
86
- self ._writer .send_bytes (b"" )
87
+ with self ._lock :
88
+ if not self ._closed :
89
+ self ._writer .send_bytes (b"" )
87
90
88
91
def clear (self ):
89
- if not self ._closed :
90
- while self ._reader .poll ():
91
- self ._reader .recv_bytes ()
92
+ assert not self ._closed
93
+ while self ._reader .poll ():
94
+ self ._reader .recv_bytes ()
92
95
93
96
94
97
def _python_exit ():
@@ -167,10 +170,8 @@ def __init__(self, work_id, fn, args, kwargs):
167
170
168
171
class _SafeQueue (Queue ):
169
172
"""Safe Queue set exception to the future object linked to a job"""
170
- def __init__ (self , max_size = 0 , * , ctx , pending_work_items , shutdown_lock ,
171
- thread_wakeup ):
173
+ def __init__ (self , max_size = 0 , * , ctx , pending_work_items , thread_wakeup ):
172
174
self .pending_work_items = pending_work_items
173
- self .shutdown_lock = shutdown_lock
174
175
self .thread_wakeup = thread_wakeup
175
176
super ().__init__ (max_size , ctx = ctx )
176
177
@@ -179,8 +180,7 @@ def _on_queue_feeder_error(self, e, obj):
179
180
tb = format_exception (type (e ), e , e .__traceback__ )
180
181
e .__cause__ = _RemoteTraceback ('\n """\n {}"""' .format ('' .join (tb )))
181
182
work_item = self .pending_work_items .pop (obj .work_id , None )
182
- with self .shutdown_lock :
183
- self .thread_wakeup .wakeup ()
183
+ self .thread_wakeup .wakeup ()
184
184
# work_item can be None if another process terminated. In this
185
185
# case, the executor_manager_thread fails all work_items
186
186
# with BrokenProcessPool
@@ -296,12 +296,10 @@ def __init__(self, executor):
296
296
# if there is no pending work item.
297
297
def weakref_cb (_ ,
298
298
thread_wakeup = self .thread_wakeup ,
299
- shutdown_lock = self .shutdown_lock ,
300
299
mp_util_debug = mp .util .debug ):
301
300
mp_util_debug ('Executor collected: triggering callback for'
302
301
' QueueManager wakeup' )
303
- with shutdown_lock :
304
- thread_wakeup .wakeup ()
302
+ thread_wakeup .wakeup ()
305
303
306
304
self .executor_reference = weakref .ref (executor , weakref_cb )
307
305
@@ -429,11 +427,6 @@ def wait_result_broken_or_wakeup(self):
429
427
elif wakeup_reader in ready :
430
428
is_broken = False
431
429
432
- # No need to hold the _shutdown_lock here because:
433
- # 1. we're the only thread to use the wakeup reader
434
- # 2. we're also the only thread to call thread_wakeup.close()
435
- # 3. we want to avoid a possible deadlock when both reader and writer
436
- # would block (gh-105829)
437
430
self .thread_wakeup .clear ()
438
431
439
432
return result_item , is_broken , cause
@@ -735,7 +728,6 @@ def __init__(self, max_workers=None, mp_context=None,
735
728
self ._call_queue = _SafeQueue (
736
729
max_size = queue_size , ctx = self ._mp_context ,
737
730
pending_work_items = self ._pending_work_items ,
738
- shutdown_lock = self ._shutdown_lock ,
739
731
thread_wakeup = self ._executor_manager_thread_wakeup )
740
732
# Killed worker processes can produce spurious "broken pipe"
741
733
# tracebacks in the queue's own worker thread. But we detect killed
0 commit comments