Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-125451: Fix deadlock in ProcessPoolExecutor shutdown #125492

Merged
merged 6 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 21 additions & 29 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,31 @@
class _ThreadWakeup:
def __init__(self):
self._closed = False
self._lock = threading.Lock()
self._reader, self._writer = mp.Pipe(duplex=False)

def close(self):
# Please note that we do not take the shutdown lock when
# Please note that we do not take the self._lock when
# calling clear() (to avoid deadlocking) so this method can
# only be called safely from the same thread as all calls to
# clear() even if you hold the shutdown lock. Otherwise we
# clear() even if you hold the lock. Otherwise we
# might try to read from the closed pipe.
if not self._closed:
self._closed = True
self._writer.close()
self._reader.close()
with self._lock:
if not self._closed:
self._closed = True
self._writer.close()
self._reader.close()

def wakeup(self):
if not self._closed:
self._writer.send_bytes(b"")
with self._lock:
if not self._closed:
self._writer.send_bytes(b"")

def clear(self):
if not self._closed:
while self._reader.poll():
self._reader.recv_bytes()
if self._closed:
raise RuntimeError('operation on closed _ThreadWakeup')
while self._reader.poll():
self._reader.recv_bytes()


def _python_exit():
Expand Down Expand Up @@ -167,10 +171,8 @@ def __init__(self, work_id, fn, args, kwargs):

class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
thread_wakeup):
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
self.shutdown_lock = shutdown_lock
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)

Expand All @@ -179,8 +181,7 @@ def _on_queue_feeder_error(self, e, obj):
tb = format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
with self.shutdown_lock:
self.thread_wakeup.wakeup()
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
# with BrokenProcessPool
Expand Down Expand Up @@ -296,12 +297,10 @@ def __init__(self, executor):
# if there is no pending work item.
def weakref_cb(_,
thread_wakeup=self.thread_wakeup,
shutdown_lock=self.shutdown_lock,
mp_util_debug=mp.util.debug):
mp_util_debug('Executor collected: triggering callback for'
' QueueManager wakeup')
with shutdown_lock:
thread_wakeup.wakeup()
thread_wakeup.wakeup()

self.executor_reference = weakref.ref(executor, weakref_cb)

Expand Down Expand Up @@ -429,11 +428,6 @@ def wait_result_broken_or_wakeup(self):
elif wakeup_reader in ready:
is_broken = False

# No need to hold the _shutdown_lock here because:
# 1. we're the only thread to use the wakeup reader
# 2. we're also the only thread to call thread_wakeup.close()
# 3. we want to avoid a possible deadlock when both reader and writer
# would block (gh-105829)
self.thread_wakeup.clear()

return result_item, is_broken, cause
Expand Down Expand Up @@ -721,10 +715,9 @@ def __init__(self, max_workers=None, mp_context=None,
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
#
# _shutdown_lock must be locked to access _ThreadWakeup.close() and
# .wakeup(). Care must also be taken to not call clear or close from
# more than one thread since _ThreadWakeup.clear() is not protected by
# the _shutdown_lock
# Care must be taken to only call clear and close from the
# executor_manager_thread, since _ThreadWakeup.clear() is not protected
# by a lock.
self._executor_manager_thread_wakeup = _ThreadWakeup()

# Create communication channels for the executor
Expand All @@ -735,7 +728,6 @@ def __init__(self, max_workers=None, mp_context=None,
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
Expand Down
3 changes: 0 additions & 3 deletions Lib/test/test_concurrent_futures/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,6 @@ def test_cancel_futures_wait_false(self):


class ProcessPoolShutdownTest(ExecutorShutdownTest):
# gh-125451: 'lock' cannot be serialized, the test is broken
# and hangs randomly
@unittest.skipIf(True, "broken test")
def test_processes_terminate(self):
def acquire_lock(lock):
lock.acquire()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix deadlock when :class:`concurrent.futures.ProcessPoolExecutor` shuts down
concurrently with an error when feeding a job to a worker process.
Loading