|
11 | 11 | import unittest |
12 | 12 | import weakref |
13 | 13 | import os |
| 14 | +import subprocess |
14 | 15 |
|
15 | 16 | from test import lock_tests |
16 | 17 |
|
@@ -272,7 +273,6 @@ def test_finalize_runnning_thread(self): |
272 | 273 | except ImportError: |
273 | 274 | raise unittest.SkipTest("cannot import ctypes") |
274 | 275 |
|
275 | | - import subprocess |
276 | 276 | rc = subprocess.call([sys.executable, "-c", """if 1: |
277 | 277 | import ctypes, sys, time, _thread |
278 | 278 |
|
@@ -303,7 +303,6 @@ def waitingThread(): |
303 | 303 | def test_finalize_with_trace(self): |
304 | 304 | # Issue1733757 |
305 | 305 | # Avoid a deadlock when sys.settrace steps into threading._shutdown |
306 | | - import subprocess |
307 | 306 | p = subprocess.Popen([sys.executable, "-c", """if 1: |
308 | 307 | import sys, threading |
309 | 308 |
|
@@ -338,7 +337,6 @@ def func(frame, event, arg): |
338 | 337 | def test_join_nondaemon_on_shutdown(self): |
339 | 338 | # Issue 1722344 |
340 | 339 | # Raising SystemExit skipped threading._shutdown |
341 | | - import subprocess |
342 | 340 | p = subprocess.Popen([sys.executable, "-c", """if 1: |
343 | 341 | import threading |
344 | 342 | from time import sleep |
@@ -445,7 +443,6 @@ def joiningfunc(mainthread): |
445 | 443 | sys.stdout.flush() |
446 | 444 | \n""" + script |
447 | 445 |
|
448 | | - import subprocess |
449 | 446 | p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) |
450 | 447 | rc = p.wait() |
451 | 448 | data = p.stdout.read().decode().replace('\r', '') |
@@ -512,6 +509,152 @@ def worker(): |
512 | 509 | """ |
513 | 510 | self._run_and_join(script) |
514 | 511 |
|
| 512 | + def assertScriptHasOutput(self, script, expected_output): |
| 513 | + p = subprocess.Popen([sys.executable, "-c", script], |
| 514 | + stdout=subprocess.PIPE) |
| 515 | + rc = p.wait() |
| 516 | + data = p.stdout.read().decode().replace('\r', '') |
| 517 | + self.assertEqual(rc, 0, "Unexpected error") |
| 518 | + self.assertEqual(data, expected_output) |
| 519 | + |
| 520 | + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") |
| 521 | + def test_4_joining_across_fork_in_worker_thread(self): |
| 522 | + # There used to be a possible deadlock when forking from a child |
| 523 | + # thread. See http://bugs.python.org/issue6643. |
| 524 | + |
| 525 | + # Skip platforms with known problems forking from a worker thread. |
| 526 | + # See http://bugs.python.org/issue3863. |
| 527 | + if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): |
| 528 | + raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) |
| 529 | + |
| 530 | + # The script takes the following steps: |
| 531 | + # - The main thread in the parent process starts a new thread and then |
| 532 | + # tries to join it. |
| 533 | + # - The join operation acquires the Lock inside the thread's _block |
| 534 | + # Condition. (See threading.py:Thread.join().) |
| 535 | + # - We stub out the acquire method on the condition to force it to wait |
| 536 | + # until the child thread forks. (See LOCK ACQUIRED HERE) |
| 537 | + # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS |
| 538 | + # HERE) |
| 539 | + # - The main thread of the parent process enters Condition.wait(), |
| 540 | + # which releases the lock on the child thread. |
| 541 | + # - The child process returns. Without the necessary fix, when the |
| 542 | + # main thread of the child process (which used to be the child thread |
| 543 | + # in the parent process) attempts to exit, it will try to acquire the |
| 544 | + # lock in the Thread._block Condition object and hang, because the |
| 545 | + # lock was held across the fork. |
| 546 | + |
| 547 | + script = """if 1: |
| 548 | + import os, time, threading |
| 549 | +
|
| 550 | + finish_join = False |
| 551 | + start_fork = False |
| 552 | +
|
| 553 | + def worker(): |
| 554 | + # Wait until this thread's lock is acquired before forking to |
| 555 | + # create the deadlock. |
| 556 | + global finish_join |
| 557 | + while not start_fork: |
| 558 | + time.sleep(0.01) |
| 559 | + # LOCK HELD: Main thread holds lock across this call. |
| 560 | + childpid = os.fork() |
| 561 | + finish_join = True |
| 562 | + if childpid != 0: |
| 563 | + # Parent process just waits for child. |
| 564 | + os.waitpid(childpid, 0) |
| 565 | + # Child process should just return. |
| 566 | +
|
| 567 | + w = threading.Thread(target=worker) |
| 568 | +
|
| 569 | + # Stub out the private condition variable's lock acquire method. |
| 570 | + # This acquires the lock and then waits until the child has forked |
| 571 | + # before returning, which will release the lock soon after. If |
| 572 | + # someone else tries to fix this test case by acquiring this lock |
| 573 | + # before forking instead of reseting it, the test case will |
| 574 | + # deadlock when it shouldn't. |
| 575 | + condition = w._block |
| 576 | + orig_acquire = condition.acquire |
| 577 | + call_count_lock = threading.Lock() |
| 578 | + call_count = 0 |
| 579 | + def my_acquire(): |
| 580 | + global call_count |
| 581 | + global start_fork |
| 582 | + orig_acquire() # LOCK ACQUIRED HERE |
| 583 | + start_fork = True |
| 584 | + if call_count == 0: |
| 585 | + while not finish_join: |
| 586 | + time.sleep(0.01) # WORKER THREAD FORKS HERE |
| 587 | + with call_count_lock: |
| 588 | + call_count += 1 |
| 589 | + condition.acquire = my_acquire |
| 590 | +
|
| 591 | + w.start() |
| 592 | + w.join() |
| 593 | + print('end of main') |
| 594 | + """ |
| 595 | + self.assertScriptHasOutput(script, "end of main\n") |
| 596 | + |
| 597 | + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") |
| 598 | + def test_5_clear_waiter_locks_to_avoid_crash(self): |
| 599 | + # Check that a spawned thread that forks doesn't segfault on certain |
| 600 | + # platforms, namely OS X. This used to happen if there was a waiter |
| 601 | + # lock in the thread's condition variable's waiters list. Even though |
| 602 | + # we know the lock will be held across the fork, it is not safe to |
| 603 | + # release locks held across forks on all platforms, so releasing the |
| 604 | + # waiter lock caused a segfault on OS X. Furthermore, since locks on |
| 605 | + # OS X are (as of this writing) implemented with a mutex + condition |
| 606 | + # variable instead of a semaphore, while we know that the Python-level |
| 607 | + # lock will be acquired, we can't know if the internal mutex will be |
| 608 | + # acquired at the time of the fork. |
| 609 | + |
| 610 | + # Skip platforms with known problems forking from a worker thread. |
| 611 | + # See http://bugs.python.org/issue3863. |
| 612 | + if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): |
| 613 | + raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) |
| 614 | + script = """if True: |
| 615 | + import os, time, threading |
| 616 | +
|
| 617 | + start_fork = False |
| 618 | +
|
| 619 | + def worker(): |
| 620 | + # Wait until the main thread has attempted to join this thread |
| 621 | + # before continuing. |
| 622 | + while not start_fork: |
| 623 | + time.sleep(0.01) |
| 624 | + childpid = os.fork() |
| 625 | + if childpid != 0: |
| 626 | + # Parent process just waits for child. |
| 627 | + (cpid, rc) = os.waitpid(childpid, 0) |
| 628 | + assert cpid == childpid |
| 629 | + assert rc == 0 |
| 630 | + print('end of worker thread') |
| 631 | + else: |
| 632 | + # Child process should just return. |
| 633 | + pass |
| 634 | +
|
| 635 | + w = threading.Thread(target=worker) |
| 636 | +
|
| 637 | + # Stub out the private condition variable's _release_save method. |
| 638 | + # This releases the condition's lock and flips the global that |
| 639 | + # causes the worker to fork. At this point, the problematic waiter |
| 640 | + # lock has been acquired once by the waiter and has been put onto |
| 641 | + # the waiters list. |
| 642 | + condition = w._block |
| 643 | + orig_release_save = condition._release_save |
| 644 | + def my_release_save(): |
| 645 | + global start_fork |
| 646 | + orig_release_save() |
| 647 | + # Waiter lock held here, condition lock released. |
| 648 | + start_fork = True |
| 649 | + condition._release_save = my_release_save |
| 650 | +
|
| 651 | + w.start() |
| 652 | + w.join() |
| 653 | + print('end of main thread') |
| 654 | + """ |
| 655 | + output = "end of worker thread\nend of main thread\n" |
| 656 | + self.assertScriptHasOutput(script, output) |
| 657 | + |
515 | 658 |
|
516 | 659 | class ThreadingExceptionTests(BaseTestCase): |
517 | 660 | # A RuntimeError should be raised if Thread.start() is called |
|
0 commit comments