Skip to content

Commit 96c886c

Browse files
committed
issue6643 - Two locks held within the threading module on each thread instance
needed to be reinitialized after fork(). Adds tests to confirm that they are and that a potential deadlock and crasher bug are fixed (platform dependant).
1 parent 68530ac commit 96c886c

2 files changed

Lines changed: 151 additions & 4 deletions

File tree

Lib/test/test_threading.py

Lines changed: 147 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import unittest
1212
import weakref
1313
import os
14+
import subprocess
1415

1516
from test import lock_tests
1617

@@ -272,7 +273,6 @@ def test_finalize_runnning_thread(self):
272273
except ImportError:
273274
raise unittest.SkipTest("cannot import ctypes")
274275

275-
import subprocess
276276
rc = subprocess.call([sys.executable, "-c", """if 1:
277277
import ctypes, sys, time, _thread
278278
@@ -303,7 +303,6 @@ def waitingThread():
303303
def test_finalize_with_trace(self):
304304
# Issue1733757
305305
# Avoid a deadlock when sys.settrace steps into threading._shutdown
306-
import subprocess
307306
p = subprocess.Popen([sys.executable, "-c", """if 1:
308307
import sys, threading
309308
@@ -338,7 +337,6 @@ def func(frame, event, arg):
338337
def test_join_nondaemon_on_shutdown(self):
339338
# Issue 1722344
340339
# Raising SystemExit skipped threading._shutdown
341-
import subprocess
342340
p = subprocess.Popen([sys.executable, "-c", """if 1:
343341
import threading
344342
from time import sleep
@@ -445,7 +443,6 @@ def joiningfunc(mainthread):
445443
sys.stdout.flush()
446444
\n""" + script
447445

448-
import subprocess
449446
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
450447
rc = p.wait()
451448
data = p.stdout.read().decode().replace('\r', '')
@@ -512,6 +509,152 @@ def worker():
512509
"""
513510
self._run_and_join(script)
514511

512+
def assertScriptHasOutput(self, script, expected_output):
513+
p = subprocess.Popen([sys.executable, "-c", script],
514+
stdout=subprocess.PIPE)
515+
rc = p.wait()
516+
data = p.stdout.read().decode().replace('\r', '')
517+
self.assertEqual(rc, 0, "Unexpected error")
518+
self.assertEqual(data, expected_output)
519+
520+
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
521+
def test_4_joining_across_fork_in_worker_thread(self):
522+
# There used to be a possible deadlock when forking from a child
523+
# thread. See http://bugs.python.org/issue6643.
524+
525+
# Skip platforms with known problems forking from a worker thread.
526+
# See http://bugs.python.org/issue3863.
527+
if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
528+
raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
529+
530+
# The script takes the following steps:
531+
# - The main thread in the parent process starts a new thread and then
532+
# tries to join it.
533+
# - The join operation acquires the Lock inside the thread's _block
534+
# Condition. (See threading.py:Thread.join().)
535+
# - We stub out the acquire method on the condition to force it to wait
536+
# until the child thread forks. (See LOCK ACQUIRED HERE)
537+
# - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
538+
# HERE)
539+
# - The main thread of the parent process enters Condition.wait(),
540+
# which releases the lock on the child thread.
541+
# - The child process returns. Without the necessary fix, when the
542+
# main thread of the child process (which used to be the child thread
543+
# in the parent process) attempts to exit, it will try to acquire the
544+
# lock in the Thread._block Condition object and hang, because the
545+
# lock was held across the fork.
546+
547+
script = """if 1:
548+
import os, time, threading
549+
550+
finish_join = False
551+
start_fork = False
552+
553+
def worker():
554+
# Wait until this thread's lock is acquired before forking to
555+
# create the deadlock.
556+
global finish_join
557+
while not start_fork:
558+
time.sleep(0.01)
559+
# LOCK HELD: Main thread holds lock across this call.
560+
childpid = os.fork()
561+
finish_join = True
562+
if childpid != 0:
563+
# Parent process just waits for child.
564+
os.waitpid(childpid, 0)
565+
# Child process should just return.
566+
567+
w = threading.Thread(target=worker)
568+
569+
# Stub out the private condition variable's lock acquire method.
570+
# This acquires the lock and then waits until the child has forked
571+
# before returning, which will release the lock soon after. If
572+
# someone else tries to fix this test case by acquiring this lock
573+
# before forking instead of reseting it, the test case will
574+
# deadlock when it shouldn't.
575+
condition = w._block
576+
orig_acquire = condition.acquire
577+
call_count_lock = threading.Lock()
578+
call_count = 0
579+
def my_acquire():
580+
global call_count
581+
global start_fork
582+
orig_acquire() # LOCK ACQUIRED HERE
583+
start_fork = True
584+
if call_count == 0:
585+
while not finish_join:
586+
time.sleep(0.01) # WORKER THREAD FORKS HERE
587+
with call_count_lock:
588+
call_count += 1
589+
condition.acquire = my_acquire
590+
591+
w.start()
592+
w.join()
593+
print('end of main')
594+
"""
595+
self.assertScriptHasOutput(script, "end of main\n")
596+
597+
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
598+
def test_5_clear_waiter_locks_to_avoid_crash(self):
599+
# Check that a spawned thread that forks doesn't segfault on certain
600+
# platforms, namely OS X. This used to happen if there was a waiter
601+
# lock in the thread's condition variable's waiters list. Even though
602+
# we know the lock will be held across the fork, it is not safe to
603+
# release locks held across forks on all platforms, so releasing the
604+
# waiter lock caused a segfault on OS X. Furthermore, since locks on
605+
# OS X are (as of this writing) implemented with a mutex + condition
606+
# variable instead of a semaphore, while we know that the Python-level
607+
# lock will be acquired, we can't know if the internal mutex will be
608+
# acquired at the time of the fork.
609+
610+
# Skip platforms with known problems forking from a worker thread.
611+
# See http://bugs.python.org/issue3863.
612+
if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
613+
raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
614+
script = """if True:
615+
import os, time, threading
616+
617+
start_fork = False
618+
619+
def worker():
620+
# Wait until the main thread has attempted to join this thread
621+
# before continuing.
622+
while not start_fork:
623+
time.sleep(0.01)
624+
childpid = os.fork()
625+
if childpid != 0:
626+
# Parent process just waits for child.
627+
(cpid, rc) = os.waitpid(childpid, 0)
628+
assert cpid == childpid
629+
assert rc == 0
630+
print('end of worker thread')
631+
else:
632+
# Child process should just return.
633+
pass
634+
635+
w = threading.Thread(target=worker)
636+
637+
# Stub out the private condition variable's _release_save method.
638+
# This releases the condition's lock and flips the global that
639+
# causes the worker to fork. At this point, the problematic waiter
640+
# lock has been acquired once by the waiter and has been put onto
641+
# the waiters list.
642+
condition = w._block
643+
orig_release_save = condition._release_save
644+
def my_release_save():
645+
global start_fork
646+
orig_release_save()
647+
# Waiter lock held here, condition lock released.
648+
start_fork = True
649+
condition._release_save = my_release_save
650+
651+
w.start()
652+
w.join()
653+
print('end of main thread')
654+
"""
655+
output = "end of worker thread\nend of main thread\n"
656+
self.assertScriptHasOutput(script, output)
657+
515658

516659
class ThreadingExceptionTests(BaseTestCase):
517660
# A RuntimeError should be raised if Thread.start() is called

Lib/threading.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,10 @@ def _after_fork():
10641064
# its new value since it can have changed.
10651065
ident = _get_ident()
10661066
thread._ident = ident
1067+
# Any condition variables hanging off of the active thread may
1068+
# be in an invalid state, so we reinitialize them.
1069+
thread._block.__init__()
1070+
thread._started._cond.__init__()
10671071
new_active[ident] = thread
10681072
else:
10691073
# All the others are already stopped.

0 commit comments

Comments
 (0)