Skip to content

Commit 546d35b

Browse files
authored
impl multiprocessing SemLock (#6542)
1 parent 346519b commit 546d35b

File tree

10 files changed

+849
-42
lines changed

10 files changed

+849
-42
lines changed

.cspell.dict/cpython.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ fromlist
3030
heaptype
3131
HIGHRES
3232
IMMUTABLETYPE
33+
ismine
3334
Itertool
3435
keeped
3536
kwonlyarg
@@ -40,6 +41,7 @@ lsprof
4041
maxdepth
4142
mult
4243
multibytecodec
44+
newsemlockobject
4345
nkwargs
4446
noraise
4547
numer

Lib/test/_test_multiprocessing.py

Lines changed: 89 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
from test.support import socket_helper
3939
from test.support import threading_helper
4040
from test.support import warnings_helper
41-
41+
from test.support import subTests
4242

4343
# Skip tests if _multiprocessing wasn't built.
4444
_multiprocessing = import_helper.import_module('_multiprocessing')
@@ -1109,7 +1109,7 @@ def test_put(self):
11091109
@classmethod
11101110
def _test_get(cls, queue, child_can_start, parent_can_continue):
11111111
child_can_start.wait()
1112-
#queue.put(1)
1112+
queue.put(1)
11131113
queue.put(2)
11141114
queue.put(3)
11151115
queue.put(4)
@@ -1133,15 +1133,16 @@ def test_get(self):
11331133
child_can_start.set()
11341134
parent_can_continue.wait()
11351135

1136-
time.sleep(DELTA)
1136+
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
1137+
if not queue_empty(queue):
1138+
break
11371139
self.assertEqual(queue_empty(queue), False)
11381140

1139-
# Hangs unexpectedly, remove for now
1140-
#self.assertEqual(queue.get(), 1)
1141+
self.assertEqual(queue.get_nowait(), 1)
11411142
self.assertEqual(queue.get(True, None), 2)
11421143
self.assertEqual(queue.get(True), 3)
11431144
self.assertEqual(queue.get(timeout=1), 4)
1144-
self.assertEqual(queue.get_nowait(), 5)
1145+
self.assertEqual(queue.get(), 5)
11451146

11461147
self.assertEqual(queue_empty(queue), True)
11471148

@@ -2970,6 +2971,8 @@ def test_map_no_failfast(self):
29702971
# check that we indeed waited for all jobs
29712972
self.assertGreater(time.monotonic() - t_start, 0.9)
29722973

2974+
# TODO: RUSTPYTHON - reference counting differences
2975+
@unittest.skip("TODO: RUSTPYTHON")
29732976
def test_release_task_refs(self):
29742977
# Issue #29861: task arguments and results should not be kept
29752978
# alive after we are done with them.
@@ -3882,6 +3885,8 @@ def _remote(cls, conn):
38823885

38833886
conn.close()
38843887

3888+
# TODO: RUSTPYTHON - hangs
3889+
@unittest.skip("TODO: RUSTPYTHON")
38853890
def test_pickling(self):
38863891
families = self.connection.families
38873892

@@ -4051,6 +4056,8 @@ def test_heap(self):
40514056
self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
40524057
self.assertEqual(len(heap._len_to_seq), 0)
40534058

4059+
# TODO: RUSTPYTHON - gc.enable() not implemented
4060+
@unittest.expectedFailure
40544061
def test_free_from_gc(self):
40554062
# Check that freeing of blocks by the garbage collector doesn't deadlock
40564063
# (issue #12352).
@@ -4103,6 +4110,8 @@ def _double(cls, x, y, z, foo, arr, string):
41034110
for i in range(len(arr)):
41044111
arr[i] *= 2
41054112

4113+
# TODO: RUSTPYTHON - ctypes Structure shared memory not working
4114+
@unittest.expectedFailure
41064115
def test_sharedctypes(self, lock=False):
41074116
x = Value('i', 7, lock=lock)
41084117
y = Value(c_double, 1.0/3.0, lock=lock)
@@ -4126,6 +4135,8 @@ def test_sharedctypes(self, lock=False):
41264135
self.assertAlmostEqual(arr[i], i*2)
41274136
self.assertEqual(string.value, latin('hellohello'))
41284137

4138+
# TODO: RUSTPYTHON - calls test_sharedctypes which fails
4139+
@unittest.expectedFailure
41294140
def test_synchronize(self):
41304141
self.test_sharedctypes(lock=True)
41314142

@@ -4140,6 +4151,19 @@ def test_copy(self):
41404151
self.assertEqual(bar.z, 2 ** 33)
41414152

41424153

4154+
def resource_tracker_format_subtests(func):
4155+
"""Run given test using both resource tracker communication formats"""
4156+
def _inner(self, *args, **kwargs):
4157+
tracker = resource_tracker._resource_tracker
4158+
for use_simple_format in False, True:
4159+
with (
4160+
self.subTest(use_simple_format=use_simple_format),
4161+
unittest.mock.patch.object(
4162+
tracker, '_use_simple_format', use_simple_format)
4163+
):
4164+
func(self, *args, **kwargs)
4165+
return _inner
4166+
41434167
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
41444168
@hashlib_helper.requires_hashdigest('sha256')
41454169
class _TestSharedMemory(BaseTestCase):
@@ -4417,6 +4441,7 @@ def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
44174441
smm.shutdown()
44184442

44194443
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4444+
@resource_tracker_format_subtests
44204445
def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
44214446
# bpo-36867: test that a SharedMemoryManager uses the
44224447
# same resource_tracker process as its parent.
@@ -4667,6 +4692,7 @@ def test_shared_memory_cleaned_after_process_termination(self):
46674692
"shared_memory objects to clean up at shutdown", err)
46684693

46694694
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4695+
@resource_tracker_format_subtests
46704696
def test_shared_memory_untracking(self):
46714697
# gh-82300: When a separate Python process accesses shared memory
46724698
# with track=False, it must not cause the memory to be deleted
@@ -4694,6 +4720,7 @@ def test_shared_memory_untracking(self):
46944720
mem.close()
46954721

46964722
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4723+
@resource_tracker_format_subtests
46974724
def test_shared_memory_tracking(self):
46984725
# gh-82300: When a separate Python process accesses shared memory
46994726
# with track=True, it must cause the memory to be deleted when
@@ -4787,6 +4814,8 @@ def test_finalize(self):
47874814
result = [obj for obj in iter(conn.recv, 'STOP')]
47884815
self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
47894816

4817+
# TODO: RUSTPYTHON - gc.get_threshold() and gc.set_threshold() not implemented
4818+
@unittest.expectedFailure
47904819
@support.requires_resource('cpu')
47914820
def test_thread_safety(self):
47924821
# bpo-24484: _run_finalizers() should be thread-safe
@@ -5414,6 +5443,8 @@ def run_in_child(cls, start_method):
54145443
flags = (tuple(sys.flags), grandchild_flags)
54155444
print(json.dumps(flags))
54165445

5446+
# TODO: RUSTPYTHON - SyntaxError in subprocess after fork
5447+
@unittest.expectedFailure
54175448
def test_flags(self):
54185449
import json
54195450
# start child process using unusual flags
@@ -6457,28 +6488,13 @@ def test_std_streams_flushed_after_preload(self):
64576488
if multiprocessing.get_start_method() != "forkserver":
64586489
self.skipTest("forkserver specific test")
64596490

6460-
# Create a test module in the temporary directory on the child's path
6461-
# TODO: This can all be simplified once gh-126631 is fixed and we can
6462-
# use __main__ instead of a module.
6463-
dirname = os.path.join(self._temp_dir, 'preloaded_module')
6464-
init_name = os.path.join(dirname, '__init__.py')
6465-
os.mkdir(dirname)
6466-
with open(init_name, "w") as f:
6467-
cmd = '''if 1:
6468-
import sys
6469-
print('stderr', end='', file=sys.stderr)
6470-
print('stdout', end='', file=sys.stdout)
6471-
'''
6472-
f.write(cmd)
6473-
64746491
name = os.path.join(os.path.dirname(__file__), 'mp_preload_flush.py')
6475-
env = {'PYTHONPATH': self._temp_dir}
6476-
_, out, err = test.support.script_helper.assert_python_ok(name, **env)
6492+
_, out, err = test.support.script_helper.assert_python_ok(name)
64776493

64786494
# Check stderr first, as it is more likely to be useful to see in the
64796495
# event of a failure.
6480-
self.assertEqual(err.decode().rstrip(), 'stderr')
6481-
self.assertEqual(out.decode().rstrip(), 'stdout')
6496+
self.assertEqual(err.decode().rstrip(), '__main____mp_main__')
6497+
self.assertEqual(out.decode().rstrip(), '__main____mp_main__')
64826498

64836499

64846500
class MiscTestCase(unittest.TestCase):
@@ -6804,3 +6820,52 @@ class SemLock(_multiprocessing.SemLock):
68046820
name = f'test_semlock_subclass-{os.getpid()}'
68056821
s = SemLock(1, 0, 10, name, False)
68066822
_multiprocessing.sem_unlink(name)
6823+
6824+
6825+
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
6826+
class TestSharedMemoryNames(unittest.TestCase):
6827+
@subTests('use_simple_format', (True, False))
6828+
def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(
6829+
self, use_simple_format):
6830+
# Test script that creates and cleans up shared memory with colon in name
6831+
test_script = textwrap.dedent("""
6832+
import sys
6833+
from multiprocessing import shared_memory
6834+
from multiprocessing import resource_tracker
6835+
import time
6836+
6837+
resource_tracker._resource_tracker._use_simple_format = %s
6838+
6839+
# Test various patterns of colons in names
6840+
test_names = [
6841+
"a:b",
6842+
"a:b:c",
6843+
"test:name:with:many:colons",
6844+
":starts:with:colon",
6845+
"ends:with:colon:",
6846+
"::double::colons::",
6847+
"name\\nwithnewline",
6848+
"name-with-trailing-newline\\n",
6849+
"\\nname-starts-with-newline",
6850+
"colons:and\\nnewlines:mix",
6851+
"multi\\nline\\nname",
6852+
]
6853+
6854+
for name in test_names:
6855+
try:
6856+
shm = shared_memory.SharedMemory(create=True, size=100, name=name)
6857+
shm.buf[:5] = b'hello' # Write something to the shared memory
6858+
shm.close()
6859+
shm.unlink()
6860+
6861+
except Exception as e:
6862+
print(f"Error with name '{name}': {e}", file=sys.stderr)
6863+
sys.exit(1)
6864+
6865+
print("SUCCESS")
6866+
""" % use_simple_format)
6867+
6868+
rc, out, err = script_helper.assert_python_ok("-c", test_script)
6869+
self.assertIn(b"SUCCESS", out)
6870+
self.assertNotIn(b"traceback", err.lower(), err)
6871+
self.assertNotIn(b"resource_tracker.py", err, err)

Lib/test/mp_fork_bomb.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import multiprocessing, sys
2+
3+
def foo():
4+
print("123")
5+
6+
# Because "if __name__ == '__main__'" is missing this will not work
7+
# correctly on Windows. However, we should get a RuntimeError rather
8+
# than the Windows equivalent of a fork bomb.
9+
10+
if len(sys.argv) > 1:
11+
multiprocessing.set_start_method(sys.argv[1])
12+
else:
13+
multiprocessing.set_start_method('spawn')
14+
15+
p = multiprocessing.Process(target=foo)
16+
p.start()
17+
p.join()
18+
sys.exit(p.exitcode)

Lib/test/mp_preload.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import multiprocessing
2+
3+
multiprocessing.Lock()
4+
5+
6+
def f():
7+
print("ok")
8+
9+
10+
if __name__ == "__main__":
11+
ctx = multiprocessing.get_context("forkserver")
12+
modname = "test.mp_preload"
13+
# Make sure it's importable
14+
__import__(modname)
15+
ctx.set_forkserver_preload([modname])
16+
proc = ctx.Process(target=f)
17+
proc.start()
18+
proc.join()

Lib/test/mp_preload_flush.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import multiprocessing
2+
import sys
3+
4+
print(__name__, end='', file=sys.stderr)
5+
print(__name__, end='', file=sys.stdout)
6+
if __name__ == '__main__':
7+
multiprocessing.set_start_method('forkserver')
8+
for _ in range(2):
9+
p = multiprocessing.Process()
10+
p.start()
11+
p.join()

Lib/test/test_importlib/test_threaded_import.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,7 @@ def test_concurrent_futures_circular_import(self):
256256
'partial', 'cfimport.py')
257257
script_helper.assert_python_ok(fn)
258258

259-
@unittest.skipUnless(hasattr(_multiprocessing, "SemLock"), "TODO: RUSTPYTHON, pool_in_threads.py needs _multiprocessing.SemLock")
260-
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
259+
@unittest.skip("TODO: RUSTPYTHON - fails on Linux due to multiprocessing issues")
261260
def test_multiprocessing_pool_circular_import(self):
262261
# Regression test for bpo-41567
263262
fn = os.path.join(os.path.dirname(__file__),

Lib/test/test_logging.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4058,7 +4058,8 @@ def _mpinit_issue121723(qspec, message_to_log):
40584058
# log a message (this creates a record put in the queue)
40594059
logging.getLogger().info(message_to_log)
40604060

4061-
@unittest.expectedFailure # TODO: RUSTPYTHON; ImportError: cannot import name 'SemLock'
4061+
# TODO: RUSTPYTHON - SemLock not implemented on Windows
4062+
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
40624063
@skip_if_tsan_fork
40634064
@support.requires_subprocess()
40644065
def test_multiprocessing_queues(self):
@@ -4118,7 +4119,8 @@ def test_90195(self):
41184119
# Logger should be enabled, since explicitly mentioned
41194120
self.assertFalse(logger.disabled)
41204121

4121-
@unittest.expectedFailure # TODO: RUSTPYTHON; ImportError: cannot import name 'SemLock'
4122+
# TODO: RUSTPYTHON - SemLock not implemented on Windows
4123+
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
41224124
def test_111615(self):
41234125
# See gh-111615
41244126
import_helper.import_module('_multiprocessing') # see gh-113692

0 commit comments

Comments
 (0)