Skip to content

Commit 2bb9173

Browse files
authored
Suspend Python threads before fork() (#7364)
* Suspend Python threads before fork() Add stop-the-world thread suspension around fork() to prevent deadlocks from locks held by dead parent threads in the child. - Thread states: DETACHED / ATTACHED / SUSPENDED with atomic CAS transitions matching _PyThreadState_{Attach,Detach,Suspend} - stop_the_world / start_the_world: park all non-requester threads before fork, resume after (parent) or reset (child) - allow_threads (Py_BEGIN/END_ALLOW_THREADS): detach around blocking syscalls (os.read/write, waitpid, Lock.acquire, time.sleep) so stop_the_world can force-park via CAS - Acquire/release import lock around fork lifecycle - zero_reinit_after_fork: generic lock reset for parking_lot types - gc_clear_raw: detach dict instead of clearing entries - Lock-free double-check for descriptor cache reads (no read-side seqlock); write-side seqlock retained for writer serialization - fork() returns PyResult, checks PythonFinalizationError, calls sys.audit
1 parent ed5bffe commit 2bb9173

File tree

18 files changed

+922
-107
lines changed

18 files changed

+922
-107
lines changed

.cspell.dict/cpython.txt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ NEWLOCALS
127127
newsemlockobject
128128
nfrees
129129
nkwargs
130-
nlocalsplus
131130
nkwelts
131+
nlocalsplus
132132
Nondescriptor
133133
noninteger
134134
nops
@@ -160,6 +160,7 @@ pylifecycle
160160
pymain
161161
pyrepl
162162
PYTHONTRACEMALLOC
163+
PYTHONUTF8
163164
pythonw
164165
PYTHREAD_NAME
165166
releasebuffer
@@ -171,9 +172,11 @@ saveall
171172
scls
172173
setdict
173174
setfunc
175+
setprofileallthreads
174176
SETREF
175177
setresult
176178
setslice
179+
settraceallthreads
177180
SLOTDEFINED
178181
SMALLBUF
179182
SOABI
@@ -190,8 +193,10 @@ subparams
190193
subscr
191194
sval
192195
swappedbytes
196+
sysdict
193197
templatelib
194198
testconsole
199+
threadstate
195200
ticketer
196201
tmptype
197202
tok_oldval

.cspell.json

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,6 @@
152152
"IFEXEC",
153153
// "stat"
154154
"FIRMLINK",
155-
// CPython internal names
156-
"PYTHONUTF",
157-
"sysdict",
158-
"settraceallthreads",
159-
"setprofileallthreads"
160155
],
161156
// flagWords - list of words to be always considered incorrect
162157
"flagWords": [

Lib/test/test_fork1.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020

2121
class ForkTest(ForkWait):
22-
@unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: process 44587 exited with code 1, but exit code 42 is expected
2322
def test_threaded_import_lock_fork(self):
2423
"""Check fork() in main thread works while a subthread is doing an import"""
2524
import_started = threading.Event()

Lib/test/test_os.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5574,7 +5574,6 @@ def test_fork_warns_when_non_python_thread_exists(self):
55745574
self.assertEqual(err.decode("utf-8"), "")
55755575
self.assertEqual(out.decode("utf-8"), "")
55765576

5577-
@unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: b"can't fork at interpreter shutdown" not found in b"Exception ignored in: <function AtFinalization.__del__ at 0xc508b30c0>\nAttributeError: 'NoneType' object has no attribute 'fork'\n"
55785577
def test_fork_at_finalization(self):
55795578
code = """if 1:
55805579
import atexit

crates/common/src/lock.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -68,43 +68,45 @@ pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock,
6868

6969
// can add fn const_{mutex,rw_lock}() if necessary, but we probably won't need to
7070

71-
/// Reset a `PyMutex` to its initial (unlocked) state after `fork()`.
71+
/// Reset a lock to its initial (unlocked) state by zeroing its bytes.
7272
///
73-
/// After `fork()`, locks held by dead parent threads would deadlock in the
74-
/// child. This writes `RawMutex::INIT` via the `Mutex::raw()` accessor,
75-
/// bypassing the normal unlock path which may interact with parking_lot's
76-
/// internal waiter queues.
73+
/// After `fork()`, any lock held by a now-dead thread would remain
74+
/// permanently locked. We zero the raw bytes (the unlocked state for all
75+
/// `parking_lot` raw lock types) instead of using the normal unlock path,
76+
/// which would interact with stale waiter queues.
7777
///
7878
/// # Safety
7979
///
8080
/// Must only be called from the single-threaded child process immediately
8181
/// after `fork()`, before any other thread is created.
82-
#[cfg(unix)]
83-
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
84-
// Use Mutex::raw() to access the underlying lock without layout assumptions.
85-
// parking_lot::RawMutex (AtomicU8) and RawCellMutex (Cell<bool>) both
86-
// represent the unlocked state as all-zero bytes.
82+
/// The type `T` must represent the unlocked state as all-zero bytes
83+
/// (true for `parking_lot::RawMutex`, `RawRwLock`, `RawReentrantMutex`, etc.).
84+
pub unsafe fn zero_reinit_after_fork<T>(lock: *const T) {
8785
unsafe {
88-
let raw = mutex.raw() as *const RawMutex as *mut u8;
89-
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawMutex>());
86+
core::ptr::write_bytes(lock as *mut u8, 0, core::mem::size_of::<T>());
9087
}
9188
}
9289

93-
/// Reset a `PyRwLock` to its initial (unlocked) state after `fork()`.
90+
/// Reset a `PyMutex` after `fork()`. See [`zero_reinit_after_fork`].
91+
///
92+
/// # Safety
9493
///
95-
/// Same rationale as [`reinit_mutex_after_fork`] — dead threads' read or
96-
/// write locks would cause permanent deadlock in the child.
94+
/// Must only be called from the single-threaded child process immediately
95+
/// after `fork()`, before any other thread is created.
96+
#[cfg(unix)]
97+
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
98+
unsafe { zero_reinit_after_fork(mutex.raw()) }
99+
}
100+
101+
/// Reset a `PyRwLock` after `fork()`. See [`zero_reinit_after_fork`].
97102
///
98103
/// # Safety
99104
///
100105
/// Must only be called from the single-threaded child process immediately
101106
/// after `fork()`, before any other thread is created.
102107
#[cfg(unix)]
103108
pub unsafe fn reinit_rwlock_after_fork<T: ?Sized>(rwlock: &PyRwLock<T>) {
104-
unsafe {
105-
let raw = rwlock.raw() as *const RawRwLock as *mut u8;
106-
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawRwLock>());
107-
}
109+
unsafe { zero_reinit_after_fork(rwlock.raw()) }
108110
}
109111

110112
/// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`.

crates/stdlib/src/select.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,9 @@ mod decl {
280280

281281
loop {
282282
let mut tv = timeout.map(sec_to_timeval);
283-
let res = super::select(nfds, &mut r, &mut w, &mut x, tv.as_mut());
283+
let res = vm.allow_threads(|| {
284+
super::select(nfds, &mut r, &mut w, &mut x, tv.as_mut())
285+
});
284286

285287
match res {
286288
Ok(_) => break,

crates/stdlib/src/socket.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,7 +1105,8 @@ mod _socket {
11051105
loop {
11061106
if deadline.is_some() || matches!(select, SelectKind::Connect) {
11071107
let interval = deadline.as_ref().map(|d| d.time_until()).transpose()?;
1108-
let res = sock_select(&*self.sock()?, select, interval);
1108+
let sock = self.sock()?;
1109+
let res = vm.allow_threads(|| sock_select(&sock, select, interval));
11091110
match res {
11101111
Ok(true) => return Err(IoOrPyException::Timeout),
11111112
Err(e) if e.kind() == io::ErrorKind::Interrupted => {
@@ -1118,8 +1119,9 @@ mod _socket {
11181119
}
11191120

11201121
let err = loop {
1121-
// loop on interrupt
1122-
match f() {
1122+
// Detach thread state around the blocking syscall so
1123+
// stop-the-world can park this thread (e.g. before fork).
1124+
match vm.allow_threads(&mut f) {
11231125
Ok(x) => return Ok(x),
11241126
Err(e) if e.kind() == io::ErrorKind::Interrupted => vm.check_signals()?,
11251127
Err(e) => break e,
@@ -1342,7 +1344,8 @@ mod _socket {
13421344
) -> Result<(), IoOrPyException> {
13431345
let sock_addr = self.extract_address(address, caller, vm)?;
13441346

1345-
let err = match self.sock()?.connect(&sock_addr) {
1347+
let sock = self.sock()?;
1348+
let err = match vm.allow_threads(|| sock.connect(&sock_addr)) {
13461349
Ok(()) => return Ok(()),
13471350
Err(e) => e,
13481351
};

crates/vm/src/frame.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7016,6 +7016,14 @@ impl ExecutingFrame<'_> {
70167016
Ok(None)
70177017
}
70187018

7019+
/// Read a cached descriptor pointer and validate it against the expected
7020+
/// type version, using a lock-free double-check pattern:
7021+
/// 1. read pointer → incref (try_to_owned)
7022+
/// 2. re-read version + pointer and confirm they still match
7023+
///
7024+
/// This matches the read-side pattern used in LOAD_ATTR_METHOD_WITH_VALUES
7025+
/// and friends: no read-side lock, relying on the write side to invalidate
7026+
/// the version tag before swapping the pointer.
70197027
#[inline]
70207028
fn try_read_cached_descriptor(
70217029
&self,
@@ -7026,7 +7034,12 @@ impl ExecutingFrame<'_> {
70267034
if descr_ptr == 0 {
70277035
return None;
70287036
}
7037+
// SAFETY: `descr_ptr` was a valid `*mut PyObject` when the writer
7038+
// stored it, and the writer keeps a strong reference alive in
7039+
// `InlineCacheEntry`. `try_to_owned_from_ptr` performs a
7040+
// conditional incref that fails if the object is already freed.
70297041
let cloned = unsafe { PyObject::try_to_owned_from_ptr(descr_ptr as *mut PyObject) }?;
7042+
// Double-check: version tag still matches AND pointer unchanged.
70307043
if self.code.instructions.read_cache_u32(cache_base + 1) == expected_type_version
70317044
&& self.code.instructions.read_cache_ptr(cache_base + 5) == descr_ptr
70327045
{
@@ -7044,8 +7057,9 @@ impl ExecutingFrame<'_> {
70447057
type_version: u32,
70457058
descr_ptr: usize,
70467059
) {
7047-
// Publish descriptor cache atomically as a tuple:
7060+
// Publish descriptor cache with version-invalidation protocol:
70487061
// invalidate version first, then write payload, then publish version.
7062+
// Reader double-checks version+ptr after incref, so no writer lock needed.
70497063
unsafe {
70507064
self.code.instructions.write_cache_u32(cache_base + 1, 0);
70517065
self.code
@@ -7065,7 +7079,6 @@ impl ExecutingFrame<'_> {
70657079
metaclass_version: u32,
70667080
descr_ptr: usize,
70677081
) {
7068-
// Same publish protocol as write_cached_descriptor(), plus metaclass guard.
70697082
unsafe {
70707083
self.code.instructions.write_cache_u32(cache_base + 1, 0);
70717084
self.code

crates/vm/src/object/core.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use super::{
1717
};
1818
use crate::object::traverse_object::PyObjVTable;
1919
use crate::{
20-
builtins::{PyDict, PyDictRef, PyType, PyTypeRef},
20+
builtins::{PyDictRef, PyType, PyTypeRef},
2121
common::{
2222
atomic::{Ordering, PyAtomic, Radium},
2323
linked_list::{Link, Pointers},
@@ -916,6 +916,12 @@ impl InstanceDict {
916916
pub fn replace(&self, d: PyDictRef) -> PyDictRef {
917917
core::mem::replace(&mut self.d.write(), d)
918918
}
919+
920+
/// Consume the InstanceDict and return the inner PyDictRef.
921+
#[inline]
922+
pub fn into_inner(self) -> PyDictRef {
923+
self.d.into_inner()
924+
}
919925
}
920926

921927
impl<T: PyPayload> PyInner<T> {
@@ -1668,11 +1674,19 @@ impl PyObject {
16681674
}
16691675

16701676
// 2. Clear dict and member slots (subtype_clear)
1671-
if let Some(ext) = obj.0.ext_ref() {
1672-
if let Some(dict) = ext.dict.as_ref() {
1673-
let dict_ref = dict.get();
1674-
// Clear dict entries to break cycles, then collect the dict itself
1675-
PyDict::clear(&dict_ref);
1677+
// Use mutable access to actually detach the dict, matching CPython's
1678+
// Py_CLEAR(*_PyObject_GetDictPtr(self)) which NULLs the dict pointer
1679+
// without clearing dict contents. This is critical because the dict
1680+
// may still be referenced by other live objects (e.g. function.__globals__).
1681+
if obj.0.has_ext() {
1682+
let self_addr = (ptr as *const u8).addr();
1683+
let ext_ptr = core::ptr::with_exposed_provenance_mut::<ObjExt>(
1684+
self_addr.wrapping_sub(EXT_OFFSET),
1685+
);
1686+
let ext = unsafe { &mut *ext_ptr };
1687+
if let Some(old_dict) = ext.dict.take() {
1688+
// Get the dict ref before dropping InstanceDict
1689+
let dict_ref = old_dict.into_inner();
16761690
result.push(dict_ref.into());
16771691
}
16781692
for slot in ext.slots.iter() {

crates/vm/src/stdlib/imp.rs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ mod lock {
1616

1717
#[pyfunction]
1818
fn acquire_lock(_vm: &VirtualMachine) {
19-
IMP_LOCK.lock()
19+
acquire_lock_for_fork()
2020
}
2121

2222
#[pyfunction]
@@ -34,6 +34,16 @@ mod lock {
3434
IMP_LOCK.is_locked()
3535
}
3636

37+
pub(super) fn acquire_lock_for_fork() {
38+
IMP_LOCK.lock();
39+
}
40+
41+
pub(super) fn release_lock_after_fork_parent() {
42+
if IMP_LOCK.is_locked() && IMP_LOCK.is_owned_by_current_thread() {
43+
unsafe { IMP_LOCK.unlock() };
44+
}
45+
}
46+
3747
/// Reset import lock after fork() — only if held by a dead thread.
3848
///
3949
/// `IMP_LOCK` is a reentrant mutex. If the *current* (surviving) thread
@@ -47,22 +57,44 @@ mod lock {
4757
pub(crate) unsafe fn reinit_after_fork() {
4858
if IMP_LOCK.is_locked() && !IMP_LOCK.is_owned_by_current_thread() {
4959
// Held by a dead thread — reset to unlocked.
50-
// Same pattern as RLock::_at_fork_reinit in thread.rs.
51-
unsafe {
52-
let old: &crossbeam_utils::atomic::AtomicCell<RawRMutex> =
53-
core::mem::transmute(&IMP_LOCK);
54-
old.swap(RawRMutex::INIT);
55-
}
60+
unsafe { rustpython_common::lock::zero_reinit_after_fork(&IMP_LOCK) };
61+
}
62+
}
63+
64+
/// Match CPython's `_PyImport_ReInitLock()` + `_PyImport_ReleaseLock()`
65+
/// behavior in the post-fork child:
66+
/// 1) if ownership metadata is stale (dead owner / changed tid), reset;
67+
/// 2) if current thread owns the lock, release it.
68+
#[cfg(unix)]
69+
pub(super) unsafe fn after_fork_child_reinit_and_release() {
70+
unsafe { reinit_after_fork() };
71+
if IMP_LOCK.is_locked() && IMP_LOCK.is_owned_by_current_thread() {
72+
unsafe { IMP_LOCK.unlock() };
5673
}
5774
}
5875
}
5976

6077
/// Re-export for fork safety code in posix.rs
78+
#[cfg(feature = "threading")]
79+
pub(crate) fn acquire_imp_lock_for_fork() {
80+
lock::acquire_lock_for_fork();
81+
}
82+
83+
#[cfg(feature = "threading")]
84+
pub(crate) fn release_imp_lock_after_fork_parent() {
85+
lock::release_lock_after_fork_parent();
86+
}
87+
6188
#[cfg(all(unix, feature = "threading"))]
6289
pub(crate) unsafe fn reinit_imp_lock_after_fork() {
6390
unsafe { lock::reinit_after_fork() }
6491
}
6592

93+
#[cfg(all(unix, feature = "threading"))]
94+
pub(crate) unsafe fn after_fork_child_imp_lock_release() {
95+
unsafe { lock::after_fork_child_reinit_and_release() }
96+
}
97+
6698
#[cfg(not(feature = "threading"))]
6799
#[pymodule(sub)]
68100
mod lock {

0 commit comments

Comments
 (0)